current position:Home>Concurrent and Asynchronous Programming in Python

Concurrent and Asynchronous Programming in Python

2022-09-09 05:16:22Pinghu Autumn Moon in Hangzhou


前言

在程序中,如果有大量的 CPU bound 任务,Or there are a lot of them I/O bound 任务,you can use concurrent programming and asynchronous programming,Greatly increases the speed of your code.

说明:下面的代码使用 Python 3.10 进行演示.Some commands and parameters used,in other lower versions Python 中可能不支持.


1. 几个相关的术语

When working with concurrent and asynchronous programming,will often encounter several terms,Need to make some distinction.

1.1 并发 concurrency 和并行 parallelism

Concurrency 是“并发机制”,Refers to the presence of multiple tasks at the same time,可能有 1 one or more tasks are running,It is also possible that all tasks are running.
Parallelism 是“并行机制”,Refers to the presence of multiple tasks at the same time,and all tasks are running at the same time.

例如 GPU there are thousands of nuclei,GPU Calculations are usually in the nuclear of parallel computing (parallelism),That is, these thousands of cores are computing at the same time.
而在 CPU 中,If you are using multi-threaded concurrency(concurrency),通常只有 1 个线程在运行,Other threads are blocked(pending).
So from a collection point of view,并发(concurrency)is included in parallel(parallelism)的.

1.2 同步 synchronous 和异步 asynchronous

synchronous 同步,is a common way of running a program:That is, after the main program calls the subroutine,The main program stopped at the call point,wait until the subroutine ends,The main program will continue to run the code following the call point.
synchronous How to run programs synchronously,Also called sequence mode sequential,Because the process of each part of the code is executed in sequence,After part of the code ends,will run the next part of the code.

asynchronous 异步,是一种“不等待”的方式:That is, after the main program calls the subroutine,The main program execution call immediately subsequent code,without waiting for the subroutine.

If the subroutine takes a long time to end,Use asynchronous programming at this time,It will greatly improve the efficiency of the code.
在 FastAPI There is an article,使用了 《Line up with your crush for a burger》 Examples to explain concepts like asynchrony and concurrency.可以参考:→ https://fastapi.tiangolo.com/async/

1.3 I/O bound 和 CPU bound

I/O bound 程序,means that the program spends a lot of time I/O 操作上,I/O operation is the efficiency bottleneck of the program.
常见的 I/O 有 2 类,包括网络 network I/O 和文件 file I/O .network I/O Sending a request and waiting for a response will take a long time.The data read and write operations between the memory and the hard disk belong to file I/O.

CPU bound 程序,means that the program spends a lot of time CPU 的计算上,CPU Computation is the efficiency bottleneck of the program.有时 CPU bound 也叫做 CPU-intensive.
对图片的处理,以及对音频、视频文件的处理,都属于 CPU bound 程序,它们都要使用 CPU 进行大量的计算.

1.4 进程、线程、协程以及 GIL 的关系图

可以用 3 an execution unit execution unit 来实现并发,包括:进程 process,线程 thread,协程 coroutine.
进程 process,线程 thread,协程 coroutine,以及 GIL(Global Interpreter Lock)There is a mutual relationship 4 个要点:

  1. 每个 Python process Both have an interpreter interpreter.The interpreter will explain Python code and execute it.
  2. 每个 Python process 中,Ability to run multiple threads concurrently threads.
  3. 线程获得了 GIL(Global Interpreter Lock)之后 ,The interpreter will run the thread.每隔 5 ms,The interpreter will force release GIL,System reset GIL Allocated to one of multiple threads,下一个得到 GIL thread to run.
  4. 一个线程内,Ability to run multiple coroutines concurrently coroutines.but only coroutines that are assigned an interpreter,才能够被运行.that is, at any time,At most one coroutine is running in a thread.

它们之间的关系图如下:
在这里插入图片描述

在上图中,因为线程 thread_2 获得了 GIL(Represented by a white box in the figure),所以 thread_2 得以运行,而另外 2 threads are blocked pending.
而在线程 thread_2 中,Multiple coroutines are running concurrently coroutines.Suppose the interpreter is assigned to the coroutine at this time coroutine_2(The white dotted box as shown in the figure),所以此时 coroutine_2 得以运行.

2. Choose the right concurrency mechanism

in different mission scenarios,Need to choose a different concurrency mechanism,to maximize the efficiency of the program.

A simple selection method is as follows:

  1. 处理 CPU bound 任务,And multi-core CPU 可用时,使用 multiprocessing 多进程并发.
  2. 在 I/O bound 的任务中,可以使用多线程 threading or multi-coroutines coroutines.At this point, concurrency is achieved in the same process.
    2.1 If the programmer is required to fully control the allocation of the interpreter,则使用 coroutines.
    2.2 If the programmer does not need to control the allocation of the interpreter,Instead, the system automatically assigns an interpreter,Use multithreading threading.
    2.3 多协程 coroutines will be faster than multithreading threading 更快一些.This is because of multithreading,It needs to be constantly correct GIL Perform unlocking and locking operations,And there is also a need for competition between multiple threads GIL 的使用权,These will lead to time-consuming.
    在这里插入图片描述

————————————————————————————————————————————————————————
in application code application code 中,When you need to use concurrent programs,should try to use high-level high level APIs,因为这些高层 APIs Simpler and easier to use.
When writing some low-level basic frameworks,will need to use low-level low level APIs.

