RxJava - Szybki przewodnik
RxJava to oparte na Javie rozszerzenie ReactiveX. Zapewnia implementację lub projekt ReactiveX w Javie. Poniżej przedstawiono kluczowe cechy RxJava.
Rozszerza wzorzec obserwatora.
Obsługa sekwencji danych / zdarzeń.
Zapewnia operatory do składania sekwencji razem w sposób deklaratywny.
Wewnętrznie obsługuje wątki, synchronizację, bezpieczeństwo wątków i współbieżne struktury danych.
Co to jest ReactiveX?
ReactiveX to projekt, którego celem jest dostarczenie koncepcji programowania reaktywnego do różnych języków programowania. Programowanie reaktywne odnosi się do scenariusza, w którym program reaguje, gdy pojawiają się dane. Jest to koncepcja programowania oparta na zdarzeniach i zdarzenia mogą być propagowane do rejestrów obserwatorów.
Zgodnie z Reactive, połączyli najlepsze wzorce obserwatorów, wzorce iteratorów i wzorce funkcjonalne.
Wzorzec Observer wykonany prawidłowo. ReactiveX to połączenie najlepszych pomysłów ze wzorca Observer, wzorca Iterator i programowania funkcjonalnego.
Programowanie funkcjonalne
Programowanie funkcjonalne polega na budowaniu oprogramowania przy użyciu czystych funkcji. Czysta funkcja nie zależy od poprzedniego stanu i zawsze zwraca ten sam wynik dla tych samych przekazanych parametrów. Czyste funkcje pomagają uniknąć problemów związanych ze współdzielonymi obiektami, zmiennymi danymi i efektami ubocznymi często występującymi w środowiskach wielowątkowych.
Programowanie reaktywne
Programowanie reaktywne odnosi się do programowania sterowanego zdarzeniami, w którym strumienie danych przychodzą w sposób asynchroniczny i są przetwarzane w momencie nadejścia.
Funkcjonalne programowanie reaktywne
RxJava realizuje obie koncepcje razem, gdzie dane strumieni zmieniają się w czasie, a funkcja konsumenta odpowiednio reaguje.
Manifest reaktywny
Reactive Manifesto to dokument on-line potwierdzający wysoki standard systemów oprogramowania użytkowego. Zgodnie z manifestem, poniżej przedstawiono kluczowe atrybuty oprogramowania reaktywnego -
Responsive - Powinien zawsze reagować w odpowiednim czasie.
Message Driven - Powinny używać asynchronicznego przesyłania komunikatów między komponentami, aby zachować luźne sprzężenie.
Elastic - Powinien pozostawać responsywny nawet przy dużym obciążeniu.
Resilient - Powinien pozostawać responsywny nawet w przypadku awarii któregokolwiek komponentu.
Kluczowe komponenty RxJava
RxJava ma dwa kluczowe komponenty: Observables i Observer.
Observable - Reprezentuje obiekt podobny do Stream, który może emitować zero lub więcej danych, może wysyłać komunikat o błędzie, którego prędkość można kontrolować podczas emisji zestawu danych, może przesyłać zarówno skończone, jak i nieskończone dane.
Observer- Subskrybuje dane sekwencji Observable i reaguje na pozycję obserwabli. Obserwatorzy są powiadamiani za każdym razem, gdy Observable emituje dane. Obserwator obsługuje dane jeden po drugim.
Obserwator nigdy nie jest powiadamiany, jeśli elementy nie są obecne lub nie zostanie zwrócone wywołanie zwrotne dla poprzedniego elementu.
Konfiguracja środowiska lokalnego
RxJava to biblioteka dla Javy, więc pierwszym wymaganiem jest zainstalowanie JDK na komputerze.
Wymagania systemowe
JDK | 1.5 lub nowszy. |
---|---|
Pamięć | Brak minimalnych wymagań. |
Miejsca na dysku | Brak minimalnych wymagań. |
System operacyjny | Brak minimalnych wymagań. |
Krok 1 - Zweryfikuj instalację Java na swoim komputerze
Przede wszystkim otwórz konsolę i wykonaj polecenie java w oparciu o system operacyjny, na którym pracujesz.
OS | Zadanie | Komenda |
---|---|---|
Windows | Otwórz konsolę poleceń | c: \> java -version |
Linux | Otwórz terminal poleceń | $ java -version |
Prochowiec | Otwórz Terminal | maszyna: <joseph $ java -version |
Sprawdźmy dane wyjściowe dla wszystkich systemów operacyjnych -
OS | Wynik |
---|---|
Windows | wersja java „1.8.0_101” Java (TM) SE Runtime Environment (kompilacja 1.8.0_101) |
Linux | wersja java „1.8.0_101” Java (TM) SE Runtime Environment (kompilacja 1.8.0_101) |
Prochowiec | wersja java „1.8.0_101” Java (TM) SE Runtime Environment (kompilacja 1.8.0_101) |
Jeśli nie masz zainstalowanej Java w swoim systemie, pobierz pakiet Java Software Development Kit (SDK) z poniższego łącza https://www.oracle.com. Zakładamy Java 1.8.0_101 jako zainstalowaną wersję tego samouczka.
Krok 2 - Ustaw środowisko JAVA
Ustaw JAVA_HOMEzmienna środowiskowa wskazująca lokalizację katalogu podstawowego, w którym na komputerze jest zainstalowana Java. Na przykład.
OS | Wynik |
---|---|
Windows | Ustaw zmienną środowiskową JAVA_HOME na C: \ Program Files \ Java \ jdk1.8.0_101 |
Linux | eksportuj JAVA_HOME = / usr / local / java-current |
Prochowiec | eksportuj JAVA_HOME = / Library / Java / Home |
Dołącz lokalizację kompilatora Java do ścieżki systemowej.
OS | Wynik |
---|---|
Windows | Dołącz ciąg C:\Program Files\Java\jdk1.8.0_101\bin na końcu zmiennej systemowej, Path. |
Linux | export PATH = $ PATH: $ JAVA_HOME / bin / |
Prochowiec | nie wymagane |
Sprawdź instalację oprogramowania Java za pomocą polecenia java -version jak wyjaśniono powyżej.
Krok 3 - Pobierz archiwum RxJava2
Pobierz najnowszą wersję pliku jar RxJava z RxJava @ MVNRepository i jego zależności Reactive Streams @ MVNRepository . W chwili pisania tego samouczka pobraliśmy rxjava-2.2.4.jar, reactive-streams-1.0.2.jar i skopiowaliśmy go do folderu C: \> RxJava.
OS | Nazwa archiwum |
---|---|
Windows | rxjava-2.2.4.jar, reactive-streams-1.0.2.jar |
Linux | rxjava-2.2.4.jar, reactive-streams-1.0.2.jar |
Prochowiec | rxjava-2.2.4.jar, reactive-streams-1.0.2.jar |
Krok 4 - Ustaw środowisko RxJava
Ustaw RX_JAVAzmienna środowiskowa, aby wskazać lokalizację katalogu podstawowego, w którym jest przechowywany plik jar RxJava na komputerze. Załóżmy, że zapisaliśmy rxjava-2.2.4.jar i reactive-streams-1.0.2.jar w folderze RxJava.
Sr.No | System operacyjny i opis |
---|---|
1 | Windows Ustaw zmienną środowiskową RX_JAVA na C: \ RxJava |
2 | Linux eksportuj RX_JAVA = / usr / local / RxJava |
3 | Mac eksportuj RX_JAVA = / Library / RxJava |
Krok 5 - Ustaw zmienną CLASSPATH
Ustaw CLASSPATH zmienna środowiskowa wskazująca lokalizację jar RxJava.
Sr.No | System operacyjny i opis |
---|---|
1 | Windows Ustaw zmienną środowiskową CLASSPATH na% CLASSPATH%;% RX_JAVA% \ rxjava-2.2.4.jar;% RX_JAVA% \ reactive-streams-1.0.2.jar;.; |
2 | Linux export CLASSPATH = $ CLASSPATH: $ RX_JAVA / rxjava-2.2.4.jar: reactive-streams-1.0.2.jar :. |
3 | Mac export CLASSPATH = $ CLASSPATH: $ RX_JAVA / rxjava-2.2.4.jar: reactive-streams-1.0.2.jar :. |
Krok 6 - Przetestuj konfigurację RxJava
Utwórz klasę TestRx.java, jak pokazano poniżej -
import io.reactivex.Flowable;
public class TestRx {
public static void main(String[] args) {
Flowable.just("Hello World!")
.subscribe(System.out::println);
}
}
Krok 7 - Sprawdź wynik
Skompiluj klasy przy użyciu javac kompilator w następujący sposób -
C:\RxJava>javac Tester.java
Sprawdź dane wyjściowe.
Hello World!
Observables reprezentuje źródła danych, gdzie as Observers (Subscribers)Słuchaj ich. W skrócie, Obserwowalny emituje przedmioty, a subskrybent następnie je zużywa.
Zauważalny
Observable udostępnia dane, gdy subskrybent zacznie nasłuchiwać.
Observable może wyemitować dowolną liczbę elementów.
Observable może emitować tylko sygnał zakończenia, a także bez elementu.
Observable może zakończyć się pomyślnie.
Observable może nigdy się nie zakończyć. np. przycisk można kliknąć dowolną liczbę razy.
Observable może zgłosić błąd w dowolnym momencie.
Abonent
Observable może mieć wielu subskrybentów.
Gdy Observable emituje element, wywoływana jest każda metoda onNext () subskrybenta.
Gdy Observable zakończy emitowanie elementów, wywoływana jest każda metoda onComplete () subskrybenta.
Jeśli Observable emituje błąd, wywoływana jest każda metoda onError () subskrybenta.
Poniżej przedstawiono klasy bazowe do tworzenia obserwabli.
Flowable- Przepływy 0..N, Emituje 0 lub n elementów. Obsługuje strumienie reaktywne i ciśnienie wsteczne.
Observable - Przepływy 0..N, ale bez przeciwciśnienia.
Single- 1 przedmiot lub błąd. Może być traktowany jako reaktywna wersja wywołania metody.
Completable- Nie wyemitowano żadnego elementu. Używany jako sygnał zakończenia lub błędu. Może być traktowany jako reaktywna wersja Runnable.
MayBe- Nie wyemitowano żadnego elementu lub 1 element został wyemitowany. Może być traktowany jako reaktywna wersja Optional.
Poniżej przedstawiono wygodne metody tworzenia obserwabli w klasie Observable.
just(T item) - Zwraca Observable, który sygnalizuje podaną (stałą referencję) element, a następnie kończy.
fromIterable(Iterable source) - Konwertuje iterowalną sekwencję na ObservableSource, która emituje elementy w sekwencji.
fromArray(T... items) - Konwertuje Array na ObservableSource, który emituje elementy w Array.
fromCallable(Callable supplier) - Zwraca Observable, który, gdy obserwator zasubskrybuje ją, wywołuje określoną funkcję, a następnie emituje wartość zwróconą przez tę funkcję.
fromFuture(Future future) - Konwertuje przyszłość na ObservableSource.
interval(long initialDelay, long period, TimeUnit unit) - Zwraca Observable, który emituje 0L po początkowym opóźnieniu i coraz większe liczby po każdym następnym okresie.
Klasa Single reprezentuje pojedynczą wartość odpowiedzi. Pojedyncza obserwowalna może emitować tylko pojedynczą pomyślną wartość lub błąd. Nie emituje zdarzenia onComplete.
Deklaracja klasy
Poniżej znajduje się deklaracja dla io.reactivex.Single<T> klasa -
public abstract class Single<T>
extends Object
implements SingleSource<T>
Protokół
Poniżej znajduje się protokół sekwencyjny, na którym działa Single Observable -
onSubscribe (onSuccess | onError)?
Pojedynczy przykład
Utwórz następujący program w języku Java, używając dowolnego wybranego edytora, na przykład w C: \> RxJava.
ObservableTester.java
import java.util.concurrent.TimeUnit;
import io.reactivex.Single;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableSingleObserver;
import io.reactivex.schedulers.Schedulers;
public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
//Create the observable
Single<String> testSingle = Single.just("Hello World");
//Create an observer
Disposable disposable = testSingle
.delay(2, TimeUnit.SECONDS, Schedulers.io())
.subscribeWith(
new DisposableSingleObserver<String>() {
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onSuccess(String value) {
System.out.println(value);
}
});
Thread.sleep(3000);
//start observing
disposable.dispose();
}
}
Sprawdź wynik
Skompiluj klasę przy użyciu javac kompilator w następujący sposób -
C:\RxJava>javac ObservableTester.java
Teraz uruchom ObservableTester w następujący sposób -
C:\RxJava>java ObservableTester
Powinien dać następujący wynik -
Hello World
Klasa MayBe reprezentuje odroczoną odpowiedź. MayBe obserowalne może emitować pojedynczą pomyślną wartość lub nie emitować żadnej wartości.
Deklaracja klasy
Poniżej znajduje się deklaracja dla io.reactivex.Single<T> klasa -
public abstract class Maybe<T>
extends Object
implements MaybeSource<T>
Protokół
Poniżej znajduje się protokół sekwencyjny, na którym działa MayBe Observable -
onSubscribe (onSuccess | onError | OnComplete)?
Przykład MayBe
Utwórz następujący program w języku Java, używając dowolnego wybranego edytora, na przykład w C: \> RxJava.
ObservableTester.java
import java.util.concurrent.TimeUnit;
import io.reactivex.Maybe;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableMaybeObserver;
import io.reactivex.schedulers.Schedulers;
public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
//Create an observer
Disposable disposable = Maybe.just("Hello World")
.delay(2, TimeUnit.SECONDS, Schedulers.io())
.subscribeWith(new DisposableMaybeObserver<String>() {
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onSuccess(String value) {
System.out.println(value);
}
@Override
public void onComplete() {
System.out.println("Done!");
}
});
Thread.sleep(3000);
//start observing
disposable.dispose();
}
}
Sprawdź wynik
Skompiluj klasę przy użyciu javac kompilator w następujący sposób -
C:\RxJava>javac ObservableTester.java
Teraz uruchom ObservableTester w następujący sposób -
C:\RxJava>java ObservableTester
Powinien dać następujący wynik -
Hello World
Klasa Completable reprezentuje odroczoną odpowiedź. Zaobserwowalne do ukończenia mogą wskazywać na pomyślne zakończenie lub błąd.
Deklaracja klasy
Poniżej znajduje się deklaracja dla io.reactivex.Completable klasa -
public abstract class Completable
extends Object
implements CompletableSource
Protokół
Poniżej znajduje się protokół sekwencyjny, w którym działa Completable Observable -
onSubscribe (onError | onComplete)?
Kompletny przykład
Utwórz następujący program w języku Java, używając dowolnego wybranego edytora, na przykład w C: \> RxJava.
ObservableTester.java
import java.util.concurrent.TimeUnit;
import io.reactivex.Completable;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableCompletableObserver;
import io.reactivex.schedulers.Schedulers;
public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
//Create an observer
Disposable disposable = Completable.complete()
.delay(2, TimeUnit.SECONDS, Schedulers.io())
.subscribeWith(new DisposableCompletableObserver() {
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onStart() {
System.out.println("Started!");
}
@Override
public void onComplete() {
System.out.println("Done!");
}
});
Thread.sleep(3000);
//start observing
disposable.dispose();
}
}
Sprawdź wynik
Skompiluj klasę przy użyciu javac kompilator w następujący sposób -
C:\RxJava>javac ObservableTester.java
Teraz uruchom ObservableTester w następujący sposób -
C:\RxJava>java ObservableTester
Powinien dać następujący wynik -
Started!
Done!
Klasa CompositeDisposable reprezentuje kontener, który może pomieścić wiele elementów jednorazowego użytku i oferuje złożoność O (1) dodawania i usuwania elementów jednorazowego użytku.
Deklaracja klasy
Poniżej znajduje się deklaracja dla io.reactivex.disposables.CompositeDisposable klasa -
public final class CompositeDisposable
extends Object
implements Disposable, io.reactivex.internal.disposables.DisposableContainer
Przykład CompositeDisposable
Utwórz następujący program w języku Java, używając dowolnego wybranego edytora, na przykład w C: \> RxJava.
ObservableTester.java
import io.reactivex.Maybe;
import io.reactivex.Single;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableMaybeObserver;
import io.reactivex.observers.DisposableSingleObserver;
import io.reactivex.schedulers.Schedulers;
import java.util.concurrent.TimeUnit;
public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
CompositeDisposable compositeDisposable = new CompositeDisposable();
//Create an Single observer
Disposable disposableSingle = Single.just("Hello World")
.delay(2, TimeUnit.SECONDS, Schedulers.io())
.subscribeWith(
new DisposableSingleObserver<String>() {
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onSuccess(String value) {
System.out.println(value);
}
});
//Create an observer
Disposable disposableMayBe = Maybe.just("Hi")
.delay(2, TimeUnit.SECONDS, Schedulers.io())
.subscribeWith(new DisposableMaybeObserver<String>() {
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onSuccess(String value) {
System.out.println(value);
}
@Override
public void onComplete() {
System.out.println("Done!");
}
});
Thread.sleep(3000);
compositeDisposable.add(disposableSingle);
compositeDisposable.add(disposableMayBe);
//start observing
compositeDisposable.dispose();
}
}
Sprawdź wynik
Skompiluj klasę przy użyciu javac kompilator w następujący sposób -
C:\RxJava>javac ObservableTester.java
Teraz uruchom ObservableTester w następujący sposób -
C:\RxJava>java ObservableTester
Powinien dać następujący wynik -
Hello World
Hi
Poniżej znajdują się operatory, które są używane do tworzenia Observable.
Sr.No. | Operator i opis |
---|---|
1 | Create Tworzy Observable od podstaw i umożliwia programowe wywoływanie metody obserwatora. |
2 | Defer Nie twórz Observable, dopóki obserwator nie zasubskrybuje. Tworzy nowe obserwowalne dla każdego obserwatora. |
3 | Empty/Never/Throw Tworzy Observable z ograniczonym zachowaniem. |
4 | From Konwertuje strukturę obiektu / danych na Observable. |
5 | Interval Tworzy obserwowalne emitujące liczby całkowite w kolejności z przerwą o określonym przedziale czasu. |
6 | Just Konwertuje strukturę obiektu / danych na Observable, aby emitować ten sam lub ten sam typ obiektów. |
7 | Range Tworzy Observable emitujące liczby całkowite w sekwencji podanego zakresu. |
8 | Repeat Tworzy Observable emitując liczby całkowite wielokrotnie. |
9 | Start Tworzy Observable, aby emitować wartość zwracaną przez funkcję. |
10 | Timer Tworzy Observable, aby wyemitować pojedynczy element po określonym opóźnieniu. |
Tworzenie przykładu operatora
Utwórz następujący program w języku Java, używając dowolnego wybranego edytora, na przykład w C: \> RxJava.
ObservableTester.java
import io.reactivex.Observable;
//Using fromArray operator to create an Observable
public class ObservableTester {
public static void main(String[] args) {
String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
final StringBuilder result = new StringBuilder();
Observable<String> observable = Observable.fromArray(letters);
observable
.map(String::toUpperCase)
.subscribe( letter -> result.append(letter));
System.out.println(result);
}
}
Sprawdź wynik
Skompiluj klasę przy użyciu javac kompilator w następujący sposób -
C:\RxJava>javac ObservableTester.java
Teraz uruchom ObservableTester w następujący sposób -
C:\RxJava>java ObservableTester
Powinien dać następujący wynik -
ABCDEFG
Poniżej znajdują się operatory, które są używane do przekształcania elementu wyemitowanego z Observable.
Sr.No. | Operator i opis |
---|---|
1 | Buffer Gromadzi przedmioty z Observable w pakietach okresowo, a następnie emituje pakiety, a nie przedmioty. |
2 | FlatMap Używane w zagnieżdżonych obserwabli. Przekształca przedmioty w Observables. Następnie spłaszcz elementy w jeden Observable. |
3 | GroupBy Podziel Observable na zbiór Observables zorganizowanych według klucza, aby wyemitować inną grupę elementów. |
4 | Map Zastosuj funkcję do każdego emitowanego elementu, aby go przekształcić. |
5 | Scan Zastosuj funkcję do każdego emitowanego elementu, sekwencyjnie, a następnie emituj kolejną wartość. |
6 | Window Zbiera elementy z okien Observable do Observable okresowo, a następnie emituje okna zamiast elementów. |
Przykład transformacji operatora
Utwórz następujący program w języku Java, używając dowolnego wybranego edytora, na przykład w C: \> RxJava.
ObservableTester.java
import io.reactivex.Observable;
//Using map operator to transform an Observable
public class ObservableTester {
public static void main(String[] args) {
String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
final StringBuilder result = new StringBuilder();
Observable<String> observable = Observable.fromArray(letters);
observable
.map(String::toUpperCase)
.subscribe( letter -> result.append(letter));
System.out.println(result);
}
}
Sprawdź wynik
Skompiluj klasę przy użyciu javac kompilator w następujący sposób -
C:\RxJava>javac ObservableTester.java
Teraz uruchom ObservableTester w następujący sposób -
C:\RxJava>java ObservableTester
Powinien dać następujący wynik -
ABCDEFG
Poniżej znajdują się operatory, które są używane do selektywnego emitowania elementu (ów) z Observable.
Sr.No. | Operator i opis |
---|---|
1 | Debounce Emituje elementy tylko po przekroczeniu limitu czasu bez emitowania innego elementu. |
2 | Distinct Emituje tylko unikalne przedmioty. |
3 | ElementAt emituje tylko element o indeksie n emitowanym przez Observable. |
4 | Filter Emituje tylko te elementy, które spełniają daną funkcję predykatu. |
5 | First Emituje pierwszy element lub pierwszy element, który spełnił podane kryteria. |
6 | IgnoreElements Nie emituje żadnych przedmiotów z Observable, ale zaznacza ukończenie. |
7 | Last Emituje ostatni element z Observable. |
8 | Sample Emituje najnowszy element w podanym przedziale czasu. |
9 | Skip Pomija pierwsze n elementów z Observable. |
10 | SkipLast Pomija ostatnie n elementów z Observable. |
11 | Take pobiera pierwsze n elementów z Observable. |
12 | TakeLast pobiera ostatnie n elementów z Observable. |
Przykład operatora filtrującego
Utwórz następujący program w języku Java, używając dowolnego wybranego edytora, na przykład w C: \> RxJava.
ObservableTester.java
import io.reactivex.Observable;
//Using take operator to filter an Observable
public class ObservableTester {
public static void main(String[] args) {
String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
final StringBuilder result = new StringBuilder();
Observable<String> observable = Observable.fromArray(letters);
observable
.take(2)
.subscribe( letter -> result.append(letter));
System.out.println(result);
}
}
Sprawdź wynik
Skompiluj klasę przy użyciu javac kompilator w następujący sposób -
C:\RxJava>javac ObservableTester.java
Teraz uruchom ObservableTester w następujący sposób -
C:\RxJava>java ObservableTester
Powinien dać następujący wynik -
ab
Poniżej znajdują się operatory, które są używane do tworzenia pojedynczego Observable z wielu Observable.
Sr.No. | Operator i opis |
---|---|
1 | And/Then/When Połącz zestawy przedmiotów za pomocą pośredników wzorca i planu. |
2 | CombineLatest Połącz najnowszy element wyemitowany przez każdy Observable za pośrednictwem określonej funkcji i wyemituj wynikowy element. |
3 | Join Połącz pozycje wyemitowane przez dwa Observables, jeśli wyemitowano w ramach czasowych drugiej Obserwowalnej emitowanej pozycji. |
4 | Merge Łączy wyemitowane elementy Observables. |
5 | StartWith Emituj określoną sekwencję elementów przed rozpoczęciem emitowania elementów ze źródła Observable |
6 | Switch Emituje najnowsze elementy emitowane przez Observables. |
7 | Zip Łączy elementy Observables na podstawie funkcji i emituje wynikowe elementy. |
Przykład łączenia operatorów
Utwórz następujący program w języku Java, używając dowolnego wybranego edytora, na przykład w C: \> RxJava.
ObservableTester.java
import io.reactivex.Observable;
//Using combineLatest operator to combine Observables
public class ObservableTester {
public static void main(String[] args) {
Integer[] numbers = { 1, 2, 3, 4, 5, 6};
String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
final StringBuilder result = new StringBuilder();
Observable<String> observable1 = Observable.fromArray(letters);
Observable<Integer> observable2 = Observable.fromArray(numbers);
Observable.combineLatest(observable1, observable2, (a,b) -> a + b)
.subscribe( letter -> result.append(letter));
System.out.println(result);
}
}
Sprawdź wynik
Skompiluj klasę przy użyciu javac kompilator w następujący sposób -
C:\RxJava>javac ObservableTester.java
Teraz uruchom ObservableTester w następujący sposób -
C:\RxJava>java ObservableTester
Powinien dać następujący wynik -
g1g2g3g4g5g6
Poniżej znajdują się operatory, które są często przydatne w przypadku Observables.
Sr.No. | Operator i opis |
---|---|
1 | Delay Zarejestruj akcję, aby obsłużyć obserwowalne zdarzenia cyklu życia. |
2 | Materialize/Dematerialize Reprezentuje wysłany element i wysłane powiadomienie. |
3 | ObserveOn Określ planistę, który ma być obserwowany. |
4 | Serialize Wymuś obserwowalne, aby wykonywać szeregowane wywołania. |
5 | Subscribe Działaj na emisjach przedmiotów i powiadomień, takich jak kompletne z Obserwowalnego |
6 | SubscribeOn Określ harmonogram, który ma być używany przez Observable, gdy jest subskrybowany. |
7 | TimeInterval Konwertuj obserwowalne, aby emitować wskazania czasu, jaki upłynął między emisjami. |
8 | Timeout Wysyła powiadomienie o błędzie, jeśli określony czas wystąpi bez emisji żadnego elementu. |
9 | Timestamp Dołącz sygnaturę czasową do każdego emitowanego elementu. |
9 | Using Tworzy jednorazowy zasób lub taką samą żywotność, jak Observable. |
Przykład operatora mediów
Utwórz następujący program w języku Java, używając dowolnego wybranego edytora, na przykład w C: \> RxJava.
ObservableTester.java
import io.reactivex.Observable;
//Using subscribe operator to subscribe to an Observable
public class ObservableTester {
public static void main(String[] args) {
String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
final StringBuilder result = new StringBuilder();
Observable<String> observable = Observable.fromArray(letters);
observable.subscribe( letter -> result.append(letter));
System.out.println(result);
}
}
Sprawdź wynik
Skompiluj klasę przy użyciu javac kompilator w następujący sposób -
C:\RxJava>javac ObservableTester.java
Teraz uruchom ObservableTester w następujący sposób -
C:\RxJava>java ObservableTester
Powinien dać następujący wynik -
abcdefg
Poniżej znajdują się operatory, które oceniają jeden lub wiele wyemitowanych Observables lub elementów.
Sr.No. | Operator i opis |
---|---|
1 | All Ocenia wszystkie wyemitowane elementy spełniające podane kryteria. |
2 | Amb Emituje wszystkie przedmioty z pierwszego Observable tylko z wieloma Observable. |
3 | Contains Sprawdza, czy Observable emituje określony element, czy nie. |
4 | DefaultIfEmpty Emituje element domyślny, jeśli Observable niczego nie emituje. |
5 | SequenceEqual Sprawdza, czy dwa Observables emitują tę samą sekwencję elementów. |
6 | SkipUntil Odrzuca elementy emitowane przez pierwszy Observable, dopóki drugi Observable nie wyemituje elementu. |
7 | SkipWhile Odrzuć elementy emitowane przez Observable, dopóki dany warunek nie stanie się fałszywy. |
8 | TakeUntil Odrzuca elementy emitowane przez Observable po tym, jak druga Observable emituje element lub kończy działanie. |
9 | TakeWhile Odrzuć elementy emitowane przez Observable po tym, jak określony warunek stanie się fałszywy. |
Przykład operatora warunkowego
Utwórz następujący program w języku Java, używając dowolnego wybranego edytora, na przykład w C: \> RxJava.
ObservableTester.java
import io.reactivex.Observable;
//Using defaultIfEmpty operator to operate on an Observable
public class ObservableTester {
public static void main(String[] args) {
final StringBuilder result = new StringBuilder();
Observable.empty()
.defaultIfEmpty("No Data")
.subscribe(s -> result.append(s));
System.out.println(result);
String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
final StringBuilder result1 = new StringBuilder();
Observable.fromArray(letters)
.firstElement()
.defaultIfEmpty("No data")
.subscribe(s -> result1.append(s));
System.out.println(result1);
}
}
Sprawdź wynik
Skompiluj klasę przy użyciu javac kompilator w następujący sposób -
C:\RxJava>javac ObservableTester.java
Teraz uruchom ObservableTester w następujący sposób -
C:\RxJava>java ObservableTester
Powinien dać następujący wynik -
No Data
a
Poniżej znajdują się operatory, które działają na całych elementach emitowanych przez Observable.
Sr.No. | Operator i opis |
---|---|
1 | Average Oblicza średnie ze wszystkich pozycji i emituje wynik. |
2 | Concat Emituje wszystkie elementy z wielu Observable bez przeplotu. |
3 | Count Zlicza wszystkie elementy i emituje wynik. |
4 | Max Ocenia pozycję o maksymalnej wartości ze wszystkich pozycji i emituje wynik. |
5 | Min Ocenia najmniej wartościowy element wszystkich elementów i emituje wynik. |
6 | Reduce Zastosuj funkcję do każdego elementu i zwróć wynik. |
7 | Sum Oblicza sumę wszystkich elementów i emituje wynik. |
Przykład operatora matematycznego
Utwórz następujący program w języku Java, używając dowolnego wybranego edytora, na przykład w C: \> RxJava.
ObservableTester.java
import io.reactivex.Observable;
//Using concat operator to operate on multiple Observables
public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
Integer[] numbers = { 1, 2, 3, 4, 5, 6};
String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
final StringBuilder result = new StringBuilder();
Observable<String> observable1 = Observable.fromArray(letters);
Observable<Integer> observable2 = Observable.fromArray(numbers);
Observable.concat(observable1, observable2)
.subscribe( letter -> result.append(letter));
System.out.println(result);
}
}
Sprawdź wynik
Skompiluj klasę przy użyciu javac kompilator w następujący sposób -
C:\RxJava>javac ObservableTester.java
Teraz uruchom ObservableTester w następujący sposób -
C:\RxJava>java ObservableTester
Powinien dać następujący wynik -
abcdefg123456
Poniżej znajdują się operatorzy, którzy dokładniej kontrolują abonament.
Sr.No. | Operator i opis |
---|---|
1 | Connect Poinstruuj podłączalnego Observable, aby emitował elementy do swoich subskrybentów. |
2 | Publish Konwertuje Observable na Connectable Observable. |
3 | RefCount Konwertuje możliwe do podłączenia Observable na zwykłe Observable. |
4 | Replay Upewnij się, że ta sama sekwencja emitowanych elementów jest widoczna dla każdego subskrybenta, nawet po tym, jak Observable zaczął emitować elementy, a subskrybenci subskrybują później. |
Przykład operatora z możliwością podłączenia
Utwórz następujący program w języku Java, używając dowolnego wybranego edytora, na przykład w C: \> RxJava.
ObservableTester.java
import io.reactivex.Observable;
import io.reactivex.observables.ConnectableObservable;
//Using connect operator on a ConnectableObservable
public class ObservableTester {
public static void main(String[] args) {
String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
final StringBuilder result = new StringBuilder();
ConnectableObservable<String> connectable = Observable.fromArray(letters).publish();
connectable.subscribe(letter -> result.append(letter));
System.out.println(result.length());
connectable.connect();
System.out.println(result.length());
System.out.println(result);
}
}
Sprawdź wynik
Skompiluj klasę przy użyciu javac kompilator w następujący sposób -
C:\RxJava>javac ObservableTester.java
Teraz uruchom ObservableTester w następujący sposób -
C:\RxJava>java ObservableTester
Powinien dać następujący wynik -
0
7
abcdefg
Zgodnie z Reactive, podmiot może działać zarówno jako obserwowalny, jak i obserwator.
Temat jest rodzajem mostu lub serwera proxy, który jest dostępny w niektórych implementacjach ReactiveX i działa zarówno jako obserwator, jak i jako obserwowalny. Ponieważ jest obserwatorem, może subskrybować jeden lub więcej Observable, a ponieważ jest Observable, może przechodzić przez obserwowane elementy, ponownie je emitując, a także może emitować nowe elementy.
Istnieją cztery rodzaje przedmiotów -
Sr.No. | Opis tematu |
---|---|
1 | Publish Subject Emituje tylko te elementy, które są emitowane po czasie subskrypcji. |
2 | Replay Subject Emituje wszystkie elementy emitowane przez źródło Observable, niezależnie od tego, kiedy subskrybuje Observable. |
3 | Behavior Subject Po subskrypcji emituje najnowszy element, a następnie kontynuuje emitowanie elementu emitowanego przez źródło Observable. |
4 | Async Subject Emituje ostatni element emitowany przez źródło Observable po zakończeniu emisji. |
PublishSubject wysyła elementy do aktualnie zasubskrybowanych obserwatorów i zdarzenia końcowe do obecnych lub późnych obserwatorów.
Deklaracja klasy
Poniżej znajduje się deklaracja dla io.reactivex.subjects.PublishSubject<T> klasa -
public final class PublishSubject<T>
extends Subject<T>
Przykład publikacji PublishSubject
Utwórz następujący program w języku Java, używając dowolnego wybranego edytora, na przykład w C: \> RxJava.
ObservableTester.java
import io.reactivex.subjects.PublishSubject;
public class ObservableTester {
public static void main(String[] args) {
final StringBuilder result1 = new StringBuilder();
final StringBuilder result2 = new StringBuilder();
PublishSubject<String> subject = PublishSubject.create();
subject.subscribe(value -> result1.append(value) );
subject.onNext("a");
subject.onNext("b");
subject.onNext("c");
subject.subscribe(value -> result2.append(value));
subject.onNext("d");
subject.onComplete();
//Output will be abcd
System.out.println(result1);
//Output will be d only
//as subscribed after c item emitted.
System.out.println(result2);
}
}
Sprawdź wynik
Skompiluj klasę przy użyciu javac kompilator w następujący sposób -
C:\RxJava>javac ObservableTester.java
Teraz uruchom ObservableTester w następujący sposób -
C:\RxJava>java ObservableTester
Powinien dać następujący wynik -
abcd
d
BehaviorSubject emituje najnowszą obserwowaną pozycję, a następnie wszystkie kolejne obserwowane pozycje do każdego subskrybowanego obserwatora.
Deklaracja klasy
Poniżej znajduje się deklaracja dla io.reactivex.subjects.BehaviorSubject<T> klasa -
public final class BehaviorSubject<T>
extends Subject<T>
Przykład BehaviorSubject
Utwórz następujący program w języku Java, używając dowolnego wybranego edytora, na przykład w C: \> RxJava.
ObservableTester.java
import io.reactivex.subjects.BehaviorSubject;
public class ObservableTester {
public static void main(String[] args) {
final StringBuilder result1 = new StringBuilder();
final StringBuilder result2 = new StringBuilder();
BehaviorSubject<String> subject = BehaviorSubject.create();
subject.subscribe(value -> result1.append(value) );
subject.onNext("a");
subject.onNext("b");
subject.onNext("c");
subject.subscribe(value -> result2.append(value));
subject.onNext("d");
subject.onComplete();
//Output will be abcd
System.out.println(result1);
//Output will be cd being BehaviorSubject
//(c is last item emitted before subscribe)
System.out.println(result2);
}
}
Sprawdź wynik
Skompiluj klasę przy użyciu javac kompilator w następujący sposób -
C:\RxJava>javac ObservableTester.java
Teraz uruchom ObservableTester w następujący sposób -
C:\RxJava>java ObservableTester
Powinien dać następujący wynik -
abcd
cd
ReplaySubject odtwarza zdarzenia / elementy obecnym i późnym Obserwatorom.
Deklaracja klasy
Poniżej znajduje się deklaracja dla io.reactivex.subjects.ReplaySubject<T> klasa -
public final class ReplaySubject<T>
extends Subject<T>
Przykład ReplaySubject
Utwórz następujący program w języku Java, używając dowolnego wybranego edytora, na przykład w C: \> RxJava.
ObservableTester.java
import io.reactivex.subjects.ReplaySubject;
public class ObservableTester {
public static void main(String[] args) {
final StringBuilder result1 = new StringBuilder();
final StringBuilder result2 = new StringBuilder();
ReplaySubject<String> subject = ReplaySubject.create();
subject.subscribe(value -> result1.append(value) );
subject.onNext("a");
subject.onNext("b");
subject.onNext("c");
subject.subscribe(value -> result2.append(value));
subject.onNext("d");
subject.onComplete();
//Output will be abcd
System.out.println(result1);
//Output will be abcd being ReplaySubject
//as ReplaySubject emits all the items
System.out.println(result2);
}
}
Sprawdź wynik
Skompiluj klasę przy użyciu javac kompilator w następujący sposób -
C:\RxJava>javac ObservableTester.java
Teraz uruchom ObservableTester w następujący sposób -
C:\RxJava>java ObservableTester
Powinien dać następujący wynik -
abcd
abcd
AsyncSubject emituje jedyną ostatnią wartość, po której następuje zdarzenie zakończenia lub odebrany błąd do obserwatorów.
Deklaracja klasy
Poniżej znajduje się deklaracja dla io.reactivex.subjects.AsyncSubject<T> klasa -
public final class AsyncSubject<T>
extends Subject<T>
Przykład AsyncSubject
Utwórz następujący program w języku Java, używając dowolnego wybranego edytora, na przykład w C: \> RxJava.
ObservableTester.java
import io.reactivex.subjects. AsyncSubject;
public class ObservableTester {
public static void main(String[] args) {
final StringBuilder result1 = new StringBuilder();
final StringBuilder result2 = new StringBuilder();
AsyncSubject<String> subject = AsyncSubject.create();
subject.subscribe(value -> result1.append(value) );
subject.onNext("a");
subject.onNext("b");
subject.onNext("c");
subject.subscribe(value -> result2.append(value));
subject.onNext("d");
subject.onComplete();
//Output will be d being the last item emitted
System.out.println(result1);
//Output will be d being the last item emitted
System.out.println(result2);
}
}
Sprawdź wynik
Skompiluj klasę przy użyciu javac kompilator w następujący sposób -
C:\RxJava>javac ObservableTester.java
Teraz uruchom ObservableTester w następujący sposób -
C:\RxJava>java ObservableTester
Powinien dać następujący wynik -
d
d
Harmonogramy są używane w środowisku wielowątkowym do pracy z operatorami obserwowalnymi.
Zgodnie z Reactive, Harmonogram służy do planowania, w jaki sposób łańcuch operatorów będzie stosowany do różnych wątków.
Domyślnie Observable i łańcuch operatorów, które do niego zastosujesz, wykonają swoją pracę i powiadomią swoich obserwatorów w tym samym wątku, w którym wywoływana jest jego metoda Subscribe. Operator SubscribeOn zmienia to zachowanie, określając inny harmonogram, na którym powinien działać Observable. Operator ObserveOn określa inny harmonogram, którego Observable będzie używać do wysyłania powiadomień do swoich obserwatorów.
W RxJava dostępne są następujące typy programów planujących -
Sr.No. | Harmonogram i opis |
---|---|
1 | Schedulers.computation() Tworzy i zwraca harmonogram przeznaczony do pracy obliczeniowej. Liczba wątków do zaplanowania zależy od procesorów obecnych w systemie. Dozwolony jest jeden wątek na procesor. Najlepsze do pętli zdarzeń lub operacji wywołania zwrotnego. |
2 | Schedulers.io() Tworzy i zwraca harmonogram przeznaczony do pracy związanej z we / wy. Pula wątków może zostać rozszerzona w razie potrzeby. |
3 | Schedulers.newThread() Tworzy i zwraca harmonogram, który tworzy nowy wątek dla każdej jednostki pracy. |
4 | Schedulers.trampoline() Tworzy i zwraca harmonogram, który kolejkuje pracę w bieżącym wątku do wykonania po zakończeniu bieżącej pracy. |
4 | Schedulers.from(java.util.concurrent.Executor executor) Konwertuje moduł wykonawczy na nową instancję programu planującego. |
Schedulers.trampoline () tworzy i zwraca harmonogram, który kolejkuje pracę w bieżącym wątku do wykonania po zakończeniu bieżącej pracy.
Schedulers.trampoline () Przykład
Utwórz następujący program w języku Java, używając dowolnego wybranego edytora, na przykład w C: \> RxJava.
ObservableTester.java
import java.util.Random;
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
Observable.just("A", "AB", "ABC")
.flatMap(v -> getLengthWithDelay(v)
.doOnNext(s -> System.out.println("Processing Thread "
+ Thread.currentThread().getName()))
.subscribeOn(Schedulers.trampoline()))
.subscribe(length -> System.out.println("Receiver Thread "
+ Thread.currentThread().getName()
+ ", Item length " + length));
Thread.sleep(10000);
}
protected static Observable<Integer> getLengthWithDelay(String v) {
Random random = new Random();
try {
Thread.sleep(random.nextInt(3) * 1000);
return Observable.just(v.length());
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
}
Sprawdź wynik
Skompiluj klasę przy użyciu javac kompilator w następujący sposób -
C:\RxJava>javac ObservableTester.java
Teraz uruchom ObservableTester w następujący sposób -
C:\RxJava>java ObservableTester
Powinien dać następujący wynik -
Processing Thread main
Receiver Thread main, Item length 1
Processing Thread main
Receiver Thread main, Item length 2
Processing Thread main
Receiver Thread main, Item length 3
Schedulers.newThread () tworzy i zwraca harmonogram, który tworzy nowy wątek dla każdej jednostki pracy.
Schedulers.newThread () Przykład
Utwórz następujący program w języku Java, używając dowolnego wybranego edytora, na przykład w C: \> RxJava.
ObservableTester.java
import java.util.Random;
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
Observable.just("A", "AB", "ABC")
.flatMap(v -> getLengthWithDelay(v)
.doOnNext(s -> System.out.println("Processing Thread "
+ Thread.currentThread().getName()))
.subscribeOn(Schedulers.newThread()))
.subscribe(length -> System.out.println("Receiver Thread "
+ Thread.currentThread().getName()
+ ", Item length " + length));
Thread.sleep(10000);
}
protected static Observable<Integer> getLengthWithDelay(String v) {
Random random = new Random();
try {
Thread.sleep(random.nextInt(3) * 1000);
return Observable.just(v.length());
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
}
Sprawdź wynik
Skompiluj klasę przy użyciu javac kompilator w następujący sposób -
C:\RxJava>javac ObservableTester.java
Teraz uruchom ObservableTester w następujący sposób -
C:\RxJava>java ObservableTester
Powinien dać następujący wynik -
Processing Thread RxNewThreadScheduler-1
Receiver Thread RxNewThreadScheduler-1, Item length 1
Processing Thread RxNewThreadScheduler-2
Receiver Thread RxNewThreadScheduler-2, Item length 2
Processing Thread RxNewThreadScheduler-3
Receiver Thread RxNewThreadScheduler-3, Item length 3
Metoda Schedulers.computation () tworzy i zwraca Harmonogram przeznaczony do pracy obliczeniowej. Liczba wątków do zaplanowania zależy od procesorów obecnych w systemie. Dozwolony jest jeden wątek na procesor. Najlepsze do pętli zdarzeń lub operacji wywołania zwrotnego.
Schedulers.computation () Przykład
Utwórz następujący program w języku Java, używając dowolnego wybranego edytora, na przykład w C: \> RxJava.
ObservableTester.java
import java.util.Random;
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
Observable.just("A", "AB", "ABC")
.flatMap(v -> getLengthWithDelay(v)
.doOnNext(s -> System.out.println("Processing Thread "
+ Thread.currentThread().getName()))
.subscribeOn(Schedulers.computation()))
.subscribe(length -> System.out.println("Receiver Thread "
+ Thread.currentThread().getName()
+ ", Item length " + length));
Thread.sleep(10000);
}
protected static Observable<Integer> getLengthWithDelay(String v) {
Random random = new Random();
try {
Thread.sleep(random.nextInt(3) * 1000);
return Observable.just(v.length());
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
}
Sprawdź wynik
Skompiluj klasę przy użyciu javac kompilator w następujący sposób -
C:\RxJava>javac ObservableTester.java
Teraz uruchom ObservableTester w następujący sposób -
C:\RxJava>java ObservableTester
Powinien dać następujący wynik -
Processing Thread RxComputationThreadPool-1
Receiver Thread RxComputationThreadPool-1, Item length 1
Processing Thread RxComputationThreadPool-2
Receiver Thread RxComputationThreadPool-2, Item length 2
Processing Thread RxComputationThreadPool-3
Receiver Thread RxComputationThreadPool-3, Item length 3
Schedulers.io () tworzy i zwraca harmonogram przeznaczony do pracy związanej z we / wy. Pula wątków może zostać rozszerzona w razie potrzeby. Najlepsze do intensywnych operacji we / wy.
Schedulers.io () Przykład
Utwórz następujący program w języku Java, używając dowolnego wybranego edytora, na przykład w C: \> RxJava.
ObservableTester.java
import java.util.Random;
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
Observable.just("A", "AB", "ABC")
.flatMap(v -> getLengthWithDelay(v)
.doOnNext(s -> System.out.println("Processing Thread "
+ Thread.currentThread().getName()))
.subscribeOn(Schedulers.io()))
.subscribe(length -> System.out.println("Receiver Thread "
+ Thread.currentThread().getName()
+ ", Item length " + length));
Thread.sleep(10000);
}
protected static Observable<Integer> getLengthWithDelay(String v) {
Random random = new Random();
try {
Thread.sleep(random.nextInt(3) * 1000);
return Observable.just(v.length());
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
}
Sprawdź wynik
Skompiluj klasę przy użyciu javac kompilator w następujący sposób -
C:\RxJava>javac ObservableTester.java
Teraz uruchom ObservableTester w następujący sposób -
C:\RxJava>java ObservableTester
Powinien dać następujący wynik -
Processing Thread RxCachedThreadScheduler-1
Receiver Thread RxCachedThreadScheduler-1, Item length 1
Processing Thread RxCachedThreadScheduler-1
Receiver Thread RxCachedThreadScheduler-1, Item length 2
Processing Thread RxCachedThreadScheduler-1
Receiver Thread RxCachedThreadScheduler-1, Item length 3
Schedulers.from (Executor) konwertuje Executor do nowej instancji programu planującego.
Schedulers.from (Executor) Przykład
Utwórz następujący program w języku Java, używając dowolnego wybranego edytora, na przykład w C: \> RxJava.
ObservableTester.java
import java.util.Random;
import java.util.concurrent.Executors;
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
Observable.just("A", "AB", "ABC")
.flatMap(v -> getLengthWithDelay(v)
.doOnNext(s -> System.out.println("Processing Thread "
+ Thread.currentThread().getName()))
.subscribeOn(Schedulers.from(Executors.newFixedThreadPool(3))))
.subscribe(length -> System.out.println("Receiver Thread "
+ Thread.currentThread().getName()
+ ", Item length " + length));
Thread.sleep(10000);
}
protected static Observable<Integer> getLengthWithDelay(String v) {
Random random = new Random();
try {
Thread.sleep(random.nextInt(3) * 1000);
return Observable.just(v.length());
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
}
Sprawdź wynik
Skompiluj klasę przy użyciu javac kompilator w następujący sposób -
C:\RxJava>javac ObservableTester.java
Teraz uruchom ObservableTester w następujący sposób -
C:\RxJava>java ObservableTester
Powinien dać następujący wynik -
Processing Thread pool-1-thread-1
Processing Thread pool-3-thread-1
Receiver Thread pool-1-thread-1, Item length 1
Processing Thread pool-4-thread-1
Receiver Thread pool-4-thread-1, Item length 3
Receiver Thread pool-3-thread-1, Item length 2
Operator buforowania pozwala zebrać elementy wyemitowane przez Observable na listę lub paczki i wyemitować te paczki zamiast elementów. W poniższym przykładzie stworzyliśmy Observable, aby emitować 9 elementów i używając buforowania, 3 elementy zostaną wyemitowane razem.
Przykład buforowania
Utwórz następujący program w języku Java, używając dowolnego wybranego edytora, na przykład w C: \> RxJava.
ObservableTester.java
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
Observable<Integer> observable = Observable.just(1, 2, 3, 4,
5, 6, 7, 8, 9);
observable.subscribeOn(Schedulers.io())
.delay(2, TimeUnit.SECONDS, Schedulers.io())
.buffer(3)
.subscribe(new Observer<List<Integer>>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("Subscribed");
}
@Override
public void onNext(List<Integer> integers) {
System.out.println("onNext: ");
for (Integer value : integers) {
System.out.println(value);
}
}
@Override
public void onError(Throwable e) {
System.out.println("Error");
}
@Override
public void onComplete() {
System.out.println("Done! ");
}
});
Thread.sleep(3000);
}
}
Sprawdź wynik
Skompiluj klasę przy użyciu javac kompilator w następujący sposób -
C:\RxJava>javac ObservableTester.java
Teraz uruchom ObservableTester w następujący sposób -
C:\RxJava>java ObservableTester
Powinien dać następujący wynik -
Subscribed
onNext:
1
2
3
onNext:
4
5
6
onNext:
7
8
9
Done!
Operator okienkowania działa podobnie do operatora bufora, ale pozwala zebrać elementy wyemitowane przez Observable do innego obserowalnego zamiast zbierania i emitować te Observables zamiast kolekcji. W poniższym przykładzie stworzyliśmy Observable do emitowania 9 elementów i używając operatora okna, 3 Observable zostaną wyemitowane razem.
Przykład okienkowania
Utwórz następujący program w języku Java, używając dowolnego wybranego edytora, na przykład w C: \> RxJava.
ObservableTester.java
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
import java.util.concurrent.TimeUnit;
public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
Observable<Integer> observable = Observable.just(1, 2, 3, 4,
5, 6, 7, 8, 9);
observable.subscribeOn(Schedulers.io())
.delay(2, TimeUnit.SECONDS, Schedulers.io())
.window(3)
.subscribe(new Observer<Observable<Integer>>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("Subscribed");
}
@Override
public void onNext(Observable<Integer> integers) {
System.out.println("onNext: ");
integers.subscribe(value -> System.out.println(value));
}
@Override
public void onError(Throwable e) {
System.out.println("Error");
}
@Override
public void onComplete() {
System.out.println("Done! ");
}
});
Thread.sleep(3000);
}
}
Sprawdź wynik
Skompiluj klasę przy użyciu javac kompilator w następujący sposób -
C:\RxJava>javac ObservableTester.java
Teraz uruchom ObservableTester w następujący sposób -
C:\RxJava>java ObservableTester
Powinien dać następujący wynik -
Subscribed
onNext:
1
2
3
onNext:
4
5
6
onNext:
7
8
9
Done!