Skip to content

Commit 4d8647c

Browse files
RxJava
1 parent 8472159 commit 4d8647c

15 files changed

Lines changed: 1379 additions & 0 deletions
Lines changed: 296 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,296 @@
1+
> 本文由 [简悦 SimpRead](http://ksria.com/simpread/) 转码, 原文地址 [mthli.xyz](https://mthli.xyz/rxjava-scheduler/)
2+
3+
> 通过设置不同的调度器,可以灵活地在不同线程间切换。
4+
5+
RxJava 在链式调用的设计基础上,通过设置不同的调度器,可以灵活地在不同线程间切换并执行对应的 Task。让我们一起来了解一下这种切换模式是如何实现的。
6+
7+
Scheduler
8+
---------
9+
10+
[Scheduler](http://reactivex.io/RxJava/javadoc/io/reactivex/Scheduler.html) 是所有 RxJava 调度器的抽象父类,子类需要复写其 `createWorker()` 返回一个 [Worker](http://reactivex.io/RxJava/javadoc/io/reactivex/Scheduler.Worker.html) 实例,用来接受并执行 Task;同时也可以复写其 `scheduleDirect()` 来决定如何将 Task 分配给不同的 Worker。一个缩略版的 Scheduler 源码如下:
11+
12+
```java
13+
public abstract class Scheduler {
14+
...
15+
16+
@NonNull
17+
public abstract Worker createWorker();
18+
19+
// 调度一次定时 Task,细节封装在传入的 Runnable 里
20+
@NonNull
21+
public Disposable scheduleDirect(
22+
@NonNull Runnable run, long delay, @NonNull TimeUnit unit
23+
) {
24+
// 新建一个 Worker
25+
final Worker w = createWorker();
26+
27+
// 静态代理并封装我们想要执行的 Runnable,具体实现可忽略
28+
final Runnable decoratedRun
29+
= RxJavaPlugins.onSchedule(run);
30+
DisposeTask task = new DisposeTask(decoratedRun, w);
31+
32+
// 将 Task 交给新建的 Worker 执行
33+
w.schedule(task, delay, unit);
34+
return task;
35+
}
36+
37+
// 同时 Worker 也是一个抽象类
38+
public abstract static class Worker implements Disposable {
39+
...
40+
41+
// 执行被分配的定时 Task;
42+
// 注意,Worker 内部也可以维护一个自己的 Task 调度策略
43+
@NonNull
44+
public abstract Disposable schedule(
45+
@NonNull Runnable run, long delay,
46+
@NonNull TimeUnit unit);
47+
}
48+
}
49+
```
50+
51+
总的来说,Scheduler 的默认实现为:只要有新 Task 到来,就新建一个 Worker 实例并将 Task 分配给它;同时 Worker 内部也可以维护一个自己的 Task 调度策略。
52+
53+
newThread
54+
---------
55+
56+
RxJava 的 [newThread](http://reactivex.io/RxJava/javadoc/io/reactivex/schedulers/Schedulers.html#newThread--) 调度器对每一个新 Task 都会新起一个线程去执行它。我们以 newThread 为例,看看一个最简单的 Scheduler 是怎样实现的。
57+
58+
我们平时使用的 `Schedulers.newThread()` 是一个返回 NewThreadScheduler 实例的单例模式。以下是 NewThreadScheduler 对应的源码:
59+
60+
```java
61+
public final class NewThreadScheduler extends Scheduler {
62+
final ThreadFactory threadFactory;
63+
64+
...
65+
66+
@NonNull
67+
@Override
68+
public Worker createWorker() {
69+
return new NewThreadWorker(threadFactory); }
70+
}
71+
```
72+
73+
可以看到,NewThreadScheduler 没有复写 `scheduleDirect()` 的默认行为,即「只要有新 Task 到来,就新建一个 Worker 实例并将 Task 分配给它」;它仅仅是复写了 `createWorker()` 返回了一个具体的 NewThreadWorker 实例。
74+
75+
我们再来看看 NewThreadWorker 对应的源码:
76+
77+
```java
78+
public class NewThreadWorker extends Scheduler.Worker
79+
implements Disposable {
80+
81+
private final ScheduledExecutorService executor;
82+
83+
public NewThreadWorker(ThreadFactory threadFactory) {
84+
// 最终被赋值为 Executors.newScheduledThreadPool(1) executor = SchedulerPoolFactory.create(threadFactory); }
85+
86+
@NonNull
87+
@Override
88+
public Disposable schedule(
89+
@NonNull final Runnable action, long delayTime,
90+
@NonNull TimeUnit unit
91+
) {
92+
...
93+
return scheduleActual(action, delayTime, unit, null);
94+
}
95+
96+
@NonNull
97+
public ScheduledRunnable scheduleActual(
98+
final Runnable run, long delayTime,
99+
@NonNull TimeUnit unit,
100+
@Nullable DisposableContainer parent
101+
) {
102+
// 静态代理并封装我们想要执行的 Runnable,具体实现可忽略
103+
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
104+
ScheduledRunnable sr =
105+
new ScheduledRunnable(decoratedRun, parent);
106+
107+
...
108+
109+
Future<?> f;
110+
try {
111+
if (delayTime <= 0) { // 立即执行 f = executor.submit((Callable<Object>) sr); } else { // 延时调度 f = executor.schedule( (Callable<Object>) sr, delayTime, unit); } sr.setFuture(f);
112+
} catch (RejectedExecutionException ex) {
113+
...
114+
}
115+
116+
return sr;
117+
}
118+
}
119+
```
120+
121+
结合 NewThreadSchedulerNewThreadWorker 的源码可以看到,每一个新的 Task 都会被一个新建的线程池容量为 1 的 [ScheduledExecutorService](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ScheduledExecutorService.html) 立即执行或延时调度,这是 JDK 原生提供的一个多线程调度器实现。其他的 RxJava 调度器实现在这里就不展开了,感兴趣的同学可以自行查阅对应的源码。
122+
123+
链式调用
124+
----
125+
126+
在了解了 Scheduler 的具体实现后,我们还需要知道 Scheduler 是如何在链式调用中工作的。关于 RxJava 的链式调用是如何工作的,建议先阅读笔者之前的文章 [RxJava 链式调用原理](https://mthli.xyz/rxjava-chain/),在此不予赘述。这里我们主要讲解用于线程切换的 [subscribeOn](http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#subscribeOn-io.reactivex.Scheduler-) 和 [observeOn](http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#observeOn-io.reactivex.Scheduler-) 两个操作符。注意,本文均以 [Observable](http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html) 的操作符实现作为讨论对象。
127+
128+
subscribeOn 用于设置 Observable 开始执行时所在的线程;observeOn 用于设置从该操作符调用处开始下游操作符所在的线程。一个典型的线程切换场景如下:
129+
130+
```
131+
Observable
132+
.create(...) // 在 io 调度器上执行
133+
.subscribeOn(Schedulers.io())
134+
.observeOn(AndroidSchedulers.mainThread())
135+
.subscribe(...) // 在 Android 主线程上执行
136+
```
137+
138+
我们先看看 subscribeOn 对应的源码:
139+
140+
```java
141+
public abstract class Observable<@NonNull T>
142+
implements ObservableSource<T> {
143+
...
144+
145+
@NonNull
146+
public final Observable<T> subscribeOn(
147+
@NonNull Scheduler scheduler
148+
) {
149+
...
150+
return RxJavaPlugins.onAssembly(
151+
new ObservableSubscribeOn<>(this, scheduler) );
152+
}
153+
}
154+
```
155+
156+
再来看看 ObservableSubscribeOn 对应的源码:
157+
158+
```java
159+
public final class ObservableSubscribeOn<T>
160+
extends AbstractObservableWithUpstream<T, T> {
161+
162+
final Scheduler scheduler;
163+
164+
public ObservableSubscribeOn(
165+
ObservableSource<T> source, Scheduler scheduler
166+
) {
167+
super(source);
168+
this.scheduler = scheduler;
169+
}
170+
171+
@Override
172+
public void subscribeActual(
173+
final Observer<? super T> observer
174+
) {
175+
// 静态代理传入的上游 Observer,具体实现可忽略
176+
final SubscribeOnObserver<T> parent =
177+
new SubscribeOnObserver<>(observer);
178+
179+
...
180+
181+
parent.setDisposable(
182+
scheduler.scheduleDirect(new SubscribeTask(parent)) );
183+
}
184+
185+
...
186+
187+
final class SubscribeTask implements Runnable {
188+
private final SubscribeOnObserver<T> parent;
189+
190+
SubscribeTask(SubscribeOnObserver<T> parent) {
191+
this.parent = parent;
192+
}
193+
194+
// 这是 Runnable 接口必须实现的方法, // 使得 subscribe() 可以运行在对应的 Scheduler @Override public void run() { // source 对象是上游的 Observable, // parent 对象是下游的 Observer source.subscribe(parent); } }
195+
}
196+
```
197+
198+
可以看到,subscribeOn 通过将 Observable 的 `subscribe()` 封装在 Task 中,并调用 Scheduler 的 `scheduleDirect()` 进行线程切换,从而达到「设置 Observable 开始执行时所在的线程」的目的。
199+
200+
接着我们看看 observeOn 对应的源码:
201+
202+
```java
203+
public abstract class Observable<@NonNull T>
204+
implements ObservableSource<T> {
205+
...
206+
207+
@NonNull
208+
public final Observable<T> observeOn(
209+
@NonNull Scheduler scheduler,
210+
boolean delayError, int bufferSize
211+
) {
212+
...
213+
return RxJavaPlugins.onAssembly(
214+
new ObservableObserveOn<>( this, scheduler, delayError, bufferSize) );
215+
}
216+
}
217+
```
218+
219+
再来看看 ObservableObserveOn 对应的源码:
220+
221+
```java
222+
public final class ObservableSubscribeOn<T>
223+
extends AbstractObservableWithUpstream<T, T> {
224+
225+
final Scheduler scheduler;
226+
227+
public ObservableObserveOn(
228+
ObservableSource<T> source, Scheduler scheduler,
229+
boolean delayError, int bufferSize
230+
) {
231+
super(source);
232+
this.scheduler = scheduler;
233+
...
234+
}
235+
236+
@Override
237+
public void subscribeActual(
238+
final Observer<? super T> observer
239+
) {
240+
if (scheduler instanceof TrampolineScheduler) {
241+
...
242+
} else {
243+
// 直接创建一个新的 Worker 实例 Scheduler.Worker w = scheduler.createWorker(); // source 对象是上游的 Observable, // observer 对象是下游的 Observer; // 此处通过创建一个 ObserveOnObserver 作为中间人角色, // 它订阅了 source 并在相关回调中调用 observer 的对应方法, // 仍然是静态代理模式的应用 source.subscribe(new ObserveOnObserver<>( observer, w, delayError, bufferSize)); }
244+
}
245+
246+
...
247+
248+
static final class ObserveOnObserver<T>
249+
extends BasicIntQueueDisposable<T>
250+
implements Observer<T>, Runnable {
251+
...
252+
253+
final Observer<? super T> downstream;
254+
final Scheduler.Worker worker;
255+
256+
ObserveOnObserver(
257+
Observer<? super T> actual, Scheduler.Worker worker,
258+
boolean delayError, int bufferSize
259+
) {
260+
this.downstream = actual; this.worker = worker;
261+
...
262+
}
263+
264+
// 和平时调用 subscribe() 时 new Observer 一样,
265+
// 复写以下四个方法;具体实现相对复杂,略去不表
266+
@Override
267+
public void onSubscribe(Disposable d) { ... }
268+
269+
@Override
270+
public void onNext(T t) { ... }
271+
272+
@Override
273+
public void onError(Throwable t) { ... }
274+
275+
@Override
276+
public void onComplete() { ... }
277+
278+
// 主要调用 downstream 的逻辑在这里; // 这是 Runnable 接口必须实现的方法, // 使得 downstream 可以运行在对应的 Scheduler @Override public void run() { ... }
279+
// 实际的逻辑跳转很多,但最终在这里切换线程
280+
void schedule() {
281+
if (getAndIncrement() == 0) {
282+
worker.schedule(this); }
283+
}
284+
285+
...
286+
}
287+
}
288+
```
289+
290+
可以看到,observeOn 通过将调用下游 Observer 的调用逻辑封装在 Task 中,由指定的 Worker 实例进行线程切换,从而达到了「设置从该操作符调用处开始下游操作符所在的线程」的目的。
291+
292+
看到这里,你可能也注意到了:从之前 Scheduler 的源码我们可知,默认情况下调用 `scheduleDirect()` 也是将 Task 交给 `createWorker()` 新建的 Worker 实例执行的;那为什么 observeOn 要采取和 subscribeOn 不同的实现方式呢?感兴趣的同学可以去看看 [single](http://reactivex.io/RxJava/javadoc/io/reactivex/schedulers/Schedulers.html#single--) 调度器的源码,分开两个方法可以更充分的自定义,且这两个方法也不一定是直接相关的。只要保证底层的调度逻辑是正确的就 OK 了。
293+
294+
总的来说,subscribeOn 和 observeOn 都是将逻辑封装到 Runnable 中交给对应的 Scheduler 执行,从而实现了线程切换。但受限于篇幅原因,其中仍然有非常多的细节被本文略去了,建议感兴趣的读者可自行查阅源码。
295+
296+
最后,即使在 2020 年的今天,同为 JVM 系语言的 [Kotlin](https://kotlinlang.org/) 已经支持协程的情况下,RxJava 仅仅使用 JDK 提供的多线程 API 就能将线程切换处理的如此优雅,仍然是十分值得学习和使用的库。笔者认为它并没有过时。
9.54 KB
Loading
9.69 KB
Loading
17.5 KB
Loading
49.4 KB
Loading
135 KB
Loading
63.8 KB
Loading
17.2 KB
Loading
42.7 KB
Loading
36.1 KB
Loading

0 commit comments

Comments
 (0)