RxJava应用

TODO 如果发生异常,OnError,Action1运行的线程所在????

TODO map 和 flatMap 的原理

TODO 与Retrofit的结合使用

TODO RxBinding,RxLifeCycle的应用

什么是响应式(Reactive):

应用开发中的例子:
搜索输入框,根据用户实时输入的数据,输入框下方会根据当前输入内容,实时刷新推荐信息,这种就叫做Reactive的能力。
类似这种能自动对外部环境的变化做出响应的系统,我们就称之为响应式系统(Reactive System),其中的外部环境,可以是输入信号的变化,事件的发生等等。通常这个响应式系统不但能响应外部环境的变化,还会根据自身内部状态通过某种方式反馈给外部观察者(推荐内容变化)。

什么是函数式编程(Functional Programming):

函数式编程介绍
介绍Callback形式和响应式区别
一篇不错的Java响应式导读
初级入门篇

有别于命令式编程,逻辑式编程的一种编程范式,是一种面向数学的抽象,将计算描述为一种表达式求值,而这个表达式就是函数,求值的过程称之为自变量的映射。也就是说一个函数的求值结果,仅取决于自变量和函数本身,不依赖其他状态变量(也就是闭包), 也不修改其他外部变量,没有副作用。自变量可以是常规的变量,也可以是另一个函数(高阶函数)。(当然还有一些其他属性和特征,不详细介绍了)

什么是响应式编程(Reactive Programming):

响应式编程是一种面向数据流和变化传播的编程范式。这意味着可以在编程语言中很方便地表达静态或动态的数据流,而相关的计算模型会自动将变化的值通过数据流进行传播。

函数响应式编程(函数式+响应式 —> 函数响应式 Functional Reactive Programming)

编程范式关系:

编程范式关系

  • 利用函数式编程的思想和方法(闭包思想,函数、高阶函数),来为响应式:事件流,变化传播所用。

监听状态变化Callback方式的弊端:

  1. 状态的修改可能分布很多地方,针对状态而作响应动作的地方,也就线性增长;
  2. 状态之间可能相互组合,相互影响,不集中管理,有可能出bug,而且很有可能出现未考虑的状态组合;
  3. 多重回调,难于调试和跟踪,回调之间关系不清晰;
  4. 尤其是在Android中,有UI操作,非UI操作之分,一些必须在UI中,一些一定不能在UI线程中去做。使用Callback难以实现线程的轻松切换

而在Java,Android中使用的函数响应式编程框架就是:RxJava,RxAndroid


RxJava的GitHub:函数响应式编程的主框架

RxAndroid的GitHub:在RxJava基础上针对Android的扩展,用的最多的就是UI线程的切换

英格力士不好的,可以看看这个翻译文档

(1) 什么是RxJava:

摘自GitHub上的一句话介绍:RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.(RxJava 是针对JVM的响应式编程思想的实现,一个使用可观测序列来组成异步基于事件的library)。

RxJava的设计思想,就是观察者模式(Observer Pattern):

Obserable:序列化产生事件的对象;Observer:观察者、订阅者,监听到变化后做出相应的动作。

Obserable和Observer通过subscribe()方法实现订阅(观察,监听)关系,从而在Obserable可以在发生变化的时候通知Observer,并且在完成通知任务之后,又追加了几个方法调用 — error的通知,complete的通知。

示例1:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("h");
subscriber.onNext("e");
subscriber.onNext("l");
subscriber.onNext("l");
subscriber.onCompleted();
}});//创造一个最基本的事件序列,在被订阅时,依次将预定事件发送给观察者。
Observer<String> observer = new Observer<String>() {
@Override
public void onCompleted() {
Log.e("liuyu", "Observer onCompleted"); }
@Override
public void onError(Throwable e) {
Log.e("liuyu", "Observer onError"); }
@Override
public void onNext(String o) {
Log.e("liuyu", "Observer onNext:"+o); }
};
observable.subscribe(observer);//发生订阅

说明:

  1. 只有订阅发生是(observable.subscribe(observer)),事件流才会启动

  2. 队列完结是,需要调用onCompleted(),作为事件完结的标志(只有在自定义事件源时才需要这么做)

  3. onComplete和onError是互斥事件

  4. onError():在事件队列发生异常时,会触发,同时队列自动终止,不允许再有事件发出

