📅  最后修改于: 2020-11-06 06:40:07             🧑  作者: Mango
主题既是可观察的序列,也是可以多播的观察者,即与已订阅的许多观察者交谈。
我们将讨论以下主题主题-
要处理主题,我们需要导入主题,如下所示:
from rx.subject import Subject
您可以按如下方式创建主题对象-
subject_test = Subject()
该对象是具有三种方法的观察者-
您可以在主题上创建多个订阅,如下所示:
subject_test.subscribe(
lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
lambda x: print("The value is {0}".format(x))
)
您可以将数据传递给使用on_next(value)方法创建的主题,如下所示-
subject_test.on_next("A")
subject_test.on_next("B")
数据将传递给所有订阅,并添加到主题上。
这是该主题的一个实际例子。
from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_next("B")
subject_test对象是通过调用Subject()创建的。 subject_test对象引用了on_next(value),on_error(error)和on_completed()方法。上面示例的输出如下所示-
E:\pyrx>python testrx.py
The value is A
The value is A
The value is B
The value is B
我们可以使用on_completed()方法来停止主题执行,如下所示。
from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_completed()
subject_test.on_next("B")
一旦调用完成,就不会调用后面调用的下一个方法。
E:\pyrx>python testrx.py
The value is A
The value is A
现在让我们看看如何调用on_error(error)方法。
from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.subscribe(
on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.on_error(Exception('There is an Error!'))
E:\pyrx>python testrx.py
Error: There is an Error!
Error: There is an Error!
当被调用时,BehaviorSubject将为您提供最新的值。您可以如下所示创建行为主题-
from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject"); // initialized the behaviour subject with value:Testing Behaviour Subject
这是使用行为主题的有效示例
from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject");
behavior_subject.subscribe(
lambda x: print("Observer A : {0}".format(x))
)
behavior_subject.on_next("Hello")
behavior_subject.subscribe(
lambda x: print("Observer B : {0}".format(x))
)
behavior_subject.on_next("Last call to Behaviour Subject")
E:\pyrx>python testrx.py
Observer A : Testing Behaviour Subject
Observer A : Hello
Observer B : Hello
Observer A : Last call to Behaviour Subject
Observer B : Last call to Behaviour Subject
重播主题类似于行为主题,其中,它可以缓冲值并将其重播给新订户。这是重放主题的一个有效示例。
from rx.subject import ReplaySubject
replay_subject = ReplaySubject(2)
replay_subject.subscribe(lambda x: print("Testing Replay Subject A: {0}".format(x)))
replay_subject.on_next(1)
replay_subject.on_next(2)
replay_subject.on_next(3)
replay_subject.subscribe(lambda x: print("Testing Replay Subject B: {0}".format(x)));
replay_subject.on_next(5)
重播主题上使用的缓冲区值为2。因此,最后两个值将被缓冲并用于新的订户。
E:\pyrx>python testrx.py
Testing Replay Subject A: 1
Testing Replay Subject A: 2
Testing Replay Subject A: 3
Testing Replay Subject B: 2
Testing Replay Subject B: 3
Testing Replay Subject A: 5
Testing Replay Subject B: 5
对于AsyncSubject,最后一次调用的值将传递给订阅服务器,并且只有在调用complete()方法之后才能完成。
from rx.subject import AsyncSubject
async_subject = AsyncSubject()
async_subject.subscribe(lambda x: print("Testing Async Subject A: {0}".format(x)))
async_subject.on_next(1)
async_subject.on_next(2)
async_subject.on_completed()
async_subject.subscribe(lambda x: print("Testing Async Subject B: {0}".format(x)))
Here, before complete is called, the last value passed to the subject is 2, and the same is given to the subscribers.
E:\pyrx>python testrx.py
Testing Async Subject A: 2
Testing Async Subject B: 2