RxJava2 - Intervall und Scheduler

Aug 24 2020

Angenommen, ich habe ein Intervall und habe ihm einen computationScheduler gegeben. So was:

Observable
    .interval(0, 1, TimeUnit.SECONDS, computationScheduler)
    .flatMap { ... }

Wird dann alles, was in flatmap {...} passiert, auch in einem Berechnungsthread geplant?

In den Quellen für Observable.interval (lange Anfangsverzögerung, langer Zeitraum, TimeUnit-Einheit, Scheduler-Scheduler) heißt es:

 * @param scheduler
 * the Scheduler on which the waiting happens and items are emitted

Als Anfänger von RxJava fällt es mir schwer, diesen Kommentar zu verstehen. Ich verstehe, dass die Intervall-Timer / Wartelogik auf dem Berechnungsthread auftritt. Bedeutet der letzte Teil über die Ausgabe von Elementen auch, dass die ausgegebenen Elemente im selben Thread verbraucht werden ? Oder ist dafür eine Beobachtung erforderlich? So was:

Observable
    .interval(0, 1, TimeUnit.SECONDS, computationScheduler)
    .observeOn(computationScheduler)
    .flatMap { ... }

Wäre diese Beobachtung erforderlich, wenn die Emits im Berechnungsthread verarbeitet werden sollen?

Antworten

1 bubbles Aug 24 2020 at 21:34

Dies ist einfach zu überprüfen: Drucken Sie einfach den aktuellen Thread aus, um zu sehen, auf welchem ​​Thread der Operator ausgeführt wird:

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
    .flatMap(e -> {
        System.out.println("on flatmap: " + Thread.currentThread().getName());
        return Observable.just(e).map(x -> "--> " + x);
    })
    .subscribe(s -> {
        System.out.println("on subscribe: " + Thread.currentThread().getName());
        System.out.println(s);
    });

Dies wird immer gedruckt:

on subscribe: main
--> 1
on flatmap: main
on subscribe: main
--> 2
on flatmap: main
on subscribe: main
--> 3
on flatmap: main
on subscribe: main
--> 4
on flatmap: main
on subscribe: main
--> 5
on flatmap: main
on subscribe: main
--> 6
on flatmap: main
on subscribe: main
--> 7
on flatmap: main
on subscribe: main
--> 8
on flatmap: main
on subscribe: main
--> 9

Nacheinander verarbeitet, da alle in einem einzigen Thread ablaufen -> main.

observeOn ändert den Downstream-Ausführungsthread:

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
    .observeOn(Schedulers.computation())
    .flatMap(e -> {
         System.out.println("on flatmap: " + Thread.currentThread().getName());
         return Observable.just(e).map(x -> "--> " + x);
     })
     .observeOn(Schedulers.io())
     .subscribe(s -> {
         System.out.println("on subscribe: " + Thread.currentThread().getName());
         System.out.println(s);
      });

Das Ergebnis dieses Mal für jede Ausführung wird anders sein , aber flatmapund subscribewird in diffrent Fäden verarbeitet werden:

on flatmap: RxComputationThreadPool-1
on subscribe: RxCachedThreadScheduler-1

intervalfungiert als observeOnund ändert den Downstream-Ausführungsthread (Scheduler):

Observable.interval(0, 1, TimeUnit.SECONDS, Schedulers.computation())
    .flatMap(e -> {
        System.out.println("on flatmap: " + Thread.currentThread().getName());
        return Observable.just(e).map(x -> "--> " + x);
    })
    .subscribe(s -> {
        System.out.println("on subscribe: " + Thread.currentThread().getName());
        System.out.println(s);
    });

Diesmal erfolgt die Ausführung sequentiell in einem Thread des Berechnungsplaners:

on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 0
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 1
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 2
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 3
...

intervalWird standardmäßig der Berechnungsplaner verwendet, müssen Sie ihn nicht als Argument übergeben und observeOnwerden nicht benötigt