Python的ProcessPoolExecutor 类
先决条件 -多处理
它允许代码并行, Python语言有两种方法来实现它的第一种是通过多处理模块,第二种是通过多线程模块。从Python 3.2 开始,在Python并发中引入了一个名为ProcessPoolExecutor的新类。有效管理和创建流程的期货模块。但是等等,如果Python已经内置了一个多处理模块,那么为什么要引入一个新模块。让我先回答这个问题。
- 当进程数量较少时,动态生成一个新进程不是问题,但是如果我们处理许多进程,管理进程就会变得非常麻烦。除此之外,创建如此多的进程在计算上效率低下,这将导致吞吐量下降。保持吞吐量的一种方法是预先创建并实例化一个空闲进程池,然后重用该池中的进程,直到所有进程都用完为止。这样可以减少创建新进程的开销。
- 此外,该池会跟踪和管理 Process 生命周期并代表程序员对其进行调度,从而使代码更简单,错误更少。
句法:
concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=”, initializer=None, initargs=())
Parameters:
- max_workers: It is number of Process aka size of pool. If the value is None, then on Windows by default 61 process are created even if number of cores available is more than that.
- mp_context: It is the multiprocessing context, If None or empty then the default multiprocessing context is used. It allows user to control starting method.
- initializer: initializer takes a callable which is invoked on start of each worker Process.
- initargs: It’s a tuple of arguments passed to intializer.
ProcessPoolExecutor 方法: ProcessPoolExecutor 类公开了以下方法来异步执行 Process。下面给出详细的解释。
- submit(fn, *args, **kwargs):它运行一个可调用或一个方法,并返回一个表示方法执行状态的 Future 对象。
- map(fn, *iterables, timeout=None, chunksize=1):它立即将方法和可迭代对象映射到一起,并发引发异常。如果在超时限制内未能这样做,则 futures.TimeoutError 。
- 如果可迭代对象非常大,那么在使用 ProcessPoolExecutor 时,块大小大于 1 可以提高性能。
- 关机(等待=真,*,cancel_futures=假):
- 它向执行者发出信号,在期货执行完毕后释放所有资源。
- 它必须在 exectuor.submit() 和 executor.map() 方法之前调用,否则会抛出 RuntimeError。
- wait=True 使该方法在所有线程执行完毕并释放资源之前不返回。
- cancel_futures=True 那么执行程序将取消所有尚未启动的未来线程。
- Cancel():尝试取消调用,如果调用无法取消则返回False,否则返回True。
- cancelled():如果调用被取消,则返回 True。
- running():如果进程正在运行且无法取消,则返回 True。
- done():如果进程已完成执行,则返回 True。
- result(timeout=None):返回进程返回的值,如果进程仍在执行,则等待指定的超时时间,否则引发 TimeoutError,如果指定 None ,它将永远等待进程完成。
- add_done_callback(fn):附加一个回调函数,该函数在进程执行完成时被调用。
示例 1
下面的代码演示了 ProcessPoolExecutor 的使用,注意与 multiprocessing 模块不同,我们不必使用循环显式调用,使用列表跟踪进程或使用 join 等待进程进行同步,或在之后释放资源过程完成后,构造函数本身将所有内容都隐藏起来,从而使代码紧凑且无错误。
Python3
from concurrent.futures import ProcessPoolExecutor
from time import sleep
values = [3,4,5,6]
def cube(x):
print(f'Cube of {x}:{x*x*x}')
if __name__ == '__main__':
result =[]
with ProcessPoolExecutor(max_workers=5) as exe:
exe.submit(cube,2)
# Maps the method 'cube' with a iterable
result = exe.map(cube,values)
for r in result:
print(r)
Python3
import requests
import time
import os
import concurrent.futures
img_urls = [
'https://media.geeksforgeeks.org/wp-content/uploads/20190623210949/download21.jpg',
'https://media.geeksforgeeks.org/wp-content/uploads/20190623211125/d11.jpg',
'https://media.geeksforgeeks.org/wp-content/uploads/20190623211655/d31.jpg',
'https://media.geeksforgeeks.org/wp-content/uploads/20190623212213/d4.jpg',
'https://media.geeksforgeeks.org/wp-content/uploads/20190623212607/d5.jpg',
'https://media.geeksforgeeks.org/wp-content/uploads/20190623235904/d6.jpg',
]
t1 = time.time()
print("Downloading images with single process")
def download_image(img_url):
img_bytes = requests.get(img_url).content
print("Downloading..")
for img in img_urls:
download_image(img)
t2 = time.time()
print(f'Single Process Code Took :{t2-t1} seconds')
print('*'*50)
t1 = time.time()
print("Downloading images with Multiprocess")
def download_image(img_url):
img_bytes = requests.get(img_url).content
print(f"[Process ID]:{os.getpid()} Downloading..")
with concurrent.futures.ProcessPoolExecutor(3) as exe:
exe.map(download_image, img_urls)
t2 = time.time()
print(f'Multiprocess Code Took:{t2-t1} seconds')
输出:
Cube of 2:8
Cube of 3:27
Cube of 6:216
Cube of 4:64
Cube of 5:125
示例 2
下面的代码是通过发出 HTTP 请求在 Internet 上获取图像,我正在使用相同的请求库。代码的第一部分对 API 进行一对一调用,即下载速度很慢,而代码的第二部分使用多个进程进行并行请求以获取 API。
您可以尝试上面讨论的所有各种参数,以查看它如何调整加速,例如,如果我将进程池设为 6 而不是 3,则加速更显着。
蟒蛇3
import requests
import time
import os
import concurrent.futures
img_urls = [
'https://media.geeksforgeeks.org/wp-content/uploads/20190623210949/download21.jpg',
'https://media.geeksforgeeks.org/wp-content/uploads/20190623211125/d11.jpg',
'https://media.geeksforgeeks.org/wp-content/uploads/20190623211655/d31.jpg',
'https://media.geeksforgeeks.org/wp-content/uploads/20190623212213/d4.jpg',
'https://media.geeksforgeeks.org/wp-content/uploads/20190623212607/d5.jpg',
'https://media.geeksforgeeks.org/wp-content/uploads/20190623235904/d6.jpg',
]
t1 = time.time()
print("Downloading images with single process")
def download_image(img_url):
img_bytes = requests.get(img_url).content
print("Downloading..")
for img in img_urls:
download_image(img)
t2 = time.time()
print(f'Single Process Code Took :{t2-t1} seconds')
print('*'*50)
t1 = time.time()
print("Downloading images with Multiprocess")
def download_image(img_url):
img_bytes = requests.get(img_url).content
print(f"[Process ID]:{os.getpid()} Downloading..")
with concurrent.futures.ProcessPoolExecutor(3) as exe:
exe.map(download_image, img_urls)
t2 = time.time()
print(f'Multiprocess Code Took:{t2-t1} seconds')
输出:
Downloading images with single process
Downloading..
Downloading..
Downloading..
Downloading..
Downloading..
Downloading..
Single Process Code Took :1.2382981777191162 seconds
**************************************************
Downloading images with Multiprocess
[Process ID]:118741 Downloading..
[Process ID]:118742 Downloading..
[Process ID]:118740 Downloading..
[Process ID]:118741 Downloading..
[Process ID]:118742 Downloading..
[Process ID]:118740 Downloading..
Multiprocess Code Took:0.8398590087890625 seconds