📜  适用于 Android 的 RxJava(1)

📅  最后修改于: 2023-12-03 14:57:59.078000             🧑  作者: Mango

适用于 Android 的 RxJava

简介

RxJava是一个基于事件流和数据流的异步编程库,有助于处理异步请求和数据流。它提供了一种轻松的方式来组合,过滤和变换不同的事件,并使异步操作更加容易管理。

在Android开发中,RxJava广泛应用于网络请求、数据库操作、UI线程异步操作等场景。

RxJava基础
Observable

Observable代表一个事件流,它可以产生多个事件并且会通知相关观察者。通常情况下,开发者会订阅这些事件并根据事件类型执行不同的操作。

Observable.just("Hello World!")
    .subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            Log.d("RxJava", s);
        }
    });

上面的代码创建了一个发射"Hello World!"字符串的Observable,并且定义了一个匿名订阅者。当Observable发射事件时,它会通过调用accept()方法将事件传递给观察者。

Observer

Observer代表一个观察者,它负责订阅Observable并且处理其中的事件。Observer通常由三个方法组成:onNext()表示处理Observable发射的事件,onError()表示处理任何错误事件,onComplete()表示Observable事件流的结束。

Observer<String> observer = new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d("RxJava", "onSubscribe");
    }

    @Override
    public void onNext(String s) {
        Log.d("RxJava", "onNext: " + s);
    }

    @Override
    public void onError(Throwable e) {
        Log.d("RxJava", "onError: " + e.getMessage());
    }

    @Override
    public void onComplete() {
        Log.d("RxJava", "onComplete");
    }
};

Observable.just("Hello World!")
    .subscribe(observer);

上面的代码定义了一个观察者,并且通过Observable的subscribe()方法订阅了这个观察者。当Observable发射事件时,它会调用相应的方法通知观察者事件的发生。

Disposable

Disposable是一个可用于取消订阅的对象。当一个Observable被订阅时,它会返回一个Disposable对象。用户可以通过Dispose()方法手动取消订阅或者在观察者结束任务后自动取消订阅。

Disposable disposable = Observable.just("Hello World!")
    .subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            Log.d("RxJava", s);
        }
    });

// 手动取消订阅
disposable.dispose();
RxJava操作符

除了基本的Observable和Observer操作外,RxJava还提供了许多强大的操作符,用于过滤、变换和组合不同的Observable事件流。下面列举了一些常用的操作符:

map()

map()操作符用于将输入的事件类型转换为输出的事件类型。例如将一个字符串转换成其中字符的长度:

Observable.just("Hello World!")
    .map(new Function<String, Integer>() {
        @Override
        public Integer apply(String s) throws Exception {
            return s.length();
        }
    })
    .subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            Log.d("RxJava", "Length: " + integer);
        }
    });
filter()

filter()操作符用于只选出符合某种条件的事件。例如只选出长度大于5的字符串:

Observable.just("Hello World!")
    .filter(new Predicate<String>() {
        @Override
        public boolean test(String s) throws Exception {
            return s.length() > 5;
        }
    })
    .subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            Log.d("RxJava", s);
        }
    });
flatMap()

flatMap()操作符用于将一个Observable事件流中的每一个事件拆分成多个事件,并将这些多个事件合并为一个新的Observable事件流。

Observable.just("Hello World!")
    .flatMap(new Function<String, ObservableSource<Character>>() {
        @Override
        public ObservableSource<Character> apply(String s) throws Exception {
            return Observable.fromIterable(s.toCharArray());
        }
    })
    .subscribe(new Consumer<Character>() {
        @Override
        public void accept(Character character) throws Exception {
            Log.d("RxJava", String.valueOf(character));
        }
    });

上面的代码将"Hello World!"字符串中的每一个字符拆分成多个事件,并将这些事件发送到一个新的Observable事件流中。

zip()

