RxJava2-間隔とスケジューラ

Aug 24 2020

間隔があり、computationSchedulerを指定したとします。このような:

Observable
    .interval(0, 1, TimeUnit.SECONDS, computationScheduler)
    .flatMap { ... }

次に、フラットマップ{...}で発生するすべてのことは、計算スレッドでもスケジュールされますか?

Observable.interval(long initialDelay、long period、TimeUnit unit、Scheduler Scheduler)のソースでは、次のように述べています。

 * @param scheduler
 * the Scheduler on which the waiting happens and items are emitted

RxJavaの初心者として、私はこのコメントを理解するのに苦労しています。インターバルタイマー/待機ロジックが計算スレッドで発生することを理解しています。しかし、放出されるアイテムに関する最後の部分は、放出されたアイテムが同じスレッドで消費されることも意味しますか?または、そのためにobserveOnが必要ですか?このような:

Observable
    .interval(0, 1, TimeUnit.SECONDS, computationScheduler)
    .observeOn(computationScheduler)
    .flatMap { ... }

エミットを計算スレッドで処理したい場合、そのobserveOnは必要ですか?

回答

1 bubbles Aug 24 2020 at 21:34

これは簡単に確認できます。現在のスレッドを出力して、オペレーターが実行されているスレッドを確認するだけです。

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
    .flatMap(e -> {
        System.out.println("on flatmap: " + Thread.currentThread().getName());
        return Observable.just(e).map(x -> "--> " + x);
    })
    .subscribe(s -> {
        System.out.println("on subscribe: " + Thread.currentThread().getName());
        System.out.println(s);
    });

これは常に印刷されます:

on subscribe: main
--> 1
on flatmap: main
on subscribe: main
--> 2
on flatmap: main
on subscribe: main
--> 3
on flatmap: main
on subscribe: main
--> 4
on flatmap: main
on subscribe: main
--> 5
on flatmap: main
on subscribe: main
--> 6
on flatmap: main
on subscribe: main
--> 7
on flatmap: main
on subscribe: main
--> 8
on flatmap: main
on subscribe: main
--> 9

すべてが単一のスレッドで発生するため、順次処理されます-> main

observeOn ダウンストリーム実行スレッドを変更します:

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
    .observeOn(Schedulers.computation())
    .flatMap(e -> {
         System.out.println("on flatmap: " + Thread.currentThread().getName());
         return Observable.just(e).map(x -> "--> " + x);
     })
     .observeOn(Schedulers.io())
     .subscribe(s -> {
         System.out.println("on subscribe: " + Thread.currentThread().getName());
         System.out.println(s);
      });

その結果、今回は実行ごとに異なるだろうが、flatmapsubscribe切り抜いたスレッドで処理されます。

on flatmap: RxComputationThreadPool-1
on subscribe: RxCachedThreadScheduler-1

intervalobserveOnダウンストリーム実行スレッド(スケジューラー)として機能し、変更します。

Observable.interval(0, 1, TimeUnit.SECONDS, Schedulers.computation())
    .flatMap(e -> {
        System.out.println("on flatmap: " + Thread.currentThread().getName());
        return Observable.just(e).map(x -> "--> " + x);
    })
    .subscribe(s -> {
        System.out.println("on subscribe: " + Thread.currentThread().getName());
        System.out.println(s);
    });

今回の実行は、計算スケジューラの1つのスレッド内で順次実行されます。

on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 0
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 1
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 2
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 3
...

intervalデフォルトでは計算スケジューラを使用します。引数として渡すobserveOn必要はなく、必要ありません。