📜  RxPY-运算符(1)

📅  最后修改于: 2023-12-03 15:34:47.124000             🧑  作者: Mango

RxPY运算符

RxPY是RxJS的Python版本,提供了像RxJS一样的一系列操作符。RxPY运算符是用于处理Observable序列的函数,可以帮助处理数据流。RxPY运算符可以分为以下几类:

创建操作符

创建操作符用于从现有数据结构中构建Observable序列,提供了从旧数据转换为新数据的方法。以下是一些创建操作符:

  • of:创建一个发出给定项的Observable序列。
from rx import of

observable = of(1, 2, 3)
observable.subscribe(on_next=lambda i: print(i))
  • from_iterable:创建一个发出Iterable中的所有项的Observable序列。
from rx import from_iterable

observable = from_iterable([1, 2, 3])
observable.subscribe(on_next=lambda i: print(i))
转换操作符

转换操作符用于将Observable序列转换为不同的形式,这些操作符通常用于变换Observable序列中的物品,从而使其更易于处理。以下是一些转换操作符:

  • map:将Observable序列中的每个物品映射到另一个物品序列。
from rx import from_iterable

observable = from_iterable([1, 2, 3])
observable.pipe(
    map(lambda i: i * 2)
).subscribe(on_next=lambda i: print(i))
  • filter:根据谓词函数过滤Observable序列中的物品并只发出符合条件的物品。
from rx import from_iterable

observable = from_iterable([1, 2, 3])
observable.pipe(
    filter(lambda i: i % 2 == 0)
).subscribe(on_next=lambda i: print(i))
组合操作符

组合操作符用于将多个Observable序列组合在一起以创建新的Observable序列。以下是一些组合操作符:

  • concat:按顺序将多个Observable序列连接在一起。
from rx import of, concat

observable1 = of(1, 2, 3)
observable2 = of(4, 5, 6)

observable = concat(observable1, observable2)
observable.subscribe(on_next=lambda i: print(i))
  • zip:将多个Observable序列相互配对并发出打包后的物品。
from rx import from_iterable, zip

observable1 = from_iterable([1, 2, 3])
observable2 = from_iterable(['a', 'b', 'c'])

observable = zip(observable1, observable2)
observable.subscribe(on_next=lambda i: print(i))
错误处理运算符

错误处理运算符用于处理Observable序列中发生的错误或异常情况。以下是一些错误处理运算符:

  • catch:捕捉Observable序列中的异常并使用备用Observable序列替换它。
from rx import of, throwError, catch

def bad_function():
    raise Exception('Bad function!')

observable1 = of('A', 'B')
observable2 = throwError(Exception('Oops!'))
observable = catch(observable2, observable1)
observable.subscribe(on_next=lambda i: print(i))
  • retry:重试Observable序列,如果它失败了,则重新订阅。
import random
from rx import of, throwError, retry

def bad_function():
    if random.randint(0, 1) == 1:
        raise Exception('Bad function!')
    return 'Good function!'

observable = of(bad_function()).pipe(
    retry(3)
)
observable.subscribe(on_next=lambda i: print(i))

这里列出的只是一小部分RxPY运算符,RxPY提供了更多的运算符,可以在https://rxjs.dev/api/中找到。