Scheduler

Jika anda ingin menerapkan multithreading pada operator-operator Obsevable anda, anda bisa melakukannya dengan cara menginstruksikan operator-operator tersebut (atau Observable tertentu) untuk beroperasi pada Scheduler tertentu.

Beberapa operator Observable ReactiveX mempunyai beberapa tipe operator yang memerlukan sebuah Scheduler sebagai paramter. Hal tersebut menginstruksikan operator untuk melakukan beberapa atau semua pekerjaannya dalam Scheduler tertentu.

Standarnya, sebuah Observable dan rantai operator yang anda aplikasikan kepadanya akan melakukan pekerjaannya, dan akan mengirim notifikasi kepada observernya, dalam thread yang sama dengan tempat dia melakukan Subscribe. Operator SubscribeOn mengubah perilaku ini dengan menspesifikasikan sebuah Scheduler yang berbeda sebagai tempat Obervabler beroperasi. Operator ObserveOn operator menspesifikasikan sebuah Scheduler yang berbeda yang akan digunakan observable untuk mengirimkan notifikasi kepada observernya.

Seperti ditunjukkan pada ilustrasi berikut, operator SubscribeOn menentukan thread tempat Observable tersebut akan berporeasi, tidak peduli di titik rantai operator yang mana operator ini dipanggil. Pada sisi lainnya, ObserveOn, mempengaruhi thread yang akan dipakai Observable dibawah dimana operator-nya muncul. Untuk alasan ini, anda bisa memanggil ObserveOn beberapa kali pada titik yang berbeda-beda dalam rantai operator Observable untuk mengubah thread mana yang akan dipakai operator untuk beroperasi.

ObserveOn and SubscribeOn

Lihat Juga

Informasi Terkait masing-masing bahasa:

Jenis-jenis Scheduler

Anda mendapatkan sebuah Scheduler dari sebuah factory method yang dijelaskan di class Schedulers. Table berikut menunjukkan berbagai jenis Scheduler yang bisa anda pakai di RxGroovy:

