自然语言处理 |使用 execnet 进行并行列表处理
本文介绍了一种使用 execnet 并行处理列表的模式。这是一种函数模式,用于将列表中的每个元素映射到一个新值,使用 execnet 并行进行映射。
在下面给出的代码中,整数被简单地加倍,可以执行任何纯计算。给出的是模块,它将由 execnet 执行。它接收 (i, arg) 的 2 元组,假设 arg 是一个数字并返回 (i, arg*2)。
代码 :
if __name__ == '__channelexec__':
for (i, arg) in channel:
channel.send((i, arg * 2))
要使用此模块将列表中的每个元素加倍,请导入plists模块并使用 remote_double 模块调用plists.map()以及要加倍的整数列表。
代码:使用 plist
import plists, remote_double
plists.map(remote_double, range(10))
输出 :
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
map()函数在 plists.py 中定义。它需要一个纯模块、一个参数列表和一个可选的由 (spec, count) 组成的 2 元组列表。默认规格为 [('popen', 2)],表示用户将打开两个本地网关和通道。一旦这些通道被打开,用户可以将它们放入一个 itertools 循环中,该循环创建一个无限迭代器,一旦它到达终点就会循环回到起点。
现在,每个参数都可以在 args 中发送到通道进行处理,并且由于通道是循环的,因此每个通道都可以得到几乎均匀分布的参数。这就是i进来的地方——结果返回的顺序是未知的,所以i作为列表中每个 arg 的索引,被传递到通道并返回,以便用户可以按原始顺序组合结果。然后使用 MultiChannel 接收队列等待结果,并将它们插入到与原始 args 长度相同的预填充列表中。获得所有预期结果后,退出网关并返回结果,如下面的代码所示 -
代码 :
import itertools, execnet
def map(mod, args, specs =[('popen', 2)]):
gateways = []
channels = []
for spec, count in specs:
for i in range(count):
gw = execnet.makegateway(spec)
gateways.append(gw)
channels.append(gw.remote_exec(mod))
cyc = itertools.cycle(channels)
for i, arg in enumerate(args):
channel = next(cyc)
channel.send((i, arg))
mch = execnet.MultiChannel(channels)
queue = mch.make_receive_queue()
l = len(args)
# creates a list of length l,
# where every element is None
results = [None] * l
for i in range(l):
channel, (i, result) = queue.get()
results[i] = result
for gw in gateways:
gw.exit()
return results
代码:通过修改规范增加并行化
plists.map(remote_double, range(10), [('popen', 4)])
输出 :
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
然而,更多的并行化并不一定意味着更快的处理。这取决于可用的资源,打开的网关和通道越多,需要的开销就越多。理想情况下,每个 CPU 内核应该有一个网关和通道,以获得最大的资源利用率。将plists.map()与任何纯模块一起使用,只要它接收并发送回 i 是第一个元素的 2 元组。当存在要尽快处理的一堆数字时,此模式最有用。