以上代码中,Observer定义了一套完整的监听事件,其中包括:每一次事件发生的监听onNext;发射源完成所有发射任务最后的事件:onComplete;如果在事件发射处理过程中有错误发生时:onError。

有时候订阅者(Observer)可能只关心以上三种事件中的几种,也可以定义一个不完整的监听,只定义自己感兴趣的事件,不关心的事件不需要定义。

例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//只对next事件感兴趣:
Observable.just("hello", "world").subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.e("liuyu", "Observer call:"+s); }
});
//只对next事件或者有可能发生的error感兴趣:
Observable.just("hello", "world").subscribe(
new Action1<String>() {//next
@Override
public void call(String s) {
Log.e("liuyu", "Observer call:"+s); }
},
new Action1<Throwable>() {//error
@Override
public void call(Throwable throwable) {
Log.e("liuyu", "Oops Error :"+throwable); }
});
//甚至于对什么都不感兴趣,只是想让被观察者的事件发生:
Observable.just("hello", "world").subscribe();

关于以上几种情况,源代码都有相应的重载方法原型:

1
2
3
4
5
6
7
public final Subscription subscribe()
public final Subscription subscribe(final Action1<? super T> onNext)
public final Subscription subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError)
public final Subscription subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError, final Action0 onCompleted)

可以看到,从不监听任何事件到监听所有事件,都有方法原型声明。null —>(next) —> (next, error) —> (next, error, complete);但是不能打破顺序跳跃式监听:例如一下:Obserable.subscribe(new Action1) 或者 Obserable.subscribe(new Action0 onCompleted)。而且如果事件队列中途发生错误,而你又没有注册响应的监听事件,就会抛出异常:OnErrorNotImplementedException。因此,缺省监听,最保险的方式是一定要注册Next事件和Error事件。

(2) RxJava常用操作符介绍:官网所列所有操作符(非常庞大)

TODO 常用操作符图文介绍
  1. create
  2. just/from
  3. map
  4. flatMap
  5. timer
  6. interval
  7. range
  8. concat
  9. merge/mergeDelayError
  10. startWith
  11. zip
  12. combineLatest/combineLatestDelayError(结合Reader同步更新书架实例)
  13. filter
  14. take/takeLast/takeLastBuffer/takeFirst
  15. ofType
  16. skip/skipLast
  17. elementAt
  18. distinct
  19. throttleFirst/throttleLast
  20. reduce
  21. collect
  22. count
  23. buffer
  24. toList/toSortedList/toMap

(3) RxJava中的线程切换

以上栗子中,没有涉及到线程切换的问题,因此都是按照默认的线程在运行:事件的发出和事件的接收在同一个线程中完成,两者所在线程由订阅动作(subscribe)发生的线程所决定。
RxJava的主要优势:1,避免了回调地狱;2,便利的线程切换。


因此,接下来就要开始最主要的特征了,事件流中的线程切换:Scheduler

