📜  池映射迭代器 (1)

📅  最后修改于: 2023-12-03 15:11:03.538000             🧑  作者: Mango

池映射迭代器

池映射迭代器是一个实现了线程池和迭代器两种功能的工具类,可以在多线程环境下对数据集合进行并行处理,提高数据处理效率。

如何使用

首先需要导入from multiprocessing import Poolitertools两个库,然后创建一个池映射迭代器对象。

from multiprocessing import Pool
import itertools

def func(x):
    return x*x

if __name__ == '__main__':
    with Pool(5) as p:
        data = [1, 2, 3, 4, 5]
        it = p.imap(func, data)
        
        for result in itertools.chain.from_iterable(iter(it)):
            print(result)
API
Pool(n)

初始化一个池映射迭代器对象,n为线程池的大小。

imap(func, iterable, chunksize=1)

对一个可迭代对象进行并行处理,返回一个迭代器对象。

  • func: 处理每个元素的函数。
  • iterable: 可迭代对象。
  • chunksize: 每个线程处理的元素个数,默认为1。
imap_unordered(func, iterable, chunksize=1)

对一个可迭代对象进行乱序并行处理,返回一个迭代器对象。

  • func: 处理每个元素的函数。
  • iterable: 可迭代对象。
  • chunksize: 每个线程处理的元素个数,默认为1。
starmap(func, iterable, chunksize=1)

对一个可迭代对象进行并行处理,每个元素作为*args传入处理函数中,返回一个迭代器对象。

  • func: 处理每个元素的函数,接受*args作为参数。
  • iterable: 可迭代对象,每个元素都是一个tuple。
  • chunksize: 每个线程处理的元素个数,默认为1。
starmap_async(func, iterable, chunksize=1, callback=None)

对一个可迭代对象进行异步并行处理,每个元素作为*args传入处理函数中,返回一个AsyncResult对象。

  • func: 处理每个元素的函数,接受*args作为参数。
  • iterable: 可迭代对象,每个元素都是一个tuple。
  • chunksize: 每个线程处理的元素个数,默认为1。
  • callback: 结果返回时调用的回调函数。
map(func, iterable)

对一个可迭代对象进行线性处理。

  • func: 处理每个元素的函数。
  • iterable: 可迭代对象。
注意事项
  • 池映射迭代器不能处理有状态的函数,否则可能出现不确定的结果。
  • 池映射迭代器需要在if __name__ == '__main__':内部调用。
  • 当线程池中的线程数等于CPU的数量时,处理效率最高。