📜  RxJava 中的 Observable 类型(1)

📅  最后修改于: 2023-12-03 15:34:46.972000             🧑  作者: Mango

RxJava 中的 Observable 类型

在 RxJava 中,Observable 是其中一个最常用的类,它被用来表示一个异步操作的结果,可以被观察者(Observer)订阅来获取相关数据。

创建 Observable

1. 使用 create 方法

我们可以使用 create 方法来手动创建一个 Observable:

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("Hello");
                emitter.onNext("RxJava");
                emitter.onNext("Observable");

                emitter.onComplete();
            }
        });

在 create 方法中,我们需要实现一个 subscribe 方法,该方法会接收一个 ObservableEmitter 对象作为参数。我们可以通过调用 ObservableEmitter 的 onNext 方法来发送数据,调用 onComplete 方法来表示数据已经发送完毕。

2. 使用 just 方法

我们也可以使用 just 方法来创建一个 Observable,该方法会直接将传入的参数作为数据发送到观察者端:

Observable<String> observable = Observable.just("Hello", "RxJava", "Observable");

它等价于下面的代码:

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("Hello");
                emitter.onNext("RxJava");
                emitter.onNext("Observable");

                emitter.onComplete();
            }
        });

3. 使用 fromArray 或 fromIterable 方法

我们还可以使用 fromArray 或 fromIterable 方法来创建一个 Observable,该方法会将传入的数组或 Iterable 对象中的数据发送出去:

String[] arr = {"Hello", "RxJava", "Observable"};
Observable<String> observable = Observable.fromArray(arr);

List<String> list = new ArrayList<>();
list.add("Hello");
list.add("RxJava");
list.add("Observable");

Observable<String> observable = Observable.fromIterable(list);
订阅 Observable

我们创建好 Observable 后,就需要将它与观察者进行订阅:

observable.subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                // 订阅时的操作,通常为空
            }

            @Override
            public void onNext(String s) {
                // 当 Observable 发送数据时,该方法会被调用
                System.out.println(s);
            }

            @Override
            public void onError(Throwable e) {
                // 当 Observable 发生错误时,该方法会被调用
            }

            @Override
            public void onComplete() {
                // 当 Observable 发送完所有数据时,该方法会被调用
            }
        });

在订阅 Observable 时,我们需要实现一个 Observer 对象,该对象需要实现四个方法,分别对应订阅时的初始化操作、当 Observable 发送数据时的回调、当 Observable 发生错误时的回调以及当 Observable 发送完所有数据后的回调。

取消订阅

当我们不再需要接收 Observable 发送的数据时,需要及时取消订阅,以免造成资源浪费:

Disposable disposable = observable.subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                // 订阅时的操作,通常为空
            }

            @Override
            public void onNext(String s) {
                // 当 Observable 发送数据时,该方法会被调用
                System.out.println(s);
            }

            @Override
            public void onError(Throwable e) {
                // 当 Observable 发生错误时,该方法会被调用
            }

            @Override
            public void onComplete() {
                // 当 Observable 发送完所有数据时,该方法会被调用
            }
        });

disposable.dispose();

我们可以调用 Disposable 对象的 dispose 方法来取消订阅,以释放相关资源。

线程控制

在实际使用中,我们通常需要控制 Observable 发送数据的线程以及观察者接收数据的线程。在 RxJava 中,有以下几种方式来控制线程:

1. subscribeOn 和 observeOn 方法

我们可以使用 subscribeOn 方法来指定 Observable 发送数据的线程,使用 observeOn 方法来指定观察者接收数据的线程:

Observable.just(1, 2, 3)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Integer>() { 
                    // 省略部分代码
                });

在上面的代码中,我们使用 subscribeOn 方法将 Observable 发送数据的线程切换到 IO 线程,使用 observeOn 方法将观察者接收数据的线程切换到主线程。

2. Schedulers 类

除了通过 subscribeOn 和 observeOn 方法来控制线程外,RxJava 还提供了 Schedulers 类,用于控制 Observable 和观察者所使用的线程:

Observable.just(1, 2, 3)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Integer>() { 
                    // 省略部分代码
                });

在上面的代码中,我们使用 Schedulers.io() 方法将 Observable 发送数据的线程切换到 IO 线程,使用 AndroidSchedulers.mainThread() 方法将观察者接收数据的线程切换到主线程。

总结

Observable 是 RxJava 中的一个核心类,它被用来表示一个异步操作的结果,并通过 Observer 观察者进行订阅来获取相关数据。我们可以手动创建 Observable,也可以利用 just、fromArray、fromIterable 等方法来快速创建。订阅 Observable 后,我们需要实现 Observer 对象,该对象用于接收 Observable 发送的数据。最后,我们可以通过 subscribeOn 和 observeOn 方法或者 Schedulers 类来控制 Observable 和观察者所使用的线程。