如何在 Python3 中使用 ThreadPoolExecutor ?
先决条件:多线程
线程允许代码并行, Python语言有两种方法来实现它的第一个是通过多处理模块,第二个是通过多线程模块。多线程非常适合加速 I/O 绑定任务,例如发出 Web 请求、数据库操作或读取/写入文件。与这种 CPU 密集型任务(如数学计算任务)相比,使用多处理受益最多。这是由于 GIL(全局解释器锁定)造成的。
从Python 3.2 开始,在Python中的concurrent.futures模块中引入了一个名为ThreadPoolExecutor的新类,以有效地管理和创建线程。但是等等,如果Python已经内置了一个线程模块,那么为什么要引入一个新模块。让我先回答这个问题。
- 当线程数较少时,动态生成新线程不是问题,但如果我们处理许多线程,管理线程就变得非常麻烦。除此之外,创建如此多的线程在计算上效率低下,这将导致吞吐量下降。保持吞吐量的一种方法是预先创建并实例化一个空闲线程池,然后重用该池中的线程,直到所有线程都用完为止。这样可以减少创建新线程的开销。
- 此外,池会跟踪和管理线程生命周期并代表程序员对其进行调度,从而使代码更简单,错误更少。
Syntax: concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix=”, initializer=None, initargs=())
Parameters:
- max_workers: It is a number of Threads aka size of pool. From 3.8 onwards default value is min(32, os.cpu_count() + 4). Out of these 5 threads are preserved for I/O bound task.
- thread_name_prefix : thread_name_prefix was added from python 3.6 onwards to give names to thread for easier debugging purpose.
- initializer: initializer takes a callable which is invoked on start of each worker thread.
- initargs: It’s a tuple of arguments passed to intializer.
线程池执行器方法:
ThreadPoolExecutor 类公开了三个异步执行线程的方法。下面给出详细的解释。
- submit(fn, *args, **kwargs):它运行一个可调用或一个方法,并返回一个表示方法执行状态的 Future 对象。
- map(fn, *iterables, timeout = None, chunksize = 1) :
- 它立即将方法和可迭代对象映射到一起,并将引发并发异常。如果在超时限制内未能这样做,则 futures.TimeoutError 。
- 如果可迭代对象非常大,那么在使用 ProcessPoolExecutor 时,块大小大于 1 可以提高性能,但使用 ThreadPoolExecutor 则没有这样的优势,即可以保留其默认值。
- 关闭(等待 = True,*,cancel_futures = False):
- 它向执行者发出信号,在期货执行完毕后释放所有资源。
- 它必须在 exectuor.submit() 和 executor.map() 方法之前调用,否则会抛出 RuntimeError。
- wait=True 使该方法在所有线程执行完毕并释放资源之前不返回。
- cancel_futures=True 那么执行程序将取消所有尚未启动的未来线程。
示例 1:
下面的代码演示了 ThreadPoolExecutor 的使用,注意与线程模块不同,我们不必使用循环显式调用,使用列表跟踪线程或使用 join 等待线程进行同步,或在线程之后释放资源完成后,构造函数本身将所有内容都隐藏起来,从而使代码紧凑且无错误。
Python3
from concurrent.futures import ThreadPoolExecutor
from time import sleep
values = [3,4,5,6]
def cube(x):
print(f'Cube of {x}:{x*x*x}')
if __name__ == '__main__':
result =[]
with ThreadPoolExecutor(max_workers=5) as exe:
exe.submit(cube,2)
# Maps the method 'cube' with a list of values.
result = exe.map(cube,values)
for r in result:
print(r)
Python3
import requests
import time
import concurrent.futures
img_urls = [
'https://media.geeksforgeeks.org/wp-content/uploads/20190623210949/download21.jpg',
'https://media.geeksforgeeks.org/wp-content/uploads/20190623211125/d11.jpg',
'https://media.geeksforgeeks.org/wp-content/uploads/20190623211655/d31.jpg',
'https://media.geeksforgeeks.org/wp-content/uploads/20190623212213/d4.jpg',
'https://media.geeksforgeeks.org/wp-content/uploads/20190623212607/d5.jpg',
'https://media.geeksforgeeks.org/wp-content/uploads/20190623235904/d6.jpg',
]
t1 = time.perf_counter()
def download_image(img_url):
img_bytes = requests.get(img_url).content
print("Downloading..")
# Download images 1 by 1 => slow
for img in img_urls:
download_image(img)
t2 = time.perf_counter()
print(f'Single Threaded Code Took :{t2 - t1} seconds')
print('*'*50)
t1 = time.perf_counter()
def download_image(img_url):
img_bytes = requests.get(img_url).content
print("Downloading..")
# Fetching images concurrently thus speeds up the download.
with concurrent.futures.ThreadPoolExecutor(3) as executor:
executor.map(download_image, img_urls)
t2 = time.perf_counter()
print(f'MultiThreaded Code Took:{t2 - t1} seconds')
输出:
Output:
Cube of 2:8
Cube of 3:27
Cube of 4:64
Cube of 5:125
Cube of 6:216
示例 2:
下面的代码是通过发出 HTTP 请求在 Internet 上获取图像,我正在使用相同的请求库。代码的第一部分对 API 进行一对一调用,即下载很慢,而代码的第二部分使用线程来获取 API 进行并行请求。
您可以尝试上面讨论的所有各种参数,以查看它如何调整加速,例如,如果我将线程池设为 6 而不是 3,则加速更显着。
蟒蛇3
import requests
import time
import concurrent.futures
img_urls = [
'https://media.geeksforgeeks.org/wp-content/uploads/20190623210949/download21.jpg',
'https://media.geeksforgeeks.org/wp-content/uploads/20190623211125/d11.jpg',
'https://media.geeksforgeeks.org/wp-content/uploads/20190623211655/d31.jpg',
'https://media.geeksforgeeks.org/wp-content/uploads/20190623212213/d4.jpg',
'https://media.geeksforgeeks.org/wp-content/uploads/20190623212607/d5.jpg',
'https://media.geeksforgeeks.org/wp-content/uploads/20190623235904/d6.jpg',
]
t1 = time.perf_counter()
def download_image(img_url):
img_bytes = requests.get(img_url).content
print("Downloading..")
# Download images 1 by 1 => slow
for img in img_urls:
download_image(img)
t2 = time.perf_counter()
print(f'Single Threaded Code Took :{t2 - t1} seconds')
print('*'*50)
t1 = time.perf_counter()
def download_image(img_url):
img_bytes = requests.get(img_url).content
print("Downloading..")
# Fetching images concurrently thus speeds up the download.
with concurrent.futures.ThreadPoolExecutor(3) as executor:
executor.map(download_image, img_urls)
t2 = time.perf_counter()
print(f'MultiThreaded Code Took:{t2 - t1} seconds')
输出:
Downloading..
Downloading..
Downloading..
Downloading..
Downloading..
Downloading..
Single Threaded Code Took :2.5529379630024778 seconds
**************************************************
Downloading..
Downloading..
Downloading..
Downloading..
Downloading..
Downloading..
MultiThreaded Code Took:0.5221083430078579 seconds