Jika anda ingin menerapkan multithreading pada operator-operator Obsevable anda, anda bisa melakukannya dengan cara menginstruksikan operator-operator tersebut (atau Observable tertentu) untuk beroperasi pada Scheduler tertentu.
Beberapa operator Observable ReactiveX mempunyai beberapa tipe operator yang memerlukan sebuah Scheduler sebagai paramter. Hal tersebut menginstruksikan operator untuk melakukan beberapa atau semua pekerjaannya dalam Scheduler tertentu.
TBD
TBD
Anda mendapatkan sebuah Scheduler dari sebuah factory method yang dijelaskan di class
Schedulers
. Table berikut menunjukkan berbagai jenis Scheduler yang bisa anda pakai di RxGroovy:
Scheduler | Tujuan |
---|---|
Schedulers.computation( ) | digunakan untuk komputasi seperti event-loops dan callback; jangan memakai scheduler ini untuk I/O (pakai Schedulers.io( ) ); jumlah thread, standarnya, sama dengan jumlah processor |
Schedulers.from(executor) | menggunakan Executor yang dispesifikasi sebagai sebuah Scheduler |
Schedulers.immediate( ) | scheduler bekerja secara langsung di thread itu juga |
Schedulers.io( ) | digunakan untuk pekerjaan yang berhubungan dengan I/O seperti asynchronous perfomance pada I/0 blocking, scheduler ini dijaga oleh sebuah thread-pool yang akan meningkatkan kapasitasnya jika diperlukan; untuk pekerjaan komputasi biasa, pakailah Schedulers.computation( ) ; Schedulers.io( ) secara standar merupakan sebuah CachedThreadScheduler , dimana seperti sebuah thread scheduler baru dengan thread caching |
Schedulers.newThread( ) | membuat sebuah thread baru untuk tiap unit kerja |
Schedulers.trampoline( ) | memasukkan pekerjaan yang perlu dilakukan dalam sebuah antrian pada thread saat itu juga |
Beberapa operator Observable di RxGroovy memiliki beberapa bentuk lain yang memungkinkan anda untuk mengatur Scheduler tempat operator tersebut akan bekerja. Yang lainnya tidak akan beroperasi pada Scheduler tertentu, atau beroperasi pada Scheduler default tertentu. Untuk yang memiliki Scheduler default tertentu termasuk:
TestScheduler
memungkinkan anda untuk melakukan kontrol manual yang dapat disesuaikan dengan bagaimana waktu Scheduler bekerja. Ini dapat berguna untuk menguji interaksi yang bergantung pada penyusunan aksi sesuai waktunya. Scheduler ini mempunyai tiga method tambahan:
advanceTimeTo(time,unit)
advanceTimeBy(time,unit)
triggerActions( )
Anda mendapatkan sebuah Scheduler dari factory method yang dideskripsikan di class
Schedulers
. Table berikut menunjukkan jenis Scheduler yang dapat dipakai di RxJava::
Scheduler | Tujuan |
---|---|
Schedulers.computation( ) | digunakan untuk komputasi seperti event-loops dan callback; jangan memakai scheduler ini untuk I/O (pakai Schedulers.io( ) ); jumlah thread, standarnya, sama dengan jumlah processor |
Schedulers.from(executor) | menggunakan Executor yang dispesifikasi sebagai sebuah Scheduler |
Schedulers.immediate( ) | scheduler bekerja secara langsung di thread itu juga |
Schedulers.io( ) | digunakan untuk pekerjaan yang berhubungan dengan I/O seperti asynchronous perfomance pada I/0 blocking, scheduler ini dijaga oleh sebuah thread-pool yang akan meningkatkan kapasitasnya jika diperlukan; untuk pekerjaan komputasi biasa, pakailah Schedulers.computation( ); Schedulers.io( ) secara standar merupakan sebuah CachedThreadScheduler, dimana seperti sebuah thread scheduler baru dengan thread caching |
Schedulers.newThread( ) | membuat sebuah thread baru untuk tiap unit kerja |
Schedulers.trampoline( ) | memasukkan pekerjaan yang perlu dilakukan dalam sebuah antrian pada thread saat itu juga |
Beberapa operator Observable di RxJava memiliki bentuk lain yang memungkinkan anda untuk mengatur Scheduler yang akan dipakai operator untuk (setidaknya beberapa bagian) dari operasinya. Yang lainnya tidak beroperasi pada Scheduler tertentu, atau beroperasi pada Scheduler default tertentu. Operator yang memiliki Scheduler default tertentu termasuk:
Selain dari meneruskan Scheduler tersebut kedalam operator Observable RxJava, anda juga bisa menggunakan mereka untuk menjadwalkan pekerjaan anda pada Subscription. Contoh berikut menggunakan method schedule
dari class Scheduler.Worker
untuk menjadwalka pekerjaan pada scheduler newThread
:
worker = Schedulers.newThread().createWorker(); worker.schedule(new Action0() { @Override public void call() { yourWork(); } }); // some time later... worker.unsubscribe();
Untuk menjadwalkan pemanggilan schedule rekursif, anda bisa menggunakan schedule
dan kemudian schedule(this)
pada object Worker:
worker = Schedulers.newThread().createWorker(); worker.schedule(new Action0() { @Override public void call() { yourWork(); // recurse until unsubscribed (schedule will do nothing if unsubscribed) worker.schedule(this); } }); // some time later... worker.unsubscribe();
Obyek-obyek dari class Worker
class mengimplementasi
inteface Subscription
, dengan method isUnsubscribed
dan unsubscribe
, sehingga anda bisa berhenti bekerja ketika sebuah subscription dibatalkan, atau anda juga bisa membatalkan subscription tersebut dari dalam task yang sudah dijadwalkan:
Worker worker = Schedulers.newThread().createWorker(); Subscription mySubscription = worker.schedule(new Action0() { @Override public void call() { while(!worker.isUnsubscribed()) { status = yourWork(); if(QUIT == status) { worker.unsubscribe(); } } } });
Worker
juga merupakan sebuah Subscription
sehingga anda juga bisa (dan harus, pada akhirnya) memanggil method unsubscribe
nya untuk memberitahukan bahwa itu bisa menghentikan pekerjaannya dan melepaskan sumber dayanya:
worker.unsubscribe();
Anda juga bisa menggunakan sebuah versi dari schedule
yang menunda aksi dari Scheduler sampai melewati jangka waktu tertentu. Contoh berikut menjadwalkan someAction
untuk dijalankan pada someScheduler
setelah 500ms telah berlalu pada waktu Scheduler tersebut:
someScheduler.schedule(someAction, 500, TimeUnit.MILLISECONDS);
Method Another Scheduler
memungkinkan anda untuk menjadwalkan sebuah aksi untuk berjalan pada sebuah interval yang teratur. Contoh berikut menjadwalkan someAction
untuk dikerjakan pada someScheduler
setelah 500ms telah berlalu, dan kemudian setiap 250ms setelahnya:
someScheduler.schedulePeriodically(someAction, 500, 250, TimeUnit.MILLISECONDS);
TestScheduler
memungkinkan anda untuk melakukan kontrol manual yang dapat disesuaikan dengan bagaimana waktu Scheduler bekerja. Ini dapat berguna untuk menguji interaksi yang bergantung pada penyusunan aksi sesuai waktunya. Scheduler ini mempunyai tiga method tambahan:
advanceTimeTo(time,unit)
advanceTimeBy(time,unit)
triggerActions( )
Dalam RxJS anda mendapatkan Scheduler dari obyek Rx.Scheduler
atau anda juga bisa mengimplementasinya sendiri.
Table berikut menunjukkan berbagai jenis Scheduler yang bisa anda pakai di RxJS:
Scheduler | Tujuan |
---|---|
Rx.Scheduler.currentThread | menjadwalkan pekerjaan sesegera mungkin pada thread itu juga |
Rx.HistoricalScheduler | schedules bekerja seakan-akan telah terjadi pada saat waktu yang tidak tetap |
Rx.Scheduler.immediate | schedules yang bekerja langsung pada thread saat itu juga |
Rx.TestScheduler | untuk unit testing; mengizinkan anda untuk secara manual memanipulasi pergerakan waktu |
Rx.Scheduler.timeout | schedules bekerja dengan sebuah callback yang sudah dijadwalkan |
TBD
TBD
TBD
TBD
TBD