还是上面的打印字符的事件栗子,这次主要关注Obserable和Observer的所处线程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
final Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("hello word");
Log.e("liuyu", "Observable onCompleted in Thread :"+ Thread.currentThread().getName());
subscriber.onCompleted();
}
});
final Observer<String> observer = new Observer<String>() {
@Override
public void onCompleted() {
Log.e("liuyu", "Observer receive onCompleted in Thread :"+ Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String o) {
Log.e("liuyu", "Observer receive onNext in Thread :"+ Thread.currentThread().getName());
}
@Override
public void onStart() {
//方便在开始一个订阅任务发生的开始时刻,添加一些额外的动作,比如开始一个网络请求之前,先显示一个loading的页面等等, 注意,在下面的subscribe代码分析中揭示了,此回调发生在subscribe调用所在的线程,因为要特别注意耗时与否
super.onStart();
}
};
observable
.subscribeOn(Schedulers.io()) //事件发出所在线程
.observeOn(AndroidSchedulers.mainThread()) //事件接收者所在线程
.subscribe(observer);

1
2
3
4
输出结果:
Observable onCompleted in Thread :Thread-2141
Observer receive onNext in Thread : main
Observer receive onCompleted in Thread :main

可以看到通过在订阅是通过方法:subscribeOn,observeOn指定了事件双方分属的线程。这里,我们主要关注的就是Schedulers(调度器),RxJava就是通过调度器来指定代码应该运行在什么样的线程中,RxJava中已经内置了几种常用的Scheduler:

调度器类型

  • AndroidSchedulers.mainThread():RxAndroid依赖库中使用最频繁的方法,指定运行在主线程(UI线程)中;

切记不能随意使用不正确的调度器,例如对响应敏感的操作,放在了io线程池中,如果当前io线程池中等待执行的任务较多时,造成该敏感任务无法得到及时响应。将本该放在io的耗时操作,放在了computation线程池中,造成固定size的computation线程池长时间被io操作所占用,无法响应其他事件。

Questions: The first 3 schedulers are pretty self explanatory; however, I’m a little confused about computation and io.
What exactly is “IO-bound work”? Is it used for dealing with streams (java.io) and files (java.nio.files)? Is it used for database queries? Is it used for downloading files or accessing REST APIs?
How is computation() different from newThread()? Is it that all computation() calls are on a single (background) thread instead of a new (background) thread each time?
Why is it bad to call computation() when doing IO work?
Why is it bad to call io() when doing computational work?

Great questions, I think the documentation could do with some more detail.
io() is backed by an unbounded thread-pool and is the sort of thing you’d use for non-computationally intensive tasks, that is stuff that doesn’t put much load on the CPU. So yep interaction with the file system, interaction with databases or services on a different host are good examples.
computation() is backed by a bounded thread-pool with size equal to the number of available processors. If you tried to schedule cpu intensive work in parallel across more than the available processors (say using newThread()) then you are up for thread creation overhead and context switching overhead as threads vie for a processor and it’s potentially a big performance hit.
It’s best to leave computation() for CPU intensive work only otherwise you won’t get good CPU utilization.
It’s bad to call io() for computational work for the reason discussed in 2. io() is unbounded and if you schedule a thousand computational tasks on io() in parallel then each of those thousand tasks will each have their own thread and be competing for CPU incurring context switching costs.

(3.1) 不包含线程切换的订阅发生逻辑:

在分析订阅者,被订阅者线程切换的逻辑之前,先将没有任何线程切换的原始版本订阅发生过程,撸一遍,在搞明白这个后,就可以循循渐进的搞明白,订阅者,被订阅者分别运行的线程是怎么切换的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//以下代码非完整版本,只保留主干。
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
// new Subscriber so onStart it
subscriber.onStart();
try {
// allow the hook to intercept and/or decorate,RxJavaHooks是用于hook 各种Observable生命周期行为的工具类,此处将其替换成真实的运行步骤
//RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
//return RxJavaHooks.onObservableReturn(subscriber);
observable.onSubscribe.call(subscriber);①
return (Subscription)subscriber;
} catch (Throwable e) {
if (subscriber.isUnsubscribed()) {
//RxJavaHooks.onError(RxJavaHooks.onObservableError(e));
observable.onSubscribe.callError(subscriber);//杜撰的方法调用
}
//return Subscriptions.unsubscribed();
return (Subscription)subscriber;
}
}

抛除各种关于Observer,Observable的包装,上述代码就是订阅发生之后,Observable emits items 的步骤,对照如下应用示例,大概就可以撸顺整个订阅发生的过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {②
subscriber.onNext("hello word");
subscriber.onCompleted();
}
});
Observer<String> observer = new Observer<String>() {
@Override
public void onCompleted() {④
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String o) {③
}
};
observable.subscribe(observer);

在Observable.onSubscribe的call回调中,参数为Subscriber,而我们定义的类型为Observer,这两者是什么关系呢?

Subscriber是Observer接口的实现类,除了实现Observer接口定义的onNext,onError,onComplete方法之外,提供额外的回调(例如onStart)和取消订阅(unsubscribe(),isUnsubscribed())等方法,
如果上述示例中,直接使用了Observer,而不是Subscriber,在底层,是会自动将其包装成Subscriber的:

