RxJava - Szybki przewodnik

RxJava to oparte na Javie rozszerzenie ReactiveX. Zapewnia implementację lub projekt ReactiveX w Javie. Poniżej przedstawiono kluczowe cechy RxJava.

  • Rozszerza wzorzec obserwatora.

  • Obsługa sekwencji danych / zdarzeń.

  • Zapewnia operatory do składania sekwencji razem w sposób deklaratywny.

  • Wewnętrznie obsługuje wątki, synchronizację, bezpieczeństwo wątków i współbieżne struktury danych.

Co to jest ReactiveX?

ReactiveX to projekt, którego celem jest dostarczenie koncepcji programowania reaktywnego do różnych języków programowania. Programowanie reaktywne odnosi się do scenariusza, w którym program reaguje, gdy pojawiają się dane. Jest to koncepcja programowania oparta na zdarzeniach i zdarzenia mogą być propagowane do rejestrów obserwatorów.

Zgodnie z Reactive, połączyli najlepsze wzorce obserwatorów, wzorce iteratorów i wzorce funkcjonalne.

Wzorzec Observer wykonany prawidłowo. ReactiveX to połączenie najlepszych pomysłów ze wzorca Observer, wzorca Iterator i programowania funkcjonalnego.

Programowanie funkcjonalne

Programowanie funkcjonalne polega na budowaniu oprogramowania przy użyciu czystych funkcji. Czysta funkcja nie zależy od poprzedniego stanu i zawsze zwraca ten sam wynik dla tych samych przekazanych parametrów. Czyste funkcje pomagają uniknąć problemów związanych ze współdzielonymi obiektami, zmiennymi danymi i efektami ubocznymi często występującymi w środowiskach wielowątkowych.

Programowanie reaktywne

Programowanie reaktywne odnosi się do programowania sterowanego zdarzeniami, w którym strumienie danych przychodzą w sposób asynchroniczny i są przetwarzane w momencie nadejścia.

Funkcjonalne programowanie reaktywne

RxJava realizuje obie koncepcje razem, gdzie dane strumieni zmieniają się w czasie, a funkcja konsumenta odpowiednio reaguje.

Manifest reaktywny

Reactive Manifesto to dokument on-line potwierdzający wysoki standard systemów oprogramowania użytkowego. Zgodnie z manifestem, poniżej przedstawiono kluczowe atrybuty oprogramowania reaktywnego -

  • Responsive - Powinien zawsze reagować w odpowiednim czasie.

  • Message Driven - Powinny używać asynchronicznego przesyłania komunikatów między komponentami, aby zachować luźne sprzężenie.

  • Elastic - Powinien pozostawać responsywny nawet przy dużym obciążeniu.

  • Resilient - Powinien pozostawać responsywny nawet w przypadku awarii któregokolwiek komponentu.

Kluczowe komponenty RxJava

RxJava ma dwa kluczowe komponenty: Observables i Observer.

  • Observable - Reprezentuje obiekt podobny do Stream, który może emitować zero lub więcej danych, może wysyłać komunikat o błędzie, którego prędkość można kontrolować podczas emisji zestawu danych, może przesyłać zarówno skończone, jak i nieskończone dane.

  • Observer- Subskrybuje dane sekwencji Observable i reaguje na pozycję obserwabli. Obserwatorzy są powiadamiani za każdym razem, gdy Observable emituje dane. Obserwator obsługuje dane jeden po drugim.

Obserwator nigdy nie jest powiadamiany, jeśli elementy nie są obecne lub nie zostanie zwrócone wywołanie zwrotne dla poprzedniego elementu.

Konfiguracja środowiska lokalnego

RxJava to biblioteka dla Javy, więc pierwszym wymaganiem jest zainstalowanie JDK na komputerze.

Wymagania systemowe

JDK 1.5 lub nowszy.
Pamięć Brak minimalnych wymagań.
Miejsca na dysku Brak minimalnych wymagań.
System operacyjny Brak minimalnych wymagań.

Krok 1 - Zweryfikuj instalację Java na swoim komputerze

Przede wszystkim otwórz konsolę i wykonaj polecenie java w oparciu o system operacyjny, na którym pracujesz.

OS Zadanie Komenda
Windows Otwórz konsolę poleceń c: \> java -version
Linux Otwórz terminal poleceń $ java -version
Prochowiec Otwórz Terminal maszyna: <joseph $ java -version

Sprawdźmy dane wyjściowe dla wszystkich systemów operacyjnych -

OS Wynik
Windows

wersja java „1.8.0_101”

Java (TM) SE Runtime Environment (kompilacja 1.8.0_101)

Linux

wersja java „1.8.0_101”

Java (TM) SE Runtime Environment (kompilacja 1.8.0_101)

Prochowiec

wersja java „1.8.0_101”

Java (TM) SE Runtime Environment (kompilacja 1.8.0_101)

Jeśli nie masz zainstalowanej Java w swoim systemie, pobierz pakiet Java Software Development Kit (SDK) z poniższego łącza https://www.oracle.com. Zakładamy Java 1.8.0_101 jako zainstalowaną wersję tego samouczka.

Krok 2 - Ustaw środowisko JAVA

Ustaw JAVA_HOMEzmienna środowiskowa wskazująca lokalizację katalogu podstawowego, w którym na komputerze jest zainstalowana Java. Na przykład.

OS Wynik
Windows Ustaw zmienną środowiskową JAVA_HOME na C: \ Program Files \ Java \ jdk1.8.0_101
Linux eksportuj JAVA_HOME = / usr / local / java-current
Prochowiec eksportuj JAVA_HOME = / Library / Java / Home

Dołącz lokalizację kompilatora Java do ścieżki systemowej.

OS Wynik
Windows Dołącz ciąg C:\Program Files\Java\jdk1.8.0_101\bin na końcu zmiennej systemowej, Path.
Linux export PATH = $ PATH: $ JAVA_HOME / bin /
Prochowiec nie wymagane

Sprawdź instalację oprogramowania Java za pomocą polecenia java -version jak wyjaśniono powyżej.

Krok 3 - Pobierz archiwum RxJava2

Pobierz najnowszą wersję pliku jar RxJava z RxJava @ MVNRepository i jego zależności Reactive Streams @ MVNRepository . W chwili pisania tego samouczka pobraliśmy rxjava-2.2.4.jar, reactive-streams-1.0.2.jar i skopiowaliśmy go do folderu C: \> RxJava.

