📜  理解 Observable.create( (1)

📅  最后修改于: 2023-12-03 14:56:15.609000             🧑  作者: Mango

Observable.create()

Observable.create() 是 RxJava 中的一个静态方法,用来创建一个自定义的 Observable 对象。它可以让程序员完全掌控如何订阅数据流和处理事件。

语法
Observable.create(ObservableOnSubscribe<T> source)

ObservableOnSubscribe<T> 是一个函数接口,用来定义观察者(Subscriber)如何订阅数据流。

使用示例

下面是一个使用 Observable.create() 创建 Observable 对象的示例:

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
        // 在这里定义如何发送事件
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onComplete();
    }
})
.subscribe(new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {
        // 订阅开始时的处理
    }

    @Override
    public void onNext(Integer integer) {
        // 接收到新的事件时的处理
    }

    @Override
    public void onError(Throwable e) {
        // 发生错误时的处理
    }

    @Override
    public void onComplete() {
        // 订阅完成时的处理
    }
});
解析

Observable.create() 方法中的参数 source 是一个 ObservableOnSubscribe 的实例,可以理解为观察者的回调函数,它定义了观察者如何订阅和响应数据。在 subscribe() 方法中,我们可以手动调用 emitter.onNext() 发送新的事件,调用 emitter.onComplete() 表示数据流结束。

上述示例创建了一个发送整型数据的 Observable,订阅时会依次收到 1 和 2,然后完成订阅。在 subscribe() 方法中,可以通过实现 Observer 接口的回调方法来处理不同的事件,如 onNext() 等。

适用场景

Observable.create() 主要用于创建一些不规则的数据流,或者需要手动控制事件发送的场景。例如,可以使用 Observable.create() 创建一个监听器,并在事件发生时手动发送事件。

Observable.create(new ObservableOnSubscribe<Event>() {
    @Override
    public void subscribe(ObservableEmitter<Event> emitter) throws Exception {
        final MyListener listener = new MyListener() {
            @Override
            public void onEvent(Event event) {
                emitter.onNext(event);
            }
        };

        // 注册监听器
        MyManager.getInstance().addListener(listener);

        // 在取消订阅时,移除监听器
        emitter.setCancellable(() -> MyManager.getInstance().removeListener(listener));
    }
})
.subscribe(new Observer<Event>() {
    // ...
});

在上述示例中,我们创建了一个 Observable 对象,监听 MyManager 中的事件,并手动发送该事件。在取消订阅时,我们还可以通过 emitter.setCancellable() 方法来清理资源,例如移除监听器。