RxJava2 - intervalle et planificateurs
Disons que j'ai un intervalle, et que je lui ai donné un computationScheduler. Comme ça:
Observable
.interval(0, 1, TimeUnit.SECONDS, computationScheduler)
.flatMap { ... }
Alors, tout ce qui se passe dans flatmap {...} sera-t-il également planifié sur un thread de calcul?
Dans les sources de Observable.interval (long initialDelay, longue période, unité TimeUnit, planificateur de planificateur), il est dit:
* @param scheduler
* the Scheduler on which the waiting happens and items are emitted
En tant que débutant avec RxJava, j'ai du mal à comprendre ce commentaire. Je comprends que la logique de temporisation / d'attente d'intervalle se produit sur le thread de calcul. Mais, est-ce que la dernière partie, sur les éléments émis, signifie également que les éléments émis seront consommés sur le même thread? Ou est-ce qu'un observerOn est nécessaire pour cela? Comme ça:
Observable
.interval(0, 1, TimeUnit.SECONDS, computationScheduler)
.observeOn(computationScheduler)
.flatMap { ... }
Est-ce que observerOn serait nécessaire si je veux que les émissions soient traitées sur le thread de calcul?
Réponses
C'est simple à vérifier: il suffit d'imprimer le thread courant pour voir sur quel thread l'opérateur est exécuté:
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
.flatMap(e -> {
System.out.println("on flatmap: " + Thread.currentThread().getName());
return Observable.just(e).map(x -> "--> " + x);
})
.subscribe(s -> {
System.out.println("on subscribe: " + Thread.currentThread().getName());
System.out.println(s);
});
Cela imprimera toujours:
on subscribe: main
--> 1
on flatmap: main
on subscribe: main
--> 2
on flatmap: main
on subscribe: main
--> 3
on flatmap: main
on subscribe: main
--> 4
on flatmap: main
on subscribe: main
--> 5
on flatmap: main
on subscribe: main
--> 6
on flatmap: main
on subscribe: main
--> 7
on flatmap: main
on subscribe: main
--> 8
on flatmap: main
on subscribe: main
--> 9
Traités séquentiellement car tout se passe dans un seul thread -> main.
observeOn changera le thread d'exécution en aval:
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
.observeOn(Schedulers.computation())
.flatMap(e -> {
System.out.println("on flatmap: " + Thread.currentThread().getName());
return Observable.just(e).map(x -> "--> " + x);
})
.observeOn(Schedulers.io())
.subscribe(s -> {
System.out.println("on subscribe: " + Thread.currentThread().getName());
System.out.println(s);
});
Le résultat cette fois sera différente pour chaque exécution mais flatmapet subscribesera traité dans les discussions diffrent:
on flatmap: RxComputationThreadPool-1
on subscribe: RxCachedThreadScheduler-1
intervalagira comme observeOnet changera le thread d'exécution en aval (ordonnanceur):
Observable.interval(0, 1, TimeUnit.SECONDS, Schedulers.computation())
.flatMap(e -> {
System.out.println("on flatmap: " + Thread.currentThread().getName());
return Observable.just(e).map(x -> "--> " + x);
})
.subscribe(s -> {
System.out.println("on subscribe: " + Thread.currentThread().getName());
System.out.println(s);
});
Cette fois, l'exécution est séquentielle à l'intérieur d'un thread du planificateur de calcul:
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 0
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 1
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 2
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 3
...
intervalutilisera par défaut le planificateur de calcul, vous n'avez pas besoin de le passer en argument et observeOnn'est pas nécessaire