📅  最后修改于: 2023-12-03 15:34:46.972000             🧑  作者: Mango
在 RxJava 中,Observable 是其中一个最常用的类,它被用来表示一个异步操作的结果,可以被观察者(Observer)订阅来获取相关数据。
我们可以使用 create 方法来手动创建一个 Observable:
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("Hello");
emitter.onNext("RxJava");
emitter.onNext("Observable");
emitter.onComplete();
}
});
在 create 方法中,我们需要实现一个 subscribe 方法,该方法会接收一个 ObservableEmitter 对象作为参数。我们可以通过调用 ObservableEmitter 的 onNext 方法来发送数据,调用 onComplete 方法来表示数据已经发送完毕。
我们也可以使用 just 方法来创建一个 Observable,该方法会直接将传入的参数作为数据发送到观察者端:
Observable<String> observable = Observable.just("Hello", "RxJava", "Observable");
它等价于下面的代码:
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("Hello");
emitter.onNext("RxJava");
emitter.onNext("Observable");
emitter.onComplete();
}
});
我们还可以使用 fromArray 或 fromIterable 方法来创建一个 Observable,该方法会将传入的数组或 Iterable 对象中的数据发送出去:
String[] arr = {"Hello", "RxJava", "Observable"};
Observable<String> observable = Observable.fromArray(arr);
List<String> list = new ArrayList<>();
list.add("Hello");
list.add("RxJava");
list.add("Observable");
Observable<String> observable = Observable.fromIterable(list);
我们创建好 Observable 后,就需要将它与观察者进行订阅:
observable.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
// 订阅时的操作,通常为空
}
@Override
public void onNext(String s) {
// 当 Observable 发送数据时,该方法会被调用
System.out.println(s);
}
@Override
public void onError(Throwable e) {
// 当 Observable 发生错误时,该方法会被调用
}
@Override
public void onComplete() {
// 当 Observable 发送完所有数据时,该方法会被调用
}
});
在订阅 Observable 时,我们需要实现一个 Observer 对象,该对象需要实现四个方法,分别对应订阅时的初始化操作、当 Observable 发送数据时的回调、当 Observable 发生错误时的回调以及当 Observable 发送完所有数据后的回调。
当我们不再需要接收 Observable 发送的数据时,需要及时取消订阅,以免造成资源浪费:
Disposable disposable = observable.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
// 订阅时的操作,通常为空
}
@Override
public void onNext(String s) {
// 当 Observable 发送数据时,该方法会被调用
System.out.println(s);
}
@Override
public void onError(Throwable e) {
// 当 Observable 发生错误时,该方法会被调用
}
@Override
public void onComplete() {
// 当 Observable 发送完所有数据时,该方法会被调用
}
});
disposable.dispose();
我们可以调用 Disposable 对象的 dispose 方法来取消订阅,以释放相关资源。
在实际使用中,我们通常需要控制 Observable 发送数据的线程以及观察者接收数据的线程。在 RxJava 中,有以下几种方式来控制线程:
我们可以使用 subscribeOn 方法来指定 Observable 发送数据的线程,使用 observeOn 方法来指定观察者接收数据的线程:
Observable.just(1, 2, 3)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Integer>() {
// 省略部分代码
});
在上面的代码中,我们使用 subscribeOn 方法将 Observable 发送数据的线程切换到 IO 线程,使用 observeOn 方法将观察者接收数据的线程切换到主线程。
除了通过 subscribeOn 和 observeOn 方法来控制线程外,RxJava 还提供了 Schedulers 类,用于控制 Observable 和观察者所使用的线程:
Observable.just(1, 2, 3)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Integer>() {
// 省略部分代码
});
在上面的代码中,我们使用 Schedulers.io() 方法将 Observable 发送数据的线程切换到 IO 线程,使用 AndroidSchedulers.mainThread() 方法将观察者接收数据的线程切换到主线程。
Observable 是 RxJava 中的一个核心类,它被用来表示一个异步操作的结果,并通过 Observer 观察者进行订阅来获取相关数据。我们可以手动创建 Observable,也可以利用 just、fromArray、fromIterable 等方法来快速创建。订阅 Observable 后,我们需要实现 Observer 对象,该对象用于接收 Observable 发送的数据。最后,我们可以通过 subscribeOn 和 observeOn 方法或者 Schedulers 类来控制 Observable 和观察者所使用的线程。