RxJava2 - aralık ve zamanlayıcılar
Diyelim ki bir aralığım var ve ona bir computationScheduler verdim. Böyle:
Observable
.interval(0, 1, TimeUnit.SECONDS, computationScheduler)
.flatMap { ... }
O halde, flatmap'te {...} olan her şey bir hesaplama iş parçacığına da programlanacak mı?
Observable.interval (long initialDelay, long period, TimeUnit unit, Scheduler scheduler) kaynaklarında şöyle diyor:
* @param scheduler
* the Scheduler on which the waiting happens and items are emitted
RxJava'ya yeni başlayan biri olarak, bu yorumu anlamakta zorlanıyorum. Zamanlayıcı / bekleme mantığının hesaplama iş parçacığında gerçekleştiğini anlıyorum. Ancak, yayımlanan öğelerle ilgili son kısım, aynı zamanda gönderilen öğelerin aynı iş parçacığı üzerinde tüketileceği anlamına mı geliyor? Yoksa bunun için bir gözlem gerekli mi? Böyle:
Observable
.interval(0, 1, TimeUnit.SECONDS, computationScheduler)
.observeOn(computationScheduler)
.flatMap { ... }
Emitlerin hesaplama iş parçacığında işlenmesini istiyorsam bu observeOn gerekli olur mu?
Yanıtlar
Bunu doğrulamak basittir: operatörün hangi iş parçacığı üzerinde çalıştırıldığını görmek için sadece mevcut iş parçacığını yazdırın:
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
.flatMap(e -> {
System.out.println("on flatmap: " + Thread.currentThread().getName());
return Observable.just(e).map(x -> "--> " + x);
})
.subscribe(s -> {
System.out.println("on subscribe: " + Thread.currentThread().getName());
System.out.println(s);
});
Bu her zaman yazdıracaktır:
on subscribe: main
--> 1
on flatmap: main
on subscribe: main
--> 2
on flatmap: main
on subscribe: main
--> 3
on flatmap: main
on subscribe: main
--> 4
on flatmap: main
on subscribe: main
--> 5
on flatmap: main
on subscribe: main
--> 6
on flatmap: main
on subscribe: main
--> 7
on flatmap: main
on subscribe: main
--> 8
on flatmap: main
on subscribe: main
--> 9
Sırayla işlendi çünkü hepsi tek bir iş parçacığında -> main.
observeOn aşağı akış yürütme iş parçacığını değiştirecek:
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
.observeOn(Schedulers.computation())
.flatMap(e -> {
System.out.println("on flatmap: " + Thread.currentThread().getName());
return Observable.just(e).map(x -> "--> " + x);
})
.observeOn(Schedulers.io())
.subscribe(s -> {
System.out.println("on subscribe: " + Thread.currentThread().getName());
System.out.println(s);
});
Bu kez, her çalıştırma için farklı olacaktır, ancak sonuç flatmapve subscribediffrent parçacığı işlenecektir:
on flatmap: RxComputationThreadPool-1
on subscribe: RxCachedThreadScheduler-1
intervalobserveOnaşağı akış yürütme iş parçacığı olarak hareket edecek ve onu değiştirecektir (zamanlayıcı):
Observable.interval(0, 1, TimeUnit.SECONDS, Schedulers.computation())
.flatMap(e -> {
System.out.println("on flatmap: " + Thread.currentThread().getName());
return Observable.just(e).map(x -> "--> " + x);
})
.subscribe(s -> {
System.out.println("on subscribe: " + Thread.currentThread().getName());
System.out.println(s);
});
Bu sefer yürütme, bir hesaplama zamanlayıcı dizisi içinde sıralıdır:
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 0
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 1
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 2
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 3
...
intervalvarsayılan olarak hesaplama planlayıcısını kullanır, bunu bir argüman olarak iletmeniz observeOngerekmez ve gerekli değildir