OS Nazwa archiwum
Windows rxjava-2.2.4.jar, reactive-streams-1.0.2.jar
Linux rxjava-2.2.4.jar, reactive-streams-1.0.2.jar
Prochowiec rxjava-2.2.4.jar, reactive-streams-1.0.2.jar

Krok 4 - Ustaw środowisko RxJava

Ustaw RX_JAVAzmienna środowiskowa, aby wskazać lokalizację katalogu podstawowego, w którym jest przechowywany plik jar RxJava na komputerze. Załóżmy, że zapisaliśmy rxjava-2.2.4.jar i reactive-streams-1.0.2.jar w folderze RxJava.

Sr.No System operacyjny i opis
1

Windows

Ustaw zmienną środowiskową RX_JAVA na C: \ RxJava

2

Linux

eksportuj RX_JAVA = / usr / local / RxJava

3

Mac

eksportuj RX_JAVA = / Library / RxJava

Krok 5 - Ustaw zmienną CLASSPATH

Ustaw CLASSPATH zmienna środowiskowa wskazująca lokalizację jar RxJava.

Sr.No System operacyjny i opis
1

Windows

Ustaw zmienną środowiskową CLASSPATH na% CLASSPATH%;% RX_JAVA% \ rxjava-2.2.4.jar;% RX_JAVA% \ reactive-streams-1.0.2.jar;.;

2

Linux

export CLASSPATH = $ CLASSPATH: $ RX_JAVA / rxjava-2.2.4.jar: reactive-streams-1.0.2.jar :.

3

Mac

export CLASSPATH = $ CLASSPATH: $ RX_JAVA / rxjava-2.2.4.jar: reactive-streams-1.0.2.jar :.

Krok 6 - Przetestuj konfigurację RxJava

Utwórz klasę TestRx.java, jak pokazano poniżej -

import io.reactivex.Flowable;
public class TestRx {
   public static void main(String[] args) {
      Flowable.just("Hello World!")
         .subscribe(System.out::println);
   }
}

Krok 7 - Sprawdź wynik

Skompiluj klasy przy użyciu javac kompilator w następujący sposób -

C:\RxJava>javac Tester.java

Sprawdź dane wyjściowe.

Hello World!

Observables reprezentuje źródła danych, gdzie as Observers (Subscribers)Słuchaj ich. W skrócie, Obserwowalny emituje przedmioty, a subskrybent następnie je zużywa.

Zauważalny

  • Observable udostępnia dane, gdy subskrybent zacznie nasłuchiwać.

  • Observable może wyemitować dowolną liczbę elementów.

  • Observable może emitować tylko sygnał zakończenia, a także bez elementu.

  • Observable może zakończyć się pomyślnie.

  • Observable może nigdy się nie zakończyć. np. przycisk można kliknąć dowolną liczbę razy.

  • Observable może zgłosić błąd w dowolnym momencie.

Abonent

  • Observable może mieć wielu subskrybentów.

  • Gdy Observable emituje element, wywoływana jest każda metoda onNext () subskrybenta.

  • Gdy Observable zakończy emitowanie elementów, wywoływana jest każda metoda onComplete () subskrybenta.

  • Jeśli Observable emituje błąd, wywoływana jest każda metoda onError () subskrybenta.

Poniżej przedstawiono klasy bazowe do tworzenia obserwabli.

  • Flowable- Przepływy 0..N, Emituje 0 lub n elementów. Obsługuje strumienie reaktywne i ciśnienie wsteczne.

  • Observable - Przepływy 0..N, ale bez przeciwciśnienia.

  • Single- 1 przedmiot lub błąd. Może być traktowany jako reaktywna wersja wywołania metody.

  • Completable- Nie wyemitowano żadnego elementu. Używany jako sygnał zakończenia lub błędu. Może być traktowany jako reaktywna wersja Runnable.

  • MayBe- Nie wyemitowano żadnego elementu lub 1 element został wyemitowany. Może być traktowany jako reaktywna wersja Optional.

Poniżej przedstawiono wygodne metody tworzenia obserwabli w klasie Observable.

  • just(T item) - Zwraca Observable, który sygnalizuje podaną (stałą referencję) element, a następnie kończy.

  • fromIterable(Iterable source) - Konwertuje iterowalną sekwencję na ObservableSource, która emituje elementy w sekwencji.

  • fromArray(T... items) - Konwertuje Array na ObservableSource, który emituje elementy w Array.

  • fromCallable(Callable supplier) - Zwraca Observable, który, gdy obserwator zasubskrybuje ją, wywołuje określoną funkcję, a następnie emituje wartość zwróconą przez tę funkcję.

  • fromFuture(Future future) - Konwertuje przyszłość na ObservableSource.

  • interval(long initialDelay, long period, TimeUnit unit) - Zwraca Observable, który emituje 0L po początkowym opóźnieniu i coraz większe liczby po każdym następnym okresie.

Klasa Single reprezentuje pojedynczą wartość odpowiedzi. Pojedyncza obserwowalna może emitować tylko pojedynczą pomyślną wartość lub błąd. Nie emituje zdarzenia onComplete.

Deklaracja klasy

Poniżej znajduje się deklaracja dla io.reactivex.Single<T> klasa -

public abstract class Single<T>
   extends Object
      implements SingleSource<T>

Protokół

Poniżej znajduje się protokół sekwencyjny, na którym działa Single Observable -

onSubscribe (onSuccess | onError)?

Pojedynczy przykład

Utwórz następujący program w języku Java, używając dowolnego wybranego edytora, na przykład w C: \> RxJava.

ObservableTester.java

import java.util.concurrent.TimeUnit;

import io.reactivex.Single;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableSingleObserver;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args)  throws InterruptedException {
      //Create the observable
      Single<String> testSingle = Single.just("Hello World");

      //Create an observer
      Disposable disposable = testSingle
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .subscribeWith(
         new DisposableSingleObserver<String>() {

         @Override
         public void onError(Throwable e) { 
            e.printStackTrace();
         }

         @Override
         public void onSuccess(String value) {
            System.out.println(value);
         }
      }); 
      Thread.sleep(3000);
      //start observing
      disposable.dispose();
   }
}

