RxJava2 - intervalo e agendadores

Aug 24 2020

Digamos que eu tenha um intervalo e atribuí a ele um computationScheduler. Como isso:

Observable
    .interval(0, 1, TimeUnit.SECONDS, computationScheduler)
    .flatMap { ... }

Então, tudo o que acontece no mapa plano {...} também será programado em um thread de computação?

Nas fontes de Observable.interval (long initialDelay, long period, unidade TimeUnit, Scheduler scheduler), ele diz:

 * @param scheduler
 * the Scheduler on which the waiting happens and items are emitted

Como um iniciante no RxJava, estou tendo dificuldade em entender este comentário. Eu entendo que o temporizador de intervalo / lógica de espera ocorre no thread de computação. Mas, a última parte, sobre itens sendo emitidos, também significa que os itens emitidos serão consumidos no mesmo thread? Ou um observeOn é necessário para isso? Como isso:

Observable
    .interval(0, 1, TimeUnit.SECONDS, computationScheduler)
    .observeOn(computationScheduler)
    .flatMap { ... }

Esse observeOn seria necessário se eu quiser que as emissões sejam processadas no thread de computação?

Respostas

1 bubbles Aug 24 2020 at 21:34

Isso é simples de verificar: basta imprimir o thread atual para ver em qual thread o operador é executado:

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
    .flatMap(e -> {
        System.out.println("on flatmap: " + Thread.currentThread().getName());
        return Observable.just(e).map(x -> "--> " + x);
    })
    .subscribe(s -> {
        System.out.println("on subscribe: " + Thread.currentThread().getName());
        System.out.println(s);
    });

Isso sempre imprimirá:

on subscribe: main
--> 1
on flatmap: main
on subscribe: main
--> 2
on flatmap: main
on subscribe: main
--> 3
on flatmap: main
on subscribe: main
--> 4
on flatmap: main
on subscribe: main
--> 5
on flatmap: main
on subscribe: main
--> 6
on flatmap: main
on subscribe: main
--> 7
on flatmap: main
on subscribe: main
--> 8
on flatmap: main
on subscribe: main
--> 9

Processado sequencialmente porque tudo acontece em um único thread -> main.

observeOn mudará o thread de execução downstream:

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
    .observeOn(Schedulers.computation())
    .flatMap(e -> {
         System.out.println("on flatmap: " + Thread.currentThread().getName());
         return Observable.just(e).map(x -> "--> " + x);
     })
     .observeOn(Schedulers.io())
     .subscribe(s -> {
         System.out.println("on subscribe: " + Thread.currentThread().getName());
         System.out.println(s);
      });

O resultado desta vez vai ser diferente para cada execução, mas flatmape subscribeserão processados em fios de diffrent:

on flatmap: RxComputationThreadPool-1
on subscribe: RxCachedThreadScheduler-1

intervalatuará como observeOne mudará o thread de execução downstream (agendador):

Observable.interval(0, 1, TimeUnit.SECONDS, Schedulers.computation())
    .flatMap(e -> {
        System.out.println("on flatmap: " + Thread.currentThread().getName());
        return Observable.just(e).map(x -> "--> " + x);
    })
    .subscribe(s -> {
        System.out.println("on subscribe: " + Thread.currentThread().getName());
        System.out.println(s);
    });

Desta vez, a execução é sequencial dentro de um thread do agendador de computação:

on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 0
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 1
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 2
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 3
...

intervalirá por padrão usar o agendador de computação, você não precisa passá-lo como um argumento e observeOnnão é necessário