When using multithreading and multiprocess concurrency,High-level ones can be used concurrent.futures 模块.And multi-coroutine concurrency,则可以使用 asyncio 模块.Below is the code for the application,Use these top layers APIs An introduction to concurrency.

3. Multithreaded concurrent templates,threading

When using multi-threaded concurrency,可以使用 concurrent.futures 模块,并且用 ThreadPoolExecutor as a context manager context manager.

3.1 使用 executor.map 方法

使用 executor.map method is the easiest.Below is a template,主要的 4 个步骤是:

  1. 导入 concurrent.futures 模块.
  2. 把单个 worker 的任务(i.e. tasks that require concurrent,done by a single thread or a single process)放入一个函数中.
  3. 用一个函数 workers_scheduler to mobilize all workers.
    3.1 创建 ThreadPoolExecutor 的个体 instance,作为 context manager.
    3.2 使用 executor.map method for concurrency,Returns a generator that evaluates to the result results.map method will be executed asynchronously for each task asynchronously.
  4. 在 for Iterating over generators in a loop results,以 FIFO(先进先出) way to return the result of the calculation.
"""使用 ThreadPoolExecutor The multithreaded concurrency template for."""
# 1. 导入 concurrent.futures.
from concurrent import futures

# 2. 把单个 worker 的任务(i.e. tasks that require concurrent)放入一个函数中.
def one_worker_task(param):
    ...
    
# 3. 用一个函数 workers_scheduler to mobilize all workers.
def workers_scheduler(parameters):
    # 3.1 创建 ThreadPoolExecutor 的个体,作为 context manager.
    with futures.ThreadPoolExecutor() as executor:
        # 3.2 使用 executor.map method for concurrency,returns a result generator.map method can put each worker 
        # The tasks are executed asynchronously asynchronously.如果有多个任务,map methods may also execute tasks concurrently.
        results = executor.map(one_worker_task, parameters)  # 返回一个 generator.

    # 4. 在 for Iterating over generators in a loop results,以 FIFO(先进先出) way to return the result of the calculation.
    # 即先进入 executor.map 的输入,The result of its calculation will come out first.
    for i, result in enumerate(results, 1):
        print(f'\neach result: {
      i}, {
      result}')

if __name__ == '__main__':
    workers_scheduler()

3.2 使用 executor.submit 和 futures.as_completed

和 executor.map 相比,executor.submit is a more flexible usage.比如可以在 executor.submit use different functions in,或者是在 futures.as_completed Mixed use of threads and processes in future 对象等.
Futures.as_completed 返回一个 iterator,which is reordered future 对象.Sorted by each future The time the object completed,先完成的 future Objects come first.

使用 executor.submit 的模板如下,The main difference in operation is the 3 步骤,i.e. mobilize all workers Perform concurrent steps.具体如下:

  1. 用一个函数 workers_scheduler to mobilize all workers.
    3.1 使用 ThreadPoolExecutor 的个体,作为 context manager.
    3.2 遍历每一个输入,使用 executor.submit 获得一个 concurrent.futures.Future 对象.
    3.3 用 futures.as_completed 得到一个 iterator,where is the run has ended future 对象. 先完成的 future Objects come first.
    遍历这个 iterator,用 future.result 方法,to get a single worker 的返回结果.

另外,如果想在 future After the object has finished running,Append some operations,Can create for the operation function,用 future.add_done_callback 即可.

"""使用 ThreadPoolExecutor The multithreaded concurrency template for,用到 Future 对象."""
# 1. 导入 concurrent.futures.
from concurrent import futures
# 2. 把每个 worker Tasks in a function.
def one_worker_task(parameters):
    ...
def done_callable(future):  # 可以直接把 future 对象作为参数传递进来.
	...
    print(f'Future object is done: {
      future}')    
# 3. 用一个函数 workers_scheduler to mobilize all workers.
def workers_scheduler(param):
    # 3.1 创建 ThreadPoolExecutor 的个体,作为 context manager.
    with futures.ThreadPoolExecutor() as executor:
        to_do: list[futures.Future] = []
        # 3.2 遍历每一个输入,使用 executor.submit 获得一个 future 对象.
        for param in sorted(parameters):            
            future = executor.submit(one_worker_task, param)
            to_do.append(future)

        # count 用于从 1 开始计算 futures 的数量,所以下面的 enumerate 设置 start 参数为 1.
        # 3.3 用 futures.as_completed 得到一个 iterator,where is the run has ended future 对象.先完成的 future Objects come first.
        # 遍历这个 iterator,用 future.result 方法,to get a single worker 的返回结果.
        for count, future in enumerate(futures.as_completed(to_do), 1):            
            res = future.result()
            ...
            # 如果想在 future After the object has finished running,Append some operations,可以用 add_done_callback. 
            future.add_done_callback(done_callable)           
            
if __name__ == '__main__':
    workers_scheduler()

3.3 其它释放 GIL 的情况

As mentioned above, when multiple threads are concurrent,每隔 5 ms,Python The interpreter suspends the currently running thread,释放 GIL.此外,There are several can release below GIL 的操作.

  1. 所有 Python The standard library procedures in a system call syscall 的同时,也会释放 GIL.This category is initiated syscall 的程序包括 disk I/O 程序, network I/O 程序, 及 time.sleep().
  2. NumPy/SciPy In some heavy use CPU 的程序(CPU-intensive functions),能够释放 GIL.
  3. 还有 zlib 和 bz2 Some compression of modules/The decompress function can also release GIL.