Sprawdź wynik

Skompiluj klasę przy użyciu javac kompilator w następujący sposób -

C:\RxJava>javac ObservableTester.java

Teraz uruchom ObservableTester w następujący sposób -

C:\RxJava>java ObservableTester

Powinien dać następujący wynik -

Hello World

Klasa MayBe reprezentuje odroczoną odpowiedź. MayBe obserowalne może emitować pojedynczą pomyślną wartość lub nie emitować żadnej wartości.

Deklaracja klasy

Poniżej znajduje się deklaracja dla io.reactivex.Single<T> klasa -

public abstract class Maybe<T>
   extends Object
      implements MaybeSource<T>

Protokół

Poniżej znajduje się protokół sekwencyjny, na którym działa MayBe Observable -

onSubscribe (onSuccess | onError | OnComplete)?

Przykład MayBe

Utwórz następujący program w języku Java, używając dowolnego wybranego edytora, na przykład w C: \> RxJava.

ObservableTester.java

import java.util.concurrent.TimeUnit;

import io.reactivex.Maybe;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableMaybeObserver;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester {
   public static void main(String[] args) throws InterruptedException {
      //Create an observer
      Disposable disposable = Maybe.just("Hello World")
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .subscribeWith(new DisposableMaybeObserver<String>() {
         @Override
         public void onError(Throwable e) { 
            e.printStackTrace();
         }

         @Override
         public void onSuccess(String value) {
            System.out.println(value);
         }

         @Override
         public void onComplete() {
            System.out.println("Done!");
         }
      }); 
      Thread.sleep(3000);
      //start observing
      disposable.dispose();
   }
}

Sprawdź wynik

Skompiluj klasę przy użyciu javac kompilator w następujący sposób -

C:\RxJava>javac ObservableTester.java

Teraz uruchom ObservableTester w następujący sposób -

C:\RxJava>java ObservableTester

Powinien dać następujący wynik -

Hello World

Klasa Completable reprezentuje odroczoną odpowiedź. Zaobserwowalne do ukończenia mogą wskazywać na pomyślne zakończenie lub błąd.

Deklaracja klasy

Poniżej znajduje się deklaracja dla io.reactivex.Completable klasa -

public abstract class Completable
extends Object
implements CompletableSource

Protokół

Poniżej znajduje się protokół sekwencyjny, w którym działa Completable Observable -

onSubscribe (onError | onComplete)?

Kompletny przykład

Utwórz następujący program w języku Java, używając dowolnego wybranego edytora, na przykład w C: \> RxJava.

ObservableTester.java

import java.util.concurrent.TimeUnit;

import io.reactivex.Completable;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableCompletableObserver;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester {
   public static void main(String[] args) throws InterruptedException {

      //Create an observer
      Disposable disposable = Completable.complete()
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .subscribeWith(new DisposableCompletableObserver() {
         @Override
         public void onError(Throwable e) { 
            e.printStackTrace();
         }
         @Override
         public void onStart() {
            System.out.println("Started!");
         }
         @Override
         public void onComplete() {
            System.out.println("Done!");
         }
      }); 
      Thread.sleep(3000);
      //start observing
      disposable.dispose();
   }
}

Sprawdź wynik

Skompiluj klasę przy użyciu javac kompilator w następujący sposób -

C:\RxJava>javac ObservableTester.java

Teraz uruchom ObservableTester w następujący sposób -

C:\RxJava>java ObservableTester

Powinien dać następujący wynik -

Started!
Done!

Klasa CompositeDisposable reprezentuje kontener, który może pomieścić wiele elementów jednorazowego użytku i oferuje złożoność O (1) dodawania i usuwania elementów jednorazowego użytku.

Deklaracja klasy

Poniżej znajduje się deklaracja dla io.reactivex.disposables.CompositeDisposable klasa -

public final class CompositeDisposable
extends Object
implements Disposable, io.reactivex.internal.disposables.DisposableContainer

Przykład CompositeDisposable

Utwórz następujący program w języku Java, używając dowolnego wybranego edytora, na przykład w C: \> RxJava.

ObservableTester.java

import io.reactivex.Maybe;
import io.reactivex.Single;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableMaybeObserver;
import io.reactivex.observers.DisposableSingleObserver;
import io.reactivex.schedulers.Schedulers;

import java.util.concurrent.TimeUnit;

public class ObservableTester  {
   public static void main(String[] args)  throws InterruptedException {    
      CompositeDisposable compositeDisposable = new CompositeDisposable();

      //Create an Single observer 
      Disposable disposableSingle = Single.just("Hello World")
      .delay(2, TimeUnit.SECONDS, Schedulers.io())
      .subscribeWith(
      new DisposableSingleObserver<String>() {
         @Override
         public void onError(Throwable e) {
            e.printStackTrace();
         }

         @Override
         public void onSuccess(String value) {
            System.out.println(value);
         }
      }); 

      //Create an observer
      Disposable disposableMayBe = Maybe.just("Hi")
      .delay(2, TimeUnit.SECONDS, Schedulers.io())
      .subscribeWith(new DisposableMaybeObserver<String>() {
         @Override
         public void onError(Throwable e) { 
            e.printStackTrace();
         }

         @Override
         public void onSuccess(String value) {
            System.out.println(value);
         }

         @Override
         public void onComplete() {
            System.out.println("Done!");
         }
      }); 

      Thread.sleep(3000);

      compositeDisposable.add(disposableSingle);
      compositeDisposable.add(disposableMayBe);

      //start observing
      compositeDisposable.dispose();
   }
}

Sprawdź wynik

Skompiluj klasę przy użyciu javac kompilator w następujący sposób -

C:\RxJava>javac ObservableTester.java

Teraz uruchom ObservableTester w następujący sposób -

C:\RxJava>java ObservableTester

Powinien dać następujący wynik -

Hello World
Hi

Poniżej znajdują się operatory, które są używane do tworzenia Observable.

Sr.No. Operator i opis
1

Create

Tworzy Observable od podstaw i umożliwia programowe wywoływanie metody obserwatora.

2

Defer

Nie twórz Observable, dopóki obserwator nie zasubskrybuje. Tworzy nowe obserwowalne dla każdego obserwatora.

3

Empty/Never/Throw

Tworzy Observable z ograniczonym zachowaniem.

