📅  最后修改于: 2020-11-06 06:41:08             🧑  作者: Mango
在本章中,我们将详细讨论以下主题-
下面给出的是一个基本示例,该示例显示了Observable, 运算符和订阅观察者的工作。
test.py
import requests
import rx
import json
from rx import operators as ops
def filternames(x):
if (x["name"].startswith("C")):
return x["name"]
else :
return ""
content = requests.get('https://jsonplaceholder.typicode.com/users')
y = json.loads(content.text)
source = rx.from_(y)
case1 = source.pipe(
ops.filter(lambda c: filternames(c)),
ops.map(lambda a:a["name"])
)
case1.subscribe(
on_next = lambda i: print("Got - {0}".format(i)), 8. RxPy — Examples
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!"),
)
这是一个非常简单的示例,其中,我从该URL获取用户数据-
https://jsonplaceholder.typicode.com/users。
过滤数据,以“ C”开头的名称,然后使用映射仅返回名称。这是相同的输出-
E:\pyrx\examples>python test.py
Got - Clementine Bauch
Got - Chelsey Dietrich
Got - Clementina DuBuque
Job Done!
在此示例中,我们将看到可观察对象和主题之间的区别。
from rx import of, operators as op
import random
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
op.map(lambda a : a+random.random())
)
print("From first subscriber")
subscriber1 = sub1.subscribe(lambda i: print("From sub1 {0}".format(i)))
print("From second subscriber")
subscriber2 = sub1.subscribe(lambda i: print("From sub2 {0}".format(i)))
E:\pyrx>python testrx.py
From first subscriber
From sub1 1.610450821095726
From sub1 2.9567564032037335
From sub1 3.933217537811936
From sub1 4.82444905626622
From sub1 5.929414892567188
From second subscriber
From sub2 1.8573813517529874
From sub2 2.902433239469483
From sub2 3.2289868093016825
From sub2 4.050413890694411
From sub2 5.226515068012821
在上面的示例中,每次订阅可观察对象时,它将为您提供新的值。
from rx import of, operators as op
import random
from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
lambda x: print("From sub1 {0}".format(x))
)
subject_test.subscribe(
lambda x: print("From sub2 {0}".format(x))
)
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
op.map(lambda a : a+random.random())
)
subscriber = sub1.subscribe(subject_test)
E:\pyrx>python testrx.py
From sub1 1.1789422863284509
From sub2 1.1789422863284509
From sub1 2.5525627903260153
From sub2 2.5525627903260153
From sub1 3.4191549324778325
From sub2 3.4191549324778325
From sub1 4.644042420199624
From sub2 4.644042420199624
From sub1 5.079896897489065
From sub2 5.079896897489065
如果您看到值是共享的,则使用该主题在两个订户之间共享。
可观察到的分类为
当多个订户订阅时,可观察到的差异将被注意到。
冷可观察的,可观察的,已执行的并在每次订阅时呈现数据。订阅后,将执行observable并给出新值。
下面的示例使人们对感冒可以理解。
from rx import of, operators as op
import random
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
op.map(lambda a : a+random.random())
)
print("From first subscriber")
subscriber1 = sub1.subscribe(lambda i: print("From sub1 {0}".format(i)))
print("From second subscriber")
subscriber2 = sub1.subscribe(lambda i: print("From sub2 {0}".format(i)))
E:\pyrx>python testrx.py
From first subscriber
From sub1 1.610450821095726
From sub1 2.9567564032037335
From sub1 3.933217537811936
From sub1 4.82444905626622
From sub1 5.929414892567188
From second subscriber
From sub2 1.8573813517529874
From sub2 2.902433239469483
From sub2 3.2289868093016825
From sub2 4.050413890694411
From sub2 5.226515068012821
在上面的示例中,每次订阅可观察对象时,它将执行可观察对象并发出值。如上例所示,每个订户的值也可能不同。
在可观察的情况下,它们将在准备就绪时发出值,而不会总是等待订阅。发出值时,所有订户将获得相同的值。
当您希望在可观察对象就绪时发出值时,或者您希望与所有订户共享相同的值时,可以使用热可观察对象。
热可观察的一个例子是Subject和connectable运算符。
from rx import of, operators as op
import random
from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
lambda x: print("From sub1 {0}".format(x))
)
subject_test.subscribe(
lambda x: print("From sub2 {0}".format(x))
)
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
op.map(lambda a : a+random.random())
)
subscriber = sub1.subscribe(subject_test)
E:\pyrx>python testrx.py
From sub1 1.1789422863284509
From sub2 1.1789422863284509
From sub1 2.5525627903260153
From sub2 2.5525627903260153
From sub1 3.4191549324778325
From sub2 3.4191549324778325
From sub1 4.644042420199624
From sub2 4.644042420199624
From sub1 5.079896897489065
From sub2 5.079896897489065
如果看到,则订阅者之间共享相同的值。您可以使用publish()可连接的observable运算符来实现相同目的。