1
2
3
4
5
6
7
8
9
public final Subscription subscribe(final Observer<? super T> observer) {
if (observer instanceof Subscriber) {
return subscribe((Subscriber<? super T>)observer);
}
if (observer == null) {
throw new NullPointerException("observer is null");
}
return subscribe(new ObserverSubscriber<T>(observer));
}

(3.2) 带线程切换的订阅动作:

  1. Observable.subscribeOn():
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    public class Obserable{}
    public final Observable<T> subscribeOn(Scheduler scheduler, boolean requestOn) {
    //TODO ScalarSynchronousObservable是什么???
    if (this instanceof ScalarSynchronousObservable) {
    return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
    }
    //因为OperatorSubscribeOn 实现了OnSubscribe接口,所以在原Observable被订阅,开始发射事件时,调用的就是OperatorSubscribeOn.call();
    return unsafeCreate(new OperatorSubscribeOn<T>(this, scheduler, requestOn));
    }
    public static <T> Observable<T> unsafeCreate(OnSubscribe<T> f) {
    return new Observable<T>(RxJavaHooks.onCreate(f));
    }
    public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
    try {
    ......
    RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
    return RxJavaHooks.onObservableReturn(subscriber);
    } catch (Throwable e) {
    ......
    subscriber.onError(RxJavaHooks.onObservableError(e));
    ......
    return Subscriptions.unsubscribed();
    }
    }
    }
    public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {
    final Scheduler scheduler;
    final Observable<T> source;
    final boolean requestOn;
    public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler, boolean requestOn) {
    this.scheduler = scheduler;
    this.source = source;
    this.requestOn = requestOn;
    }
    @Override
    public void call(final Subscriber<? super T> subscriber) {
    final Worker inner = scheduler.createWorker();
    SubscribeOnSubscriber<T> parent = new SubscribeOnSubscriber<T>(subscriber, requestOn, inner, source);
    //便于订阅事件的跟踪,取消等。可暂时不考虑逻辑
    subscriber.add(parent);
    subscriber.add(inner);
    //根据Schedulers产生不同的线程切换动作
    inner.schedule(parent);
    ++++++++++++++++++++++++++++++++++
    摘选一个Schedulers.newThread()的具体实现来看一下调用逻辑:CachedThreadSchedluer
    ......
    ......
    @Override
    public Subscription schedule(Action0 action) {
    return schedule(action, 0, null);
    }
    public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit, CompositeSubscription parent) {
    Action0 decoratedAction = RxJavaHooks.onScheduledAction(action);
    //对Action0的包装,在executor执行线程时,调用Action0的call方法
    ScheduledAction run = new ScheduledAction(decoratedAction, parent);
    parent.add(run);
    Future<?> f;
    if (delayTime <= 0) {
    //实际调用为SubscribeOnSubscriber.call(),此时已经完成线程切换
    f = executor.submit(run);
    } else {
    f = executor.schedule(run, delayTime, unit);
    }
    run.add(f);
    return run;
    }
    ......
    ......
    public final class ScheduledAction extends AtomicReference<Thread> implements Runnable, Subscription {
    final SubscriptionList cancel;
    final Action0 action;
    public ScheduledAction(Action0 action, SubscriptionList parent) {
    this.action = action;
    this.cancel = new SubscriptionList(new Remover2(this, parent));
    }
    @Override
    public void run() {
    try {
    ...
    ...
    action.call();
    } catch (Throwable e) {
    ...
    ...
    } finally {
    unsubscribe();
    }
    }
    }
    }
    ++++++++++++++++++++++++++++++++++
    }
    static final class SubscribeOnSubscriber<T> extends Subscriber<T> implements Action0 {
    final Subscriber<? super T> actual;
    final boolean requestOn;
    final Worker worker;
    Observable<T> source;
    Thread t;
    SubscribeOnSubscriber(Subscriber<? super T> actual, boolean requestOn, Worker worker, Observable<T> source) {
    this.actual = actual;
    this.requestOn = requestOn;
    this.worker = worker;
    this.source = source;
    }
    @Override
    public void onNext(T t) {
    actual.onNext(t);
    }
    @Override
    public void onError(Throwable e) {
    try {
    actual.onError(e);
    } finally {
    worker.unsubscribe();
    }
    }
    @Override
    public void onCompleted() {
    try {
    actual.onCompleted();
    } finally {
    worker.unsubscribe();
    }
    }
    //inner.schedule(parent)切换之后的回调方法
    @Override
    public void call() {//注意:此时已经切换为Worker指定的线程了
    Observable<T> src = source;
    source = null;
    t = Thread.currentThread();
    //原Observable将items发射给当前这个Subscriber(SubscribeOnSubscriber),然后actual(Subscriber)再去将事件进行下一步的分发。
    src.unsafeSubscribe(this);
    }
    。。。。。。
    }
    }
    如同上述例子中的Observable.create(OnSubscribe), unsafeCreate方法有产生了一个Observable,并且订阅的新事件为OnSubscribe( = OperatorSubscribeOn)