3.4 龟兔赛跑

Here is an example of a tortoise and hare race,to demonstrate multithreading thread 并发.
Concurrent in total 2 个线程,A thread to run the turtle,Another thread is used to run the rabbit.
image of tortoise and rabbit Unicode 实现.完整的 Unicode The content can be found in the official document below:
https://www.unicode.org/Public/UCD/latest/charts/CodeCharts.pdf
在这里插入图片描述

在上面的图片中,Will continue to switch the picture of the tortoise and the hare,每次只能显示其中的一个.This is because only the occupied GIL 的线程,to print the picture,And it will overwrite the picture of another thread.
Whether it's a tortoise or a rabbit,You can see them running towards the finish line,This means concurrency 2 threads are run.
The multi-threaded concurrent code of this tortoise and hare race is as follows.

"""An example of the tortoise and the hare,to demonstrate multithreading thread Concurrent usage. 并发 2 个线程,a thread running turtle,Another thread running bunny. image of tortoise and rabbit Unicode 实现.完整的 Unicode The content can be found in the official documentation: https://www.unicode.org/Public/UCD/latest/charts/CodeCharts.pdf """

import time
import unicodedata
from concurrent import futures


def worker(unicode_name, frame_duration, duration_limit):
    """single concurrent worker,Can keep objects moving. arguments: unicode_name:一个字符串,是 Unicode The official name of the character,不区分大小写. frame_duration:一个浮点数,表示时间.每经过 frame_duration 秒,The object moves forward 一个位置. duration_limit:一个浮点数,表示时间,单位为 s.是 worker 的总运行时间. """
    object_position = -1  # 从最右边开始,即 -1 位置.
    duration_start = time.time()
    while True:
        tiles = ['_'] * 20  # Use the underline simulation of the road 20 块地砖 tiles
        # 用 unicodedata Draw objects,and put the object in the correct position.
        tiles[object_position] = unicodedata.lookup(f'{
      unicode_name}')

        if unicode_name == 'turtle':  # Draw the turtle and the hare in different positions,以便于区分.
            frame = f'\r{
      unicode_name}:\t'  # 注意要先使用 \r,Back to the front.
        else:
            frame = '\r' + '\t' * 6 + f'{
      unicode_name}:\t'

        frame += ''.join(tiles)  # 形成一帧 frame.
        print(frame, end='')  # 注意使用 end='',keep the screen on the same line.

        time.sleep(frame_duration)
        object_position -= 1  # Object moves forward one block.
        object_position = max(object_position, -len(tiles))  # Avoid index out of bounds.

        duration_stop = time.time()
        duration = duration_stop - duration_start
        if duration > duration_limit:
            break


def main():
    unicode_names = ['turtle', 'rabbit']  
    frame_durations = [0.5, 0.7]  # This is equivalent to the speed of the tortoise and the hare respectively.
    duration_limits = [10] * 2  # 一共跑 10 s.
    print('Start racing!')
    with futures.ThreadPoolExecutor() as executor:
        executor.map(worker, unicode_names, frame_durations, duration_limits)


if __name__ == '__main__':
    main()
    # If you need to see the movement of a single turtle,You can comment out the above line,Just use the code below.
    # worker('turtle', 0.5, 10)

4. Multi-process concurrency template,multiprocessing

在 concurrent.futures 中,Multi-process concurrency is the same as multi-threading,Just use above executor.map 和 executor.submit 的 2 in a template,用 ProcessPoolExecutor 代替 ThreadPoolExecutor 即可.
下面是 Python 官网的例子.

"""Python 官网中,使用 ProcessPoolExecutor Example of calculating prime numbers.Added some comments and modifications. https://docs.python.org/3/library/concurrent.futures.html#processpoolexecutor-example """
import concurrent.futures
import math
import time

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]


