调度器(Scheduler)

如果要在事件源符的级联中引入多线程,可以通过指定这些操作符(或特定的事件源)在特定的调度器上运行来实现。

一些ReactiveX事件源操作符具有将调度器作为参数的变体。 这些表示操作符在特定的调度器上完成部分或全部工作。

默认情况下,事件源和应用于它的操作符链将在同一线程上完成其工作,并将在调用其订阅方法通知其观察者。 SubscribeOn操作符指定不同的调度器来改变事件源上应操作的行为。ObserveOn操作符指定不同的调度器用来处理事件源要发给观察者的通知。

如图所示,SubscribeOn操作符指明了事件源开始操作的线程,无论操作符在链上的哪个位置被调用。在另一方面ObserveOn是影响事件源将在该操作符出现的位置下方使用的线程。因此,可以调用ObserveOn 多次在事件源操作符链上的不同位置为了改变某些操作符运行的线程。

ObserveOn and SubscribeOn

参考

Language-Specific Information:

待定

调度器的种类

可以从Schedulers的描述中的工厂方法获得调度器。下表显示了RxGroovy中通过这些方法可用的调度器的种类:

调度器作用
Schedulers.computation( )用于计算工作,如事件循环和回调处理;不能在I/O工作使用此调度器(用Schedulers.io( )代替); 默认情况下,线程数等于处理器数
Schedulers.from(executor)使用指定的Executor作为一个调度器
Schedulers.immediate( )调度工作立即在当前线程中运行
Schedulers.io( )用于I/O密集型工作如异步执行阻塞I/O,此调度程序由一个将根据需要增长的线程池支持;用于普通的计算工作转为Schedulers.computation( )Schedulers.io(&#8239) 默认情况下是CachedThreadScheduler, 它是类似于带有线程缓存的新线程调度器
Schedulers.newThread( )为每个工作单元创建一个新线程
Schedulers.trampoline( )队列工作在其他已经排队的工作之后在当前线程上运行

RxGroovy事件源操作符的默认调度器

RxGroovy中的一些事件源操作符具有变体,允许设置操作符用于(至少某些部分)其操作的调度器。其他不在任何特定的调度器上运行,或在指定的默认调度器上运行。具有特定默认调度器的那些包括:

操作符调度器
buffer(timespan)computation
buffer(timespan, count)computation
buffer(timespan, timeshift)computation
debounce(timeout, unit)computation
delay(delay, unit)computation
delaySubscription(delay, unit)computation
intervalcomputation
repeattrampoline
replay(time, unit)computation
replay(buffersize, time, unit)computation
replay(selector, time, unit)computation
replay(selector, buffersize, time, unit)computation
retrytrampoline
sample(period, unit)computation
skip(time, unit)computation
skipLast(time, unit)computation
take(time, unit)computation
takeLast(time, unit)computation
takeLast(count, time, unit)computation
takeLastBuffer(time, unit)computation
takeLastBuffer(count, time, unit)computation
throttleFirstcomputation
throttleLastcomputation
throttleWithTimeoutcomputation
timeIntervalimmediate
timeout(timeoutSelector)immediate
timeout(firstTimeoutSelector, timeoutSelector)immediate
timeout(timeoutSelector, other)immediate
timeout(timeout, timeUnit)computation
timeout(firstTimeoutSelector, timeoutSelector, other)immediate
timeout(timeout, timeUnit, other)computation
timercomputation
timestampimmediate
window(timespan)computation
window(timespan, count)computation
window(timespan, timeshift)computation

测试调度器

TestScheduler允许对调度器的时钟行为进行手动微调。这对于测试依赖于明确的时间安排的动作是非常有用的。此调度器还有三种方法:

advanceTimeTo(time,unit)
将调度器的时钟修改到特定时间点
advanceTimeBy(time,unit)
将调度器的时钟向前推进一段特定的时间
triggerActions( )
根据调度器的时钟启动任何已安排的时间等于或早于当前时间的未启动操作

参考

调度器的种类

可以从Schedulers的描述中的工厂方法获得调度器。下表显示了RxJava中通过这些方法可用的调度器的种类:

调度器作用
Schedulers.computation( )用于计算工作,如事件循环和回调处理;不能在I/O工作使用此调度器(用Schedulers.io( )代替); 默认情况下,线程数等于处理器数
Schedulers.from(executor)使用指定的Executor作为一个调度器
Schedulers.immediate( )调度工作立即在当前线程中运行
Schedulers.io( )用于I/O密集型工作如异步执行阻塞I/O,此调度程序由一个将根据需要增长的线程池支持;用于普通的计算工作转为Schedulers.computation( )Schedulers.io(&#8239) 默认情况下是CachedThreadScheduler, 它是类似于带有线程缓存的新线程调度器
Schedulers.newThread( )为每个工作单元创建一个新线程
Schedulers.trampoline( )队列工作在其他已经排队的工作之后在当前线程上运行

RxJava 1.x事件源操作符的默认调度器

RxJava 1.x中的一些事件源操作符具有变体,允许设置操作符用于(至少某些部分)其操作的调度器。其他不在任何特定的调度器上运行,或在指定的默认调度器上运行。具有特定默认调度器的那些包括:

操作符调度器
buffer(timespan)computation
buffer(timespan, count)computation
buffer(timespan, timeshift)computation
debounce(timeout, unit)computation
delay(delay, unit)computation
delaySubscription(delay, unit)computation
intervalcomputation
repeattrampoline
replay(time, unit)computation
replay(buffersize, time, unit)computation
replay(selector, time, unit)computation
replay(selector, buffersize, time, unit)computation
retrytrampoline
sample(period, unit)computation
skip(time, unit)computation
skipLast(time, unit)computation
take(time, unit)computation
takeLast(time, unit)computation
takeLast(count, time, unit)computation
takeLastBuffer(time, unit)computation
takeLastBuffer(count, time, unit)computation
throttleFirstcomputation
throttleLastcomputation
throttleWithTimeoutcomputation
timeIntervalimmediate
timeout(timeoutSelector)immediate
timeout(firstTimeoutSelector, timeoutSelector)immediate
timeout(timeoutSelector, other)immediate
timeout(timeout, timeUnit)computation
timeout(firstTimeoutSelector, timeoutSelector, other)immediate
timeout(timeout, timeUnit, other)computation
timercomputation
timestampimmediate
window(timespan)computation
window(timespan, count)computation
window(timespan, timeshift)computation

使用调度器

不仅可以将这些调度器传递给RxJava事件源操作符,还可以使用它们来安排自己的订阅工作。以下示例使用Scheduler.Worker调度方法让其运行在newThread的调度器上:

worker = Schedulers.newThread().createWorker();
worker.schedule(new Action0() {

    @Override
    public void call() {
        yourWork();
    }

});
// some time later...
worker.unsubscribe();

递归调度器

在Worker对象使用schedule然后使用schedule(this)递归的调用调度:

worker = Schedulers.newThread().createWorker();
worker.schedule(new Action0() {

    @Override
    public void call() {
        yourWork();
        // recurse until unsubscribed (schedule will do nothing if unsubscribed)
        worker.schedule(this);
    }

});
// some time later...
worker.unsubscribe();

检查或设置取消订阅状态

Worker对象的类实现了Subscription接口, 使用isUnsubscribedunsubscribe 方法, 可以在被订阅后停止运行或者从调度任务中取消订阅:

Worker worker = Schedulers.newThread().createWorker();
Subscription mySubscription = worker.schedule(new Action0() {

    @Override
    public void call() {
        while(!worker.isUnsubscribed()) {
            status = yourWork();
            if(QUIT == status) { worker.unsubscribe(); }
        }
    }

});

Worker已经变成Subscription,就可以(并应该、最终)调用它的unsubscribe方法来表示它可以停止工作并释放资源:

worker.unsubscribe();

延迟和定期调度器

可以延迟对给定计划程序的操作,直到超过某个时间跨度通过使用schedule一个实现。以下示例调度someActionsomeScheduler调度器的时钟经过500ms后运行:

someScheduler.schedule(someAction, 500, TimeUnit.MILLISECONDS);

另一种Scheduler方法允许安排定期执行操作。以下示例调度someActionsomeScheduler调度器时钟经过500ms后运行,然后每隔250ms也都运行:

someScheduler.schedulePeriodically(someAction, 500, 250, TimeUnit.MILLISECONDS);

测试调度器

TestScheduler允许对调度器的时钟行为进行手动微调。这对于测试依赖于明确的时间安排的动作是非常有用的。此调度器还有三种方法:

advanceTimeTo(time,unit)
将调度器的时钟修改到特定时间点
advanceTimeBy(time,unit)
将调度器的时钟向前推进一段特定的时间
triggerActions( )
根据调度器的时钟启动任何已安排的时间等于或早于当前时间的未启动操作

参考

</div> </div> </div>

在RxJS可以从Rx.Scheduler对象或独立实现的对象获取调度器,下表显示了RxJS中可用的各种调度器:

调度器作用
Rx.Scheduler.currentThread调度工作立即在当前线程中运行
Rx.HistoricalScheduler调度工作就像在指定的之前时间发生一样
Rx.Scheduler.immediate调度工作立即在当前线程中运行
Rx.TestScheduler用于单元测试;允许手动操纵时间移动
Rx.Scheduler.timeout调度工作通过定时器回调

参考

待定

待定

待定

待定

待定

</div>