current position:Home>Python: several implementation methods of multi process

Python: several implementation methods of multi process

2022-01-31 04:24:16 Hz in Nuggets

1/ Use multiprocessing Modular Pool class

from multiprocessing.pool import Pool

Pool Class can provide a specified number ( from processes Parameter determination ) A process called by the user ,
 When a new request is submitted to Pool In the middle of the day , If the pool is not full , A new process will be created to execute the request .
 If the pool is full , The request will tell you to wait , Until there are processes in the pool , Will create a new process to execute these requests .

 So let's talk about that multiprocessing  Under the module of Pool Class :

<1>apply()  Synchronous execution mode 
 The function prototype :apply(func[, args=()[, kwds={}]])
 This function is used to pass indefinite parameters , Same as python Medium apply The functions are the same , The main process will be blocked until the end of function execution ( Not recommended , also 3.x There will be no more )

<2>apply_async()  Asynchronous execution mode 
 The function prototype :apply_async(func[, args=()[, kwds={}[, callback=None]]])
 And apply Use consistent , However, it is non blocking and supports callback after the result is returned 

<3>map()
 The function prototype :map(func, iterable[, chunksize=None])
Pool Class map Method , With built-in map Function usage behavior is basically the same , It blocks the process until the result returns 
 Be careful : Although the second parameter is an iterator , But in practice , After the entire queue is ready , The program will run subprocesses 

<4>map_async()
 The function prototype :map_async(func, iterable[, chunksize[, callback]])
 And map Use consistent , But it's non blocking 

<5>close()
 Close process pool (pool), Make it no longer accept new tasks 

<6>terminal()
 End the work process , No more processing of unprocessed tasks 

<7>join()
 The main process is blocked waiting for the child process to exit ,
join The way is in close or terminate Then use 
 Copy code 
# -*- coding:utf-8 -*-

import os,time
from multiprocessing import Pool

def worker(arg):
    print(" The subprocess starts executing >>> pid={},ppid={}, Number {}".format(os.getpid(),os.getppid(),arg))
    time.sleep(0.5)
    
    print(" Child process termination >>> pid={},ppid={}, Number {}".format(os.getpid(),os.getppid(),arg))
  
def main():
    print(" The main process starts executing >>> pid={}".format(os.getpid()))
    
    process_pool = Pool( processes = 5 )
    for i in range(10):
        # process_pool.apply(worker,args=(i,)) #  Synchronous execution 
        process_pool.apply_async(worker,args=(i,)) #  Asynchronous execution 
        
    #  Close process pool , Stop accepting other processes 
    process_pool.close()
    
    #  Blocking process 
    process_pool.join()
    
    print(" The main process terminates ")
    
    
#  Test it 
#  Synchronous execution mode ,apply(function,args=(i,))
#  The results are shown below , It can be seen that the synchronous execution mode will block , Did not achieve the purpose of parallelism .
 The main process starts executing >>> pid=6688
 The subprocess starts executing >>> pid=6689,ppid=6688, Number 0
 Child process termination >>> pid=6689,ppid=6688, Number 0
 The subprocess starts executing >>> pid=6690,ppid=6688, Number 1
 Child process termination >>> pid=6690,ppid=6688, Number 1
 The subprocess starts executing >>> pid=6691,ppid=6688, Number 2
 Child process termination >>> pid=6691,ppid=6688, Number 2
 The subprocess starts executing >>> pid=6692,ppid=6688, Number 3
 Child process termination >>> pid=6692,ppid=6688, Number 3
 The subprocess starts executing >>> pid=6693,ppid=6688, Number 4
 Child process termination >>> pid=6693,ppid=6688, Number 4
 The subprocess starts executing >>> pid=6689,ppid=6688, Number 5
 Child process termination >>> pid=6689,ppid=6688, Number 5
 The subprocess starts executing >>> pid=6690,ppid=6688, Number 6
 Child process termination >>> pid=6690,ppid=6688, Number 6
 The subprocess starts executing >>> pid=6691,ppid=6688, Number 7
 Child process termination >>> pid=6691,ppid=6688, Number 7
 The subprocess starts executing >>> pid=6692,ppid=6688, Number 8
 Child process termination >>> pid=6692,ppid=6688, Number 8
 The subprocess starts executing >>> pid=6693,ppid=6688, Number 9
 Child process termination >>> pid=6693,ppid=6688, Number 9
 The main process terminates 

