RxJava2 - interval dan penjadwal
Katakanlah saya memiliki interval, dan saya telah memberinya computationScheduler. Seperti ini:
Observable
.interval(0, 1, TimeUnit.SECONDS, computationScheduler)
.flatMap { ... }
Lalu, apakah semua yang terjadi di flatmap {...} juga akan dijadwalkan pada thread komputasi?
Di sumber untuk Observable.interval (long initialDelay, long period, unit TimeUnit, Scheduler scheduler), dikatakan:
* @param scheduler
* the Scheduler on which the waiting happens and items are emitted
Sebagai pemula di RxJava, saya kesulitan memahami komentar ini. Saya memahami bahwa timer interval / logika menunggu terjadi pada thread komputasi. Tapi, apakah bagian terakhir tentang item yang di-emit juga berarti item yang dipancarkan akan dikonsumsi di thread yang sama? Atau apakah suatu observOn diperlukan untuk itu? Seperti ini:
Observable
.interval(0, 1, TimeUnit.SECONDS, computationScheduler)
.observeOn(computationScheduler)
.flatMap { ... }
Apakah observOn itu diperlukan jika saya ingin emit diproses pada utas komputasi?
Jawaban
Ini mudah untuk diverifikasi: cukup cetak utas saat ini untuk melihat utas mana operator dijalankan:
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
.flatMap(e -> {
System.out.println("on flatmap: " + Thread.currentThread().getName());
return Observable.just(e).map(x -> "--> " + x);
})
.subscribe(s -> {
System.out.println("on subscribe: " + Thread.currentThread().getName());
System.out.println(s);
});
Ini akan selalu mencetak:
on subscribe: main
--> 1
on flatmap: main
on subscribe: main
--> 2
on flatmap: main
on subscribe: main
--> 3
on flatmap: main
on subscribe: main
--> 4
on flatmap: main
on subscribe: main
--> 5
on flatmap: main
on subscribe: main
--> 6
on flatmap: main
on subscribe: main
--> 7
on flatmap: main
on subscribe: main
--> 8
on flatmap: main
on subscribe: main
--> 9
Diproses secara berurutan karena semua terjadi dalam satu utas -> main.
observeOn akan mengubah thread eksekusi downstream:
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
.observeOn(Schedulers.computation())
.flatMap(e -> {
System.out.println("on flatmap: " + Thread.currentThread().getName());
return Observable.just(e).map(x -> "--> " + x);
})
.observeOn(Schedulers.io())
.subscribe(s -> {
System.out.println("on subscribe: " + Thread.currentThread().getName());
System.out.println(s);
});
Hasil kali ini akan berbeda untuk setiap eksekusi tetapi flatmapdan subscribeakan diproses dalam rangkaian berbeda:
on flatmap: RxComputationThreadPool-1
on subscribe: RxCachedThreadScheduler-1
intervalakan bertindak sebagai observeOndan mengubah utas eksekusi hilir (penjadwal):
Observable.interval(0, 1, TimeUnit.SECONDS, Schedulers.computation())
.flatMap(e -> {
System.out.println("on flatmap: " + Thread.currentThread().getName());
return Observable.just(e).map(x -> "--> " + x);
})
.subscribe(s -> {
System.out.println("on subscribe: " + Thread.currentThread().getName());
System.out.println(s);
});
Kali ini eksekusinya berurutan di dalam satu utas penjadwal komputasi:
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 0
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 1
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 2
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 3
...
intervalakan secara default menggunakan penjadwal komputasi, Anda tidak perlu meneruskannya sebagai argumen dan observeOntidak diperlukan