学习RxJava源码关注:
http://www.cherylgood.cn/detail/5a6037029a7b56c5c24fbc69
学习Rxjava使用理念关注
https://www.jianshu.com/p/a75ecf461e02
什么是同步订阅?
上下游工作在同一个线程, 上游每发送一个事件必须等待下游处理完了才会继续发事件, 不可能出现上下游流速不均衡的问题呀.
RxJava实际应用案例讲解:使用RxJava的最佳开发场景
RxJava是什么?
一个可以在JVM上使用的,是由异步的基于事件编写的通过使用可观察序列构成的一个库。
创建到执行




CreateEmitter继承了AtomicReference提供了原子级的控制能力,目的是防止多线程操作出现的错误。
神秘的取消订阅流程

RxJava2.x 线程切换的实现原理
Observable.subscribeOn(Schedulers.io())返回的是一个Observable对象,这里面涉及的代码段如下:

备注:RxJavaPlugins.onAssembly,前面分析过,为hook服务,ObservableSubscribeOn是对Observble进行了一次wrapper操作。
代码接着往下看:

Observable是ObservableSource接口的实现类,通过source字段保存上游的Observable,

parent就是我们包装后的observer,其内部保存了下游的observer,source即通过ObservableSubscribeOnwrapper后存储我们上游的obserabler,所以run里面的source.subscribe(parent);即为wrapper的observer订阅了上游的observable,触发了上游observable的subscribeActual,开始执行数据的分发。
观察者线程切换原理
分析observeOn与subscribeOn的不同之处。

都是经过包装然后,执行 HandlerScheduler 的 schedule,用的是 Handler 机制来完成的。
操作符应用

Rxjava系列读他的博客:
https://www.jianshu.com/u/c50b715ccaeb
总结:
简单的来说, subscribeOn() 指定的是上游发送事件的线程, observeOn() 指定的是下游接收事件的线程。
论切换线程次数的有效性
多次指定上游的线程只有第一次指定的有效, 也就是说多次调用subscribeOn() 只有第一次的有效, 其余的会被忽略。
多次指定下游的线程是可以的, 也就是说每调用一次observeOn() , 下游的线程就会切换一次。
情境分析
1,如果在请求的过程中Activity已经退出了, 这个时候如果回到主线程去更新UI, 那么APP肯定就崩溃了?
我们可以在Activity中将这个Disposable 保存起来, 当Activity退出时, 切断它即可,那如果有多个Disposable 该怎么办呢, RxJava中已经内置了一个容器CompositeDisposable,每当我们得到一个Disposable时就调用CompositeDisposable.add()将它添加到容器中, 在退出的时候, 调用CompositeDisposable.clear() 即可切断所有的水管。
读写数据库
public Observable<List<Record>> readAllRecords() {
return Observable.create(new ObservableOnSubscribe<List<Record>>() {
@Override
public void subscribe(ObservableEmitter<List<Record>> emitter) throws Exception {
Cursor cursor = null;
try {
cursor = getReadableDatabase().rawQuery("select * from " + TABLE_NAME, new String[]{});
List<Record> result = new ArrayList<>();
while (cursor.moveToNext()) {
result.add(Db.Record.read(cursor));
}
emitter.onNext(result);
emitter.onComplete();
} finally {
if (cursor != null) {
cursor.close();
}
}
}
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());}
上游在子线程,下游在主线程
当上下游工作在同一个线程中时, 这时候是一个同步的订阅关系, 也就是说上游每发送一个事件必须等到下游接收处理完了以后才能接着发送下一个事件。
当上下游工作在不同的线程中时, 这时候是一个异步的订阅关系, 这个时候上游发送数据不需要等待下游接收, 为什么呢, 因为两个线程并不能直接进行通信, 因此上游发送的事件并不能直接到下游里去, 这个时候就需要一个田螺姑娘来帮助它们俩, 这个田螺姑娘就是我们刚才说的水缸 ! 上游把事件发送到水缸里去, 下游从水缸里取出事件来处理, 因此, 当上游发事件的速度太快, 下游取事件的速度太慢, 水缸就会迅速装满, 然后溢出来, 最后就OOM了。
Flowable在设计的时候采用了一种新的思路也就是响应式拉取的方式来更好的解决上下游流速不均衡的问题。
下游没有调用request, 上游就认为下游没有处理事件的能力, 而这又是一个同步的订阅, 既然下游处理不了, 那上游不可能一直等待吧, 如果是这样, 万一这两根水管工作在主线程里, 界面不就卡死了吗, 因此只能抛个异常来提醒我们. 那如何解决这种情况呢, 很简单啦, 下游直接调用request(Long.MAX_VALUE)就行了, 或者根据上游发送事件的数量来request就行了, 比如这里request(3)就可以了。
在Flowable里默认有一个大小为128的水缸, 当上下游工作在不同的线程中时, 上游就会先把事件发送到这个水缸中, 因此, 下游虽然没有调用request, 但是上游在水缸中保存着这些事件, 只有当下游调用request时, 才从水缸里取出事件发给下游。
Observaable -> Flowable
Observer -> Subscriber
Flowable的存在就是解决背压问题的
Flowable<Integer> upstream = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "emit 1");
emitter.onNext(1);
Log.d(TAG, "emit 2");
emitter.onNext(2);
Log.d(TAG, "emit 3");
emitter.onNext(3);
Log.d(TAG, "emit complete");
emitter.onComplete();
}
}, BackpressureStrategy.ERROR); //增加了一个参数
Subscriber<Integer> downstream = new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
s.request(Long.MAX_VALUE); //注意这句代码
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
};
upstream.subscribe(downstream);
备注:
当上下游流速不均匀的时候,就会报出 io.reactivex.exceptions.MissingBackpressureException: 所以上下游发生这种 发送 -> 接收不对等的时候,就要考虑背压策略,我们的Flowable就要登场了。


