RxJava2 - aralık ve zamanlayıcılar

Aug 24 2020

Diyelim ki bir aralığım var ve ona bir computationScheduler verdim. Böyle:

Observable
    .interval(0, 1, TimeUnit.SECONDS, computationScheduler)
    .flatMap { ... }

O halde, flatmap'te {...} olan her şey bir hesaplama iş parçacığına da programlanacak mı?

Observable.interval (long initialDelay, long period, TimeUnit unit, Scheduler scheduler) kaynaklarında şöyle diyor:

 * @param scheduler
 * the Scheduler on which the waiting happens and items are emitted

RxJava'ya yeni başlayan biri olarak, bu yorumu anlamakta zorlanıyorum. Zamanlayıcı / bekleme mantığının hesaplama iş parçacığında gerçekleştiğini anlıyorum. Ancak, yayımlanan öğelerle ilgili son kısım, aynı zamanda gönderilen öğelerin aynı iş parçacığı üzerinde tüketileceği anlamına mı geliyor? Yoksa bunun için bir gözlem gerekli mi? Böyle:

Observable
    .interval(0, 1, TimeUnit.SECONDS, computationScheduler)
    .observeOn(computationScheduler)
    .flatMap { ... }

Emitlerin hesaplama iş parçacığında işlenmesini istiyorsam bu observeOn gerekli olur mu?

Yanıtlar

1 bubbles Aug 24 2020 at 21:34

Bunu doğrulamak basittir: operatörün hangi iş parçacığı üzerinde çalıştırıldığını görmek için sadece mevcut iş parçacığını yazdırın:

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
    .flatMap(e -> {
        System.out.println("on flatmap: " + Thread.currentThread().getName());
        return Observable.just(e).map(x -> "--> " + x);
    })
    .subscribe(s -> {
        System.out.println("on subscribe: " + Thread.currentThread().getName());
        System.out.println(s);
    });

Bu her zaman yazdıracaktır:

on subscribe: main
--> 1
on flatmap: main
on subscribe: main
--> 2
on flatmap: main
on subscribe: main
--> 3
on flatmap: main
on subscribe: main
--> 4
on flatmap: main
on subscribe: main
--> 5
on flatmap: main
on subscribe: main
--> 6
on flatmap: main
on subscribe: main
--> 7
on flatmap: main
on subscribe: main
--> 8
on flatmap: main
on subscribe: main
--> 9

Sırayla işlendi çünkü hepsi tek bir iş parçacığında -> main.

observeOn aşağı akış yürütme iş parçacığını değiştirecek:

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
    .observeOn(Schedulers.computation())
    .flatMap(e -> {
         System.out.println("on flatmap: " + Thread.currentThread().getName());
         return Observable.just(e).map(x -> "--> " + x);
     })
     .observeOn(Schedulers.io())
     .subscribe(s -> {
         System.out.println("on subscribe: " + Thread.currentThread().getName());
         System.out.println(s);
      });

Bu kez, her çalıştırma için farklı olacaktır, ancak sonuç flatmapve subscribediffrent parçacığı işlenecektir:

on flatmap: RxComputationThreadPool-1
on subscribe: RxCachedThreadScheduler-1

intervalobserveOnaşağı akış yürütme iş parçacığı olarak hareket edecek ve onu değiştirecektir (zamanlayıcı):

Observable.interval(0, 1, TimeUnit.SECONDS, Schedulers.computation())
    .flatMap(e -> {
        System.out.println("on flatmap: " + Thread.currentThread().getName());
        return Observable.just(e).map(x -> "--> " + x);
    })
    .subscribe(s -> {
        System.out.println("on subscribe: " + Thread.currentThread().getName());
        System.out.println(s);
    });

Bu sefer yürütme, bir hesaplama zamanlayıcı dizisi içinde sıralıdır:

on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 0
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 1
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 2
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 3
...

intervalvarsayılan olarak hesaplama planlayıcısını kullanır, bunu bir argüman olarak iletmeniz observeOngerekmez ve gerekli değildir