至此,调用链为:

  1. Observable.create():创建了Observable-1和OnSubscribe-1;
  2. subscribeOn():创建了Observable-2和OperatorSubscribeOn(OnSubscribe-2),同时,OnSubscribe-2中保存了Observable-1的引用。
  3. Obserable-2.subscribe(Observer):调用了OnSubscribe-2的call方法,在call方法中,通过Schedulers发生了线程切换,线程完成切换之后,调用了Observale-1.unsafeSubscribe(Obserable-2内部实现Subscriber),此时OnSubscribe-1的call方法被调用,并且发生在Obsubscribe-2.SubscribeOnSubscriber.call所在的线程,此时就完成了源Observable.OnSubscribe发射事件的线程切换。

如果多个subscribeOn链接,那么此时,Obserable.OnSubscribe究竟发生在哪个线程呢???

1
2
3
4
5
6
7
8
Observable.just("ss")
.subscribeOn(Schedulers.io()) // --thread-1---
.subscribeOn(Schedulers.newThread()) //--thread-2----
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
}
});

下面来分析一下这个例子中的线程切换流程:

1,Obserable.just() : 产生Observable-1, OnSubscribe-1

2,Observable-1.subscribeOn(Schedulers.io()) : 产生了Obserable-2, OperatorSubscribeOn-2,并在OperatorSubscribeOn-2中保存了Obserable-1的引用。

3,了Obserable-2..subscribeOn(Schedulers.newThread()):创建Observable-3, OperatorSubscribeOn-3并在OperatorSubscribeOn-3保存Observable-2的引用。

4,Obserable-3.subscribe():
    调用OperatorSubscribeOn_3.call(),改变线程为Schedulers.newThread()。
    调用OperatorSubscribeOn_2.call(),改变线程为Schedulers.io()。
    调用OnSubscribe_1.call(),此时call()运行在Schedulers.io()。

根据以上逻辑分析,会按照thread-1的线程进行执行。

总结:subscribeOn的调用,会改变调用前事件运行的线程,多次调用subscribeOn,只有头部的subscribeOn起作用。
RxJava.subscribeOn线程切换逻辑

  1. Observable.observeOn():
1
2
3
4
5
6
7
8
9
示例:
Observable.just("ss")
.observeOn(Schedulers.io())
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
}
});

