RxJava2 - intervalo y programadores
Digamos que tengo un intervalo y que le he dado un programador de cálculo. Me gusta esto:
Observable
.interval(0, 1, TimeUnit.SECONDS, computationScheduler)
.flatMap { ... }
Entonces, ¿todo lo que sucede en el mapa plano {...} también se programará en un hilo de cálculo?
En las fuentes de Observable.interval (retraso inicial largo, período largo, unidad TimeUnit, planificador del programador), dice:
* @param scheduler
* the Scheduler on which the waiting happens and items are emitted
Como principiante en RxJava, me cuesta entender este comentario. Entiendo que el temporizador de intervalo / lógica de espera se produce en el hilo de cálculo. Pero, ¿la última parte, sobre los elementos que se emiten, también significa que los elementos emitidos se consumirán en el mismo hilo? ¿O se requiere un observeOn para eso? Me gusta esto:
Observable
.interval(0, 1, TimeUnit.SECONDS, computationScheduler)
.observeOn(computationScheduler)
.flatMap { ... }
¿Sería necesario observarOn si quiero que las emisiones se procesen en el hilo de cálculo?
Respuestas
Esto es fácil de verificar: simplemente imprima el hilo actual para ver en qué hilo se ejecuta el operador:
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
.flatMap(e -> {
System.out.println("on flatmap: " + Thread.currentThread().getName());
return Observable.just(e).map(x -> "--> " + x);
})
.subscribe(s -> {
System.out.println("on subscribe: " + Thread.currentThread().getName());
System.out.println(s);
});
Esto siempre imprimirá:
on subscribe: main
--> 1
on flatmap: main
on subscribe: main
--> 2
on flatmap: main
on subscribe: main
--> 3
on flatmap: main
on subscribe: main
--> 4
on flatmap: main
on subscribe: main
--> 5
on flatmap: main
on subscribe: main
--> 6
on flatmap: main
on subscribe: main
--> 7
on flatmap: main
on subscribe: main
--> 8
on flatmap: main
on subscribe: main
--> 9
Procesado secuencialmente porque todo sucede en un solo hilo -> main.
observeOn cambiará el hilo de ejecución descendente:
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
.observeOn(Schedulers.computation())
.flatMap(e -> {
System.out.println("on flatmap: " + Thread.currentThread().getName());
return Observable.just(e).map(x -> "--> " + x);
})
.observeOn(Schedulers.io())
.subscribe(s -> {
System.out.println("on subscribe: " + Thread.currentThread().getName());
System.out.println(s);
});
El resultado esta vez será diferente para cada ejecución pero flatmapy subscribeserá procesada en hilos diffrent:
on flatmap: RxComputationThreadPool-1
on subscribe: RxCachedThreadScheduler-1
intervalactuará como observeOny cambiará el hilo de ejecución descendente (programador):
Observable.interval(0, 1, TimeUnit.SECONDS, Schedulers.computation())
.flatMap(e -> {
System.out.println("on flatmap: " + Thread.currentThread().getName());
return Observable.just(e).map(x -> "--> " + x);
})
.subscribe(s -> {
System.out.println("on subscribe: " + Thread.currentThread().getName());
System.out.println(s);
});
Esta vez, la ejecución es secuencial dentro de un hilo del programador de cálculo:
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 0
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 1
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 2
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 3
...
intervalutilizará de forma predeterminada el programador de cálculo, no es necesario que lo pase como un argumento y observeOnno es necesario