📜  Python的ProcessPoolExecutor 类

📅  最后修改于: 2022-05-13 01:55:48.610000             🧑  作者: Mango

Python的ProcessPoolExecutor 类

先决条件 -多处理

它允许代码并行, Python语言有两种方法来实现它的第一种是通过多处理模块,第二种是通过多线程模块。从Python 3.2 开始,在Python并发中引入了一个名为ProcessPoolExecutor的新类。有效管理和创建流程的期货模块。但是等等,如果Python已经内置了一个多处理模块,那么为什么要引入一个新模块。让我先回答这个问题。

  • 当进程数量较少时,动态生成一个新进程不是问题,但是如果我们处理许多进程,管理进程就会变得非常麻烦。除此之外,创建如此多的进程在计算上效率低下,这将导致吞吐量下降。保持吞吐量的一种方法是预先创建并实例化一个空闲进程池,然后重用该池中的进程,直到所有进程都用完为止。这样可以减少创建新进程的开销。
  • 此外,该池会跟踪和管理 Process 生命周期并代表程序员对其进行调度,从而使代码更简单,错误更少。

句法:

ProcessPoolExecutor 方法: ProcessPoolExecutor 类公开了以下方法来异步执行 Process。下面给出详细的解释。

  • submit(fn, *args, **kwargs):它运行一个可调用或一个方法,并返回一个表示方法执行状态的 Future 对象。
  • map(fn, *iterables, timeout=None, chunksize=1):它立即将方法和可迭代对象映射到一起,并发引发异常。如果在超时限制内未能这样做,则 futures.TimeoutError 。
    • 如果可迭代对象非常大,那么在使用 ProcessPoolExecutor 时,块大小大于 1 可以提高性能。
  • 关机(等待=真,*,cancel_futures=假):
    • 它向执行者发出信号,在期货执行完毕后释放所有资源。
    • 它必须在 exectuor.submit() 和 executor.map() 方法之前调用,否则会抛出 RuntimeError。
    • wait=True 使该方法在所有线程执行完毕并释放资源之前不返回。
    • cancel_futures=True 那么执行程序将取消所有尚未启动的未来线程。
  • Cancel():尝试取消调用,如果调用无法取消则返回False,否则返回True。
  • cancelled():如果调用被取消,则返回 True。
  • running():如果进程正在运行且无法取消,则返回 True。
  • done():如果进程已完成执行,则返回 True。
  • result(timeout=None):返回进程返回的值,如果进程仍在执行,则等待指定的超时时间,否则引发 TimeoutError,如果指定 None ,它将永远等待进程完成。
  • add_done_callback(fn):附加一个回调函数,该函数在进程执行完成时被调用。

示例 1

下面的代码演示了 ProcessPoolExecutor 的使用,注意与 multiprocessing 模块不同,我们不必使用循环显式调用,使用列表跟踪进程或使用 join 等待进程进行同步,或在之后释放资源过程完成后,构造函数本身将所有内容都隐藏起来,从而使代码紧凑且无错误。

Python3
from concurrent.futures import ProcessPoolExecutor
from time import sleep
  
values = [3,4,5,6]
def cube(x):
    print(f'Cube of {x}:{x*x*x}')
  
  
if __name__ == '__main__':
    result =[]
    with ProcessPoolExecutor(max_workers=5) as exe:
        exe.submit(cube,2)
          
        # Maps the method 'cube' with a iterable
        result = exe.map(cube,values)
      
    for r in result:
      print(r)


Python3
import requests
import time
import os
import concurrent.futures
   
img_urls = [
    'https://media.geeksforgeeks.org/wp-content/uploads/20190623210949/download21.jpg',
    'https://media.geeksforgeeks.org/wp-content/uploads/20190623211125/d11.jpg',
    'https://media.geeksforgeeks.org/wp-content/uploads/20190623211655/d31.jpg',
    'https://media.geeksforgeeks.org/wp-content/uploads/20190623212213/d4.jpg',
    'https://media.geeksforgeeks.org/wp-content/uploads/20190623212607/d5.jpg',
    'https://media.geeksforgeeks.org/wp-content/uploads/20190623235904/d6.jpg',
]
   
t1 = time.time()
print("Downloading images with single process")
def download_image(img_url):
    img_bytes = requests.get(img_url).content
    print("Downloading..")
       
for img in img_urls:
    download_image(img)
   
t2 = time.time()
print(f'Single Process Code Took :{t2-t1} seconds')
print('*'*50)
   
t1 = time.time()
print("Downloading images with Multiprocess")
   
def download_image(img_url):
    img_bytes = requests.get(img_url).content
    print(f"[Process ID]:{os.getpid()} Downloading..")
   
with concurrent.futures.ProcessPoolExecutor(3) as exe:
    exe.map(download_image, img_urls)
   
t2 = time.time()
print(f'Multiprocess Code Took:{t2-t1} seconds')


输出:

Cube of 2:8
Cube of 3:27
Cube of 6:216
Cube of 4:64
Cube of 5:125

示例 2

下面的代码是通过发出 HTTP 请求在 Internet 上获取图像,我正在使用相同的请求库。代码的第一部分对 API 进行一对一调用,即下载速度很慢,而代码的第二部分使用多个进程进行并行请求以获取 API。

您可以尝试上面讨论的所有各种参数,以查看它如何调整加速,例如,如果我将进程池设为 6 而不是 3,则加速更显着。

蟒蛇3

import requests
import time
import os
import concurrent.futures
   
img_urls = [
    'https://media.geeksforgeeks.org/wp-content/uploads/20190623210949/download21.jpg',
    'https://media.geeksforgeeks.org/wp-content/uploads/20190623211125/d11.jpg',
    'https://media.geeksforgeeks.org/wp-content/uploads/20190623211655/d31.jpg',
    'https://media.geeksforgeeks.org/wp-content/uploads/20190623212213/d4.jpg',
    'https://media.geeksforgeeks.org/wp-content/uploads/20190623212607/d5.jpg',
    'https://media.geeksforgeeks.org/wp-content/uploads/20190623235904/d6.jpg',
]
   
t1 = time.time()
print("Downloading images with single process")
def download_image(img_url):
    img_bytes = requests.get(img_url).content
    print("Downloading..")
       
for img in img_urls:
    download_image(img)
   
t2 = time.time()
print(f'Single Process Code Took :{t2-t1} seconds')
print('*'*50)
   
t1 = time.time()
print("Downloading images with Multiprocess")
   
def download_image(img_url):
    img_bytes = requests.get(img_url).content
    print(f"[Process ID]:{os.getpid()} Downloading..")
   
with concurrent.futures.ProcessPoolExecutor(3) as exe:
    exe.map(download_image, img_urls)
   
t2 = time.time()
print(f'Multiprocess Code Took:{t2-t1} seconds')

输出:

Downloading images with single process
Downloading..
Downloading..
Downloading..
Downloading..
Downloading..
Downloading..
Single Process Code Took :1.2382981777191162 seconds
**************************************************
Downloading images with Multiprocess
[Process ID]:118741 Downloading..
[Process ID]:118742 Downloading..
[Process ID]:118740 Downloading..
[Process ID]:118741 Downloading..
[Process ID]:118742 Downloading..
[Process ID]:118740 Downloading..
Multiprocess Code Took:0.8398590087890625 seconds