接下开始分析observeOn

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
public class Obserable{
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}
}
//首先介绍一下一个关键的操作符,作为事件流转换的基石,由此可以延伸出好多功能的操作符
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return unsafeCreate(new OnSubscribeLift<T, R>(onSubscribe, operator));
}
public final class OnSubscribeLift<T, R> implements OnSubscribe<R> {
final OnSubscribe<T> parent;
final Operator<? extends R, ? super T> operator;
public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {
this.parent = parent;
this.operator = operator;
}
@Override
public void call(Subscriber<? super R> o) {
try {
Subscriber<? super T> st = operator.call(o);
...
st.onStart();
parent.call(st);
} catch (Throwable e) {
...
...
}
}
//懒得自己写了,摘抄一段别人的吧
/**它生成了一个新的 Observable 并返回,而且创建新 Observable 所用的参数 OnSubscribe 的回调方法 call() 中的实现竟然看起来和前面讲过的 Observable.subscribe() 一样!然而它们并不一样哟~不一样的地方关键就在于第二行 onSubscribe.call(subscriber) 中的 onSubscribe 所指代的对象不同
//subscribe() 中这句话的 onSubscribe 指的是 Observable 中的 onSubscribe 对象,这个没有问题,但是 lift() 之后的情况就复杂了点。
//当含有 lift() 时:
1. lift() 创建了一个 Observable 后,加上之前的原始 Observable,已经有两个 Observable 了;
2. 而同样地,新 Observable 里的新 OnSubscribe 加上之前的原始 Observable 中的原始 OnSubscribe,也就有了两个 OnSubscribe;
3. 当用户调用经过 lift() 后的 Observable 的 subscribe() 的时候,使用的是 lift() 所返回的新的 Observable ,于是它所触发的 onSubscribe.call(subscriber),也是用的新 Observable 中的新 OnSubscribe,即在 lift() 中生成的那个 OnSubscribe;
4. 而这个新 OnSubscribe 的 call() 方法中的 onSubscribe ,就是指的原始 Observable 中的原始 OnSubscribe ,在这个 call() 方法里,新 OnSubscribe 利用 operator.call(subscriber) 生成了一个新的 Subscriber(Operator 就是在这里,通过自己的 call() 方法将新 Subscriber 和原始 Subscriber 进行关联,并插入自己的『变换』代码以实现变换),然后利用这个新 Subscriber 向原始 Observable 进行订阅。
这样就实现了 lift() 过程,有点像一种代理机制,通过事件拦截和处理实现事件序列的变换。
精简掉细节的话,也可以这么说:在 Observable 执行了 lift(Operator) 方法之后,会返回一个新的 Observable,这个新的 Observable 会像一个代理一样,负责接收原始的 Observable 发出的事件,并在处理后发送给 Subscriber。
**/
}
public final class OperatorObserveOn<T> implements Operator<T, T> {
...
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
...
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
parent.init();
return parent;
...
}
static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {
...
void init() {
Subscriber<? super T> localChild = child;
localChild.setProducer(new Producer() {
@Override
public void request(long n) {
if (n > 0L) {
BackpressureUtils.getAndAddRequest(requested, n);
schedule();
}
}
});
localChild.add(recursiveScheduler);
localChild.add(this);
}
@Override
public void onNext(final T t) {
...
schedule();
}
@Override
public void onCompleted() {
...
finished = true;
schedule();
}
@Override
public void onError(final Throwable e) {
...
finished = true;
schedule();
}
protected void schedule() {
if (counter.getAndIncrement() == 0) {
recursiveScheduler.schedule(this);
}
}
// only execute this from schedule()
@Override
public void call() {
final Queue<Object> q = this.queue;
final Subscriber<? super T> localChild = this.child;
for (;;) {
long requestAmount = requested.get();
while (requestAmount != currentEmission) {
boolean done = finished;
Object v = q.poll();
boolean empty = v == null;
if (checkTerminated(done, empty, localChild, q)) {
return;
}
if (empty) {
break;
}
localChild.onNext(NotificationLite.<T>getValue(v));
...
}
}
//完成或者异常时的退出
boolean checkTerminated(boolean done, boolean isEmpty, Subscriber<? super T> a, Queue<Object> q) {
if (a.isUnsubscribed()) {
q.clear();
return true;
}
if (done) {
...
Throwable e = error;
try {
if (e != null) {
a.onError(e);
} else {
a.onCompleted();
}
} finally {
recursiveScheduler.unsubscribe();
}
...
}
return false;
}
}
}

至此,调用链为:

  1. Obserable.create() : 创建了Observable-1 ,OnSubscriber-1;
  2. observeOn : 创建了Obserable-2,OnSubscriber-2(OnSubcribeLift)持有OnSubscriber-1的引用,和一个线程变换动作OperatorObserverOn;
  3. subscribe(Subscriber-final) :

    ① OperatorObserveOn完成线程切换的准备工作,并持有了最终的Suscriber-final(缓存起来),并以此构造了一个Obserable-1需要使用的Subscriber-temp;

    ② Obserable-1调用call(Subscriber)方法,实际是调用了①中创建的Subscriber-temp,如上,Subsriber-temp中的onNext,onComplete,onError,都没有直接调用真实的Subsriber-final对应的方法回调(废话,这样就相当于倒手传递了一下发射的事件,什么都没有做),而是使用调度方法schedule,触发线程切换动作。

    ③ Subscriber-temp 在线程切换之后,回调call方法中,取出①中缓存的Suscriber-final,调用Subscriber-final的onNext,onComplete,onError。

