RxJava2 - interval dan penjadwal

Aug 24 2020

Katakanlah saya memiliki interval, dan saya telah memberinya computationScheduler. Seperti ini:

Observable
    .interval(0, 1, TimeUnit.SECONDS, computationScheduler)
    .flatMap { ... }

Lalu, apakah semua yang terjadi di flatmap {...} juga akan dijadwalkan pada thread komputasi?

Di sumber untuk Observable.interval (long initialDelay, long period, unit TimeUnit, Scheduler scheduler), dikatakan:

 * @param scheduler
 * the Scheduler on which the waiting happens and items are emitted

Sebagai pemula di RxJava, saya kesulitan memahami komentar ini. Saya memahami bahwa timer interval / logika menunggu terjadi pada thread komputasi. Tapi, apakah bagian terakhir tentang item yang di-emit juga berarti item yang dipancarkan akan dikonsumsi di thread yang sama? Atau apakah suatu observOn diperlukan untuk itu? Seperti ini:

Observable
    .interval(0, 1, TimeUnit.SECONDS, computationScheduler)
    .observeOn(computationScheduler)
    .flatMap { ... }

Apakah observOn itu diperlukan jika saya ingin emit diproses pada utas komputasi?

Jawaban

1 bubbles Aug 24 2020 at 21:34

Ini mudah untuk diverifikasi: cukup cetak utas saat ini untuk melihat utas mana operator dijalankan:

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
    .flatMap(e -> {
        System.out.println("on flatmap: " + Thread.currentThread().getName());
        return Observable.just(e).map(x -> "--> " + x);
    })
    .subscribe(s -> {
        System.out.println("on subscribe: " + Thread.currentThread().getName());
        System.out.println(s);
    });

Ini akan selalu mencetak:

on subscribe: main
--> 1
on flatmap: main
on subscribe: main
--> 2
on flatmap: main
on subscribe: main
--> 3
on flatmap: main
on subscribe: main
--> 4
on flatmap: main
on subscribe: main
--> 5
on flatmap: main
on subscribe: main
--> 6
on flatmap: main
on subscribe: main
--> 7
on flatmap: main
on subscribe: main
--> 8
on flatmap: main
on subscribe: main
--> 9

Diproses secara berurutan karena semua terjadi dalam satu utas -> main.

observeOn akan mengubah thread eksekusi downstream:

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
    .observeOn(Schedulers.computation())
    .flatMap(e -> {
         System.out.println("on flatmap: " + Thread.currentThread().getName());
         return Observable.just(e).map(x -> "--> " + x);
     })
     .observeOn(Schedulers.io())
     .subscribe(s -> {
         System.out.println("on subscribe: " + Thread.currentThread().getName());
         System.out.println(s);
      });

Hasil kali ini akan berbeda untuk setiap eksekusi tetapi flatmapdan subscribeakan diproses dalam rangkaian berbeda:

on flatmap: RxComputationThreadPool-1
on subscribe: RxCachedThreadScheduler-1

intervalakan bertindak sebagai observeOndan mengubah utas eksekusi hilir (penjadwal):

Observable.interval(0, 1, TimeUnit.SECONDS, Schedulers.computation())
    .flatMap(e -> {
        System.out.println("on flatmap: " + Thread.currentThread().getName());
        return Observable.just(e).map(x -> "--> " + x);
    })
    .subscribe(s -> {
        System.out.println("on subscribe: " + Thread.currentThread().getName());
        System.out.println(s);
    });

Kali ini eksekusinya berurutan di dalam satu utas penjadwal komputasi:

on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 0
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 1
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 2
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 3
...

intervalakan secara default menggunakan penjadwal komputasi, Anda tidak perlu meneruskannya sebagai argumen dan observeOntidak diperlukan