RxJava2 - intervallo e pianificatori
Diciamo che ho un intervallo e che gli ho assegnato un computo Scheduler. Come questo:
Observable
.interval(0, 1, TimeUnit.SECONDS, computationScheduler)
.flatMap { ... }
Quindi, tutto ciò che accade in flatmap {...} sarà anche programmato su un thread di calcolo?
Nelle fonti per Observable.interval (long initialDelay, long period, TimeUnit unit, Scheduler scheduler), dice:
* @param scheduler
* the Scheduler on which the waiting happens and items are emitted
Come principiante di RxJava, ho difficoltà a capire questo commento. Capisco che la logica del timer di intervallo / attesa si verifica sul thread di calcolo. Ma l'ultima parte, sugli elementi emessi, significa anche che gli elementi emessi verranno consumati sullo stesso thread? O è richiesto un ObservOn per questo? Come questo:
Observable
.interval(0, 1, TimeUnit.SECONDS, computationScheduler)
.observeOn(computationScheduler)
.flatMap { ... }
Sarebbe necessario osservareOn se desidero che le emissioni vengano elaborate sul thread di calcolo?
Risposte
Questo è semplice da verificare: basta stampare il thread corrente per vedere su quale thread viene eseguito l'operatore:
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
.flatMap(e -> {
System.out.println("on flatmap: " + Thread.currentThread().getName());
return Observable.just(e).map(x -> "--> " + x);
})
.subscribe(s -> {
System.out.println("on subscribe: " + Thread.currentThread().getName());
System.out.println(s);
});
Questo stamperà sempre:
on subscribe: main
--> 1
on flatmap: main
on subscribe: main
--> 2
on flatmap: main
on subscribe: main
--> 3
on flatmap: main
on subscribe: main
--> 4
on flatmap: main
on subscribe: main
--> 5
on flatmap: main
on subscribe: main
--> 6
on flatmap: main
on subscribe: main
--> 7
on flatmap: main
on subscribe: main
--> 8
on flatmap: main
on subscribe: main
--> 9
Elaborato sequenzialmente perché tutto avviene in un unico thread -> main.
observeOn cambierà il thread di esecuzione a valle:
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
.observeOn(Schedulers.computation())
.flatMap(e -> {
System.out.println("on flatmap: " + Thread.currentThread().getName());
return Observable.just(e).map(x -> "--> " + x);
})
.observeOn(Schedulers.io())
.subscribe(s -> {
System.out.println("on subscribe: " + Thread.currentThread().getName());
System.out.println(s);
});
Il risultato questa volta sarà diverso per ogni esecuzione ma flatmape subscribesarà trasformato in fili diffrent:
on flatmap: RxComputationThreadPool-1
on subscribe: RxCachedThreadScheduler-1
intervalagirà come observeOne cambierà il thread di esecuzione a valle (scheduler):
Observable.interval(0, 1, TimeUnit.SECONDS, Schedulers.computation())
.flatMap(e -> {
System.out.println("on flatmap: " + Thread.currentThread().getName());
return Observable.just(e).map(x -> "--> " + x);
})
.subscribe(s -> {
System.out.println("on subscribe: " + Thread.currentThread().getName());
System.out.println(s);
});
Questa volta l'esecuzione è sequenziale all'interno di un thread dello scheduler di calcolo:
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 0
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 1
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 2
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 3
...
intervalper impostazione predefinita utilizzerà lo scheduler di calcolo, non è necessario passarlo come argomento e observeOnnon è necessario