RxJava2 - интервал и планировщики

Aug 24 2020

Скажем, у меня есть интервал, и я дал ему computationScheduler. Как это:

Observable
    .interval(0, 1, TimeUnit.SECONDS, computationScheduler)
    .flatMap { ... }

Тогда будет ли все, что происходит в плоской карте {...}, также быть запланировано в вычислительном потоке?

В источниках для Observable.interval (long initialDelay, long period, TimeUnit unit, Scheduler scheduler) говорится:

 * @param scheduler
 * the Scheduler on which the waiting happens and items are emitted

Как новичок в RxJava, мне трудно понять этот комментарий. Я понимаю, что логика интервального таймера / ожидания выполняется в вычислительном потоке. Но, не последняя часть, о предметах , которые испускаются, также означает , что испускаемые элементы будут потребляться в том же потоке? Или для этого требуется наблюдение? Как это:

Observable
    .interval(0, 1, TimeUnit.SECONDS, computationScheduler)
    .observeOn(computationScheduler)
    .flatMap { ... }

Будет ли необходимо наблюдениеOn, если я хочу, чтобы излучение обрабатывалось в вычислительном потоке?

Ответы

1 bubbles Aug 24 2020 at 21:34

Проверить это просто: просто распечатайте текущий поток, чтобы увидеть, в каком потоке выполняется оператор:

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
    .flatMap(e -> {
        System.out.println("on flatmap: " + Thread.currentThread().getName());
        return Observable.just(e).map(x -> "--> " + x);
    })
    .subscribe(s -> {
        System.out.println("on subscribe: " + Thread.currentThread().getName());
        System.out.println(s);
    });

Это всегда будет печатать:

on subscribe: main
--> 1
on flatmap: main
on subscribe: main
--> 2
on flatmap: main
on subscribe: main
--> 3
on flatmap: main
on subscribe: main
--> 4
on flatmap: main
on subscribe: main
--> 5
on flatmap: main
on subscribe: main
--> 6
on flatmap: main
on subscribe: main
--> 7
on flatmap: main
on subscribe: main
--> 8
on flatmap: main
on subscribe: main
--> 9

Обработка выполняется последовательно, потому что все происходит в одном потоке -> main.

observeOn изменит нисходящий поток выполнения:

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
    .observeOn(Schedulers.computation())
    .flatMap(e -> {
         System.out.println("on flatmap: " + Thread.currentThread().getName());
         return Observable.just(e).map(x -> "--> " + x);
     })
     .observeOn(Schedulers.io())
     .subscribe(s -> {
         System.out.println("on subscribe: " + Thread.currentThread().getName());
         System.out.println(s);
      });

Результат на этот раз будет отличаться для каждого исполнения , но flatmapи subscribeбудет обработан в diffrent нитей:

on flatmap: RxComputationThreadPool-1
on subscribe: RxCachedThreadScheduler-1

intervalбудет действовать как observeOnи изменять нисходящий поток выполнения (планировщик):

Observable.interval(0, 1, TimeUnit.SECONDS, Schedulers.computation())
    .flatMap(e -> {
        System.out.println("on flatmap: " + Thread.currentThread().getName());
        return Observable.just(e).map(x -> "--> " + x);
    })
    .subscribe(s -> {
        System.out.println("on subscribe: " + Thread.currentThread().getName());
        System.out.println(s);
    });

На этот раз выполнение происходит последовательно внутри одного потока планировщика вычислений:

on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 0
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 1
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 2
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 3
...

intervalпо умолчанию будет использовать планировщик вычислений, вам не нужно передавать его в качестве аргумента и observeOnне требуется