def worker(n):
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False

    isqrt_n = math.isqrt(n)  # isqrt Is the square root of integer part,相当于 int(math.sqrt(n)).
    for i in range(3, isqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True


def main():
    # The following can be modified max_workers=1,Compare the difference in the number of different processes.
    with concurrent.futures.ProcessPoolExecutor(max_workers=None) as executor:
        results = executor.map(worker, PRIMES)  # 并发多个进程.

    for number, prime in zip(PRIMES, results):
        print(f'{
      number} is prime: {
      prime}')


if __name__ == '__main__':
    tic = time.perf_counter()
    main()
    toc = time.perf_counter()
    print(f'\nDuration:\t{
      toc - tic:.2f} seconds.')  # 计时功能.When the number of concurrent can compare different process,time difference.

ProcessPoolExecutor 的参数 signature 如下图.If you do not set the number of processes max_workers,The default to use the machine all processors processors.
It is generally recommended to only open one in a processor Python 进程 process,To avoid contention between multiple processes for the processor,This leads to extra time.
可以使用 os.cpu_count() View the number of native processors,如 AMD 5950X 为 16 核 32 处理器.
在这里插入图片描述
mp_context 是一个 context object,The startup method used to set the child process(start method),可以通过 get_context 获得,如下图.一般用得不多.

import concurrent.futures
import multiprocessing

ctx = multiprocessing.get_context('forkserver')  # How to start a child process start method
with concurrent.futures.ProcessPoolExecutor(max_workers=32, mp_context=ctx) as executor:
    ...

4.1 Pit in multi-process concurrency rabbit hole

When using multi-process concurrency,There is a relatively hidden pit → rabbit hole,需要注意,so as not to fall into the pit.
The performance of this pit is:Although there are multiple concurrent processes,But the speed of the program is not getting faster,even slower than a single process.

这个问题的原因,由 3 部分组成:

  1. 在主进程中,There are some operations such as import operations and creating global variables outside the function body.Can call them“全局操作”.
  2. And every time to create a child process,All of the above“全局操作”执行一遍.假设这些“全局操作”耗时为 t,则 n child process will take time n * t.
  3. 如果这些“全局操作”耗时较长,will make the total“全局操作”耗时 n * t 极大,Much more time than the child process computes,It seems very slow after multi-process concurrency.

一个示例如下,Part of the code is Python Website prime examples of calculation.

"""This part is for display“坑”的代码,Each child process will“全局操作”执行一遍."""
import concurrent.futures
import math

# 1. 注意下面的 ONE_CONSTANT 和 try-else block,将会被执行 3 遍.
ONE_CONSTANT = 888  # ONE_CONSTANT Just as a global variable example,has no real role in the program.
try:
    PRIMES = [115797848077099, 1099726899285419]
except Exception as exc:
    print(f'Exception raised: \n{
      exc}')
else:
    print(f'\nPRIMES created:\t{
      PRIMES!r}')
    print(f'ONE_CONSTANT created:\t{
      ONE_CONSTANT!r}')


def is_prime(index):
    n = PRIMES[index]  # 2. Pay attention to in the process of each child,List can be used directly PRIMES.
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False
    isqrt_n = math.isqrt(n)  # isqrt Is the square root of integer part,等于 int(math.sqrt()).
    for i in range(3, isqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True


def main():
    indices = range(len(PRIMES))
    with concurrent.futures.ProcessPoolExecutor() as executor:
        # 3. 下面的 map,虽然没有把 PRIMES 传递给子进程,But each subprocess is actually
        # Created a copy PRIMES.
        results = executor.map(is_prime, indices)
    for number, prime in zip(PRIMES, results):
        print(f'\n{
      number:_}\t is prime: {
      prime}')


if __name__ == '__main__':
    print('Start the main:')
    main()    

运行结果如下图,Note that outside the function body“全局操作”(Include print statements PRIMES created 和 ONE_CONSTANT createed)被执行了 3 次,That is, the main process executes 1 次,2 Each child process executes 1 次.
因此,If creating global variables these operations take a long time,will make multiprocessing appear very slow.
在这里插入图片描述
And the solution to this problem,is to create a“准备函数” preparation,Put the global action in“准备函数”中去.如下代码示例.其中的 3 A key operation is:

  1. Put the global action in“准备函数” preparation 中.
  2. 在主进程中执行 1 次 preparation,Get the parameters required by the child process.
  3. Pass the parameters required by the child process through map 函数传递.
"""The ways to solve multi-process concurrent slower:Create a in the main process“准备函数” preparation,Put global operations 到“准备函数”中去. """
import concurrent.futures
import math


# 1. Put global operations into a“准备函数” preparation 中,This function only needs to be executed by the main process 1 遍.
def preparation():
    ONE_CONSTANT = 888
    try:
        PRIMES = [115797848077099, 1099726899285419]
    except Exception as exc:
        print(f'Exception raised: \n{
      exc}')
    else:
        print(f'\nPRIMES created:\t{
      PRIMES!r}')
        print(f'ONE_CONSTANT created:\t{
      ONE_CONSTANT!r}')
    return PRIMES


def is_prime(n):
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False
    isqrt_n = math.isqrt(n)
    for i in range(3, isqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True


def main():
    PRIMES = preparation()  # 2. 执行 preparation,Get the parameters required by the child process.
    with concurrent.futures.ProcessPoolExecutor() as executor:
        # 3. 修改 map 函数,Traverse to the child process is needed PRIMES 参数通过 map 传递进去.
        results = executor.map(is_prime, PRIMES)
    for number, prime in zip(PRIMES, results):
        print(f'\n{
      number:_}\t is prime: {
      prime}')


if __name__ == '__main__':
    print('Start the main:')
    main()

运行结果如下图.Note the print statement PRIMES created 和 ONE_CONSTANT createed are only executed once in the main process.
在这里插入图片描述

PS: 《Fluent Python》When talking about concurrent programming,使用了 rabbit hole 一词,查询后发现,在美国英语中,the word is “坑” 的意思.

4.2 使用 queue 的例子

When doing multi-threading and multi-process concurrency,Python The recommended usage on the official website is to give priority to the use of high-level API,That is, the recommended order is :
concurrent.futures > threading, multiprocessing > queue > shared state, synchronization.
in recommended order,The lower ones at the back API 作用是:

  1. queue Queues are used to exchange between different execution units exchange 数据.Python 中有一个单独的 queue 模块,用于线程 thread 之间交换数据;而 multiprocessing.Queue is used to exchange data between processes.
  2. shared state are used for different processes process 之间共享数据,包括 multiprocessing.Value,multiprocessing.Array 和 multiprocessing.Manger.Python The official recommendation is to try to avoid using shared state.
  3. Synchronization 是同步,The main function is to avoid the simultaneous 2 more than one thread or process,write to the same object.Synchronization 对象包括 Lock,RLock,Semaphore,Barrier 等.

下面是使用 queue 的例子.可以看出使用 queue operation is more complicated,Requires manual handling by programmers,Correctly match the calculation results of each subprocess with the input,So the following code in the specially added a variable index,Used to determine the order of the results of the calculation.与之对比,高层的 executor.map will do these steps for us automatically.

"""使用 multiprocessing.Queue Example of exchanging data among multiple processes."""
import multiprocessing
import queue
import time
import typing
from multiprocessing import Process


# 使用 type alias,定义 2 个数据类型 JobQueue 和 ResultQueue.
# When defining data types,必须用 queue.Queue,而不能用 multiprocessing.Queue.
JobQueue = queue.Queue[tuple[int, typing.Any]]
ResultQueue = queue.Queue[tuple[int, typing.Any]]


# When multiple processes are concurrent,Data cannot be exchanged directly between processes,所以要通过 multiprocessing.Queue 或者
# multiprocessing.SimpleQueue 来实现,Instead of directly using a variable to exchange data like ordinary programs.
# Each child process calls the following worker 程序,By putting the calculation results of each subprocess into the output queue ResultQueue
# 中,Has realized the data exchange between different processes.
def worker(jobs: JobQueue, results: ResultQueue) -> None:
    # 3. 在每个子进程中,Bind the output result to the input index.

    index, number = jobs.get()  # 3.1 from the input queue jobs 中,Get the index of the input.
    print(f'in worker:\t index, number={
      index}, {
      number}')
    # Use the following lines of delay code,to make the program appear bug.即输入 input_numbers = [25, 28],
    # The output should have been [25, 28].But the delay code makes the order of the output reversed,变为 [28, 25].
    delay = 29 - number
    for i in range(delay):
        print(f'number= {
      number},\tsleep: {
      i + 1}')
        time.sleep(1)
    # 3.2 put the index of the input index Put it on the first page of Yuanzu 0 位,The output results are placed in the first 1 位,bundle output to the result queue.
    results.put((index, number))


# Designing a Multi-Process Concurrency,make the final output output_numbers,和输入 input_numbers 完全相同.
input_numbers = [25, 28]


def main() -> None:
    # 1. Create input queue and output queue.
    jobs = multiprocessing.Queue()
    results = multiprocessing.Queue()
    # 在这个例子中,也可以使用 Queue 的简化版,即 SimpleQueue,用下面 2 line in place of the above 2 行.
    # jobs = multiprocessing.SimpleQueue()
    # results = multiprocessing.SimpleQueue()
    # 1.1 iterate over each input value.
    for index, number in enumerate(input_numbers):
        print(f'In main:\t index, number = {
      index}, {
      number}')
        # 1.2 put the input on the input queue jobs 中,and add an index to each input value index.
        jobs.put((index, number))

    # 2. Multiple child processes concurrently.
    sub_processes = []
    for _ in range(len(input_numbers)):
        # 2.1 Create a subprocess for each subtask.
        proc = Process(target=worker, args=(jobs, results))
        proc.start()  # 2.2 启动子进程.
        sub_processes.append(proc)  # 2.3 put all child processes in a list,进行记录.
    # 2.4 遍历所有子进程,用 join 等待它们完成.
    for sub_process in sub_processes:
        sub_process.join()

    # 4. Queue the returned results ResultQueue 进行排序,得到最终的输出结果.
    temp_output = []
    while not results.empty():  # 4.1 Constantly fetching values ​​from the result queue,until the result queue is empty.
        index, number = results.get()
        temp_output.append([index, number])  # 4.2 form a temporary result list.
        print(f'In result:\t index, number = {
      index}, {
      number}')

    # 4.3 for a temporary list of results temp_output 进行排序.The sorting method is by the index of the input,From first to last
    # 排序.output[0] is the first of each output tuple 0 位,i.e. the index of the input.
    temp_output.sort(key=lambda output: output[0])
    # 4.4 for a temporary list of results temp_output Do list comprehension,得到最终的输出结果.
    output_numbers = [number for _, number in temp_output]

    print(f'input_numbers:\t{
      input_numbers}')
    print(f'output_numbers:\t{
      output_numbers}')


if __name__ == '__main__':
    main()

The result of the program is as follows.The goal of this simple program is to,Let output and input [25, 28] 一样.
可以看到数字 25 calculation time is longer,So it will be in numbers 28 之后结束运行.即先输出 28,再输出 25.And the queue queue 有先进先出 FIFO 的特点,如果不使用 index,The output result will be wrong,变为 [28, 25].
但是借助 index,Still have the right results.
在这里插入图片描述


FYI: The following is an introduction to queues,Probably not used much,仅供参考.
multiprocessing The multiprocessing module has 3 种队列,分别是 multiprocessing.Queue, multiprocessing.SimpleQueue 和 multiprocessing.JoinableQueue.
以上 3 种都是“先进先出”的 FIFO 队列,and is based on the standard library queue.Queue 创建的.
multiprocessing.SimpleQueue 是 multiprocessing.Queue 的简化版,SimpleQueue 只有 get(), put(), empty() 和 close() 这 4 个方法.Use in simple cases multiprocessing.SimpleQueue 即可.
此外,multiprocessing.Pipe It can also be used to exchange data between multiple processes.

Python 内置的标准库 queue Queues are used for multithreading,包括 4 种队列:queue.Queue,queue.SimpleQueue,queue.LifoQueue 和 queue.PriorityQueue.
其中 queue.Queue,queue.SimpleQueue 是 FIFO 队列,queue.LifoQueue 是“后进先出”的 LIFO 队列.
queue.PriorityQueue a priority queue,优先级高(The smaller the value of the priority, the higher the priority)'s elements are fetched first.Usage and indexing in the example above index 基本相同:index The smaller is taken out first.


5. 协程,coroutines

使用协程时,to be used frequently 4 个概念:

  1. 用 async def The defined function is a coroutine coroutine.
  2. 用 asyncio.run(coro()) 启动的协程 coro 是主协程,也叫做 event loop.Usually in the main program main 中使用 asyncio.run().
  3. Except for the main coroutine event loop 之外,Other coroutines must run inside a coroutine.可以使用 await coro() 运行一个协程,也可以用 asyncio.create_task(coro()) Concurrent with a coroutine.
  4. Event loop and sub-coroutines are in the same thread thread 中运行的.

使用协程时,通常会用到 Python 内置的 asyncio 模块,asyncio 有 2 个主要作用:1. Concurrent multiple coroutines.2. 实现异步编程.

5.1 A template for multi-coroutine concurrency

严格来说,When multiple coroutines are concurrent,是使用 task Objects are concurrent,也就是用 asyncio.create_task 把协程 coroutine 包裹(wrap)为 task.
The template of multi-coroutine concurrency is as follows,具体 4 个步骤:

  1. 用 async def Define child coroutines sub-coroutine.
  2. 把 main as the main coroutine main coroutine(即 event loop),在其中使用 asyncio.create_task 创建并发 task.
  3. Collect the result of the child coroutine.
  4. 用 asyncio.run 运行 main 函数,about to do it event loop 运行.
"""使用 asyncio 的并发、Asynchronous programming template template. """
import asyncio

# 1. 用 async def Define child coroutines sub-coroutine.
async def sub_coro(parameter):
    ...
# 2. 把 main as the main coroutine main coroutine(即 event loop).
async def main(parameters):
    task_list = []
    for parameter in parameters:
        # 2.1 用 asyncio.create_task,创建并发 task,并记录到“Concurrent plan”中,等待执行.
        one_task = asyncio.create_task(sub_coro(parameter))
        # 2.2 You can use a list to collect multiple concurrent sub-coroutines.
        task_list.append(one_task)
    
    # 3. Collect the result of the child coroutine.
    # asyncio.as_completed 的 2 个作用:1. Run the input concurrently awaitables iterable.
    # 2.返回一个 coroutines iterator,Its order is as follows coroutines The completion time of.
    # 而 asyncio.gather,It is to wait for all sub-coroutines to complete before returning the result,其顺序 create_task time sequence is the same.
    for finished_task in asyncio.as_completed(task_list):  # 3.1 开始执行“Concurrent plan”中的 tasks.
        result = await finished_task  # 3.2 对已完成的 task,Collect the results of its operation.
        
if __name__ == '__main__':
    # 4. 用 asyncio.run 运行 main 函数,The main function will be used as event loop,也是 asyncio 模块的常用方法.
    asyncio.run(main(parameters))

PS: 上面的模板中,使用了 “Concurrent plan” 一词.这并不是 Python 的官方说法,But I am in order to facilitate the understanding of the concurrency mechanism of the coroutine,a self-introduced statement.

5.2 用 asyncio 进行异步操作

下面用一个简单的例子展示 asyncio 的异步操作.

"""使用 asyncio Example of doing asynchronous operations."""
import asyncio


async def foo(n=2):
    print(f'Enter foo.')
    await asyncio.sleep(n)  # 3. 释放解释器.Interpreter back event loop,Form an asynchronous operation.
    print(f'Exit foo.')


async def main():
    asyncio.create_task(foo())  # 1. 创建一个 task 对象.
    print('Checkpoint 1.')

    await asyncio.sleep(1)  # 2. 释放解释器,让解释器运行 task 对象,即协程 foo.
    print('Checkpoint 2.')

    await asyncio.sleep(2)
    print('Checkpoint 3.')


if __name__ == '__main__':
    asyncio.run(main())

运行结果如下图.Pay attention to coroutines foo 在开始之后,并没有等待 foo 结束,而是返回了 event loop,执行 event loop 后续的代码.
在这里插入图片描述

5.3 asyncio.sleep() 的作用

An important feature of multi-coroutines,is to have full control over the interpreter,That is, the programmer can assign the interpreter to a coroutine,Specifies the interpreter to execute the coroutine(And multithreading is by OS scheduler 来分配 GIL 给某个线程).
使用 asyncio.sleep() 函数,The allocation process of the interpreter can be clearly seen.asyncio.sleep() 的特点是:

  1. asyncio.sleep() will put the interpreter in event loop 和并发的 tasks 之间进行切换.
    1.1 在 event loop 中使用 asyncio.sleep(),will assign the interpreter to the concurrent tasks 使用.
    1.2 在第一个 task 中使用 asyncio.sleep(),The interpreter is assigned to “Concurrent plan” the second one waiting to run task.以此类推,执行 “Concurrent plan” on all concurrency tasks.
    1.3 “Concurrent plan” 中的所有 tasks After executing it all,使用 asyncio.sleep(),The interpreter will come back event loop.
  2. asyncio.sleep(t) 会挂起 suspend 当前 coroutine and at least keep it t 秒,asyncio.sleep(0) It means to directly suspend the current coroutine,Let go of the interpreter.
  3. asyncio.sleep() 只能用于 coroutine 之内,cannot be used in ordinary functions.

Also need to pay attention and time.sleep() 的区别.虽然 time.sleep() 会释放 GIL,Reallocate between threads GIL,但是 time.sleep() cannot control a thread thread 内部的运行.And concurrent coroutines coroutines is in the same thread,所以 time.sleep() Unable to control the allocation of the interpreter between different coroutines.
对协程 coroutine 来说,time.sleep() will only block the current coroutine suspend 一段时间.

5.4 When multiple coroutines are concurrent,Assignment control over the interpreter

了解了 asyncio.sleep() 的作用,can be used to demonstrate manual control of interpreter allocation.

"""在 asyncio Manually control the allocation of interpreters in."""
import asyncio
import time


async def foo():
    print('Enter foo.')
    await asyncio.sleep(0)  # 5. Let go of the interpreter,开始执行“Concurrent plan”上的所有 tasks.
    print('Exit foo.')  # 7. 执行打印语句,foo 结束.


async def bar(n, name):
    print(f'Enter {
      name}.')  # 6.1 执行 bar_3_seconds 和 bar_1_seconds 的打印.
    await asyncio.sleep(n)  # 6.2 在 bar_1_seconds 中,Will give up the interpreter event loop.
    print(f'Exit {
      name}.')  # 10. 执行 bar_1_seconds 的打印语句.


async def main():
    # 1. Concurrent below 2 个 task,记录到“Concurrent plan”,等待执行.
    asyncio.create_task(bar(3, name='bar_3_seconds'))
    asyncio.create_task(bar(1, name='bar_1_seconds'))
    print('Check point 1: after create_tasks.')  # 2. The first print statement of the output.
    time.sleep(3)  # 3. 因为 time.sleep won't give up the interpreter,So the whole program is blocked at this time 3 s.

    # 4. await Execute like a normal function foo.此时 foo 相当于 event loop 的一部分.
    await foo()
    # 6. 执行“Concurrent plan”上的 2 个并发 tasks.
    print('Check point 2: after foo.')  # 8. 执行打印语句.

    # 9. 下面的 asyncio.sleep Let go of the interpreter,执行“Concurrent plan”上的 1 个并发 task,即执行
    # task_1_seconds.而因为 bar_3_seconds 还没有 sleep 结束,所以不在 “Concurrent plan”上,
    # bar_3_seconds The last print statement in will not be executed.
    await asyncio.sleep(2)

if __name__ == '__main__':
    asyncio.run(main())
    print('Check point 3: after asyncio.run.')  # 11. 执行打印语句.

运行结果如下图,of the interpreter allocation process 3 The main point is:

  1. create_task() will be concurrent task 放到“Concurrent plan”上,等待执行.
  2. asyncio.sleep() will release the interpreter.
  3. 在 event loop 中以 await foo() way to execute the coroutine foo,从效果上来说,会使得 foo 成为 event loop 的一部分.
    在这里插入图片描述

5.5 一个简单的异步、Concurrent program flow chart

用 asyncio Do simple async、并发编程,其流程图如下,Can help understand the operation process.图中有 7 个要点:

  1. 只有 1 个解释器,The purple part in the picture.
  2. Asynchronous programming usually has 3 部分参与,主协程 event loop,子协程 sub-coroutine 和 I/O 设备.
  3. 红色箭头,Reflects the flow to the distribution of the interpreter,That is, the actual operation process of the program.
  4. 解释器在 event loop 和 sub-coroutine 之间切换,and is used by programmers await 和 as_completed etc. functions control the allocation of the interpreter.
  5. create_task() For creating multiple concurrent tasks tasks,But the concurrent task has not been executed at this time,So connect the sub-coroutines with dotted lines.
  6. asyncio.as_completed() 让 “Concurrent plan” 上的 tasks 开始执行.
  7. I/O Parts are white arrows and white boxes,因为可以由 DMAC(Direct Memory Access Controller) 控制 I/O 操作,不需要 CPU/Interpreter participation,所以用虚线表示.
    在这里插入图片描述

5.6 example in network programming

Asynchronous programming and concurrent programming abound in network programming,Preferably the client client 和服务器端 server Both support asynchronous operations.此时,Clients can use third-party httpx 库,The server side can use third-party FastAPI.
Below is the application 5.1 节的 asyncio 并发、Asynchronous programming template,使用 httpx Example of downloading.

"""套用 asyncio 的并发、Asynchronous programming template template,用 httpx 进行下载."""
import asyncio
import httpx
from pathlib import Path

# 1. Additional receiving in the sub-coroutine httpx.AsyncClient 和 url 等参数.
async def sub_coro(client: httpx.AsyncClient, url: str, 
                   downloaded: Path, parameter) -> bytes:
    # 1.1 使用 await client.get,Download asynchronously.
    response = await client.get(url, follow_redirects=True) 
    ...  # You can also perform operations such as saving.
    return response.content  # 1.2 返回 bytes or other format data.

# 2. 主协程 main coroutine Need to set up asynchronous httpx.AsyncClient() 等操作.
async def main():    
    # 2.1 Set the download address url and save folder downloaded 等.
    downloaded = Path(r'D:\downloaded')
    url = r'http://localhost:8000/flags'  # Download address to this machine localhost 为例. 
    # 2.2 使用 with httpx.AsyncClient() 作为 context manager,to support asynchronous client.
    # 共用 client The benefit is increased efficiency,That is, you only need to build the underlying TCP 连接,每个并发的 task Share this connection.
    with httpx.AsyncClient() as client:
        task_list = []
        parameters = ...
        for parameter in parameters:
            # 2.3 使用 asyncio.create_task 时,Async must be made client Input to child coroutines.
            one_task = asyncio.create_task(sub_coro(client, url, downloaded, parameter))
            task_list.append(one_task)
        # 2.4 注意 asyncio.as_completed 部分,也应该在 httpx.AsyncClient() 的范围之内.
        for finished_task in asyncio.as_completed(task_list):
            result = await finished_task
        
if __name__ == '__main__':    
    asyncio.run(main())    

6. 主要参考资料

concurrent.futures: https://docs.python.org/3/library/concurrent.futures.html
asyncio: https://docs.python.org/3/library/asyncio.html
threading: https://docs.python.org/3/library/threading.html
multiprocessing: https://docs.python.org/3/library/multiprocessing.html
《Fluent Python》: https://www.oreilly.com/library/view/fluent-python-2nd/9781492056348/
HTTPX: https://www.python-httpx.org/async/


—————————— 补充一点内容 ——————————

PS: Just For Fun

在上面 3.4 In some examples of the tortoise and the hare race,The pictures of the tortoise and the hare are shown separately.If you want to have more fun,You can change the code,At the same time, according to their race.
具体的做法,是在 2 concurrent child threads thread 中,Do not print,but to extract their picture frames;然后在主线程中,将 2 The picture frames of the sub-threads are merged into one picture frame;Finally, the merged picture frame can be printed in the main thread..效果图如下.
在这里插入图片描述
The code that also shows the tortoise and the hare is as follows:

"""An example of the tortoise and the hare,to demonstrate multithreading thread Concurrent usage. To see the progress of the tortoise and the hare at the same time,则在 2 concurrent child threads thread 中,Do not print,but to take them Frames are extracted;然后在主线程中,将 2 The picture frames of the sub-threads are merged into one picture frame;Finally print on the main thread The merged frame can be. 并发 2 个线程,a thread running turtle,Another thread running bunny. image of tortoise and rabbit Unicode 实现.完整的 Unicode The content can be found in the official documentation: https://www.unicode.org/Public/UCD/latest/charts/CodeCharts.pdf """
import time
import unicodedata
from concurrent import futures


def worker(unicode_name, frame_duration):
    """single concurrent worker,Can keep objects moving.Note that this function returns a generator. arguments: unicode_name:一个字符串,是 Unicode The official name of the character,不区分大小写. frame_duration:一个浮点数,表示时间.每经过 frame_duration 秒,The object moves forward 一个位置. """

    duration_start = time.time()
    object_position = -1  # 从最右边开始,即 -1 位置.

    while True:
        tiles = ['_'] * 20  # Use the underline simulation of the road 20 块地砖 tiles
        # 用 unicodedata Draw objects,and put the object in the correct position.
        tiles[object_position] = unicodedata.lookup(f'{
      unicode_name}')
        frame = f'{
      unicode_name}:\t'
        frame += ''.join(tiles)  # 形成一帧 frame.
        yield frame

        duration_stop = time.time()
        duration = duration_stop - duration_start
        if duration > frame_duration:
            object_position -= 1  # Object moves forward one block.
            object_position = max(object_position, -len(tiles))
            duration_start = time.time()  # 并且重新开始计时.


def draw_objects(gen_1, gen_2, timer=10):
    """把 2 merge frames,Then print the merged frame. arguments: gen_1:一个生成器,will continuously output a picture frame. gen_2:一个生成器,will continuously output a picture frame. timer:一个浮点数,表示时间,单位是 s.Controls the total display timer 秒的画面. """

    tic = time.time()
    print('Start racing!')
    for turtle, rabbit in zip(gen_1, gen_2):
        drawing = f'\r{
      turtle} \t{
      rabbit}'
        print(drawing, end='')

        toc = time.time()
        time.sleep(0.05)
        duration = toc - tic
        if duration > timer:
            break


def main():
    unicode_names = ['turtle', 'rabbit']  
    frame_durations = [0.5, 0.7]  
    with futures.ThreadPoolExecutor() as executor:
        result_gens = executor.map(worker, unicode_names, frame_durations)

    # 因为 result_gens itself a generator,after traversing it,会得到 2 generator of concurrent programs,
    # 最终需要把 2 A generator of concurrent programs inputs to draw_objects.
    gens = []
    for one_gen in result_gens:
        gens.append(one_gen)
    draw_objects(*gens)


if __name__ == '__main__':
    main()


——————————本文结束——————————

copyright notice
author[Pinghu Autumn Moon in Hangzhou],Please bring the original link to reprint, thank you.
https://en.pythonmana.com/2022/252/202209090452273596.html

Random recommended