SchedulerTujuan
Schedulers.computation( )digunakan untuk komputasi seperti event-loops dan callback; jangan memakai scheduler ini untuk I/O (pakai Schedulers.io( )); jumlah thread, standarnya, sama dengan jumlah processor
Schedulers.from(executor)menggunakan Executor yang dispesifikasi sebagai sebuah Scheduler
Schedulers.immediate( )scheduler bekerja secara langsung di thread itu juga
Schedulers.io( )digunakan untuk pekerjaan yang berhubungan dengan I/O seperti asynchronous perfomance pada I/0 blocking, scheduler ini dijaga oleh sebuah thread-pool yang akan meningkatkan kapasitasnya jika diperlukan; untuk pekerjaan komputasi biasa, pakailah Schedulers.computation( ); Schedulers.io(&#8239) secara standar merupakan sebuah CachedThreadScheduler, dimana seperti sebuah thread scheduler baru dengan thread caching
Schedulers.newThread( )membuat sebuah thread baru untuk tiap unit kerja
Schedulers.trampoline( )memasukkan pekerjaan yang perlu dilakukan dalam sebuah antrian pada thread saat itu juga

Scheduler default untuk Operator Observable RxGroovy

Beberapa operator Observable di RxGroovy memiliki beberapa bentuk lain yang memungkinkan anda untuk mengatur Scheduler tempat operator tersebut akan bekerja. Yang lainnya tidak akan beroperasi pada Scheduler tertentu, atau beroperasi pada Scheduler default tertentu. Untuk yang memiliki Scheduler default tertentu termasuk:

operatorScheduler
buffer(timespan)computation
buffer(timespan, count)computation
buffer(timespan, timeshift)computation
debounce(timeout, unit)computation
delay(delay, unit)computation
delaySubscription(delay, unit)computation
intervalcomputation
repeattrampoline
replay(time, unit)computation
replay(buffersize, time, unit)computation
replay(selector, time, unit)computation
replay(selector, buffersize, time, unit)computation
retrytrampoline
sample(period, unit)computation
skip(time, unit)computation
skipLast(time, unit)computation
take(time, unit)computation
takeLast(time, unit)computation
takeLast(count, time, unit)computation
takeLastBuffer(time, unit)computation
takeLastBuffer(count, time, unit)computation
throttleFirstcomputation
throttleLastcomputation
throttleWithTimeoutcomputation
timeIntervalimmediate
timeout(timeoutSelector)immediate
timeout(firstTimeoutSelector, timeoutSelector)immediate
timeout(timeoutSelector, other)immediate
timeout(timeout, timeUnit)computation
timeout(firstTimeoutSelector, timeoutSelector, other)immediate
timeout(timeout, timeUnit, other)computation
timercomputation
timestampimmediate
window(timespan)computation
window(timespan, count)computation
window(timespan, timeshift)computation

Test Scheduler

TestScheduler memungkinkan anda untuk melakukan kontrol manual yang dapat disesuaikan dengan bagaimana waktu Scheduler bekerja. Ini dapat berguna untuk menguji interaksi yang bergantung pada penyusunan aksi sesuai waktunya. Scheduler ini mempunyai tiga method tambahan:

advanceTimeTo(time,unit)
memajukan waktu scheduler ke suatu titik waktu tertentu
advanceTimeBy(time,unit)
memajukan waktu scheduler ke depan dengan suatu unit waktu tertentu
triggerActions( )
memulai aksi yang belum dimulai yang sudah dijadwalkan pada suatu waktu pada saat sekarang ataupun di masa lalu sesuai waktu Scheduler

Lihat Juga

Jenis-jenis Scheduler

Anda mendapatkan sebuah Scheduler dari factory method yang dideskripsikan di class Schedulers. Table berikut menunjukkan jenis Scheduler yang dapat dipakai di RxJava::

SchedulerTujuan
Schedulers.computation( )digunakan untuk komputasi seperti event-loops dan callback; jangan memakai scheduler ini untuk I/O (pakai Schedulers.io( )); jumlah thread, standarnya, sama dengan jumlah processor
Schedulers.from(executor)menggunakan Executor yang dispesifikasi sebagai sebuah Scheduler
Schedulers.immediate( )scheduler bekerja secara langsung di thread itu juga
Schedulers.io( )digunakan untuk pekerjaan yang berhubungan dengan I/O seperti asynchronous perfomance pada I/0 blocking, scheduler ini dijaga oleh sebuah thread-pool yang akan meningkatkan kapasitasnya jika diperlukan; untuk pekerjaan komputasi biasa, pakailah Schedulers.computation( ); Schedulers.io( ) secara standar merupakan sebuah CachedThreadScheduler, dimana seperti sebuah thread scheduler baru dengan thread caching
Schedulers.newThread( )membuat sebuah thread baru untuk tiap unit kerja
Schedulers.trampoline( )memasukkan pekerjaan yang perlu dilakukan dalam sebuah antrian pada thread saat itu juga

Scheduler Default untuk Operator Observable RxJava 1.x

Beberapa operator Observable di RxJava memiliki bentuk lain yang memungkinkan anda untuk mengatur Scheduler yang akan dipakai operator untuk (setidaknya beberapa bagian) dari operasinya. Yang lainnya tidak beroperasi pada Scheduler tertentu, atau beroperasi pada Scheduler default tertentu. Operator yang memiliki Scheduler default tertentu termasuk:

operatorScheduler
buffer(timespan)computation
buffer(timespan, count)computation
buffer(timespan, timeshift)computation
debounce(timeout, unit)computation
delay(delay, unit)computation
delaySubscription(delay, unit)computation
intervalcomputation
repeattrampoline
replay(time, unit)computation
replay(buffersize, time, unit)computation
replay(selector, time, unit)computation
replay(selector, buffersize, time, unit)computation
retrytrampoline
sample(period, unit)computation
skip(time, unit)computation
skipLast(time, unit)computation
take(time, unit)computation
takeLast(time, unit)computation
takeLast(count, time, unit)computation
takeLastBuffer(time, unit)computation
takeLastBuffer(count, time, unit)computation
throttleFirstcomputation
throttleLastcomputation
throttleWithTimeoutcomputation
timeIntervalimmediate
timeout(timeoutSelector)immediate
timeout(firstTimeoutSelector, timeoutSelector)immediate
timeout(timeoutSelector, other)immediate
timeout(timeout, timeUnit)computation
timeout(firstTimeoutSelector, timeoutSelector, other)immediate
timeout(timeout, timeUnit, other)computation
timercomputation
timestampimmediate
window(timespan)computation
window(timespan, count)computation
window(timespan, timeshift)computation

Memakai Scheduler

Selain dari meneruskan Scheduler tersebut kedalam operator Observable RxJava, anda juga bisa menggunakan mereka untuk menjadwalkan pekerjaan anda pada Subscription. Contoh berikut menggunakan method schedule dari class Scheduler.Worker untuk menjadwalka pekerjaan pada scheduler newThread:

worker = Schedulers.newThread().createWorker();
worker.schedule(new Action0() {

    @Override
    public void call() {
        yourWork();
    }

});
// some time later...
worker.unsubscribe();

Scheduler Rekursif

Untuk menjadwalkan pemanggilan schedule rekursif, anda bisa menggunakan schedule dan kemudian schedule(this) pada object Worker:

worker = Schedulers.newThread().createWorker();
worker.schedule(new Action0() {

    @Override
    public void call() {
        yourWork();
        // recurse until unsubscribed (schedule will do nothing if unsubscribed)
        worker.schedule(this);
    }

});
// some time later...
worker.unsubscribe();

Mengecek atau Menyetel Status Unsubscribed

Obyek-obyek dari class Worker class mengimplementasi inteface Subscription, dengan method isUnsubscribed dan unsubscribe, sehingga anda bisa berhenti bekerja ketika sebuah subscription dibatalkan, atau anda juga bisa membatalkan subscription tersebut dari dalam task yang sudah dijadwalkan:

Worker worker = Schedulers.newThread().createWorker();
Subscription mySubscription = worker.schedule(new Action0() {

    @Override
    public void call() {
        while(!worker.isUnsubscribed()) {
            status = yourWork();
            if(QUIT == status) { worker.unsubscribe(); }
        }
    }

});

Worker juga merupakan sebuah Subscription sehingga anda juga bisa (dan harus, pada akhirnya) memanggil method unsubscribe nya untuk memberitahukan bahwa itu bisa menghentikan pekerjaannya dan melepaskan sumber dayanya:

worker.unsubscribe();

Scheduler Delayed dan Periodic

Anda juga bisa menggunakan sebuah versi dari schedule yang menunda aksi dari Scheduler sampai melewati jangka waktu tertentu. Contoh berikut menjadwalkan someAction untuk dijalankan pada someScheduler setelah 500ms telah berlalu pada waktu Scheduler tersebut:

someScheduler.schedule(someAction, 500, TimeUnit.MILLISECONDS);

Method Another Scheduler memungkinkan anda untuk menjadwalkan sebuah aksi untuk berjalan pada sebuah interval yang teratur. Contoh berikut menjadwalkan someAction untuk dikerjakan pada someScheduler setelah 500ms telah berlalu, dan kemudian setiap 250ms setelahnya:

someScheduler.schedulePeriodically(someAction, 500, 250, TimeUnit.MILLISECONDS);

Test Scheduler

TestScheduler memungkinkan anda untuk melakukan kontrol manual yang dapat disesuaikan dengan bagaimana waktu Scheduler bekerja. Ini dapat berguna untuk menguji interaksi yang bergantung pada penyusunan aksi sesuai waktunya. Scheduler ini mempunyai tiga method tambahan:

advanceTimeTo(time,unit)
memajukan waktu scheduler ke suatu titik waktu tertentu
advanceTimeBy(time,unit)
advances the Scheduler’s clock forward by a particular amount of time
triggerActions( )
memulai aksi yang belum dimulai yang sudah dijadwalkan pada suatu waktu pada saat sekarang ataupun di masa lalu sesuai waktu Scheduler

Lihat Juga

</div> </div> </div>

Dalam RxJS anda mendapatkan Scheduler dari obyek Rx.Scheduler atau anda juga bisa mengimplementasinya sendiri. Table berikut menunjukkan berbagai jenis Scheduler yang bisa anda pakai di RxJS:

SchedulerTujuan
Rx.Scheduler.currentThreadmenjadwalkan pekerjaan sesegera mungkin pada thread itu juga
Rx.HistoricalSchedulerschedules bekerja seakan-akan telah terjadi pada saat waktu yang tidak tetap
Rx.Scheduler.immediateschedules yang bekerja langsung pada thread saat itu juga
Rx.TestScheduleruntuk unit testing; mengizinkan anda untuk secara manual memanipulasi pergerakan waktu
Rx.Scheduler.timeoutschedules bekerja dengan sebuah callback yang sudah dijadwalkan

Lihat Juga

TBD

TBD

</div>