📅  最后修改于: 2023-12-03 15:34:47.124000             🧑  作者: Mango
RxPY是RxJS的Python版本,提供了像RxJS一样的一系列操作符。RxPY运算符是用于处理Observable序列的函数,可以帮助处理数据流。RxPY运算符可以分为以下几类:
创建操作符用于从现有数据结构中构建Observable序列,提供了从旧数据转换为新数据的方法。以下是一些创建操作符:
from rx import of
observable = of(1, 2, 3)
observable.subscribe(on_next=lambda i: print(i))
from rx import from_iterable
observable = from_iterable([1, 2, 3])
observable.subscribe(on_next=lambda i: print(i))
转换操作符用于将Observable序列转换为不同的形式,这些操作符通常用于变换Observable序列中的物品,从而使其更易于处理。以下是一些转换操作符:
from rx import from_iterable
observable = from_iterable([1, 2, 3])
observable.pipe(
map(lambda i: i * 2)
).subscribe(on_next=lambda i: print(i))
from rx import from_iterable
observable = from_iterable([1, 2, 3])
observable.pipe(
filter(lambda i: i % 2 == 0)
).subscribe(on_next=lambda i: print(i))
组合操作符用于将多个Observable序列组合在一起以创建新的Observable序列。以下是一些组合操作符:
from rx import of, concat
observable1 = of(1, 2, 3)
observable2 = of(4, 5, 6)
observable = concat(observable1, observable2)
observable.subscribe(on_next=lambda i: print(i))
from rx import from_iterable, zip
observable1 = from_iterable([1, 2, 3])
observable2 = from_iterable(['a', 'b', 'c'])
observable = zip(observable1, observable2)
observable.subscribe(on_next=lambda i: print(i))
错误处理运算符用于处理Observable序列中发生的错误或异常情况。以下是一些错误处理运算符:
from rx import of, throwError, catch
def bad_function():
raise Exception('Bad function!')
observable1 = of('A', 'B')
observable2 = throwError(Exception('Oops!'))
observable = catch(observable2, observable1)
observable.subscribe(on_next=lambda i: print(i))
import random
from rx import of, throwError, retry
def bad_function():
if random.randint(0, 1) == 1:
raise Exception('Bad function!')
return 'Good function!'
observable = of(bad_function()).pipe(
retry(3)
)
observable.subscribe(on_next=lambda i: print(i))
这里列出的只是一小部分RxPY运算符,RxPY提供了更多的运算符,可以在https://rxjs.dev/api/中找到。