current position:Home>Introduction and internal principles of Python's widely used concurrent processing Library Futures
Introduction and internal principles of Python's widely used concurrent processing Library Futures
2022-02-01 06:46:18 【waws520】
This article is reproduced from Lao Qian's blog, Help you understand Concurrent processing library futures
Use
The address of the author's original text is :juejin.cn/post/684490…
Python Widely used concurrent processing library futures Introduction to use and internal principles
In the use of Python When dealing with tasks , Limited to single thread processing capacity , Need to parallelize tasks , Distributed to multiple threads or multiple processes to execute .
concurrent.futures This is such a library , It allows users to easily parallelize tasks . The name is a little long , I'll use words directly later concurrent Instead of concurrent.futures.
concurrent Two concurrency models are provided , One is multithreading ThreadPoolExecutor, One is multi process ProcessPoolExecutor. about IO Multithreading model should be used for intensive tasks . For compute intensive tasks, a multi process model should be used .
Why choose like this ? Because Python GIL It's the presence of the world that makes Python Virtual machines cannot effectively use multi-core computing . For pure computing tasks , It can only drain a single... Forever CPU The core . If you want to break through this bottleneck , It must be fork Several subprocesses to share computing tasks . And for IO Intensive task ,CPU The utilization rate is often very low , Using multithreading will double CPU Usage rate , But it's far from saturation (100%) The point of , On the premise that a single core can cope with the overall calculation , Naturally, we should choose the mode with less resource consumption , That is, multithreading mode .
Next, let's try two modes for Parallel Computing .
Multithreading
Multithreading mode is suitable for IO Intensive operations , Here I want to use sleep Let's simulate a slow IO Mission . At the same time, in order to facilitate the preparation of command-line programs , Use here Google fire Open source library to simplify command line parameter processing .
# coding: utf8
# t.py
import time
import fire
import threading
from concurrent.futures import ThreadPoolExecutor, wait
# Split subtasks
def each_task(index):
time.sleep(1) # sleep 1s, simulation IO
print "thread %s square %d" % (threading.current_thread().ident, index)
return index * index # Return results
def run(thread_num, task_num):
# Instantiate thread pool ,thread_num Threads
executor = ThreadPoolExecutor(thread_num)
start = time.time()
fs = [] # future list
for i in range(task_num):
fs.append(executor.submit(each_task, i)) # Submit tasks
wait(fs) # Wait for the calculation to end
end = time.time()
duration = end - start
s = sum([f.result() for f in fs]) # Sum up
print "total result=%s cost: %.2fs" % (s, duration)
executor.shutdown() # Destroy thread pool
if __name__ == '__main__':
fire.Fire(run)
Copy code
Copy code
function python t.py 2 10
, That is to say 2 Thread run 10 A mission , Observe the output
thread 123145422131200 square 0thread 123145426337792 square 1
thread 123145426337792 square 2
thread 123145422131200 square 3
thread 123145426337792 square 4
thread 123145422131200 square 5
thread 123145426337792 square 6
thread 123145422131200 square 7
thread 123145426337792 square 8
thread 123145422131200 square 9
total result=285 cost: 5.02s
Copy code
Copy code
We see that the total cost of the calculation is about 5s, in total sleep 了 10s Shared by two threads , So it is 5s. The reader may ask , Why is the output messy , This is because print Operation is not atomic , It's two consecutive write Operation synthesis , first write Output content , the second write Output line breaks ,write The operation itself is atomic , But in a multithreaded environment , these two items. write Operations are interleaved , So the output is not neat . If the code is slightly modified , take print Change to single write operation , The output is neat ( About write Whether absolute atomicity needs further discussion )
# Split subtasks
def each_task(index):
time.sleep(1) # sleep 1s, simulation IO
import sys
sys.stdout.write("thread %s square %d\n" % (threading.current_thread().ident, index))
return index * index # Return results
Copy code
Copy code
Let's run again python t.py 2 10
, Observe the output
thread 123145438244864 square 0
thread 123145442451456 square 1
thread 123145442451456 square 2
thread 123145438244864 square 3
thread 123145438244864 square 4
thread 123145442451456 square 5
thread 123145438244864 square 6
thread 123145442451456 square 7
thread 123145442451456 square 9
thread 123145438244864 square 8
total result=285 cost: 5.02s
Copy code
Copy code
Next , We change the parameters , Expand to 10 Threads , See how long all the tasks take to complete
> python t.py 10 10
thread 123145327464448 square 0
thread 123145335877632 square 2
thread 123145331671040 square 1
thread 123145344290816 square 4
thread 123145340084224 square 3
thread 123145348497408 square 5
thread 123145352704000 square 6
thread 123145356910592 square 7
thread 123145365323776 square 9
thread 123145361117184 square 8
total result=285 cost: 1.01s
Copy code
Copy code
You can see 1s All the tasks are completed in . This is the charm of multithreading , Multiple IO Parallelization of operations , Reduce overall processing time .
Multi process
Compared with multithreading, it is suitable for processing IO Intensive task , Multi process is suitable for computing intensive systems . Next, let's simulate computing intensive tasks . My PC has 2 Core , You can experience the advantages of multi-core computing .
How to simulate this intensive computing task , We can use the PI formula .
By expanding the length of the series n, You can approach PI infinitely . When n When I was very old , The calculation will be slow , Now CPU Will always be busy , This is exactly what we expect .
good , Write the code of multi process parallel computing
# coding: utf8
# p.py
import os
import sys
import math
import time
import fire
from concurrent.futures import ProcessPoolExecutor, wait
# Split subtasks
def each_task(n):
# Calculate the PI according to the formula
s = 0.0
for i in range(n):
s += 1.0/(i+1)/(i+1)
pi = math.sqrt(6*s)
# os.getpid You can get the sub process number
sys.stdout.write("process %s n=%d pi=%s\n" % (os.getpid(), n, pi))
return pi
def run(process_num, *ns): # Enter multiple n value , Divide into multiple subtasks to calculate the results
# Instantiate process pool ,process_num A process
executor = ProcessPoolExecutor(process_num)
start = time.time()
fs = [] # future list
for n in ns:
fs.append(executor.submit(each_task, int(n))) # Submit tasks
wait(fs) # Wait for the calculation to end
end = time.time()
duration = end - start
print "total cost: %.2fs" % duration
executor.shutdown() # Destroy process pool
if __name__ == '__main__':
fire.Fire(run)
Copy code
Copy code
From the code, we can see that the multi process mode is not much different from multithreading in code writing , Just change a class name , Everything else is the same . This is also concurrent The charm of Library , The multithreading and multiprocessing model are abstracted out of the same use interface .
Let's run it python p.py 1 5000000 5001000 5002000 5003000
, All in all 4 Time pi, Only one process . Observe the output
process 96354 n=5000000 pi=3.1415924626
process 96354 n=5001000 pi=3.14159246264
process 96354 n=5002000 pi=3.14159246268
process 96354 n=5003000 pi=3.14159246272
total cost: 9.45s
Copy code
Copy code
It can be seen that with n The increase of , The result is getting closer to pi , Because only one process is used , So the task is executed serially , It took about 9.5s.
Next, add another process , Observe the output
> python p.py 2 5000000 5001000 5002000 5003000
process 96529 n=5001000 pi=3.14159246264
process 96530 n=5000000 pi=3.1415924626
process 96529 n=5002000 pi=3.14159246268
process 96530 n=5003000 pi=3.14159246272
total cost: 4.98s
Copy code
Copy code
In terms of time consumption, it is close to 1 And a half , It shows that multi process does play the effect of computing parallelization . Now if you use top Command to observe the of the process CPU Usage rate , The difference between these two processes is CPU The utilization rate is close to 100%.
If we add more 2 A process , Can we continue to compress the computing time
> python p.py 4 5000000 5001000 5002000 5003000
process 96864 n=5002000 pi=3.14159246268
process 96862 n=5000000 pi=3.1415924626
process 96863 n=5001000 pi=3.14159246264
process 96865 n=5003000 pi=3.14159246272
total cost: 4.86s
Copy code
Copy code
It seems that we can't continue to save time , Because only 2 A computing core ,2 One process is enough to drain them , Even with more progress, there is only 2 Two computing cores are available .
Deep principle
concurrent It's very simple to use , But the internal implementation is not well understood . Before delving into the internal structure , We need to understand Future This object . In the previous example ,executor Submit (submit) A will be returned after the task Future object , It represents a pit of results , When the task has just been submitted , The pit is empty , Once the child thread runs, the task ends , Will plug the results of the operation into the pit , The main thread can pass Future Object to get this result . In a nutshell ,Future Object is the medium through which the main thread and the child thread communicate .
Future The internal logic of the object is simpler and can be represented by the following code
class Future(object):
def __init__(self):
self._condition = threading.Condition() # Condition variables,
self._result = None
def result(self, timeout=None):
self._condition.wait(timeout)
return self._result
def set_result(self, result):
self._result = result
self._condition.notify_all()
Copy code
Copy code
The main thread gets this after it inserts the task into the thread pool Future object , Its internal _result Or empty? . If the main thread calls result() Method get results , Will block on the condition variable . If the calculation task of the child thread is completed, it will immediately call set_result() Method to populate the results into future object , And wake up the thread blocking on the condition variable , That is, the main thread . At this time, the main process immediately wakes up and returns the results normally .
Thread pool internal structure
The interaction between main thread and sub thread is divided into two parts , The first part is how the main thread passes the task to the child thread , The second part is how the child thread passes the result to the main thread . The second part has been said through Future Object to complete . How did the first part do it ?
As shown in the figure above , The secret lies in this queue , The main thread passes tasks to multiple child threads through a queue . Once the main thread pushes the task into the task queue , The sub threads will start competing , Finally, only one thread can grab the task , And execute immediately , After execution, put the results into Future Object completes the whole execution process of this task .
Disadvantages of thread pool
concurrent There is a major design problem with the thread pool , That is, the task queue is unbounded . If the queued producer task is producing too fast , Thread pool consumption is too slow to handle , Tasks pile up . If the accumulation continues , Memory will continue to grow until OOM, All the tasks accumulated in the task queue are completely lost . Users must pay attention to this when using , And make proper control .
Internal structure of process pool
The internal structure of the process pool is complex , even concurent The authors of the library themselves find it particularly complex , So I drew a picture in the code ascii Figure to explain the internal structure of the model
I don't think the author's picture is easy enough to understand , So I drew a separate picture , Please carefully combine the above two pictures , Let's go through the complete task processing process .
- The mainline crams tasks into TaskQueue( Normal memory queue ), Get Future object
- The only management thread is from TaskQueue Access to task , Stuff in CallQueue( Distributed cross process queue )
- Subprocesses from CallQueue Compete for tasks in
- The subprocess inserts the processing results into ResultQueue( Distributed cross process queue )
- Management thread from ResultQueue Get results in , Stuff in Future object
- The main thread is from Future Get the result from the object
This complex process involves 3 A queue , There are additional management threads in the middle . Then why did the author design so complex , What's the advantage of such a design ?
First , Let's look at the left half of this picture , It is not much different from the processing flow of thread pool , The only difference is that there is only one management thread , The child threads of the thread pool will have multiple . This design can make the use of multi process model and multi thread model consistent , This is why there is no difference between the two models —— Through the middle management thread, the multi process interaction logic behind it is hidden .
Then let's look at the right half of this picture , The management thread interacts with child processes through two queues , Both queues are cross process queues (multiprocessing.Queue).CallQueue It is a single producer and multiple consumers ,ResultQueue It's a multi producer single consumer .
CallQueue It's a bounded queue , Its upper limit is written dead in the code as 「 Number of child processes +1」. If the child processes can't handle it ,CallQueue It will be full , The management thread will stop stuffing data into it . But there are also the same problems as thread pool ,TaskQueue It's a boundless line , Its content doesn't matter whether consumers are continuing ( Manage threads ) consumption ,TaskQueue Will continue to grow indefinitely , So it will eventually lead to OOM.
Cross process queue
The cross process queue in the process pool model uses multiprocessing.Queue Realized . What are the internal details of the cross process queue , What high technology is used to realize it
The author read carefully multiprocessing.Queue Source code discovery for , It uses an anonymous socket sockerpair To complete cross process communication ,socketpair and socket The difference is socketpair No ports are needed , You don't need to go through the network protocol stack , Direct cross process communication through the socket read-write buffer of the kernel .
When the parent process wants to pass tasks to the child process , First use pickle Serialize the task object into a byte array , Then pass the byte array through socketpair The write descriptor is written to the kernel buffer in . The subprocess can then start from buffer Byte array read from , And then use pickle Deserialize the byte array to get the task object , So we can finally perform the task . In the same way, the process passes the result to the parent process and follows the same process , It's just that the socketpair yes ResultQueue Anonymous socket created internally .
multiprocessing.Queue It supports duplex communication , Data flow can be parent to child , It can also be from son to father , But in concurrent Only simplex communication is used in the implementation of process pool .CallQueue From father to son ,ResultQueue From son to father .
summary
concurrent.futures The frame is very easy to use , Although the internal implementation mechanism is extremely complex , Readers can use it directly without fully understanding the internal details . However, it should be noted that the task queue inside the thread pool or process pool is unbounded , We must avoid consumers' untimely processing and the continuous rise of memory .
copyright notice
author[waws520],Please bring the original link to reprint, thank you.
https://en.pythonmana.com/2022/02/202202010646165908.html
The sidebar is recommended
- Django paging (II)
- Concurrent. For Python concurrent programming Futures or multiprocessing?
- Programmers over the age of 25 can't know a few Chinese herbal medicines. Python crawler lessons 9-9
- Python crawler from introduction to pit full series of tutorials (detailed tutorial + various practical combat)
- The second bullet of class in Python
- Python object oriented programming 03: class inheritance and its derived terms
- How IOS developers learn Python Programming 13 - function 4
- Python crawler from introduction to mastery (VI) form and crawler login
- Python crawler from entry to mastery (V) challenges of dynamic web pages
- Deeply understand pandas to read excel, TXT, CSV files and other commands
guess what you like
-
Daily python, Chapter 18, class
-
"I just want to collect some plain photos in Python for machine learning," he said. "I believe you a ghost!"
-
Django view
-
Python implements filtering emoticons in text
-
When winter comes, python chooses a coat with temperament for mom! Otherwise, there's really no way to start!
-
Python crawler - get fund change information
-
Highlight actor using Python VTK
-
Python crawler actual combat: crawling southern weekend news articles
-
leetcode 406. Queue Reconstruction by Height(python)
-
leetcode 1043. Partition Array for Maximum Sum (python)
Random recommended
- Python * * packaging and unpacking details
- Python realizes weather query function
- Python from 0 to 1 (day 12) - Python data application 2 (STR function)
- Python from 0 to 1 (day 13) - Python data application 3
- Numpy common operations of Python data analysis series Chapter 8
- How to implement mockserver [Python version]
- Van * Python! Write an article and publish the script on multiple platforms
- Python data analysis - file reading
- Python data De duplication and missing value processing
- Python office automation - play with browser
- Python series tutorial 127 -- Reference vs copy
- Control flow in Python: break and continue
- Teach you how to extract tables in PDF with Python
- leetcode 889. Construct Binary Tree from Preorder and Postorder Traversal(python)
- leetcode 1338. Reduce Array Size to The Half(python)
- Object oriented and exception handling in Python
- How to configure load balancing for Django service
- How to embed Python in go
- Python Matplotlib drawing graphics
- Python object-oriented programming 05: concluding summary of classes and objects
- Python from 0 to 1 (day 14) - Python conditional judgment 1
- Several very interesting modules in Python
- How IOS developers learn Python Programming 15 - object oriented programming 1
- Daily python, Chapter 20, exception handling
- Understand the basis of Python collaboration in a few minutes
- [centos7] how to install and use Python under Linux
- leetcode 1130. Minimum Cost Tree From Leaf Values(python)
- leetcode 1433. Check If a String Can Break Another String(python)
- Python Matplotlib drawing 3D graphics
- Talk about deep and shallow copying in Python
- Python crawler series - network requests
- Python thread 01 understanding thread
- Analysis of earthquake distribution in the past 10 years with Python~
- You need to master these before learning Python crawlers
- After the old friend (R & D post) was laid off, I wanted to join the snack bar. I collected some data in Python. It's more or less a intention
- Python uses redis
- Python crawler - ETF fund acquisition
- Detailed tutorial on Python operation Tencent object storage (COS)
- [Python] comparison of list, tuple, array and bidirectional queue methods
- Go Python 3 usage and pit Prevention Guide