zip()操作符用于将多个Observable事件流中的事件进行压缩或合并。压缩时,按顺序从每个Observable中取出一个事件,并将这些事件压缩为一个新的事件;合并时,则将包含相同索引位置的事件进行合并。

Observable<String> observable1 = Observable.just("A", "B", "C");
Observable<String> observable2 = Observable.just("1", "2", "3");

Observable.zip(observable1, observable2, new BiFunction<String, String, String>() {
    @Override
    public String apply(String s, String s2) throws Exception {
        return s + s2;
    }
}).subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        Log.d("RxJava", s);
    }
});

上面的代码将observable1和observable2中的事件按索引位置进行了合并。

RxJava在Android开发中的应用
网络请求

RxJava可以与Retrofit结合使用以简化网络请求操作。Retrofit中提供了一个Observable接口来表示一个网络请求,我们可以通过订阅这个接口返回的Observable来进行网络请求并监听请求结果。

interface ApiService {
    @GET("/user")
    Observable<User> getUser(@Query("id") int id);
}

ApiService apiService = Retrofit.Builder()
        .baseUrl("https://www.example.com")
        .addConverterFactory(GsonConverterFactory.create())
        .build()
        .create(ApiService.class);

apiService.getUser(1)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Consumer<User>() {
            @Override
            public void accept(User user) throws Exception {
                Log.d("RxJava", user.toString());
            }
        });

上面的代码中,我们定义了一个ApiService接口,并且在使用Retrofit时通过create()方法将其实例化。当我们调用getUser(1)时,便会得到一个Observable接口,并且通过subscribeOn()和observeOn()方法指定了在哪些线程上执行网络请求和接受请求结果。最后我们通过subscribe()方法订阅了这个接口并处理结果。

数据库操作

RxJava可以与Room结合使用以简化数据库操作。Room提供了一个Flowable接口来表示一个数据库查询操作,我们可以通过订阅这个接口返回的Flowable来进行数据库查询并监听查询结果。

@Dao
interface UserDao {
    @Query("SELECT * FROM user WHERE id = :id")
    Flowable<User> getUserById(int id);
}

UserDao userDao = Room.databaseBuilder(context, AppDatabase.class, "app-database")
        .build()
        .userDao();

userDao.getUserById(1)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Consumer<User>() {
            @Override
            public void accept(User user) throws Exception {
                Log.d("RxJava", user.toString());
            }
        });

上面的代码中,我们定义了一个UserDao接口,并且在使用Room时通过databaseBuilder()方法将其实例化。当我们调用getUserById(1)时,便会得到一个Flowable接口,并且通过subscribeOn()和observeOn()方法指定了在哪些线程上执行数据库查询和接受查询结果。最后我们通过subscribe()方法订阅了这个接口并处理结果。

UI线程异步操作

在Android开发中,我们经常需要在UI线程上进行耗时的异步操作,例如加载图片或者处理耗时任务。RxJava可以通过AndroidSchedulers提供的方法将操作切换到UI线程上运行。

Observable.just("https://www.example.com/image.jpg")
    .map(new Function<String, Bitmap>() {
        @Override
        public Bitmap apply(String s) throws Exception {
            URL url = new URL(s);
            HttpURLConnection connection = (HttpURLConnection) url.openConnection();
            InputStream inputStream = connection.getInputStream();
            Bitmap bitmap = BitmapFactory.decodeStream(inputStream);
            return bitmap;
        }
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Consumer<Bitmap>() {
        @Override
        public void accept(Bitmap bitmap) throws Exception {
            imageView.setImageBitmap(bitmap);
        }
    });

上面的代码中,我们先从网络上获取一张图片,然后通过subscribeOn()方法指定网络请求在非UI线程上进行,最后通过observeOn()方法指定图片设置在UI线程上进行。

总结

RxJava是一个强大的异步编程库,在Android开发中有着广泛的应用。通过使用RxJava,我们可以简化网络请求、数据库操作和UI线程异步操作等工作,并提高代码的简洁性和可读性。