TODO 如果发生异常,OnError,Action1运行的线程所在????
TODO map 和 flatMap 的原理
TODO 与Retrofit的结合使用
TODO RxBinding,RxLifeCycle的应用
什么是响应式(Reactive):
应用开发中的例子:
搜索输入框,根据用户实时输入的数据,输入框下方会根据当前输入内容,实时刷新推荐信息,这种就叫做Reactive的能力。
类似这种能自动对外部环境的变化做出响应的系统,我们就称之为响应式系统(Reactive System),其中的外部环境,可以是输入信号的变化,事件的发生等等。通常这个响应式系统不但能响应外部环境的变化,还会根据自身内部状态通过某种方式反馈给外部观察者(推荐内容变化)。
什么是函数式编程(Functional Programming):
函数式编程介绍
介绍Callback形式和响应式区别
一篇不错的Java响应式导读
初级入门篇
有别于命令式编程,逻辑式编程的一种编程范式,是一种面向数学的抽象,将计算描述为一种表达式求值,而这个表达式就是函数,求值的过程称之为自变量的映射。也就是说一个函数的求值结果,仅取决于自变量和函数本身,不依赖其他状态变量(也就是闭包), 也不修改其他外部变量,没有副作用。自变量可以是常规的变量,也可以是另一个函数(高阶函数)。(当然还有一些其他属性和特征,不详细介绍了)
什么是响应式编程(Reactive Programming):
响应式编程是一种面向数据流和变化传播的编程范式。这意味着可以在编程语言中很方便地表达静态或动态的数据流,而相关的计算模型会自动将变化的值通过数据流进行传播。
函数响应式编程(函数式+响应式 —> 函数响应式 Functional Reactive Programming)
编程范式关系:
- 利用函数式编程的思想和方法(闭包思想,函数、高阶函数),来为响应式:事件流,变化传播所用。
监听状态变化Callback方式的弊端:
- 状态的修改可能分布很多地方,针对状态而作响应动作的地方,也就线性增长;
- 状态之间可能相互组合,相互影响,不集中管理,有可能出bug,而且很有可能出现未考虑的状态组合;
- 多重回调,难于调试和跟踪,回调之间关系不清晰;
- 尤其是在Android中,有UI操作,非UI操作之分,一些必须在UI中,一些一定不能在UI线程中去做。使用Callback难以实现线程的轻松切换
而在Java,Android中使用的函数响应式编程框架就是:RxJava,RxAndroid
RxJava的GitHub:函数响应式编程的主框架
RxAndroid的GitHub:在RxJava基础上针对Android的扩展,用的最多的就是UI线程的切换
(1) 什么是RxJava:
摘自GitHub上的一句话介绍:RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.(RxJava 是针对JVM的响应式编程思想的实现,一个使用可观测序列来组成异步和基于事件的library)。
RxJava的设计思想,就是观察者模式(Observer Pattern):
Obserable:序列化产生事件的对象;Observer:观察者、订阅者,监听到变化后做出相应的动作。
Obserable和Observer通过subscribe()方法实现订阅(观察,监听)关系,从而在Obserable可以在发生变化的时候通知Observer,并且在完成通知任务之后,又追加了几个方法调用 — error的通知,complete的通知。
示例1:
|
|
说明:
只有订阅发生是(observable.subscribe(observer)),事件流才会启动
队列完结是,需要调用onCompleted(),作为事件完结的标志(只有在自定义事件源时才需要这么做)
onComplete和onError是互斥事件
onError():在事件队列发生异常时,会触发,同时队列自动终止,不允许再有事件发出
以上代码中,Observer定义了一套完整的监听事件,其中包括:每一次事件发生的监听onNext;发射源完成所有发射任务最后的事件:onComplete;如果在事件发射处理过程中有错误发生时:onError。
有时候订阅者(Observer)可能只关心以上三种事件中的几种,也可以定义一个不完整的监听,只定义自己感兴趣的事件,不关心的事件不需要定义。
例如:
关于以上几种情况,源代码都有相应的重载方法原型:
可以看到,从不监听任何事件到监听所有事件,都有方法原型声明。null —>(next) —> (next, error) —> (next, error, complete);但是不能打破顺序跳跃式监听:例如一下:Obserable.subscribe(new Action1
) 或者 Obserable.subscribe(new Action0 onCompleted)。而且如果事件队列中途发生错误,而你又没有注册响应的监听事件,就会抛出异常:OnErrorNotImplementedException。因此,缺省监听,最保险的方式是一定要注册Next事件和Error事件。
(2) RxJava常用操作符介绍:官网所列所有操作符(非常庞大)
TODO 常用操作符图文介绍
- create
- just/from
- map
- flatMap
- timer
- interval
- range
- concat
- merge/mergeDelayError
- startWith
- zip
- combineLatest/combineLatestDelayError(结合Reader同步更新书架实例)
- filter
- take/takeLast/takeLastBuffer/takeFirst
- ofType
- skip/skipLast
- elementAt
- distinct
- throttleFirst/throttleLast
- reduce
- collect
- count
- buffer
- toList/toSortedList/toMap
(3) RxJava中的线程切换
以上栗子中,没有涉及到线程切换的问题,因此都是按照默认的线程在运行:事件的发出和事件的接收在同一个线程中完成,两者所在线程由订阅动作(subscribe)发生的线程所决定。
RxJava的主要优势:1,避免了回调地狱;2,便利的线程切换。
因此,接下来就要开始最主要的特征了,事件流中的线程切换:Scheduler
还是上面的打印字符的事件栗子,这次主要关注Obserable和Observer的所处线程:
|
|
可以看到通过在订阅是通过方法:subscribeOn,observeOn指定了事件双方分属的线程。这里,我们主要关注的就是Schedulers(调度器),RxJava就是通过调度器来指定代码应该运行在什么样的线程中,RxJava中已经内置了几种常用的Scheduler:
- AndroidSchedulers.mainThread():RxAndroid依赖库中使用最频繁的方法,指定运行在主线程(UI线程)中;
切记不能随意使用不正确的调度器,例如对响应敏感的操作,放在了io线程池中,如果当前io线程池中等待执行的任务较多时,造成该敏感任务无法得到及时响应。将本该放在io的耗时操作,放在了computation线程池中,造成固定size的computation线程池长时间被io操作所占用,无法响应其他事件。
Questions: The first 3 schedulers are pretty self explanatory; however, I’m a little confused about computation and io.
What exactly is “IO-bound work”? Is it used for dealing with streams (java.io) and files (java.nio.files)? Is it used for database queries? Is it used for downloading files or accessing REST APIs?
How is computation() different from newThread()? Is it that all computation() calls are on a single (background) thread instead of a new (background) thread each time?
Why is it bad to call computation() when doing IO work?
Why is it bad to call io() when doing computational work?Great questions, I think the documentation could do with some more detail.
io() is backed by an unbounded thread-pool and is the sort of thing you’d use for non-computationally intensive tasks, that is stuff that doesn’t put much load on the CPU. So yep interaction with the file system, interaction with databases or services on a different host are good examples.
computation() is backed by a bounded thread-pool with size equal to the number of available processors. If you tried to schedule cpu intensive work in parallel across more than the available processors (say using newThread()) then you are up for thread creation overhead and context switching overhead as threads vie for a processor and it’s potentially a big performance hit.
It’s best to leave computation() for CPU intensive work only otherwise you won’t get good CPU utilization.
It’s bad to call io() for computational work for the reason discussed in 2. io() is unbounded and if you schedule a thousand computational tasks on io() in parallel then each of those thousand tasks will each have their own thread and be competing for CPU incurring context switching costs.
(3.1) 不包含线程切换的订阅发生逻辑:
在分析订阅者,被订阅者线程切换的逻辑之前,先将没有任何线程切换的原始版本订阅发生过程,撸一遍,在搞明白这个后,就可以循循渐进的搞明白,订阅者,被订阅者分别运行的线程是怎么切换的。
|
|
抛除各种关于Observer,Observable的包装,上述代码就是订阅发生之后,Observable emits items 的步骤,对照如下应用示例,大概就可以撸顺整个订阅发生的过程:
在Observable.onSubscribe的call回调中,参数为Subscriber,而我们定义的类型为Observer,这两者是什么关系呢?
Subscriber是Observer接口的实现类,除了实现Observer接口定义的onNext,onError,onComplete方法之外,提供额外的回调(例如onStart)和取消订阅(unsubscribe(),isUnsubscribed())等方法,
如果上述示例中,直接使用了Observer,而不是Subscriber,在底层,是会自动将其包装成Subscriber的:
(3.2) 带线程切换的订阅动作:
- Observable.subscribeOn():123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161public class Obserable{}public final Observable<T> subscribeOn(Scheduler scheduler, boolean requestOn) {//TODO ScalarSynchronousObservable是什么???if (this instanceof ScalarSynchronousObservable) {return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);}//因为OperatorSubscribeOn 实现了OnSubscribe接口,所以在原Observable被订阅,开始发射事件时,调用的就是OperatorSubscribeOn.call();return unsafeCreate(new OperatorSubscribeOn<T>(this, scheduler, requestOn));}public static <T> Observable<T> unsafeCreate(OnSubscribe<T> f) {return new Observable<T>(RxJavaHooks.onCreate(f));}public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {try {......RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);return RxJavaHooks.onObservableReturn(subscriber);} catch (Throwable e) {......subscriber.onError(RxJavaHooks.onObservableError(e));......return Subscriptions.unsubscribed();}}}public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {final Scheduler scheduler;final Observable<T> source;final boolean requestOn;public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler, boolean requestOn) {this.scheduler = scheduler;this.source = source;this.requestOn = requestOn;}public void call(final Subscriber<? super T> subscriber) {final Worker inner = scheduler.createWorker();SubscribeOnSubscriber<T> parent = new SubscribeOnSubscriber<T>(subscriber, requestOn, inner, source);//便于订阅事件的跟踪,取消等。可暂时不考虑逻辑subscriber.add(parent);subscriber.add(inner);//根据Schedulers产生不同的线程切换动作inner.schedule(parent);++++++++++++++++++++++++++++++++++摘选一个Schedulers.newThread()的具体实现来看一下调用逻辑:CachedThreadSchedluer............public Subscription schedule(Action0 action) {return schedule(action, 0, null);}public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit, CompositeSubscription parent) {Action0 decoratedAction = RxJavaHooks.onScheduledAction(action);//对Action0的包装,在executor执行线程时,调用Action0的call方法ScheduledAction run = new ScheduledAction(decoratedAction, parent);parent.add(run);Future<?> f;if (delayTime <= 0) {//实际调用为SubscribeOnSubscriber.call(),此时已经完成线程切换f = executor.submit(run);} else {f = executor.schedule(run, delayTime, unit);}run.add(f);return run;}............public final class ScheduledAction extends AtomicReference<Thread> implements Runnable, Subscription {final SubscriptionList cancel;final Action0 action;public ScheduledAction(Action0 action, SubscriptionList parent) {this.action = action;this.cancel = new SubscriptionList(new Remover2(this, parent));}public void run() {try {......action.call();} catch (Throwable e) {......} finally {unsubscribe();}}}}++++++++++++++++++++++++++++++++++}static final class SubscribeOnSubscriber<T> extends Subscriber<T> implements Action0 {final Subscriber<? super T> actual;final boolean requestOn;final Worker worker;Observable<T> source;Thread t;SubscribeOnSubscriber(Subscriber<? super T> actual, boolean requestOn, Worker worker, Observable<T> source) {this.actual = actual;this.requestOn = requestOn;this.worker = worker;this.source = source;}public void onNext(T t) {actual.onNext(t);}public void onError(Throwable e) {try {actual.onError(e);} finally {worker.unsubscribe();}}public void onCompleted() {try {actual.onCompleted();} finally {worker.unsubscribe();}}//inner.schedule(parent)切换之后的回调方法public void call() {//注意:此时已经切换为Worker指定的线程了Observable<T> src = source;source = null;t = Thread.currentThread();//原Observable将items发射给当前这个Subscriber(SubscribeOnSubscriber),然后actual(Subscriber)再去将事件进行下一步的分发。src.unsafeSubscribe(this);}。。。。。。}}如同上述例子中的Observable.create(OnSubscribe), unsafeCreate方法有产生了一个Observable,并且订阅的新事件为OnSubscribe( = OperatorSubscribeOn)
至此,调用链为:
- Observable.create():创建了Observable-1和OnSubscribe-1;
- subscribeOn():创建了Observable-2和OperatorSubscribeOn(OnSubscribe-2),同时,OnSubscribe-2中保存了Observable-1的引用。
- Obserable-2.subscribe(Observer):调用了OnSubscribe-2的call方法,在call方法中,通过Schedulers发生了线程切换,线程完成切换之后,调用了Observale-1.unsafeSubscribe(Obserable-2内部实现Subscriber),此时OnSubscribe-1的call方法被调用,并且发生在Obsubscribe-2.SubscribeOnSubscriber.call所在的线程,此时就完成了源Observable.OnSubscribe发射事件的线程切换。
如果多个subscribeOn链接,那么此时,Obserable.OnSubscribe究竟发生在哪个线程呢???
|
|
下面来分析一下这个例子中的线程切换流程:
1,Obserable.just() : 产生Observable-1, OnSubscribe-1
2,Observable-1.subscribeOn(Schedulers.io()) : 产生了Obserable-2, OperatorSubscribeOn-2,并在OperatorSubscribeOn-2中保存了Obserable-1的引用。
3,了Obserable-2..subscribeOn(Schedulers.newThread()):创建Observable-3, OperatorSubscribeOn-3并在OperatorSubscribeOn-3保存Observable-2的引用。
4,Obserable-3.subscribe():
调用OperatorSubscribeOn_3.call(),改变线程为Schedulers.newThread()。
调用OperatorSubscribeOn_2.call(),改变线程为Schedulers.io()。
调用OnSubscribe_1.call(),此时call()运行在Schedulers.io()。
根据以上逻辑分析,会按照thread-1的线程进行执行。
总结:subscribeOn的调用,会改变调用前事件运行的线程,多次调用subscribeOn,只有头部的subscribeOn起作用。
- Observable.observeOn():
|
|
接下开始分析observeOn
至此,调用链为:
- Obserable.create() : 创建了Observable-1 ,OnSubscriber-1;
- observeOn : 创建了Obserable-2,OnSubscriber-2(OnSubcribeLift)持有OnSubscriber-1的引用,和一个线程变换动作OperatorObserverOn;
subscribe(Subscriber-final) :
① OperatorObserveOn完成线程切换的准备工作,并持有了最终的Suscriber-final(缓存起来),并以此构造了一个Obserable-1需要使用的Subscriber-temp;
② Obserable-1调用call(Subscriber)方法,实际是调用了①中创建的Subscriber-temp,如上,Subsriber-temp中的onNext,onComplete,onError,都没有直接调用真实的Subsriber-final对应的方法回调(废话,这样就相当于倒手传递了一下发射的事件,什么都没有做),而是使用调度方法schedule,触发线程切换动作。
③ Subscriber-temp 在线程切换之后,回调call方法中,取出①中缓存的Suscriber-final,调用Subscriber-final的onNext,onComplete,onError。
subscribeOn() 和 observeOn() 都做了线程切换的工作(图中的 “schedule…” 部位)。不同的是, subscribeOn() 的线程切换发生在 OnSubscribe 中,即在它通知上一级 OnSubscribe 时,这时事件还没有开始发送,因此 subscribeOn() 的线程控制可以从事件发出的开端就造成影响;而 observeOn() 的线程切换则发生在它内建的 Subscriber 中,即发生在它即将给下一级 Subscriber 发送事件时,因此 observeOn() 控制的是它后面的线程。
TODO observeOn的流程图
懒得画了。。。
(3.3)关于调度器线程切换的实现
(3.4)subscribeOn、observeOn
综上,subscribeOn关注的是OnSubscriber,observeOn关注的是subscriber.onNext()。
subcribeOn和observeOn 对比分析
有如上逻辑,则我们对其运行进行分析。
首先,我们需要先明白其内部执行的逻辑。
在调用subscribe之后,逻辑开始运行。分别调用每一步OnSubscribe.call(),注意:自下往上。当运行到最上,即Observable.create()后,我们在其中调用了subscriber.onNext(),于是程序开始自上往下执行每一个对象的subscriber.onNext()方法。最终,直到subscribe()中的回调。
其次,从上面对subscribeOn()和observeOn()的分析中可以明白,subscribeOn()是在call()方法中起作用,而observeOn()是在onNext()中作用。
那么对于以上的逻辑,我们可以得出如下结论:
操作1,2,3,4在io线程中,因为在如果没有observeOn()影响,他们的回调操作默认在订阅的线程中。而我们的订阅线程在subscribeOn(io)发生了改变。注意他们执行的先后顺序。
操作5,6在main线程中运行。因为observeOn()改变了onNext().
特别注意那一个逻辑没起到作用
再简单点总结就是
subscribeOn的调用切换之前的线程。
observeOn的调用切换之后的线程。
observeOn之后,不可再调用subscribeOn 切换线程
复杂情况
我们经常多次使用subscribeOn切换线程,那么以后是否可以组合observeOn和subscribeOn达到自由切换的目的呢?
组合是可以的,但是他们的执行顺序是有条件的,如果仔细分析的话,可以知道observeOn调用之后,再调用subscribeOn是无效的,原因是什么?
因为subscribeOn改变的是subscribe这句调用所在的线程,大多数情况,产生内容和消费内容是在同一线程的,所以改变了产生内容所在的线程,就改变了消费内容所在的线程。
经过上面的阐述,我们知道,observeOn的工作原理是把消费结果先缓存,再切换到新线程上让原始消费者消费,它和生产者是没有一点关系的,就算subscribeOn调用了,也只是改变observeOn这个消费者所在的线程,和OperatorObserveOn中存储的原始消费者一点关系都没有,它还是由observeOn控制。
下面提到的“操作”包括产生事件、用操作符操作事件以及最终的通过 subscriber 消费事件;
- 只有第一subscribeOn() 起作用(所以多个 subscribeOn() 无意义;
- 这个 subscribeOn() 控制从流程开始的第一个操作,直到遇到第一个 observeOn();
- observeOn() 可以使用多次,每个 observeOn() 将导致一次线程切换(),这次切换开始于这次 observeOn() 的下一个操作;
不论是 subscribeOn() 还是 observeOn(),每次线程切换如果不受到下一个 observeOn() 的干预,线程将不再改变,不会自动切换到其他线程。
3. 发生错误时的逻辑处理
略