RxJava2 - intervallo e pianificatori

Aug 24 2020

Diciamo che ho un intervallo e che gli ho assegnato un computo Scheduler. Come questo:

Observable
    .interval(0, 1, TimeUnit.SECONDS, computationScheduler)
    .flatMap { ... }

Quindi, tutto ciò che accade in flatmap {...} sarà anche programmato su un thread di calcolo?

Nelle fonti per Observable.interval (long initialDelay, long period, TimeUnit unit, Scheduler scheduler), dice:

 * @param scheduler
 * the Scheduler on which the waiting happens and items are emitted

Come principiante di RxJava, ho difficoltà a capire questo commento. Capisco che la logica del timer di intervallo / attesa si verifica sul thread di calcolo. Ma l'ultima parte, sugli elementi emessi, significa anche che gli elementi emessi verranno consumati sullo stesso thread? O è richiesto un ObservOn per questo? Come questo:

Observable
    .interval(0, 1, TimeUnit.SECONDS, computationScheduler)
    .observeOn(computationScheduler)
    .flatMap { ... }

Sarebbe necessario osservareOn se desidero che le emissioni vengano elaborate sul thread di calcolo?

Risposte

1 bubbles Aug 24 2020 at 21:34

Questo è semplice da verificare: basta stampare il thread corrente per vedere su quale thread viene eseguito l'operatore:

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
    .flatMap(e -> {
        System.out.println("on flatmap: " + Thread.currentThread().getName());
        return Observable.just(e).map(x -> "--> " + x);
    })
    .subscribe(s -> {
        System.out.println("on subscribe: " + Thread.currentThread().getName());
        System.out.println(s);
    });

Questo stamperà sempre:

on subscribe: main
--> 1
on flatmap: main
on subscribe: main
--> 2
on flatmap: main
on subscribe: main
--> 3
on flatmap: main
on subscribe: main
--> 4
on flatmap: main
on subscribe: main
--> 5
on flatmap: main
on subscribe: main
--> 6
on flatmap: main
on subscribe: main
--> 7
on flatmap: main
on subscribe: main
--> 8
on flatmap: main
on subscribe: main
--> 9

Elaborato sequenzialmente perché tutto avviene in un unico thread -> main.

observeOn cambierà il thread di esecuzione a valle:

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
    .observeOn(Schedulers.computation())
    .flatMap(e -> {
         System.out.println("on flatmap: " + Thread.currentThread().getName());
         return Observable.just(e).map(x -> "--> " + x);
     })
     .observeOn(Schedulers.io())
     .subscribe(s -> {
         System.out.println("on subscribe: " + Thread.currentThread().getName());
         System.out.println(s);
      });

Il risultato questa volta sarà diverso per ogni esecuzione ma flatmape subscribesarà trasformato in fili diffrent:

on flatmap: RxComputationThreadPool-1
on subscribe: RxCachedThreadScheduler-1

intervalagirà come observeOne cambierà il thread di esecuzione a valle (scheduler):

Observable.interval(0, 1, TimeUnit.SECONDS, Schedulers.computation())
    .flatMap(e -> {
        System.out.println("on flatmap: " + Thread.currentThread().getName());
        return Observable.just(e).map(x -> "--> " + x);
    })
    .subscribe(s -> {
        System.out.println("on subscribe: " + Thread.currentThread().getName());
        System.out.println(s);
    });

Questa volta l'esecuzione è sequenziale all'interno di un thread dello scheduler di calcolo:

on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 0
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 1
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 2
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 3
...

intervalper impostazione predefinita utilizzerà lo scheduler di calcolo, non è necessario passarlo come argomento e observeOnnon è necessario