4

From

Konwertuje strukturę obiektu / danych na Observable.

5

Interval

Tworzy obserwowalne emitujące liczby całkowite w kolejności z przerwą o określonym przedziale czasu.

6

Just

Konwertuje strukturę obiektu / danych na Observable, aby emitować ten sam lub ten sam typ obiektów.

7

Range

Tworzy Observable emitujące liczby całkowite w sekwencji podanego zakresu.

8

Repeat

Tworzy Observable emitując liczby całkowite wielokrotnie.

9

Start

Tworzy Observable, aby emitować wartość zwracaną przez funkcję.

10

Timer

Tworzy Observable, aby wyemitować pojedynczy element po określonym opóźnieniu.

Tworzenie przykładu operatora

Utwórz następujący program w języku Java, używając dowolnego wybranego edytora, na przykład w C: \> RxJava.

ObservableTester.java

import io.reactivex.Observable;
//Using fromArray operator to create an Observable
public class ObservableTester  {
   public static void main(String[] args) { 
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable = Observable.fromArray(letters);
      observable
         .map(String::toUpperCase)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

Sprawdź wynik

Skompiluj klasę przy użyciu javac kompilator w następujący sposób -

C:\RxJava>javac ObservableTester.java

Teraz uruchom ObservableTester w następujący sposób -

C:\RxJava>java ObservableTester

Powinien dać następujący wynik -

ABCDEFG

Poniżej znajdują się operatory, które są używane do przekształcania elementu wyemitowanego z Observable.

Sr.No. Operator i opis
1

Buffer

Gromadzi przedmioty z Observable w pakietach okresowo, a następnie emituje pakiety, a nie przedmioty.

2

FlatMap

Używane w zagnieżdżonych obserwabli. Przekształca przedmioty w Observables. Następnie spłaszcz elementy w jeden Observable.

3

GroupBy

Podziel Observable na zbiór Observables zorganizowanych według klucza, aby wyemitować inną grupę elementów.

4

Map

Zastosuj funkcję do każdego emitowanego elementu, aby go przekształcić.

5

Scan

Zastosuj funkcję do każdego emitowanego elementu, sekwencyjnie, a następnie emituj kolejną wartość.

6

Window

Zbiera elementy z okien Observable do Observable okresowo, a następnie emituje okna zamiast elementów.

Przykład transformacji operatora

Utwórz następujący program w języku Java, używając dowolnego wybranego edytora, na przykład w C: \> RxJava.

ObservableTester.java

import io.reactivex.Observable;
//Using map operator to transform an Observable
public class ObservableTester  { 
   public static void main(String[] args) {    
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable = Observable.fromArray(letters);
      observable
         .map(String::toUpperCase)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

Sprawdź wynik

Skompiluj klasę przy użyciu javac kompilator w następujący sposób -

C:\RxJava>javac ObservableTester.java

Teraz uruchom ObservableTester w następujący sposób -

C:\RxJava>java ObservableTester

Powinien dać następujący wynik -

ABCDEFG

Poniżej znajdują się operatory, które są używane do selektywnego emitowania elementu (ów) z Observable.

Sr.No. Operator i opis
1

Debounce

Emituje elementy tylko po przekroczeniu limitu czasu bez emitowania innego elementu.

2

Distinct

Emituje tylko unikalne przedmioty.

3

ElementAt

emituje tylko element o indeksie n emitowanym przez Observable.

4

Filter

Emituje tylko te elementy, które spełniają daną funkcję predykatu.

5

First

Emituje pierwszy element lub pierwszy element, który spełnił podane kryteria.

6

IgnoreElements

Nie emituje żadnych przedmiotów z Observable, ale zaznacza ukończenie.

7

Last

Emituje ostatni element z Observable.

8

Sample

Emituje najnowszy element w podanym przedziale czasu.

9

Skip

Pomija pierwsze n elementów z Observable.

10

SkipLast

Pomija ostatnie n elementów z Observable.

11

Take

pobiera pierwsze n elementów z Observable.

12

TakeLast

pobiera ostatnie n elementów z Observable.

Przykład operatora filtrującego

Utwórz następujący program w języku Java, używając dowolnego wybranego edytora, na przykład w C: \> RxJava.

ObservableTester.java

import io.reactivex.Observable;
//Using take operator to filter an Observable
public class ObservableTester  {
   public static void main(String[] args) {    
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable = Observable.fromArray(letters);
      observable
         .take(2)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

Sprawdź wynik

Skompiluj klasę przy użyciu javac kompilator w następujący sposób -

C:\RxJava>javac ObservableTester.java

Teraz uruchom ObservableTester w następujący sposób -

C:\RxJava>java ObservableTester

Powinien dać następujący wynik -

ab

Poniżej znajdują się operatory, które są używane do tworzenia pojedynczego Observable z wielu Observable.

Sr.No. Operator i opis
1 And/Then/When

Połącz zestawy przedmiotów za pomocą pośredników wzorca i planu.

2 CombineLatest

Połącz najnowszy element wyemitowany przez każdy Observable za pośrednictwem określonej funkcji i wyemituj wynikowy element.

3 Join

Połącz pozycje wyemitowane przez dwa Observables, jeśli wyemitowano w ramach czasowych drugiej Obserwowalnej emitowanej pozycji.

4 Merge

Łączy wyemitowane elementy Observables.

5 StartWith

Emituj określoną sekwencję elementów przed rozpoczęciem emitowania elementów ze źródła Observable

6 Switch

Emituje najnowsze elementy emitowane przez Observables.

7 Zip

Łączy elementy Observables na podstawie funkcji i emituje wynikowe elementy.

Przykład łączenia operatorów

Utwórz następujący program w języku Java, używając dowolnego wybranego edytora, na przykład w C: \> RxJava.

ObservableTester.java

import io.reactivex.Observable;
//Using combineLatest operator to combine Observables
public class ObservableTester {
   public static void main(String[] args) {    
      Integer[] numbers = { 1, 2, 3, 4, 5, 6};
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable1 = Observable.fromArray(letters);
      Observable<Integer> observable2 = Observable.fromArray(numbers);
      Observable.combineLatest(observable1, observable2, (a,b) -> a + b)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

Sprawdź wynik

Skompiluj klasę przy użyciu javac kompilator w następujący sposób -

C:\RxJava>javac ObservableTester.java

Teraz uruchom ObservableTester w następujący sposób -

C:\RxJava>java ObservableTester

Powinien dać następujący wynik -

g1g2g3g4g5g6

Poniżej znajdują się operatory, które są często przydatne w przypadku Observables.

Sr.No. Operator i opis
1

Delay

Zarejestruj akcję, aby obsłużyć obserwowalne zdarzenia cyklu życia.

2

Materialize/Dematerialize

Reprezentuje wysłany element i wysłane powiadomienie.

3

ObserveOn

Określ planistę, który ma być obserwowany.

4

Serialize

Wymuś obserwowalne, aby wykonywać szeregowane wywołania.

5

Subscribe

Działaj na emisjach przedmiotów i powiadomień, takich jak kompletne z Obserwowalnego

6

SubscribeOn

Określ harmonogram, który ma być używany przez Observable, gdy jest subskrybowany.

7

TimeInterval

Konwertuj obserwowalne, aby emitować wskazania czasu, jaki upłynął między emisjami.

8

Timeout

Wysyła powiadomienie o błędzie, jeśli określony czas wystąpi bez emisji żadnego elementu.

9

Timestamp

Dołącz sygnaturę czasową do każdego emitowanego elementu.

9

Using

Tworzy jednorazowy zasób lub taką samą żywotność, jak Observable.

Przykład operatora mediów

Utwórz następujący program w języku Java, używając dowolnego wybranego edytora, na przykład w C: \> RxJava.

ObservableTester.java

import io.reactivex.Observable;
//Using subscribe operator to subscribe to an Observable
public class ObservableTester  {
   public static void main(String[] args) {    
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable = Observable.fromArray(letters);
      observable.subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

Sprawdź wynik

Skompiluj klasę przy użyciu javac kompilator w następujący sposób -

C:\RxJava>javac ObservableTester.java

Teraz uruchom ObservableTester w następujący sposób -

C:\RxJava>java ObservableTester

Powinien dać następujący wynik -

abcdefg

Poniżej znajdują się operatory, które oceniają jeden lub wiele wyemitowanych Observables lub elementów.

Sr.No. Operator i opis
1

All

Ocenia wszystkie wyemitowane elementy spełniające podane kryteria.

2

Amb

Emituje wszystkie przedmioty z pierwszego Observable tylko z wieloma Observable.

3

Contains

Sprawdza, czy Observable emituje określony element, czy nie.

4

DefaultIfEmpty

Emituje element domyślny, jeśli Observable niczego nie emituje.

5

SequenceEqual

Sprawdza, czy dwa Observables emitują tę samą sekwencję elementów.

6

SkipUntil

Odrzuca elementy emitowane przez pierwszy Observable, dopóki drugi Observable nie wyemituje elementu.

7

SkipWhile

Odrzuć elementy emitowane przez Observable, dopóki dany warunek nie stanie się fałszywy.

8

TakeUntil

Odrzuca elementy emitowane przez Observable po tym, jak druga Observable emituje element lub kończy działanie.

9

TakeWhile

Odrzuć elementy emitowane przez Observable po tym, jak określony warunek stanie się fałszywy.

Przykład operatora warunkowego

Utwórz następujący program w języku Java, używając dowolnego wybranego edytora, na przykład w C: \> RxJava.

ObservableTester.java

import io.reactivex.Observable;
//Using defaultIfEmpty operator to operate on an Observable
public class ObservableTester  {
   public static void main(String[] args) {    
      final StringBuilder result = new StringBuilder();
      Observable.empty()
      .defaultIfEmpty("No Data")
      .subscribe(s -> result.append(s));
      System.out.println(result);
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result1 = new StringBuilder();
      Observable.fromArray(letters)
      .firstElement()
      .defaultIfEmpty("No data")   
      .subscribe(s -> result1.append(s));
      System.out.println(result1);
   }
}

Sprawdź wynik

Skompiluj klasę przy użyciu javac kompilator w następujący sposób -

C:\RxJava>javac ObservableTester.java

Teraz uruchom ObservableTester w następujący sposób -

C:\RxJava>java ObservableTester

Powinien dać następujący wynik -

No Data
a

Poniżej znajdują się operatory, które działają na całych elementach emitowanych przez Observable.

Sr.No. Operator i opis
1

Average

Oblicza średnie ze wszystkich pozycji i emituje wynik.

2

Concat

Emituje wszystkie elementy z wielu Observable bez przeplotu.

3

Count

Zlicza wszystkie elementy i emituje wynik.

4

Max

Ocenia pozycję o maksymalnej wartości ze wszystkich pozycji i emituje wynik.

5

Min

Ocenia najmniej wartościowy element wszystkich elementów i emituje wynik.

6

Reduce

Zastosuj funkcję do każdego elementu i zwróć wynik.

7

Sum

Oblicza sumę wszystkich elementów i emituje wynik.

Przykład operatora matematycznego

Utwórz następujący program w języku Java, używając dowolnego wybranego edytora, na przykład w C: \> RxJava.

ObservableTester.java

import io.reactivex.Observable;
//Using concat operator to operate on multiple Observables
public class ObservableTester  {
   public static void main(String[] args)  throws InterruptedException {    
      Integer[] numbers = { 1, 2, 3, 4, 5, 6};
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable1 = Observable.fromArray(letters);
      Observable<Integer> observable2 = Observable.fromArray(numbers);
      Observable.concat(observable1, observable2)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

Sprawdź wynik

Skompiluj klasę przy użyciu javac kompilator w następujący sposób -

C:\RxJava>javac ObservableTester.java

Teraz uruchom ObservableTester w następujący sposób -

C:\RxJava>java ObservableTester

Powinien dać następujący wynik -

abcdefg123456

Poniżej znajdują się operatorzy, którzy dokładniej kontrolują abonament.

Sr.No. Operator i opis
1

Connect

Poinstruuj podłączalnego Observable, aby emitował elementy do swoich subskrybentów.

2

Publish

Konwertuje Observable na Connectable Observable.

3

RefCount

Konwertuje możliwe do podłączenia Observable na zwykłe Observable.

4

Replay

Upewnij się, że ta sama sekwencja emitowanych elementów jest widoczna dla każdego subskrybenta, nawet po tym, jak Observable zaczął emitować elementy, a subskrybenci subskrybują później.

Przykład operatora z możliwością podłączenia

Utwórz następujący program w języku Java, używając dowolnego wybranego edytora, na przykład w C: \> RxJava.

ObservableTester.java

import io.reactivex.Observable;
import io.reactivex.observables.ConnectableObservable;
//Using connect operator on a ConnectableObservable
public class ObservableTester {
   public static void main(String[] args) {
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      ConnectableObservable<String> connectable = Observable.fromArray(letters).publish();      
      connectable.subscribe(letter -> result.append(letter));
      System.out.println(result.length());
      connectable.connect();
      System.out.println(result.length());
      System.out.println(result);
   }
}

Sprawdź wynik

Skompiluj klasę przy użyciu javac kompilator w następujący sposób -

C:\RxJava>javac ObservableTester.java

Teraz uruchom ObservableTester w następujący sposób -

C:\RxJava>java ObservableTester

Powinien dać następujący wynik -

0
7
abcdefg

Zgodnie z Reactive, podmiot może działać zarówno jako obserwowalny, jak i obserwator.

Temat jest rodzajem mostu lub serwera proxy, który jest dostępny w niektórych implementacjach ReactiveX i działa zarówno jako obserwator, jak i jako obserwowalny. Ponieważ jest obserwatorem, może subskrybować jeden lub więcej Observable, a ponieważ jest Observable, może przechodzić przez obserwowane elementy, ponownie je emitując, a także może emitować nowe elementy.

Istnieją cztery rodzaje przedmiotów -

Sr.No. Opis tematu
1

Publish Subject

Emituje tylko te elementy, które są emitowane po czasie subskrypcji.

2 Replay Subject

Emituje wszystkie elementy emitowane przez źródło Observable, niezależnie od tego, kiedy subskrybuje Observable.

3

Behavior Subject

Po subskrypcji emituje najnowszy element, a następnie kontynuuje emitowanie elementu emitowanego przez źródło Observable.

4

Async Subject

Emituje ostatni element emitowany przez źródło Observable po zakończeniu emisji.

PublishSubject wysyła elementy do aktualnie zasubskrybowanych obserwatorów i zdarzenia końcowe do obecnych lub późnych obserwatorów.

Deklaracja klasy

Poniżej znajduje się deklaracja dla io.reactivex.subjects.PublishSubject<T> klasa -

public final class PublishSubject<T>
extends Subject<T>

Przykład publikacji PublishSubject

Utwórz następujący program w języku Java, używając dowolnego wybranego edytora, na przykład w C: \> RxJava.

ObservableTester.java

import io.reactivex.subjects.PublishSubject;
public class ObservableTester  {
   public static void main(String[] args) {   
      final StringBuilder result1 = new StringBuilder();
      final StringBuilder result2 = new StringBuilder();         

      PublishSubject<String> subject = PublishSubject.create(); 
      subject.subscribe(value -> result1.append(value) ); 
      subject.onNext("a"); 
      subject.onNext("b"); 
      subject.onNext("c"); 
      subject.subscribe(value -> result2.append(value)); 
      subject.onNext("d"); 
      subject.onComplete();

      //Output will be abcd 
      System.out.println(result1);
      //Output will be d only
      //as subscribed after c item emitted.
      System.out.println(result2);
   }
}

Sprawdź wynik

Skompiluj klasę przy użyciu javac kompilator w następujący sposób -

C:\RxJava>javac ObservableTester.java

Teraz uruchom ObservableTester w następujący sposób -

C:\RxJava>java ObservableTester

Powinien dać następujący wynik -

abcd
d

BehaviorSubject emituje najnowszą obserwowaną pozycję, a następnie wszystkie kolejne obserwowane pozycje do każdego subskrybowanego obserwatora.

Deklaracja klasy

Poniżej znajduje się deklaracja dla io.reactivex.subjects.BehaviorSubject<T> klasa -

public final class BehaviorSubject<T>
extends Subject<T>

Przykład BehaviorSubject

Utwórz następujący program w języku Java, używając dowolnego wybranego edytora, na przykład w C: \> RxJava.

ObservableTester.java

import io.reactivex.subjects.BehaviorSubject;
public class ObservableTester  {
   public static void main(String[] args) {   
      final StringBuilder result1 = new StringBuilder();
      final StringBuilder result2 = new StringBuilder();         
      BehaviorSubject<String> subject =  BehaviorSubject.create(); 
      subject.subscribe(value -> result1.append(value) ); 
      subject.onNext("a"); 
      subject.onNext("b"); 
      subject.onNext("c"); 
      subject.subscribe(value -> result2.append(value)); 
      subject.onNext("d"); 
      subject.onComplete();
      //Output will be abcd
      System.out.println(result1);
      //Output will be cd being BehaviorSubject 
      //(c is last item emitted before subscribe)
      System.out.println(result2);
   }
}

Sprawdź wynik

Skompiluj klasę przy użyciu javac kompilator w następujący sposób -

C:\RxJava>javac ObservableTester.java

Teraz uruchom ObservableTester w następujący sposób -

C:\RxJava>java ObservableTester

Powinien dać następujący wynik -

abcd
cd

ReplaySubject odtwarza zdarzenia / elementy obecnym i późnym Obserwatorom.

Deklaracja klasy

Poniżej znajduje się deklaracja dla io.reactivex.subjects.ReplaySubject<T> klasa -

public final class ReplaySubject<T>
extends Subject<T>

Przykład ReplaySubject

Utwórz następujący program w języku Java, używając dowolnego wybranego edytora, na przykład w C: \> RxJava.

ObservableTester.java

import io.reactivex.subjects.ReplaySubject;
public class ObservableTester  {
   public static void main(String[] args) {   
      final StringBuilder result1 = new StringBuilder();
      final StringBuilder result2 = new StringBuilder();         

      ReplaySubject<String> subject = ReplaySubject.create(); 
      subject.subscribe(value -> result1.append(value) ); 
      subject.onNext("a"); 
      subject.onNext("b"); 
      subject.onNext("c"); 
      subject.subscribe(value -> result2.append(value)); 
      subject.onNext("d"); 
      subject.onComplete();

      //Output will be abcd
      System.out.println(result1);
      //Output will be abcd being ReplaySubject
      //as ReplaySubject emits all the items
      System.out.println(result2);
   }
}

Sprawdź wynik

Skompiluj klasę przy użyciu javac kompilator w następujący sposób -

C:\RxJava>javac ObservableTester.java

Teraz uruchom ObservableTester w następujący sposób -

C:\RxJava>java ObservableTester

Powinien dać następujący wynik -

abcd
abcd

AsyncSubject emituje jedyną ostatnią wartość, po której następuje zdarzenie zakończenia lub odebrany błąd do obserwatorów.

Deklaracja klasy

Poniżej znajduje się deklaracja dla io.reactivex.subjects.AsyncSubject<T> klasa -

public final class  AsyncSubject<T>
extends Subject<T>

Przykład AsyncSubject

Utwórz następujący program w języku Java, używając dowolnego wybranego edytora, na przykład w C: \> RxJava.

ObservableTester.java

import io.reactivex.subjects. AsyncSubject;
public class ObservableTester  {
   public static void main(String[] args) {   
      final StringBuilder result1 = new StringBuilder();
      final StringBuilder result2 = new StringBuilder();         

      AsyncSubject<String> subject =  AsyncSubject.create(); 
      subject.subscribe(value -> result1.append(value) ); 
      subject.onNext("a"); 
      subject.onNext("b"); 
      subject.onNext("c"); 
      subject.subscribe(value -> result2.append(value)); 
      subject.onNext("d"); 
      subject.onComplete();

      //Output will be d being the last item emitted
      System.out.println(result1);
      //Output will be d being the last item emitted     
      System.out.println(result2);
   }
}

Sprawdź wynik

Skompiluj klasę przy użyciu javac kompilator w następujący sposób -

C:\RxJava>javac ObservableTester.java

Teraz uruchom ObservableTester w następujący sposób -

C:\RxJava>java ObservableTester

Powinien dać następujący wynik -

d
d

Harmonogramy są używane w środowisku wielowątkowym do pracy z operatorami obserwowalnymi.

Zgodnie z Reactive, Harmonogram służy do planowania, w jaki sposób łańcuch operatorów będzie stosowany do różnych wątków.

Domyślnie Observable i łańcuch operatorów, które do niego zastosujesz, wykonają swoją pracę i powiadomią swoich obserwatorów w tym samym wątku, w którym wywoływana jest jego metoda Subscribe. Operator SubscribeOn zmienia to zachowanie, określając inny harmonogram, na którym powinien działać Observable. Operator ObserveOn określa inny harmonogram, którego Observable będzie używać do wysyłania powiadomień do swoich obserwatorów.

W RxJava dostępne są następujące typy programów planujących -

Sr.No. Harmonogram i opis
1

Schedulers.computation()

Tworzy i zwraca harmonogram przeznaczony do pracy obliczeniowej. Liczba wątków do zaplanowania zależy od procesorów obecnych w systemie. Dozwolony jest jeden wątek na procesor. Najlepsze do pętli zdarzeń lub operacji wywołania zwrotnego.

2

Schedulers.io()

Tworzy i zwraca harmonogram przeznaczony do pracy związanej z we / wy. Pula wątków może zostać rozszerzona w razie potrzeby.

3

Schedulers.newThread()

Tworzy i zwraca harmonogram, który tworzy nowy wątek dla każdej jednostki pracy.

4

Schedulers.trampoline()

Tworzy i zwraca harmonogram, który kolejkuje pracę w bieżącym wątku do wykonania po zakończeniu bieżącej pracy.

4

Schedulers.from(java.util.concurrent.Executor executor)

Konwertuje moduł wykonawczy na nową instancję programu planującego.

Schedulers.trampoline () tworzy i zwraca harmonogram, który kolejkuje pracę w bieżącym wątku do wykonania po zakończeniu bieżącej pracy.

Schedulers.trampoline () Przykład

Utwórz następujący program w języku Java, używając dowolnego wybranego edytora, na przykład w C: \> RxJava.

ObservableTester.java

import java.util.Random;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread " 
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.trampoline()))
         .subscribe(length -> System.out.println("Receiver Thread " 
            + Thread.currentThread().getName() 
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

Sprawdź wynik

Skompiluj klasę przy użyciu javac kompilator w następujący sposób -

C:\RxJava>javac ObservableTester.java

Teraz uruchom ObservableTester w następujący sposób -

C:\RxJava>java ObservableTester

Powinien dać następujący wynik -

Processing Thread main
Receiver Thread main, Item length 1
Processing Thread main
Receiver Thread main, Item length 2
Processing Thread main
Receiver Thread main, Item length 3

Schedulers.newThread () tworzy i zwraca harmonogram, który tworzy nowy wątek dla każdej jednostki pracy.

Schedulers.newThread () Przykład

Utwórz następujący program w języku Java, używając dowolnego wybranego edytora, na przykład w C: \> RxJava.

ObservableTester.java

import java.util.Random;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread " 
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.newThread()))
         .subscribe(length -> System.out.println("Receiver Thread " 
            + Thread.currentThread().getName() 
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

Sprawdź wynik

Skompiluj klasę przy użyciu javac kompilator w następujący sposób -

C:\RxJava>javac ObservableTester.java

Teraz uruchom ObservableTester w następujący sposób -

C:\RxJava>java ObservableTester

Powinien dać następujący wynik -

Processing Thread RxNewThreadScheduler-1
Receiver Thread RxNewThreadScheduler-1, Item length 1
Processing Thread RxNewThreadScheduler-2
Receiver Thread RxNewThreadScheduler-2, Item length 2
Processing Thread RxNewThreadScheduler-3
Receiver Thread RxNewThreadScheduler-3, Item length 3

Metoda Schedulers.computation () tworzy i zwraca Harmonogram przeznaczony do pracy obliczeniowej. Liczba wątków do zaplanowania zależy od procesorów obecnych w systemie. Dozwolony jest jeden wątek na procesor. Najlepsze do pętli zdarzeń lub operacji wywołania zwrotnego.

Schedulers.computation () Przykład

Utwórz następujący program w języku Java, używając dowolnego wybranego edytora, na przykład w C: \> RxJava.

ObservableTester.java

import java.util.Random;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread " 
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.computation()))
         .subscribe(length -> System.out.println("Receiver Thread " 
            + Thread.currentThread().getName() 
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

Sprawdź wynik

Skompiluj klasę przy użyciu javac kompilator w następujący sposób -

C:\RxJava>javac ObservableTester.java

Teraz uruchom ObservableTester w następujący sposób -

C:\RxJava>java ObservableTester

Powinien dać następujący wynik -

Processing Thread RxComputationThreadPool-1
Receiver Thread RxComputationThreadPool-1, Item length 1
Processing Thread RxComputationThreadPool-2
Receiver Thread RxComputationThreadPool-2, Item length 2
Processing Thread RxComputationThreadPool-3
Receiver Thread RxComputationThreadPool-3, Item length 3

Schedulers.io () tworzy i zwraca harmonogram przeznaczony do pracy związanej z we / wy. Pula wątków może zostać rozszerzona w razie potrzeby. Najlepsze do intensywnych operacji we / wy.

Schedulers.io () Przykład

Utwórz następujący program w języku Java, używając dowolnego wybranego edytora, na przykład w C: \> RxJava.

ObservableTester.java

import java.util.Random;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread " 
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.io()))
         .subscribe(length -> System.out.println("Receiver Thread " 
            + Thread.currentThread().getName() 
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

Sprawdź wynik

Skompiluj klasę przy użyciu javac kompilator w następujący sposób -

C:\RxJava>javac ObservableTester.java

Teraz uruchom ObservableTester w następujący sposób -

C:\RxJava>java ObservableTester

Powinien dać następujący wynik -

Processing Thread RxCachedThreadScheduler-1
Receiver Thread RxCachedThreadScheduler-1, Item length 1
Processing Thread RxCachedThreadScheduler-1
Receiver Thread RxCachedThreadScheduler-1, Item length 2
Processing Thread RxCachedThreadScheduler-1
Receiver Thread RxCachedThreadScheduler-1, Item length 3

Schedulers.from (Executor) konwertuje Executor do nowej instancji programu planującego.

Schedulers.from (Executor) Przykład

Utwórz następujący program w języku Java, używając dowolnego wybranego edytora, na przykład w C: \> RxJava.

ObservableTester.java

import java.util.Random;
import java.util.concurrent.Executors;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread " 
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.from(Executors.newFixedThreadPool(3))))
         .subscribe(length -> System.out.println("Receiver Thread " 
            + Thread.currentThread().getName() 
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

Sprawdź wynik

Skompiluj klasę przy użyciu javac kompilator w następujący sposób -

C:\RxJava>javac ObservableTester.java

Teraz uruchom ObservableTester w następujący sposób -

C:\RxJava>java ObservableTester

Powinien dać następujący wynik -

Processing Thread pool-1-thread-1
Processing Thread pool-3-thread-1
Receiver Thread pool-1-thread-1, Item length 1
Processing Thread pool-4-thread-1
Receiver Thread pool-4-thread-1, Item length 3
Receiver Thread pool-3-thread-1, Item length 2

Operator buforowania pozwala zebrać elementy wyemitowane przez Observable na listę lub paczki i wyemitować te paczki zamiast elementów. W poniższym przykładzie stworzyliśmy Observable, aby emitować 9 elementów i używając buforowania, 3 elementy zostaną wyemitowane razem.

Przykład buforowania

Utwórz następujący program w języku Java, używając dowolnego wybranego edytora, na przykład w C: \> RxJava.

ObservableTester.java

import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;

import java.util.List;
import java.util.concurrent.TimeUnit;

public class ObservableTester {
   public static void main(String[] args) throws InterruptedException {
      Observable<Integer> observable = Observable.just(1, 2, 3, 4,
         5, 6, 7, 8, 9);

      observable.subscribeOn(Schedulers.io())
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .buffer(3)
         .subscribe(new Observer<List<Integer>>() {
            @Override
            public void onSubscribe(Disposable d) {
               System.out.println("Subscribed");
            }
            @Override
            public void onNext(List<Integer> integers) {
               System.out.println("onNext: ");
               for (Integer value : integers) {
                  System.out.println(value);
               }
            }
            @Override
            public void onError(Throwable e) {
               System.out.println("Error");
            }

            @Override
            public void onComplete() {
               System.out.println("Done! ");
            }
         });
      Thread.sleep(3000);
   }
}

Sprawdź wynik

Skompiluj klasę przy użyciu javac kompilator w następujący sposób -

C:\RxJava>javac ObservableTester.java

Teraz uruchom ObservableTester w następujący sposób -

C:\RxJava>java ObservableTester

Powinien dać następujący wynik -

Subscribed
onNext: 
1
2
3
onNext: 
4
5
6
onNext: 
7
8
9
Done!

Operator okienkowania działa podobnie do operatora bufora, ale pozwala zebrać elementy wyemitowane przez Observable do innego obserowalnego zamiast zbierania i emitować te Observables zamiast kolekcji. W poniższym przykładzie stworzyliśmy Observable do emitowania 9 elementów i używając operatora okna, 3 Observable zostaną wyemitowane razem.

Przykład okienkowania

Utwórz następujący program w języku Java, używając dowolnego wybranego edytora, na przykład w C: \> RxJava.

ObservableTester.java

import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;

import java.util.concurrent.TimeUnit;

public class ObservableTester {
   public static void main(String[] args) throws InterruptedException {
      Observable<Integer> observable = Observable.just(1, 2, 3, 4,
         5, 6, 7, 8, 9);

      observable.subscribeOn(Schedulers.io())
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .window(3)
         .subscribe(new Observer<Observable<Integer>>() {
            @Override
            public void onSubscribe(Disposable d) {
               System.out.println("Subscribed");
            }
            @Override
            public void onNext(Observable<Integer> integers) {
               System.out.println("onNext: ");
               integers.subscribe(value -> System.out.println(value));
            }
            @Override
            public void onError(Throwable e) {
               System.out.println("Error");
            }

            @Override
            public void onComplete() {
               System.out.println("Done! ");
            }
         });
      Thread.sleep(3000);
   }
}

Sprawdź wynik

Skompiluj klasę przy użyciu javac kompilator w następujący sposób -

C:\RxJava>javac ObservableTester.java

Teraz uruchom ObservableTester w następujący sposób -

C:\RxJava>java ObservableTester

Powinien dać następujący wynik -

Subscribed
onNext: 
1
2
3
onNext: 
4
5
6
onNext: 
7
8
9
Done!