📅  最后修改于: 2023-12-03 14:56:15.609000             🧑  作者: Mango
Observable.create()
是 RxJava 中的一个静态方法,用来创建一个自定义的 Observable 对象。它可以让程序员完全掌控如何订阅数据流和处理事件。
Observable.create(ObservableOnSubscribe<T> source)
ObservableOnSubscribe<T>
是一个函数接口,用来定义观察者(Subscriber)如何订阅数据流。
下面是一个使用 Observable.create()
创建 Observable 对象的示例:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
// 在这里定义如何发送事件
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
// 订阅开始时的处理
}
@Override
public void onNext(Integer integer) {
// 接收到新的事件时的处理
}
@Override
public void onError(Throwable e) {
// 发生错误时的处理
}
@Override
public void onComplete() {
// 订阅完成时的处理
}
});
Observable.create()
方法中的参数 source
是一个 ObservableOnSubscribe 的实例,可以理解为观察者的回调函数,它定义了观察者如何订阅和响应数据。在 subscribe()
方法中,我们可以手动调用 emitter.onNext()
发送新的事件,调用 emitter.onComplete()
表示数据流结束。
上述示例创建了一个发送整型数据的 Observable,订阅时会依次收到 1 和 2,然后完成订阅。在 subscribe()
方法中,可以通过实现 Observer 接口的回调方法来处理不同的事件,如 onNext()
等。
Observable.create()
主要用于创建一些不规则的数据流,或者需要手动控制事件发送的场景。例如,可以使用 Observable.create()
创建一个监听器,并在事件发生时手动发送事件。
Observable.create(new ObservableOnSubscribe<Event>() {
@Override
public void subscribe(ObservableEmitter<Event> emitter) throws Exception {
final MyListener listener = new MyListener() {
@Override
public void onEvent(Event event) {
emitter.onNext(event);
}
};
// 注册监听器
MyManager.getInstance().addListener(listener);
// 在取消订阅时,移除监听器
emitter.setCancellable(() -> MyManager.getInstance().removeListener(listener));
}
})
.subscribe(new Observer<Event>() {
// ...
});
在上述示例中,我们创建了一个 Observable 对象,监听 MyManager 中的事件,并手动发送该事件。在取消订阅时,我们还可以通过 emitter.setCancellable()
方法来清理资源,例如移除监听器。