subscribeOn() 和 observeOn() 都做了线程切换的工作(图中的 “schedule…” 部位)。不同的是, subscribeOn() 的线程切换发生在 OnSubscribe 中,即在它通知上一级 OnSubscribe 时,这时事件还没有开始发送,因此 subscribeOn() 的线程控制可以从事件发出的开端就造成影响;而 observeOn() 的线程切换则发生在它内建的 Subscriber 中,即发生在它即将给下一级 Subscriber 发送事件时,因此 observeOn() 控制的是它后面的线程。

TODO observeOn的流程图

懒得画了。。。

(3.3)关于调度器线程切换的实现

(3.4)subscribeOn、observeOn

综上,subscribeOn关注的是OnSubscriber,observeOn关注的是subscriber.onNext()。

subcribeOn和observeOn 对比分析

1
2
3
4
5
6
7
8
9
10
11
Observable
.map // 操作1
.flatMap // 操作2
.subscribeOn(io)
.map //操作3
.flatMap //操作4
.observeOn(main)
.map //操作5
.flatMap //操作6
.subscribeOn(io) //!!特别注意
.subscribe(handleData)

有如上逻辑,则我们对其运行进行分析。

首先,我们需要先明白其内部执行的逻辑。

在调用subscribe之后,逻辑开始运行。分别调用每一步OnSubscribe.call(),注意:自下往上。当运行到最上,即Observable.create()后,我们在其中调用了subscriber.onNext(),于是程序开始自上往下执行每一个对象的subscriber.onNext()方法。最终,直到subscribe()中的回调。

其次,从上面对subscribeOn()和observeOn()的分析中可以明白,subscribeOn()是在call()方法中起作用,而observeOn()是在onNext()中作用。

那么对于以上的逻辑,我们可以得出如下结论:

操作1,2,3,4在io线程中,因为在如果没有observeOn()影响,他们的回调操作默认在订阅的线程中。而我们的订阅线程在subscribeOn(io)发生了改变。注意他们执行的先后顺序。
操作5,6在main线程中运行。因为observeOn()改变了onNext().
特别注意那一个逻辑没起到作用
再简单点总结就是

subscribeOn的调用切换之前的线程。
observeOn的调用切换之后的线程。
observeOn之后,不可再调用subscribeOn 切换线程
复杂情况

我们经常多次使用subscribeOn切换线程,那么以后是否可以组合observeOn和subscribeOn达到自由切换的目的呢?

组合是可以的,但是他们的执行顺序是有条件的,如果仔细分析的话,可以知道observeOn调用之后,再调用subscribeOn是无效的,原因是什么?

因为subscribeOn改变的是subscribe这句调用所在的线程,大多数情况,产生内容和消费内容是在同一线程的,所以改变了产生内容所在的线程,就改变了消费内容所在的线程。

经过上面的阐述,我们知道,observeOn的工作原理是把消费结果先缓存,再切换到新线程上让原始消费者消费,它和生产者是没有一点关系的,就算subscribeOn调用了,也只是改变observeOn这个消费者所在的线程,和OperatorObserveOn中存储的原始消费者一点关系都没有,它还是由observeOn控制。

下面提到的“操作”包括产生事件、用操作符操作事件以及最终的通过 subscriber 消费事件;

  1. 只有第一subscribeOn() 起作用(所以多个 subscribeOn() 无意义;
  2. 这个 subscribeOn() 控制从流程开始的第一个操作,直到遇到第一个 observeOn();
  3. observeOn() 可以使用多次,每个 observeOn() 将导致一次线程切换(),这次切换开始于这次 observeOn() 的下一个操作;
    不论是 subscribeOn() 还是 observeOn(),每次线程切换如果不受到下一个 observeOn() 的干预,线程将不再改变,不会自动切换到其他线程。

3. 发生错误时的逻辑处理