#  Asynchronous execution 
# apply_async()
#  The results are shown below , It can be seen that the asynchronous execution mode will not block , The purpose of parallelism is achieved .
 The main process starts executing >>> pid=8449
 The subprocess starts executing >>> pid=8451,ppid=8449, Number 0
 The subprocess starts executing >>> pid=8452,ppid=8449, Number 1
 The subprocess starts executing >>> pid=8453,ppid=8449, Number 2
 The subprocess starts executing >>> pid=8454,ppid=8449, Number 3
 The subprocess starts executing >>> pid=8455,ppid=8449, Number 4
 Child process termination >>> pid=8451,ppid=8449, Number 0
 The subprocess starts executing >>> pid=8451,ppid=8449, Number 5
 Child process termination >>> pid=8454,ppid=8449, Number 3
 Child process termination >>> pid=8452,ppid=8449, Number 1
 Child process termination >>> pid=8453,ppid=8449, Number 2 Child process termination >>> pid=8455,ppid=8449, Number 4

 The subprocess starts executing >>> pid=8455,ppid=8449, Number 6
 The subprocess starts executing >>> pid=8454,ppid=8449, Number 7
 The subprocess starts executing >>> pid=8452,ppid=8449, Number 8
 The subprocess starts executing >>> pid=8453,ppid=8449, Number 9
 Child process termination >>> pid=8451,ppid=8449, Number 5
 Child process termination >>> pid=8454,ppid=8449, Number 7
 Child process termination >>> pid=8455,ppid=8449, Number 6
 Child process termination >>> pid=8453,ppid=8449, Number 9
 Child process termination >>> pid=8452,ppid=8449, Number 8
 The main process terminates 


# -*- coding:utf-8 -*-

import os,time
from multiprocessing import Pool

def worker(arg):
    print(" The subprocess starts executing >>> pid={},ppid={}, Number {}".format(os.getpid(),os.getppid(),arg))
    time.sleep(0.5)
    
    print(" Child process termination >>> pid={},ppid={}, Number {}".format(os.getpid(),os.getppid(),arg))
  
def main():
    print(" The main process starts executing >>> pid={}".format(os.getpid()))
    
    process_pool = Pool( processes = 5 )
    data = range(5)
    # process_pool.map(worker,data) 
    process_pool.map_async(worker,data) 
        
    #  Close process pool , Stop accepting other processes 
    process_pool.close()
    
    #  Blocking process 
    process_pool.join()
    
    print(" The main process terminates ")
    
 Copy code 

2/ concurrent.futures.ProcessPoolExecutor The process of pool

 Standard library concurrent.futures modular , It provides ProcessPoolExecutor and ThreadPoolExecutor Two classes ,
 Realized with threading and multiprocessing Further abstraction of .

 Yes 2 A way to submit tasks : Synchronous commit , Asynchronous submission .

<1> Synchronous task submission method   Submit tasks , Wait in place for the completion of task execution , Get the return result of the task .result(), Then perform the next task  
    advantage : Can be decoupled  
    shortcoming : Slow speed , Because we need to wait for the result , Then execute the next 
   
 Copy code 
  import datetime
  from concurrent.futures import ProcessPoolExecutor
  import time, random, os
  import requests

  def f(name):
      print('%s %s is running'%(name,os.getpid()))
      #print(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
    
  if __name__ == '__main__':
    process_pool = ProcessPoolExecutor(4) #  Set the number of processes in the process pool 
    for i in range(10):
        #  Synchronous call mode , Call and equivalence 
        #  How to pass parameters ( Task name , Parameters ), Parameter use position or keyword parameter 
        obj = process_pool.submit(f," process pid:")
        
        # .result() function , Get the return result of the process , And return Result 
        #  If f() There is no return, be obj.result() yes None
        res = obj.result()
        
    #  Close the entry of the process pool , Wait for the tasks in the pool to finish running , Then execute the main process 
    process_pool.shutdown(wait=True) 
    
    print(" The main thread ")
 Copy code 
<2> Asynchronous task submission 
    Only functions are called f, Unequal value 
    advantage : Fast 
    shortcoming : Coupling exists 
 Copy code 
    import datetime
    from concurrent.futures import ProcessPoolExecutor
    import time, random, os
    import requests
    
    def f(name): 
        print("%s %s is running" %(name,os.getpid())) 
        time.sleep(random.randint(1,3)) 
    
    if __name__ == '__main__': 
        # Set the processes in the process pool  
        process_pool = ProcessPoolExecutor(4)
        
        for i in range(10): 
            #  Asynchronous submission method , Just call , Unequal value  
            process_pool.submit(f,' process pid:') 
            #  How to pass parameters ( Task name , Parameters ), Parameters use positional parameters or keyword parameters 
        
        #  Close the entry of the process pool , Wait for the tasks in the pool to finish running , Then execute the main process 
        process_pool.shutdown( wait=True ) 
        
        print(' The main thread ')
 Copy code 

copyright notice
author[Hz in Nuggets],Please bring the original link to reprint, thank you.
https://en.pythonmana.com/2022/01/202201310424144982.html

Random recommended