RxJava2 - intervalo e agendadores
Digamos que eu tenha um intervalo e atribuí a ele um computationScheduler. Como isso:
Observable
.interval(0, 1, TimeUnit.SECONDS, computationScheduler)
.flatMap { ... }
Então, tudo o que acontece no mapa plano {...} também será programado em um thread de computação?
Nas fontes de Observable.interval (long initialDelay, long period, unidade TimeUnit, Scheduler scheduler), ele diz:
* @param scheduler
* the Scheduler on which the waiting happens and items are emitted
Como um iniciante no RxJava, estou tendo dificuldade em entender este comentário. Eu entendo que o temporizador de intervalo / lógica de espera ocorre no thread de computação. Mas, a última parte, sobre itens sendo emitidos, também significa que os itens emitidos serão consumidos no mesmo thread? Ou um observeOn é necessário para isso? Como isso:
Observable
.interval(0, 1, TimeUnit.SECONDS, computationScheduler)
.observeOn(computationScheduler)
.flatMap { ... }
Esse observeOn seria necessário se eu quiser que as emissões sejam processadas no thread de computação?
Respostas
Isso é simples de verificar: basta imprimir o thread atual para ver em qual thread o operador é executado:
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
.flatMap(e -> {
System.out.println("on flatmap: " + Thread.currentThread().getName());
return Observable.just(e).map(x -> "--> " + x);
})
.subscribe(s -> {
System.out.println("on subscribe: " + Thread.currentThread().getName());
System.out.println(s);
});
Isso sempre imprimirá:
on subscribe: main
--> 1
on flatmap: main
on subscribe: main
--> 2
on flatmap: main
on subscribe: main
--> 3
on flatmap: main
on subscribe: main
--> 4
on flatmap: main
on subscribe: main
--> 5
on flatmap: main
on subscribe: main
--> 6
on flatmap: main
on subscribe: main
--> 7
on flatmap: main
on subscribe: main
--> 8
on flatmap: main
on subscribe: main
--> 9
Processado sequencialmente porque tudo acontece em um único thread -> main.
observeOn mudará o thread de execução downstream:
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
.observeOn(Schedulers.computation())
.flatMap(e -> {
System.out.println("on flatmap: " + Thread.currentThread().getName());
return Observable.just(e).map(x -> "--> " + x);
})
.observeOn(Schedulers.io())
.subscribe(s -> {
System.out.println("on subscribe: " + Thread.currentThread().getName());
System.out.println(s);
});
O resultado desta vez vai ser diferente para cada execução, mas flatmape subscribeserão processados em fios de diffrent:
on flatmap: RxComputationThreadPool-1
on subscribe: RxCachedThreadScheduler-1
intervalatuará como observeOne mudará o thread de execução downstream (agendador):
Observable.interval(0, 1, TimeUnit.SECONDS, Schedulers.computation())
.flatMap(e -> {
System.out.println("on flatmap: " + Thread.currentThread().getName());
return Observable.just(e).map(x -> "--> " + x);
})
.subscribe(s -> {
System.out.println("on subscribe: " + Thread.currentThread().getName());
System.out.println(s);
});
Desta vez, a execução é sequencial dentro de um thread do agendador de computação:
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 0
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 1
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 2
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 3
...
intervalirá por padrão usar o agendador de computação, você não precisa passá-lo como um argumento e observeOnnão é necessário