RxJava2 - intervalle et planificateurs

Aug 24 2020

Disons que j'ai un intervalle, et que je lui ai donné un computationScheduler. Comme ça:

Observable
    .interval(0, 1, TimeUnit.SECONDS, computationScheduler)
    .flatMap { ... }

Alors, tout ce qui se passe dans flatmap {...} sera-t-il également planifié sur un thread de calcul?

Dans les sources de Observable.interval (long initialDelay, longue période, unité TimeUnit, planificateur de planificateur), il est dit:

 * @param scheduler
 * the Scheduler on which the waiting happens and items are emitted

En tant que débutant avec RxJava, j'ai du mal à comprendre ce commentaire. Je comprends que la logique de temporisation / d'attente d'intervalle se produit sur le thread de calcul. Mais, est-ce que la dernière partie, sur les éléments émis, signifie également que les éléments émis seront consommés sur le même thread? Ou est-ce qu'un observerOn est nécessaire pour cela? Comme ça:

Observable
    .interval(0, 1, TimeUnit.SECONDS, computationScheduler)
    .observeOn(computationScheduler)
    .flatMap { ... }

Est-ce que observerOn serait nécessaire si je veux que les émissions soient traitées sur le thread de calcul?

Réponses

1 bubbles Aug 24 2020 at 21:34

C'est simple à vérifier: il suffit d'imprimer le thread courant pour voir sur quel thread l'opérateur est exécuté:

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
    .flatMap(e -> {
        System.out.println("on flatmap: " + Thread.currentThread().getName());
        return Observable.just(e).map(x -> "--> " + x);
    })
    .subscribe(s -> {
        System.out.println("on subscribe: " + Thread.currentThread().getName());
        System.out.println(s);
    });

Cela imprimera toujours:

on subscribe: main
--> 1
on flatmap: main
on subscribe: main
--> 2
on flatmap: main
on subscribe: main
--> 3
on flatmap: main
on subscribe: main
--> 4
on flatmap: main
on subscribe: main
--> 5
on flatmap: main
on subscribe: main
--> 6
on flatmap: main
on subscribe: main
--> 7
on flatmap: main
on subscribe: main
--> 8
on flatmap: main
on subscribe: main
--> 9

Traités séquentiellement car tout se passe dans un seul thread -> main.

observeOn changera le thread d'exécution en aval:

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
    .observeOn(Schedulers.computation())
    .flatMap(e -> {
         System.out.println("on flatmap: " + Thread.currentThread().getName());
         return Observable.just(e).map(x -> "--> " + x);
     })
     .observeOn(Schedulers.io())
     .subscribe(s -> {
         System.out.println("on subscribe: " + Thread.currentThread().getName());
         System.out.println(s);
      });

Le résultat cette fois sera différente pour chaque exécution mais flatmapet subscribesera traité dans les discussions diffrent:

on flatmap: RxComputationThreadPool-1
on subscribe: RxCachedThreadScheduler-1

intervalagira comme observeOnet changera le thread d'exécution en aval (ordonnanceur):

Observable.interval(0, 1, TimeUnit.SECONDS, Schedulers.computation())
    .flatMap(e -> {
        System.out.println("on flatmap: " + Thread.currentThread().getName());
        return Observable.just(e).map(x -> "--> " + x);
    })
    .subscribe(s -> {
        System.out.println("on subscribe: " + Thread.currentThread().getName());
        System.out.println(s);
    });

Cette fois, l'exécution est séquentielle à l'intérieur d'un thread du planificateur de calcul:

on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 0
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 1
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 2
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 3
...

intervalutilisera par défaut le planificateur de calcul, vous n'avez pas besoin de le passer en argument et observeOnn'est pas nécessaire