通过本节的学习,大家应该知道如何正确的去实现一个完整的响应式拉取了,在某一些场景下,可以在发送事件前先判断当前的requested的值是否大于0,若等于0则说明下游处理不过来了,则需要等待,例如下面这个例子。
RxBinding是一个将Android UI控件事件转换为Rx事件的Java库
Button button = ...;
RxView.clickEvents(button)
.subscribe(new Action1<ViewClickEvent>() {
@Override
public void call(ViewClickEvent event) {
// Click handling
}
});
RxBus
RxBus是一个通过RxJava来实现时间总线的开源库,使用方式与大多是EventBus库类似,只是内部是通过RxJava来处理事件,这里就不做篇幅讲解了,有兴趣的同学可以参考https://github.com/AndroidKnife/RxBus
- Schedulers.from(Executor executor)
我们可以使用它创建自定义的Scheduler,它是由我们自己的Executor作为支撑的。在有些场景下,我们希望创建自定义的Scheduler为App执行特定的任务,这些任务可能需要自定义的线程逻辑。
假设,我们想要限制App中并行网络请求的数量,那么我们就可以创建一个自定义的Scheduler,使其具有一个固定线程池大小的Executor:Scheduler.from(Executors.newFixedThreadPool(n)),然后将其应用到代码中所有网络相关的Observable上。
- Schedulers.io()
这是由无边界线程池作为支撑的一个Scheduler,它适用于非CPU密集的I/O工作,比如访问文件系统、执行网络调用、访问数据库等等。这个Scheduler是没有限制的,它的线程池可以按需一直增长。
注意:在使用无边界线程池支撑的Scheduler时,比如Schedulers.io(),我们要特别小心,因为它有可能会导致线程池无限增长,使系统中出现大量的线程。
- Schedulers.computation()
这个Scheduler用于执行CPU密集的工作,比如处理大规模的数据集、图像处理等等。它由一个有界的线程池作为支撑,线程的最大数量就是可用的处理器数量。
因为这个Scheduler只适用于CPU密集的任务,我们希望限制线程的数量,这样的话,它们不会彼此抢占CPU时间或出现线程饿死的现象。
- Schedulers.newThread()
**这个Scheduler 每次都会创建一个全新的线程来完成一组工作。它不会从任何线程池中受益,线程的创建和销毁都是很昂贵的,所以你需要非常小心,不要衍生出太多的线程,导致服务器系统变慢或出现内存溢出的错误。
理想情况下,你应该很少使用这个Scheduler,它大多用于在一个完全分离的线程中开始一项长时间运行、隔离的一组任务。**
- Schedulers.single()
这个Scheduler是RxJava 2新引入的,它的背后只有一个线程作为支撑,只能按照有序的方式执行任务。如果你有一组后台任务要在App的不同地方执行,但是同时只能承受一个任务执行的话,那么这个Scheduler就可以派上用场了。
关于轮询
- interval:创建一个按固定时间间隔发射整数序列的Observable,它是按照周期执行的
1,interval(0,5, TimeUnit.SECONDS)
第一个参数表示初始化延时多久开始请求,这里用0表示不延时直接请求
第二个参数表示间隔多久轮询一次,这里表示间隔5s
2,interval(5, TimeUnit.SECONDS)
其中的这个5就表示,初始延时5秒开始执行请求,轮询也是5秒
take:表示只取前n项。这里用take和interval操作符联合使用,限定次数轮询
条件轮询:利用RxJava的takeUntil操作符,当满足了什么条件就终止轮询。
嵌套请求:利用RxJava的flatMap操作符。往后面一个请求的参数是前面一个请求的结果,于是经常需要在前面一个请求的响应中去发送第二个请求,这就是我们所说的"请求嵌套"问题。
merge:将多个Observalbe发射的数据项,合并到一个Observable中再发射出去,可能会让合并的Observable发射的数据交错(concat操作符是连接不会出现交错),如果在合并的途中出现错误,就会立即将错误提交给订阅者,将终止合并后的Observable。
避免重复请求:利用Rxjavad的throttleFirst操作符。 会定期发射这个时间段里源Observable发射的第一个数据。
减少频繁的网络请求:利用Rxjavad的debounce操作符。
简单的理解就是:当N个结点发生的时间太靠近(即发生的时间差小于设定的值T),debounce就会自动过滤掉前N-1个结点。 例如:即时搜索,避免每输入(删除)一个字就做一次请求,500毫秒才让它去请求一次网络,这样可以避免数据混乱,也优了app性能。
merge和zip的区别到底是什么呢?
merge和zip都是将多个Observalbe发射的数据项,合并到一个Observable中再发射出去。只是在发射的结果上有所不同。例如有3个网络请求的Observalbe,zip是等待这3个请求都完成后才一起返回,既onNext调用1次。merge是3个Observalbe分别返回,而且无序,既onNext调用3次,相当于把3个本来分散的网络请求。写在同一个地方合并起来执行。
merge、mergeDelayError都是合并,但是需要注意二者区别。
merge合并的请求,如果有一个接口报错了,就立马报错,会终止整个流,另外的接口也不会请求。
mergeDelayError合并的请求,如果有一个接口报错了,会延迟错误处理,后面的接口会继续执行没有被中断。
Observable.flatMap() which does not preserve the order of the elements.


