📜  RxPY-使用调度程序的并发性(1)

📅  最后修改于: 2023-12-03 15:19:53.768000             🧑  作者: Mango

RxPY-使用调度程序的并发性

RxPY是Python中响应式编程的最佳选择,同时它也是Rx.NET的Python版本。它提供了一种非常强大的编程模型,可以对异步事件进行库级别的处理。通过使用RxPY,我们可以简单快捷地实现从底层事件中构建数据流管道,并且在RxPY的处理下,可以支持多并发执行。

响应式编程

响应式编程的一个重要原则就是通过利用异步序列来进行计算并且尽可能支持并发。RxPY就是一个典型的响应式编程库。通过使用RxPY,我们可以轻松构建起像linspace和rand之类的底层API产生的事件和数据流,并使用一系列支持响应式编程的函数对其进行处理。

让我们先来看一个简单的例子:

from rx import Observable

xs = Observable.from_iterable(range(10))
ys = xs.map(lambda x : x * 2)
zs = ys.filter(lambda x : x % 3 == 0)

def on_next(i):
    print(i)

zs.subscribe(on_next)

这个例子演示了一个从0到9的数字产生的事件流。我们使用mapping操作将每个数乘以2,并使用filter操作选择能被3整除的数字。最后这个事件流通过subscribe订阅器的形式输出。

使用Scheduler应对并发

产生了这么多事件和数据流,如何平衡好它们之间的并发呢?RxPY为我们提供了Scheduler的概念,它能够帮助我们控制事件流和订阅器的执行方式。

默认情况下,RxPY执行的线程和时间是来源于我们当前所在的线程和时间,而Scheduler则提供了一个外部线程和时间的执行环境,这样我们就能够方便地控制事件流和订阅器执行的环境。RxPY提供了一个默认的schedule器,在后台使用线程池的方式执行订阅器以实现非阻塞并发。

现在我们来看一个使用Scheduler的例子:

from rx.scheduler import ThreadPoolScheduler
import multiprocessing
import time
from rx import Observable

def intense_calculation(value):
    # 做一些耗时计算
    time.sleep(1)
    return value

# CPU计算数量
processes = multiprocessing.cpu_count()
print("执行任务需要的CPU数量为:{0}".format(processes))
# 线程池
pool_scheduler = ThreadPoolScheduler(processes)
print(type(pool_scheduler))

# 发射30个事件,每个事件同步计算值,然后异步输出
Observable.range(1, 30) \
    .map(lambda s: intense_calculation(s)) \
    .subscribe_on(pool_scheduler) \
    .subscribe(lambda i: print("调度输出:{}".format(i)))

该程序使用10个线程来处理计算密集型的任务,我们可以根据我们的需要,进行调度器的设置,以满足程序的需要。在上述程序里面,每个事件都包含一个同步值的计算,然后我们使用了subscribe-on方法,来指定所有的输出都使用异步的方式进行。

理解调度器

完全理解RxPY的调度器并不是一件容易的事情,这里我们简单介绍几个重要的调度器:ThreadPoolScheduler, ImmediateScheduler, CurrentThreadScheduler。这几个调度器我们可以通过直接使用RxPY的Scheduler类来获取。

ThreadPoolScheduler是RxPY的默认调度器,它基于Python的线程池实现。使用ThreadPoolScheduler调度器的OMG可以实现在局部使用非阻塞的线程池环境,避免阻塞等待事件或数据的继续或者开销。

ImmediateScheduler常常用在测试中。ImmediateScheduler调度器会立刻执行当前事件或数据的任务,如果存在并发任务,则在时间上保持先后顺序,保障数据的同步性。

其中CurrentThreadScheduler调度器可以把任务在当前线程中进行执行。这个调度器可以用于实现一些需要在不同线程之间共享某些值的场景,它也可以用于实现不同线程之间的协作处理,包括IO的调度和内存共享等。

总结

本文介绍了RxPY的一些基本概念和使用方法,特别介绍了如何使用Scheduler来实现RxPY支持并发IO的能力。本文还介绍了RxPY中的一些重要调度器,并结合不同的场景和应用给出了使用方法和注意事项,可以帮助程序员对RxPY进行更好的理解和应用。