美图欣赏 | 设为首页 | 加入收藏 | 网站地图

当前位置:电脑中国 > 编程 > 移动开发 >

Android 响应式编程 RxJava2 完全解析

2018-03-02 14:53|来源:未知 |作者:dnzg |点击:

使用了 RxJava2 有一段时间了,深深感受到了其“牛逼”之处。下面,就从 RxJava2 的基础开始,一步步与大家分享一下这个强大的异步库的用法!RxJava 是 一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库,也就是用于实现异步操作的库。

一、RxJava2 基础

RxJava可以浓缩为异步两个字,其核心的东西不外乎两个, Observables(被观察者) 和 Observable(观察者)。Observables可以发出一系列的 事件(例如网络请求、复杂计算、数据库操作、文件读取等),事件执行结束后交给Observable 的回调处理。

1.RxJava2 的观察者模式

观察者模式是对象的行为模式,也叫做发布-订阅(Publish/Subscribe)模式、模型-视图(Model/View)模式、源-监听器(Source/Listener)模式或从属者(Dependents)模式。

什么是观察者模式?举个栗子,Android中View的点击监听器的实现,View是被观察者,OnClickListener对象是观察者,Activity要如何知道View被点击了?那就是派一个OnClickListener对象,入驻View,与View达成一个订阅关系,一旦View被点击了,就通过OnClickListener对象的OnClick方法传达给Activity。采用观察者模式可以避免去轮询检查,节约有限的cpu资源。

RxJava 作为一个工具库,使用的便是通用形式的观察者模式:

?wx_fmt=png

普通事件:onNext(),相当于 onClick()、onEvent();特殊事件:onCompleted() 和 onError()

如图所示,RxJava 的基本概念分别为:Observable(被观察者,事件源),Observer(观察者,订阅者),subscribe (订阅)、事件;不同的是,RxJava 把多个事件看做一个队列,并对每个事件单独处理。在一个队列中 onCompleted() 和 onError(),只有一个会被调用。如果调用了 onCompleted() 就说明队列执行完毕,没有出现异常,否则调用 onError() 方法并终止队列。

2.RxJava2 响应式编程结构

什么是响应式编程?举个栗子,a = b + c; 这句代码将b+c的值赋给a,而之后如果b和c的值改变了不会影响到a,然而,对于响应式编程,之后b和c的值的改变也动态影响着a,意味着a会随着b和c的变化而变化。

响应式编程的组成为Observable/Operator/Subscriber,RxJava在响应式编程中的基本流程如下:

这个流程,可以简单的理解为:Observable -> Operator1 -> Operator2 -> Operator3 -> Subscriber

  1. Observable发出一系列事件,他是事件的产生者;

  2. Subscriber负责处理事件,他是事件的消费者;

  3. Operator是对Observable发出的事件进行修改和变换;

  4. 若事件从产生到消费不需要其他处理,则可以省略掉中间的Operator,从而流程变为Obsevable -> Subscriber;

  5. Subscriber通常在主线程执行,所以原则上不要去处理太多的事务,而这些复杂的处理则交给Operator;

3.创建一个完整的 RxJava2 调用

首先需要添加 RxJava2 在 Android 中的 Gradle 依赖:

compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile "io.reactivex.rxjava2:rxjava:2.0.8"

RxJava2 可以通过下面这几种方法创建被观察者:

// 发送对应的方法
Observable.create(new ObservableOnSubscribe<String>() {
    // 默认在主线程里执行该方法    
    @Override    
    public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
        e.onNext("Hello");        
        e.onNext("World");      
        // 结束标识        
        e.onComplete();    
     }
});
// 发送多个数据
Observable.just("Hello", "World");
// 发送数组
Observable.fromArray("Hello", "World");
// 发送一个数据
Observable.fromCallable(new Callable<String>() {
    @Override    
    public String call() throws Exception {
        return "Hello";    
    }
});

RxJava2 支持链式编程,下来我们创建被观察者,然后创建观察者并订阅:

// 创建被观察者
Observable.just("Hello", "World")
// 将被观察者切换到子线程
.subscribeOn(Schedulers.io())
// 将观察者切换到主线程
.observeOn(AndroidSchedulers.mainThread())
// 创建观察者并订阅
.subscribe(new Observer<String>() {
    // Disposable 相当于RxJava1.x中的 Subscription,用于解除订阅    
    private Disposable disposable;    
    @Override    
    public void onSubscribe(Disposable d) {  
        disposable = d;    
    }    
    @Override    
    public void onNext(String s) {  
        Log.i("JAVA", "被观察者向观察者发送的数据:" + s);
       if (s == "-1") {   // "-1" 时为异常数据,解除订阅   
            disposable.dispose();        
        }    
   }   
   @Override    
   public void onError(Throwable e) {    
   }    
   @Override    
   public void onComplete() {    
   }
});

一旦 Observer 订阅了 Observable,Observable 就会调用 Observer 的 onNext()、onCompleted()、onError() 等方法。至此一个完整的 RxJava 调用就完成了。看一下输出的Log:

I/JAVA: 被观察者向观察者发送的数据:Hello
I/JAVA: 被观察者向观察者发送的数据:World

若喜欢简洁、定制服务,那么可以实现的方法跟上面的实现方法是对应起来的,大家看参数就知道哪个对应哪个了,你可以通过new Consumer(不需要实现的方法你可以不写,看上去更简洁),Consumer就是消费者的意思,可以理解为消费了 onNext 等事件:

Observable.just("Hello", "World")
.subscribe(new Consumer<String>() {
    @Override    
    public void accept(@NonNull String s) throws Exception {
        Log.i("JAVA", "被观察者向观察者发送的数据:" + s);
    }
}, new Consumer<Throwable>() {
    @Override    
    public void accept(@NonNull Throwable throwable) throws Exception { 
    }
}, new Action() {
    @Override    
    public void run() throws Exception {    
    }
}, new Consumer<Disposable>() {
    @Override    
    public void accept(@NonNull Disposable disposable) throws Exception {
     }
});
					
(责任编辑:dnzg)