如果要在事件源符的级联中引入多线程,可以通过指定这些操作符(或特定的事件源)在特定的调度器上运行来实现。
一些ReactiveX事件源操作符具有将调度器作为参数的变体。 这些表示操作符在特定的调度器上完成部分或全部工作。
待定
待定
可以从Schedulers
类的描述中的工厂方法获得调度器。下表显示了RxGroovy中通过这些方法可用的调度器的种类:
调度器 | 作用 |
---|---|
Schedulers.computation( ) | 用于计算工作,如事件循环和回调处理;不能在I/O工作使用此调度器(用Schedulers.io( ) 代替); 默认情况下,线程数等于处理器数 |
Schedulers.from(executor) | 使用指定的Executor 作为一个调度器 |
Schedulers.immediate( ) | 调度工作立即在当前线程中运行 |
Schedulers.io( ) | 用于I/O密集型工作如异步执行阻塞I/O,此调度程序由一个将根据需要增长的线程池支持;用于普通的计算工作转为Schedulers.computation( ) ;Schedulers.io( ) 默认情况下是CachedThreadScheduler , 它是类似于带有线程缓存的新线程调度器 |
Schedulers.newThread( ) | 为每个工作单元创建一个新线程 |
Schedulers.trampoline( ) | 队列工作在其他已经排队的工作之后在当前线程上运行 |
RxGroovy中的一些事件源操作符具有变体,允许设置操作符用于(至少某些部分)其操作的调度器。其他不在任何特定的调度器上运行,或在指定的默认调度器上运行。具有特定默认调度器的那些包括:
TestScheduler
允许对调度器的时钟行为进行手动微调。这对于测试依赖于明确的时间安排的动作是非常有用的。此调度器还有三种方法:
advanceTimeTo(time,unit)
advanceTimeBy(time,unit)
triggerActions( )
可以从Schedulers
类的描述中的工厂方法获得调度器。下表显示了RxJava中通过这些方法可用的调度器的种类:
调度器 | 作用 |
---|---|
Schedulers.computation( ) | 用于计算工作,如事件循环和回调处理;不能在I/O工作使用此调度器(用Schedulers.io( ) 代替); 默认情况下,线程数等于处理器数 |
Schedulers.from(executor) | 使用指定的Executor 作为一个调度器 |
Schedulers.immediate( ) | 调度工作立即在当前线程中运行 |
Schedulers.io( ) | 用于I/O密集型工作如异步执行阻塞I/O,此调度程序由一个将根据需要增长的线程池支持;用于普通的计算工作转为Schedulers.computation( ) ;Schedulers.io( ) 默认情况下是CachedThreadScheduler , 它是类似于带有线程缓存的新线程调度器 |
Schedulers.newThread( ) | 为每个工作单元创建一个新线程 |
Schedulers.trampoline( ) | 队列工作在其他已经排队的工作之后在当前线程上运行 |
RxJava 1.x中的一些事件源操作符具有变体,允许设置操作符用于(至少某些部分)其操作的调度器。其他不在任何特定的调度器上运行,或在指定的默认调度器上运行。具有特定默认调度器的那些包括:
不仅可以将这些调度器传递给RxJava事件源操作符,还可以使用它们来安排自己的订阅工作。以下示例使用Scheduler.Worker
类的调度
方法让其运行在newThread
的调度器上:
worker = Schedulers.newThread().createWorker(); worker.schedule(new Action0() { @Override public void call() { yourWork(); } }); // some time later... worker.unsubscribe();
在Worker对象使用schedule
然后使用schedule(this)
递归的调用调度:
worker = Schedulers.newThread().createWorker(); worker.schedule(new Action0() { @Override public void call() { yourWork(); // recurse until unsubscribed (schedule will do nothing if unsubscribed) worker.schedule(this); } }); // some time later... worker.unsubscribe();
Worker
对象的类实现了Subscription
接口, 使用isUnsubscribed
和unsubscribe
方法, 可以在被订阅后停止运行或者从调度任务中取消订阅:
Worker worker = Schedulers.newThread().createWorker(); Subscription mySubscription = worker.schedule(new Action0() { @Override public void call() { while(!worker.isUnsubscribed()) { status = yourWork(); if(QUIT == status) { worker.unsubscribe(); } } } });
当Worker
已经变成Subscription
,就可以(并应该、最终)调用它的unsubscribe
方法来表示它可以停止工作并释放资源:
worker.unsubscribe();
可以延迟对给定计划程序的操作,直到超过某个时间跨度通过使用schedule
一个实现。以下示例调度someAction
在
someScheduler
调度器的时钟经过500ms后运行:
someScheduler.schedule(someAction, 500, TimeUnit.MILLISECONDS);
另一种Scheduler
方法允许安排定期执行操作。以下示例调度someAction
在someScheduler
调度器时钟经过500ms后运行,然后每隔250ms也都运行:
someScheduler.schedulePeriodically(someAction, 500, 250, TimeUnit.MILLISECONDS);
TestScheduler
允许对调度器的时钟行为进行手动微调。这对于测试依赖于明确的时间安排的动作是非常有用的。此调度器还有三种方法:
advanceTimeTo(time,unit)
advanceTimeBy(time,unit)
triggerActions( )
在RxJS可以从Rx.Scheduler
对象或独立实现的对象获取调度器,下表显示了RxJS中可用的各种调度器:
调度器 | 作用 |
---|---|
Rx.Scheduler.currentThread | 调度工作立即在当前线程中运行 |
Rx.HistoricalScheduler | 调度工作就像在指定的之前时间发生一样 |
Rx.Scheduler.immediate | 调度工作立即在当前线程中运行 |
Rx.TestScheduler | 用于单元测试;允许手动操纵时间移动 |
Rx.Scheduler.timeout | 调度工作通过定时器回调 |
待定
待定
待定
待定
待定