I have to do a program that reads a PDF, convert each page to PNG file, and then run some code in parallel for each page of each image. I'm searching the way to create a father process, with a constant number of children processes.
The father process have to send work to the children. If the father has 21 pages to process, and has only 5 children, the father has to manage a queue to send work without kill the 5 children and create new children. When the child finish his work, he send a message to the father to send him new work.
I don't want kill the children because I think is faster than kill and create new subprocesses or children. I'm in the wrong direction ?
I'm trying to do this using multiprocessing.apply_async, but I don't found the way to do what I need.
Some advice or tutorial ?
Sorry for my poor english
Some code what I'm trying to do:
from multiprocessing import Pool
import time
import random
def BarcodeSearcher(x):
#Here goes the image processing
return x*x
def resultCollector(result):
print result
def main():
pool = Pool(processes=3)
for pag in range(3):
pool.apply_async(BarcodeSearcher, args = (pag, ), callback = resultCollector)
pool.close()
pool.join()
if __name__ == '__main__':
main()
The way you're currently doing it should work just fine. multiprocessing.Pool
creates a pool with a constant number of worker processes, all of which will stay alive for the lifetime of the Pool
. The Pool
has an internal queue that is used to send work items to the worker processes as soon as one of them finishes doing work. So, all you need to do is feed all the work you want done into the Pool
, and then the Pool
will handle distributing it all to your three worker processes.
Consider your example, except now we're feeding it 30 work items:
from multiprocessing import Pool, current_process
import time
import random
def BarcodeSearcher(x):
print ("Process %s: handling %s" % (current_process(), x)
#Here goes the image processing
return x*x
def resultCollector(result):
print result
def main():
pool = Pool(processes=3)
for pag in range(30):
pool.apply_async(BarcodeSearcher, args = (pag, ), callback = resultCollector)
pool.close()
pool.join()
if __name__ == '__main__':
main()
Here's the output:
Process <Process(PoolWorker-1, started daemon)>: handling 0
Process <Process(PoolWorker-3, started daemon)>: handling 2
Process <Process(PoolWorker-1, started daemon)>: handling 3
Process <Process(PoolWorker-1, started daemon)>: handling 4
Process <Process(PoolWorker-3, started daemon)>: handling 5
0
Process <Process(PoolWorker-2, started daemon)>: handling 1
9
4
Process <Process(PoolWorker-1, started daemon)>: handling 6
Process <Process(PoolWorker-1, started daemon)>: handling 7
16
Process <Process(PoolWorker-2, started daemon)>: handling 8
25
1
Process <Process(PoolWorker-3, started daemon)>: handling 9
36
49
64
81
Process <Process(PoolWorker-1, started daemon)>: handling 10
100
Process <Process(PoolWorker-2, started daemon)>: handling 11
Process <Process(PoolWorker-3, started daemon)>: handling 12
121
144
Process <Process(PoolWorker-2, started daemon)>: handling 13
Process <Process(PoolWorker-1, started daemon)>: handling 14
169
Process <Process(PoolWorker-2, started daemon)>: handling 15
196
Process <Process(PoolWorker-1, started daemon)>: handling 16
225
Process <Process(PoolWorker-3, started daemon)>: handling 17
256
Process <Process(PoolWorker-3, started daemon)>: handling 18
Process <Process(PoolWorker-1, started daemon)>: handling 19
Process <Process(PoolWorker-1, started daemon)>: handling 20
289
Process <Process(PoolWorker-1, started daemon)>: handling 21
324
Process <Process(PoolWorker-3, started daemon)>: handling 22
361
Process <Process(PoolWorker-3, started daemon)>: handling 24
400
Process <Process(PoolWorker-1, started daemon)>: handling 25
441
Process <Process(PoolWorker-3, started daemon)>: handling 26
Process <Process(PoolWorker-1, started daemon)>: handling 27
484
576
Process <Process(PoolWorker-3, started daemon)>: handling 28
Process <Process(PoolWorker-1, started daemon)>: handling 29
625
676
729
784
841
Process <Process(PoolWorker-2, started daemon)>: handling 23
529
As you can see, the work got distributed between your workers without you having to do anything special.
Collected from the Internet
Please contact [email protected] to delete if infringement.
Comments