如果要在事件源符的级联中引入多线程,可以通过指定这些操作符(或特定的事件源)在特定的调度器上运行来实现。
一些ReactiveX事件源操作符具有将调度器作为参数的变体。 这些表示操作符在特定的调度器上完成部分或全部工作。
默认情况下,事件源和应用于它的操作符链将在同一线程上完成其工作,并将在调用其订阅方法通知其观察者。 SubscribeOn操作符指定不同的调度器来改变事件源上应操作的行为。ObserveOn操作符指定不同的调度器用来处理事件源要发给观察者的通知。
如图所示,SubscribeOn操作符指明了事件源开始操作的线程,无论操作符在链上的哪个位置被调用。在另一方面ObserveOn是影响事件源将在该操作符出现的位置下方使用的线程。因此,可以调用ObserveOn 多次在事件源操作符链上的不同位置为了改变某些操作符运行的线程。
待定
待定
可以从Schedulers类的描述中的工厂方法获得调度器。下表显示了RxGroovy中通过这些方法可用的调度器的种类:
| 调度器 | 作用 |
|---|---|
Schedulers.computation( ) | 用于计算工作,如事件循环和回调处理;不能在I/O工作使用此调度器(用Schedulers.io( )代替); 默认情况下,线程数等于处理器数 |
Schedulers.from(executor) | 使用指定的Executor作为一个调度器 |
Schedulers.immediate( ) | 调度工作立即在当前线程中运行 |
Schedulers.io( ) | 用于I/O密集型工作如异步执行阻塞I/O,此调度程序由一个将根据需要增长的线程池支持;用于普通的计算工作转为Schedulers.computation( );Schedulers.io( ) 默认情况下是CachedThreadScheduler, 它是类似于带有线程缓存的新线程调度器 |
Schedulers.newThread( ) | 为每个工作单元创建一个新线程 |
Schedulers.trampoline( ) | 队列工作在其他已经排队的工作之后在当前线程上运行 |
RxGroovy中的一些事件源操作符具有变体,允许设置操作符用于(至少某些部分)其操作的调度器。其他不在任何特定的调度器上运行,或在指定的默认调度器上运行。具有特定默认调度器的那些包括:
TestScheduler允许对调度器的时钟行为进行手动微调。这对于测试依赖于明确的时间安排的动作是非常有用的。此调度器还有三种方法:
advanceTimeTo(time,unit)advanceTimeBy(time,unit)triggerActions( )
可以从Schedulers类的描述中的工厂方法获得调度器。下表显示了RxJava中通过这些方法可用的调度器的种类:
| 调度器 | 作用 |
|---|---|
Schedulers.computation( ) | 用于计算工作,如事件循环和回调处理;不能在I/O工作使用此调度器(用Schedulers.io( )代替); 默认情况下,线程数等于处理器数 |
Schedulers.from(executor) | 使用指定的Executor作为一个调度器 |
Schedulers.immediate( ) | 调度工作立即在当前线程中运行 |
Schedulers.io( ) | 用于I/O密集型工作如异步执行阻塞I/O,此调度程序由一个将根据需要增长的线程池支持;用于普通的计算工作转为Schedulers.computation( );Schedulers.io( ) 默认情况下是CachedThreadScheduler, 它是类似于带有线程缓存的新线程调度器 |
Schedulers.newThread( ) | 为每个工作单元创建一个新线程 |
Schedulers.trampoline( ) | 队列工作在其他已经排队的工作之后在当前线程上运行 |
RxJava 1.x中的一些事件源操作符具有变体,允许设置操作符用于(至少某些部分)其操作的调度器。其他不在任何特定的调度器上运行,或在指定的默认调度器上运行。具有特定默认调度器的那些包括:
不仅可以将这些调度器传递给RxJava事件源操作符,还可以使用它们来安排自己的订阅工作。以下示例使用Scheduler.Worker类的调度方法让其运行在newThread的调度器上:
worker = Schedulers.newThread().createWorker();
worker.schedule(new Action0() {
@Override
public void call() {
yourWork();
}
});
// some time later...
worker.unsubscribe();
在Worker对象使用schedule然后使用schedule(this)递归的调用调度:
worker = Schedulers.newThread().createWorker();
worker.schedule(new Action0() {
@Override
public void call() {
yourWork();
// recurse until unsubscribed (schedule will do nothing if unsubscribed)
worker.schedule(this);
}
});
// some time later...
worker.unsubscribe();
Worker对象的类实现了Subscription接口, 使用isUnsubscribed和unsubscribe
方法, 可以在被订阅后停止运行或者从调度任务中取消订阅:
Worker worker = Schedulers.newThread().createWorker();
Subscription mySubscription = worker.schedule(new Action0() {
@Override
public void call() {
while(!worker.isUnsubscribed()) {
status = yourWork();
if(QUIT == status) { worker.unsubscribe(); }
}
}
});
当Worker已经变成Subscription,就可以(并应该、最终)调用它的unsubscribe方法来表示它可以停止工作并释放资源:
worker.unsubscribe();
可以延迟对给定计划程序的操作,直到超过某个时间跨度通过使用schedule一个实现。以下示例调度someAction在
someScheduler调度器的时钟经过500ms后运行:
someScheduler.schedule(someAction, 500, TimeUnit.MILLISECONDS);
另一种Scheduler方法允许安排定期执行操作。以下示例调度someAction在someScheduler调度器时钟经过500ms后运行,然后每隔250ms也都运行:
someScheduler.schedulePeriodically(someAction, 500, 250, TimeUnit.MILLISECONDS);
TestScheduler允许对调度器的时钟行为进行手动微调。这对于测试依赖于明确的时间安排的动作是非常有用的。此调度器还有三种方法:
advanceTimeTo(time,unit)advanceTimeBy(time,unit)triggerActions( )
在RxJS可以从Rx.Scheduler对象或独立实现的对象获取调度器,下表显示了RxJS中可用的各种调度器:
| 调度器 | 作用 |
|---|---|
Rx.Scheduler.currentThread | 调度工作立即在当前线程中运行 |
Rx.HistoricalScheduler | 调度工作就像在指定的之前时间发生一样 |
Rx.Scheduler.immediate | 调度工作立即在当前线程中运行 |
Rx.TestScheduler | 用于单元测试;允许手动操纵时间移动 |
Rx.Scheduler.timeout | 调度工作通过定时器回调 |
待定
待定
待定
待定
待定