RxJava - Kurzanleitung

RxJava ist eine Java-basierte Erweiterung von ReactiveX. Es bietet eine Implementierung oder ein ReactiveX-Projekt in Java. Im Folgenden sind die wichtigsten Merkmale von RxJava aufgeführt.

  • Erweitert das Beobachtermuster.

  • Unterstützungssequenzen von Daten / Ereignissen.

  • Bietet Operatoren, um Sequenzen deklarativ zusammenzusetzen.

  • Behandelt Threading, Synchronisation, Thread-Sicherheit und gleichzeitige Datenstrukturen intern.

Was ist ReactiveX?

ReactiveX ist ein Projekt, das darauf abzielt, verschiedene Programmiersprachen mit einem reaktiven Programmierkonzept zu versehen. Reaktive Programmierung bezieht sich auf das Szenario, in dem das Programm reagiert, sobald Daten angezeigt werden. Es ist ein ereignisbasiertes Programmierkonzept und Ereignisse können sich an Registerbeobachter ausbreiten.

Nach dem ReactiveSie haben das Beste aus Observer-Muster, Iterator-Muster und Funktionsmuster kombiniert.

Das Beobachtermuster richtig gemacht. ReactiveX ist eine Kombination der besten Ideen aus dem Observer-Muster, dem Iterator-Muster und der funktionalen Programmierung.

Funktionale Programmierung

Bei der funktionalen Programmierung geht es darum, die Software mit reinen Funktionen zu erstellen. Eine reine Funktion hängt nicht vom vorherigen Status ab und gibt immer das gleiche Ergebnis für die gleichen übergebenen Parameter zurück. Reine Funktionen helfen dabei, Probleme zu vermeiden, die mit gemeinsam genutzten Objekten, veränderlichen Daten und Nebenwirkungen verbunden sind, die in Multithreading-Umgebungen häufig auftreten.

Reaktive Programmierung

Reaktive Programmierung bezieht sich auf ereignisgesteuerte Programmierung, bei der Datenströme asynchron eingehen und bei ihrem Eintreffen verarbeitet werden.

Funktionale reaktive Programmierung

RxJava implementiert beide Konzepte zusammen, wobei sich die Daten von Streams im Laufe der Zeit ändern und die Verbraucherfunktion entsprechend reagiert.

Das reaktive Manifest

Reactive Manifesto ist ein Online-Dokument, das den hohen Standard von Anwendungssoftwaresystemen beschreibt. Nach dem Manifest sind im Folgenden die Schlüsselattribute einer reaktiven Software aufgeführt:

  • Responsive - Sollte immer rechtzeitig reagieren.

  • Message Driven - Sollte eine asynchrone Nachrichtenübermittlung zwischen Komponenten verwenden, damit diese eine lose Kopplung beibehalten.

  • Elastic - Sollte auch unter hoher Last reaktionsschnell bleiben.

  • Resilient - Sollte auch dann reaktionsfähig bleiben, wenn eine oder mehrere Komponenten ausfallen.

Schlüsselkomponenten von RxJava

RxJava besteht aus zwei Hauptkomponenten: Observables und Observer.

  • Observable - Es stellt ein Objekt ähnlich wie Stream dar, das null oder mehr Daten ausgeben kann, eine Fehlermeldung senden kann, deren Geschwindigkeit während der Ausgabe eines Datensatzes gesteuert werden kann, endliche und unendliche Daten senden kann.

  • Observer- Es abonniert die Sequenzdaten von Observable und reagiert pro Element der Observablen. Beobachter werden benachrichtigt, wenn Observable Daten ausgibt. Ein Beobachter verarbeitet Daten einzeln.

Ein Beobachter wird niemals benachrichtigt, wenn Elemente nicht vorhanden sind oder ein Rückruf für ein vorheriges Element nicht zurückgegeben wird.

Einrichtung der lokalen Umgebung

RxJava ist eine Bibliothek für Java. Die allererste Voraussetzung ist daher, dass JDK auf Ihrem Computer installiert ist.

System Anforderungen

JDK 1,5 oder höher.
Erinnerung Keine Mindestanforderung.
Festplattenplatz Keine Mindestanforderung.
Betriebssystem Keine Mindestanforderung.

Schritt 1 - Überprüfen Sie die Java-Installation auf Ihrem Computer

Öffnen Sie zunächst die Konsole und führen Sie einen Java-Befehl aus, der auf dem Betriebssystem basiert, an dem Sie arbeiten.

Betriebssystem Aufgabe Befehl
Windows Öffnen Sie die Befehlskonsole c: \> Java-Version
Linux Öffnen Sie das Befehlsterminal $ java -version
Mac Terminal öffnen Maschine: <joseph $ java -version

Lassen Sie uns die Ausgabe für alle Betriebssysteme überprüfen -

Betriebssystem Ausgabe
Windows

Java-Version "1.8.0_101"

Java (TM) SE-Laufzeitumgebung (Build 1.8.0_101)

Linux

Java-Version "1.8.0_101"

Java (TM) SE-Laufzeitumgebung (Build 1.8.0_101)

Mac

Java-Version "1.8.0_101"

Java (TM) SE-Laufzeitumgebung (Build 1.8.0_101)

Wenn auf Ihrem System kein Java installiert ist, laden Sie das Java Software Development Kit (SDK) über den folgenden Link herunter https://www.oracle.com. Wir gehen davon aus, dass Java 1.8.0_101 die installierte Version für dieses Tutorial ist.

Schritt 2 - JAVA-Umgebung einstellen

Stellen Sie die JAVA_HOMEUmgebungsvariable, die auf den Speicherort des Basisverzeichnisses verweist, in dem Java auf Ihrem Computer installiert ist. Zum Beispiel.

Betriebssystem Ausgabe
Windows Setzen Sie die Umgebungsvariable JAVA_HOME auf C: \ Programme \ Java \ jdk1.8.0_101
Linux export JAVA_HOME = / usr / local / java-current
Mac export JAVA_HOME = / Library / Java / Home

Hängen Sie den Java-Compiler-Speicherort an den Systempfad an.

Betriebssystem Ausgabe
Windows Hängen Sie die Zeichenfolge an C:\Program Files\Java\jdk1.8.0_101\bin am Ende der Systemvariablen, Path.
Linux export PATH = $ PATH: $ JAVA_HOME / bin /
Mac nicht benötigt

Überprüfen Sie die Java-Installation mit dem Befehl java -version wie oben erklärt.

Schritt 3 - Laden Sie das RxJava2-Archiv herunter

Laden Sie die neueste Version der RxJava-JAR-Datei von RxJava @ MVNRepository und ihrer Abhängigkeit Reactive Streams @ MVNRepository herunter . Zum Zeitpunkt des Schreibens dieses Tutorials haben wir rxjava-2.2.4.jar, reactive -streams-1.0.2.jar heruntergeladen und in den Ordner C: \> RxJava kopiert.

Betriebssystem Archivname
Windows rxjava-2.2.4.jar, reaktive-Streams-1.0.2.jar
Linux rxjava-2.2.4.jar, reaktive-Streams-1.0.2.jar
Mac rxjava-2.2.4.jar, reaktive-Streams-1.0.2.jar

Schritt 4 - RxJava-Umgebung festlegen

Stellen Sie die RX_JAVAUmgebungsvariable, die auf den Speicherort des Basisverzeichnisses verweist, in dem das RxJava-JAR auf Ihrem Computer gespeichert ist. Nehmen wir an, wir haben rxjava-2.2.4.jar und reaktive-Streams-1.0.2.jar im Ordner RxJava gespeichert.

Sr.Nr. Betriebssystem & Beschreibung
1

Windows

Setzen Sie die Umgebungsvariable RX_JAVA auf C: \ RxJava

2

Linux

exportiere RX_JAVA = / usr / local / RxJava

3

Mac

export RX_JAVA = / Library / RxJava

Schritt 5 - Legen Sie die Variable CLASSPATH fest

Stellen Sie die CLASSPATH Umgebungsvariable, die auf den Speicherort des RxJava-JARs verweist.

Sr.Nr. Betriebssystem & Beschreibung
1

Windows

Setzen Sie die Umgebungsvariable CLASSPATH auf% CLASSPATH%;% RX_JAVA% \ rxjava-2.2.4.jar;% RX_JAVA% \ reaktive-Streams-1.0.2.jar;.;

2

Linux

export CLASSPATH = $ CLASSPATH: $ RX_JAVA / rxjava-2.2.4.jar: reaktive Streams-1.0.2.jar:.

3

Mac

export CLASSPATH = $ CLASSPATH: $ RX_JAVA / rxjava-2.2.4.jar: reaktive Streams-1.0.2.jar:.

Schritt 6 - Testen Sie das RxJava-Setup

Erstellen Sie eine Klasse TestRx.java wie unten gezeigt -

import io.reactivex.Flowable;
public class TestRx {
   public static void main(String[] args) {
      Flowable.just("Hello World!")
         .subscribe(System.out::println);
   }
}

Schritt 7 - Überprüfen Sie das Ergebnis

Kompilieren Sie die Klassen mit javac Compiler wie folgt -

C:\RxJava>javac Tester.java

Überprüfen Sie die Ausgabe.

Hello World!

Observables stellt die Datenquellen dar, wobei as Observers (Subscribers)höre ihnen zu. Kurz gesagt, ein Observable sendet Elemente aus und ein Abonnent verbraucht diese Elemente.

Beobachtbar

  • Observable liefert Daten, sobald der Teilnehmer zuhört.

  • Observable kann eine beliebige Anzahl von Elementen ausgeben.

  • Observable kann auch nur ein Abschlusssignal ohne Gegenstand ausgeben.

  • Observable kann erfolgreich beendet werden.

  • Observable kann niemals enden. Beispielsweise kann eine Schaltfläche beliebig oft angeklickt werden.

  • Observable kann zu jedem Zeitpunkt einen Fehler auslösen.

Teilnehmer

  • Observable kann mehrere Teilnehmer haben.

  • Wenn ein Observable ein Element ausgibt, wird jeder Abonnent der onNext () -Methode aufgerufen.

  • Wenn ein Observable die Ausgabe von Elementen beendet hat, wird jeder Abonnent der Methode onComplete () aufgerufen.

  • Wenn ein Observable einen Fehler ausgibt, wird jede onError () -Methode des Abonnenten aufgerufen.

Im Folgenden finden Sie die Basisklassen zum Erstellen von Observablen.

  • Flowable- 0..N fließt, gibt 0 oder n Elemente aus. Unterstützt Reactive-Streams und Gegendruck.

  • Observable - 0..N fließt, aber kein Gegendruck.

  • Single- 1 Artikel oder Fehler. Kann als reaktive Version des Methodenaufrufs behandelt werden.

  • Completable- Kein Artikel ausgegeben. Wird als Signal für Abschluss oder Fehler verwendet. Kann als reaktive Version von Runnable behandelt werden.

  • MayBe- Entweder kein Artikel oder 1 Artikel ausgegeben. Kann als reaktive Version von Optional behandelt werden.

Im Folgenden finden Sie die praktischen Methoden zum Erstellen von Observablen in der Observable-Klasse.

  • just(T item) - Gibt ein Observable zurück, das das angegebene Element (konstante Referenz) signalisiert und dann vervollständigt.

  • fromIterable(Iterable source) - Konvertiert eine Iterable-Sequenz in eine ObservableSource, die die Elemente in der Sequenz ausgibt.

  • fromArray(T... items) - Konvertiert ein Array in eine ObservableSource, die die Elemente im Array ausgibt.

  • fromCallable(Callable supplier) - Gibt ein Observable zurück, das, wenn ein Beobachter es abonniert, eine von Ihnen angegebene Funktion aufruft und dann den von dieser Funktion zurückgegebenen Wert ausgibt.

  • fromFuture(Future future) - Wandelt eine Zukunft in eine ObservableSource um.

  • interval(long initialDelay, long period, TimeUnit unit) - Gibt ein Observable zurück, das nach der anfänglichen Verzögerung eine 0L ausgibt und nach jedem Zeitraum danach immer mehr Zahlen.

Die Einzelklasse repräsentiert die Einzelwertantwort. Single Observable kann nur einen einzelnen erfolgreichen Wert oder einen Fehler ausgeben. Das Ereignis onComplete wird nicht ausgegeben.

Klassenerklärung

Es folgt die Erklärung für io.reactivex.Single<T> Klasse -

public abstract class Single<T>
   extends Object
      implements SingleSource<T>

Protokoll

Es folgt das sequentielle Protokoll, das Single Observable ausführt:

onSubscribe (onSuccess | onError)?

Einzelbeispiel

Erstellen Sie das folgende Java-Programm mit einem beliebigen Editor Ihrer Wahl, z. B. in C: \> RxJava.

ObservableTester.java

import java.util.concurrent.TimeUnit;

import io.reactivex.Single;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableSingleObserver;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args)  throws InterruptedException {
      //Create the observable
      Single<String> testSingle = Single.just("Hello World");

      //Create an observer
      Disposable disposable = testSingle
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .subscribeWith(
         new DisposableSingleObserver<String>() {

         @Override
         public void onError(Throwable e) { 
            e.printStackTrace();
         }

         @Override
         public void onSuccess(String value) {
            System.out.println(value);
         }
      }); 
      Thread.sleep(3000);
      //start observing
      disposable.dispose();
   }
}

Überprüfen Sie das Ergebnis

Kompilieren Sie die Klasse mit javac Compiler wie folgt -

C:\RxJava>javac ObservableTester.java

Führen Sie nun den ObservableTester wie folgt aus:

C:\RxJava>java ObservableTester

Es sollte die folgende Ausgabe erzeugen -

Hello World

Die MayBe-Klasse repräsentiert eine verzögerte Antwort. MayBe Observable kann entweder einen einzelnen erfolgreichen Wert oder keinen Wert ausgeben.

Klassenerklärung

Es folgt die Erklärung für io.reactivex.Single<T> Klasse -

public abstract class Maybe<T>
   extends Object
      implements MaybeSource<T>

Protokoll

Es folgt das sequentielle Protokoll, das MayBe Observable ausführt:

onSubscribe (onSuccess | onError | OnComplete)?

MayBe Beispiel

Erstellen Sie das folgende Java-Programm mit einem beliebigen Editor Ihrer Wahl, z. B. in C: \> RxJava.

ObservableTester.java

import java.util.concurrent.TimeUnit;

import io.reactivex.Maybe;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableMaybeObserver;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester {
   public static void main(String[] args) throws InterruptedException {
      //Create an observer
      Disposable disposable = Maybe.just("Hello World")
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .subscribeWith(new DisposableMaybeObserver<String>() {
         @Override
         public void onError(Throwable e) { 
            e.printStackTrace();
         }

         @Override
         public void onSuccess(String value) {
            System.out.println(value);
         }

         @Override
         public void onComplete() {
            System.out.println("Done!");
         }
      }); 
      Thread.sleep(3000);
      //start observing
      disposable.dispose();
   }
}

Überprüfen Sie das Ergebnis

Kompilieren Sie die Klasse mit javac Compiler wie folgt -

C:\RxJava>javac ObservableTester.java

Führen Sie nun den ObservableTester wie folgt aus:

C:\RxJava>java ObservableTester

Es sollte die folgende Ausgabe erzeugen -

Hello World

Die Completable-Klasse repräsentiert die verzögerte Antwort. Completable Observable kann entweder einen erfolgreichen Abschluss oder einen Fehler anzeigen.

Klassenerklärung

Es folgt die Erklärung für io.reactivex.Completable Klasse -

public abstract class Completable
extends Object
implements CompletableSource

Protokoll

Es folgt das sequentielle Protokoll, das Completable Observable ausführt:

onSubscribe (onError | onComplete)?

Vervollständigbares Beispiel

Erstellen Sie das folgende Java-Programm mit einem beliebigen Editor Ihrer Wahl, z. B. in C: \> RxJava.

ObservableTester.java

import java.util.concurrent.TimeUnit;

import io.reactivex.Completable;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableCompletableObserver;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester {
   public static void main(String[] args) throws InterruptedException {

      //Create an observer
      Disposable disposable = Completable.complete()
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .subscribeWith(new DisposableCompletableObserver() {
         @Override
         public void onError(Throwable e) { 
            e.printStackTrace();
         }
         @Override
         public void onStart() {
            System.out.println("Started!");
         }
         @Override
         public void onComplete() {
            System.out.println("Done!");
         }
      }); 
      Thread.sleep(3000);
      //start observing
      disposable.dispose();
   }
}

Überprüfen Sie das Ergebnis

Kompilieren Sie die Klasse mit javac Compiler wie folgt -

C:\RxJava>javac ObservableTester.java

Führen Sie nun den ObservableTester wie folgt aus:

C:\RxJava>java ObservableTester

Es sollte die folgende Ausgabe erzeugen -

Started!
Done!

Die CompositeDisposable-Klasse stellt einen Container dar, der mehrere Einwegartikel aufnehmen kann und eine O (1) -Komplexität beim Hinzufügen und Entfernen von Einwegartikeln bietet.

Klassenerklärung

Es folgt die Erklärung für io.reactivex.disposables.CompositeDisposable Klasse -

public final class CompositeDisposable
extends Object
implements Disposable, io.reactivex.internal.disposables.DisposableContainer

CompositeDisposable-Beispiel

Erstellen Sie das folgende Java-Programm mit einem beliebigen Editor Ihrer Wahl, z. B. in C: \> RxJava.

ObservableTester.java

import io.reactivex.Maybe;
import io.reactivex.Single;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableMaybeObserver;
import io.reactivex.observers.DisposableSingleObserver;
import io.reactivex.schedulers.Schedulers;

import java.util.concurrent.TimeUnit;

public class ObservableTester  {
   public static void main(String[] args)  throws InterruptedException {    
      CompositeDisposable compositeDisposable = new CompositeDisposable();

      //Create an Single observer 
      Disposable disposableSingle = Single.just("Hello World")
      .delay(2, TimeUnit.SECONDS, Schedulers.io())
      .subscribeWith(
      new DisposableSingleObserver<String>() {
         @Override
         public void onError(Throwable e) {
            e.printStackTrace();
         }

         @Override
         public void onSuccess(String value) {
            System.out.println(value);
         }
      }); 

      //Create an observer
      Disposable disposableMayBe = Maybe.just("Hi")
      .delay(2, TimeUnit.SECONDS, Schedulers.io())
      .subscribeWith(new DisposableMaybeObserver<String>() {
         @Override
         public void onError(Throwable e) { 
            e.printStackTrace();
         }

         @Override
         public void onSuccess(String value) {
            System.out.println(value);
         }

         @Override
         public void onComplete() {
            System.out.println("Done!");
         }
      }); 

      Thread.sleep(3000);

      compositeDisposable.add(disposableSingle);
      compositeDisposable.add(disposableMayBe);

      //start observing
      compositeDisposable.dispose();
   }
}

Überprüfen Sie das Ergebnis

Kompilieren Sie die Klasse mit javac Compiler wie folgt -

C:\RxJava>javac ObservableTester.java

Führen Sie nun den ObservableTester wie folgt aus:

C:\RxJava>java ObservableTester

Es sollte die folgende Ausgabe erzeugen -

Hello World
Hi

Im Folgenden sind die Operatoren aufgeführt, mit denen ein Observable erstellt wird.

Sr.Nr. Betreiber & Beschreibung
1

Create

Erstellt ein Observable von Grund auf neu und ermöglicht es der Beobachtermethode, programmgesteuert aufzurufen.

2

Defer

Erstellen Sie kein Observable, bis ein Beobachter es abonniert hat. Erstellt für jeden Beobachter ein neues Observable.

3

Empty/Never/Throw

Erstellt ein Observable mit eingeschränktem Verhalten.

4

From

Konvertiert eine Objekt- / Datenstruktur in eine Observable.

5

Interval

Erstellt eine beobachtbare emittierende Ganzzahl nacheinander mit einer Lücke des angegebenen Zeitintervalls.

6

Just

Konvertiert eine Objekt- / Datenstruktur in eine Observable, um denselben oder denselben Objekttyp auszugeben.

7

Range

Erstellt eine beobachtbare emittierende Ganzzahl in der Reihenfolge des angegebenen Bereichs.

8

Repeat

Erstellt wiederholt eine beobachtbare emittierende Ganzzahl nacheinander.

9

Start

Erstellt ein Observable, um den Rückgabewert einer Funktion auszugeben.

10

Timer

Erstellt ein Observable, um nach einer bestimmten Verzögerung ein einzelnes Element auszugeben.

Operator-Beispiel erstellen

Erstellen Sie das folgende Java-Programm mit einem beliebigen Editor Ihrer Wahl, z. B. in C: \> RxJava.

ObservableTester.java

import io.reactivex.Observable;
//Using fromArray operator to create an Observable
public class ObservableTester  {
   public static void main(String[] args) { 
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable = Observable.fromArray(letters);
      observable
         .map(String::toUpperCase)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

Überprüfen Sie das Ergebnis

Kompilieren Sie die Klasse mit javac Compiler wie folgt -

C:\RxJava>javac ObservableTester.java

Führen Sie nun den ObservableTester wie folgt aus:

C:\RxJava>java ObservableTester

Es sollte die folgende Ausgabe erzeugen -

ABCDEFG

Im Folgenden sind die Operatoren aufgeführt, mit denen ein von einem Observable ausgegebenes Element transformiert wird.

Sr.Nr. Betreiber & Beschreibung
1

Buffer

Sammelt regelmäßig Elemente von Observable in Bündel und gibt dann die Bündel anstelle von Elementen aus.

2

FlatMap

Wird in verschachtelten Observablen verwendet. Wandelt Elemente in Observables um. Reduzieren Sie dann die Elemente zu einem einzigen Observable.

3

GroupBy

Teilen Sie eine Observable in eine Reihe von Observables ein, die nach Schlüsseln organisiert sind, um verschiedene Gruppen von Elementen auszugeben.

4

Map

Wenden Sie auf jedes ausgegebene Element eine Funktion an, um es zu transformieren.

5

Scan

Wenden Sie nacheinander eine Funktion auf jedes ausgegebene Element an und geben Sie dann den nachfolgenden Wert aus.

6

Window

Sammelt regelmäßig Elemente aus Observable in Observable-Fenster und gibt dann die Fenster anstelle von Elementen aus.

Beispiel für einen transformierenden Operator

Erstellen Sie das folgende Java-Programm mit einem beliebigen Editor Ihrer Wahl, z. B. in C: \> RxJava.

ObservableTester.java

import io.reactivex.Observable;
//Using map operator to transform an Observable
public class ObservableTester  { 
   public static void main(String[] args) {    
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable = Observable.fromArray(letters);
      observable
         .map(String::toUpperCase)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

Überprüfen Sie das Ergebnis

Kompilieren Sie die Klasse mit javac Compiler wie folgt -

C:\RxJava>javac ObservableTester.java

Führen Sie nun den ObservableTester wie folgt aus:

C:\RxJava>java ObservableTester

Es sollte die folgende Ausgabe erzeugen -

ABCDEFG

Im Folgenden sind die Operatoren aufgeführt, mit denen Elemente von einem Observable selektiv ausgegeben werden.

Sr.Nr. Betreiber & Beschreibung
1

Debounce

Gibt Elemente nur aus, wenn eine Zeitüberschreitung auftritt, ohne dass ein anderes Element ausgegeben wird.

2

Distinct

Gibt nur eindeutige Artikel aus.

3

ElementAt

emittiere nur Elemente mit n Index, die von einem Observable ausgegeben werden.

4

Filter

Gibt nur die Elemente aus, die die angegebene Prädikatfunktion erfüllen.

5

First

Gibt das erste Element oder das erste Element aus, das die angegebenen Kriterien erfüllt hat.

6

IgnoreElements

Gibt keine Elemente von Observable aus, sondern markiert die Fertigstellung.

7

Last

Gibt das letzte Element von Observable aus.

8

Sample

Gibt das aktuellste Element mit einem bestimmten Zeitintervall aus.

9

Skip

Überspringt die ersten n Elemente eines Observable.

10

SkipLast

Überspringt die letzten n Elemente eines Observable.

11

Take

Nimmt die ersten n Elemente aus einem Observable.

12

TakeLast

Nimmt die letzten n Elemente aus einem Observable.

Beispiel für einen Filteroperator

Erstellen Sie das folgende Java-Programm mit einem beliebigen Editor Ihrer Wahl, z. B. in C: \> RxJava.

ObservableTester.java

import io.reactivex.Observable;
//Using take operator to filter an Observable
public class ObservableTester  {
   public static void main(String[] args) {    
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable = Observable.fromArray(letters);
      observable
         .take(2)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

Überprüfen Sie das Ergebnis

Kompilieren Sie die Klasse mit javac Compiler wie folgt -

C:\RxJava>javac ObservableTester.java

Führen Sie nun den ObservableTester wie folgt aus:

C:\RxJava>java ObservableTester

Es sollte die folgende Ausgabe erzeugen -

ab

Im Folgenden sind die Operatoren aufgeführt, mit denen eine einzelne Observable aus mehreren Observables erstellt wird.

Sr.Nr. Betreiber & Beschreibung
1 And/Then/When

Kombinieren Sie Objektgruppen mithilfe von Muster- und Planvermittlern.

2 CombineLatest

Kombinieren Sie das neueste Element, das von jedem Observable über eine bestimmte Funktion ausgegeben wird, und senden Sie das resultierende Element aus.

3 Join

Kombinieren Sie von zwei Observables emittierte Elemente, wenn diese während des Zeitrahmens des zweiten von Observable emittierten Elements ausgegeben werden.

4 Merge

Kombiniert die von Observables ausgegebenen Elemente.

5 StartWith

Geben Sie eine bestimmte Folge von Elementen aus, bevor Sie mit dem Ausgeben der Elemente aus der Quelle Observable beginnen

6 Switch

Gibt die neuesten von Observables ausgegebenen Elemente aus.

7 Zip

Kombiniert Elemente von Observables basierend auf der Funktion und gibt die resultierenden Elemente aus.

Kombinierendes Operatorbeispiel

Erstellen Sie das folgende Java-Programm mit einem beliebigen Editor Ihrer Wahl, z. B. in C: \> RxJava.

ObservableTester.java

import io.reactivex.Observable;
//Using combineLatest operator to combine Observables
public class ObservableTester {
   public static void main(String[] args) {    
      Integer[] numbers = { 1, 2, 3, 4, 5, 6};
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable1 = Observable.fromArray(letters);
      Observable<Integer> observable2 = Observable.fromArray(numbers);
      Observable.combineLatest(observable1, observable2, (a,b) -> a + b)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

Überprüfen Sie das Ergebnis

Kompilieren Sie die Klasse mit javac Compiler wie folgt -

C:\RxJava>javac ObservableTester.java

Führen Sie nun den ObservableTester wie folgt aus:

C:\RxJava>java ObservableTester

Es sollte die folgende Ausgabe erzeugen -

g1g2g3g4g5g6

Im Folgenden sind die Operatoren aufgeführt, die bei Observables häufig hilfreich sind.

Sr.Nr. Betreiber & Beschreibung
1

Delay

Registrieren Sie die Aktion, um beobachtbare Lebenszyklusereignisse zu verarbeiten.

2

Materialize/Dematerialize

Stellt das ausgegebene Element und die gesendete Benachrichtigung dar.

3

ObserveOn

Geben Sie den zu beobachtenden Scheduler an.

4

Serialize

Erzwinge Observable, um serialisierte Anrufe zu tätigen.

5

Subscribe

Arbeiten Sie mit den Emissionen von Gegenständen und Benachrichtigungen wie vollständig von einem Observable

6

SubscribeOn

Geben Sie den Scheduler an, der von einem Observable verwendet werden soll, wenn es abonniert ist.

7

TimeInterval

Konvertieren Sie ein Observable, um Angaben zur Zeitspanne zwischen den Emissionen zu machen.

8

Timeout

Gibt eine Fehlerbenachrichtigung aus, wenn die angegebene Zeit auftritt, ohne dass ein Element ausgegeben wird.

9

Timestamp

Fügen Sie jedem ausgegebenen Objekt einen Zeitstempel hinzu.

9

Using

Erstellt eine verfügbare Ressource oder dieselbe Lebensdauer wie Observable.

Beispiel für einen Dienstprogrammbetreiber

Erstellen Sie das folgende Java-Programm mit einem beliebigen Editor Ihrer Wahl, z. B. in C: \> RxJava.

ObservableTester.java

import io.reactivex.Observable;
//Using subscribe operator to subscribe to an Observable
public class ObservableTester  {
   public static void main(String[] args) {    
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable = Observable.fromArray(letters);
      observable.subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

Überprüfen Sie das Ergebnis

Kompilieren Sie die Klasse mit javac Compiler wie folgt -

C:\RxJava>javac ObservableTester.java

Führen Sie nun den ObservableTester wie folgt aus:

C:\RxJava>java ObservableTester

Es sollte die folgende Ausgabe erzeugen -

abcdefg

Im Folgenden sind die Operatoren aufgeführt, die eine oder mehrere Observables oder emittierte Elemente auswerten.

Sr.Nr. Betreiber & Beschreibung
1

All

Wertet alle ausgegebenen Elemente aus, um bestimmte Kriterien zu erfüllen.

2

Amb

Gibt alle Elemente aus dem ersten Observable nur bei mehreren Observables aus.

3

Contains

Überprüft, ob ein Observable einen bestimmten Gegenstand ausgibt oder nicht.

4

DefaultIfEmpty

Gibt das Standardelement aus, wenn Observable nichts ausgibt.

5

SequenceEqual

Überprüft, ob zwei Observables dieselbe Folge von Elementen ausgeben.

6

SkipUntil

Wirft vom ersten Observable ausgegebene Elemente ab, bis ein zweites Observable ein Element ausgibt.

7

SkipWhile

Verwerfen Sie von einem Observable ausgegebene Elemente, bis eine bestimmte Bedingung falsch wird.

8

TakeUntil

Wirft von einem Observable ausgegebene Elemente ab, nachdem ein zweites Observable ein Element ausgegeben oder beendet hat.

9

TakeWhile

Von einem Observable ausgegebene Elemente verwerfen, nachdem eine bestimmte Bedingung falsch geworden ist.

Beispiel für einen bedingten Operator

Erstellen Sie das folgende Java-Programm mit einem beliebigen Editor Ihrer Wahl, z. B. in C: \> RxJava.

ObservableTester.java

import io.reactivex.Observable;
//Using defaultIfEmpty operator to operate on an Observable
public class ObservableTester  {
   public static void main(String[] args) {    
      final StringBuilder result = new StringBuilder();
      Observable.empty()
      .defaultIfEmpty("No Data")
      .subscribe(s -> result.append(s));
      System.out.println(result);
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result1 = new StringBuilder();
      Observable.fromArray(letters)
      .firstElement()
      .defaultIfEmpty("No data")   
      .subscribe(s -> result1.append(s));
      System.out.println(result1);
   }
}

Überprüfen Sie das Ergebnis

Kompilieren Sie die Klasse mit javac Compiler wie folgt -

C:\RxJava>javac ObservableTester.java

Führen Sie nun den ObservableTester wie folgt aus:

C:\RxJava>java ObservableTester

Es sollte die folgende Ausgabe erzeugen -

No Data
a

Im Folgenden sind die Bediener aufgeführt, die ganze Elemente bearbeiten, die von einem Observable ausgegeben werden.

Sr.Nr. Betreiber & Beschreibung
1

Average

Wertet die Durchschnittswerte aller Elemente aus und gibt das Ergebnis aus.

2

Concat

Gibt alle Elemente aus mehreren Observable ohne Verschachtelung aus.

3

Count

Zählt alle Elemente und gibt das Ergebnis aus.

4

Max

Wertet den maximal bewerteten Artikel aller Artikel aus und gibt das Ergebnis aus.

5

Min

Wertet den min-wertigen Artikel aller Artikel aus und gibt das Ergebnis aus.

6

Reduce

Wenden Sie auf jedes Element eine Funktion an und geben Sie das Ergebnis zurück.

7

Sum

Wertet die Summe aller Elemente aus und gibt das Ergebnis aus.

Beispiel für einen mathematischen Operator

Erstellen Sie das folgende Java-Programm mit einem beliebigen Editor Ihrer Wahl, z. B. in C: \> RxJava.

ObservableTester.java

import io.reactivex.Observable;
//Using concat operator to operate on multiple Observables
public class ObservableTester  {
   public static void main(String[] args)  throws InterruptedException {    
      Integer[] numbers = { 1, 2, 3, 4, 5, 6};
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable1 = Observable.fromArray(letters);
      Observable<Integer> observable2 = Observable.fromArray(numbers);
      Observable.concat(observable1, observable2)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

Überprüfen Sie das Ergebnis

Kompilieren Sie die Klasse mit javac Compiler wie folgt -

C:\RxJava>javac ObservableTester.java

Führen Sie nun den ObservableTester wie folgt aus:

C:\RxJava>java ObservableTester

Es sollte die folgende Ausgabe erzeugen -

abcdefg123456

Im Folgenden sind die Betreiber aufgeführt, die eine genauere Kontrolle über das Abonnement haben.

Sr.Nr. Betreiber & Beschreibung
1

Connect

Weisen Sie ein verbindbares Observable an, Elemente an seine Abonnenten zu senden.

2

Publish

Konvertiert ein Observable in ein verbindbares Observable.

3

RefCount

Konvertiert eine anschließbare Observable in eine normale Observable.

4

Replay

Stellen Sie sicher, dass jeder Abonnent dieselbe Reihenfolge der ausgegebenen Elemente sieht, auch nachdem das Observable mit dem Senden von Elementen begonnen hat und die Abonnenten später abonnieren.

Beispiel für einen anschließbaren Bediener

Erstellen Sie das folgende Java-Programm mit einem beliebigen Editor Ihrer Wahl, z. B. in C: \> RxJava.

ObservableTester.java

import io.reactivex.Observable;
import io.reactivex.observables.ConnectableObservable;
//Using connect operator on a ConnectableObservable
public class ObservableTester {
   public static void main(String[] args) {
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      ConnectableObservable<String> connectable = Observable.fromArray(letters).publish();      
      connectable.subscribe(letter -> result.append(letter));
      System.out.println(result.length());
      connectable.connect();
      System.out.println(result.length());
      System.out.println(result);
   }
}

Überprüfen Sie das Ergebnis

Kompilieren Sie die Klasse mit javac Compiler wie folgt -

C:\RxJava>javac ObservableTester.java

Führen Sie nun den ObservableTester wie folgt aus:

C:\RxJava>java ObservableTester

Es sollte die folgende Ausgabe erzeugen -

0
7
abcdefg

Nach dem Reactivekann ein Subjekt sowohl als beobachtbar als auch als Beobachter fungieren.

Ein Subjekt ist eine Art Brücke oder Proxy, die in einigen Implementierungen von ReactiveX verfügbar ist und sowohl als Beobachter als auch als Observable fungiert. Da es sich um einen Beobachter handelt, kann er eine oder mehrere Observables abonnieren. Da es sich um eine Observable handelt, kann er die beobachteten Elemente durch erneutes Ausgeben durchlaufen und neue Elemente ausgeben.

Es gibt vier Arten von Themen -

Sr.Nr. Thema Beschreibung
1

Publish Subject

Gibt nur die Artikel aus, die nach dem Zeitpunkt des Abonnements ausgegeben werden.

2 Replay Subject

Gibt alle von source Observable ausgegebenen Elemente aus, unabhängig davon, wann das Observable abonniert wurde.

3

Behavior Subject

Gibt beim Abonnement das neueste Element aus und gibt dann weiterhin das von der Quelle Observable ausgegebene Element aus.

4

Async Subject

Gibt das letzte von der Quelle emittierbare Element aus, das nach Abschluss der Emission beobachtet werden kann.

PublishSubject sendet Elemente an aktuell abonnierte Beobachter und Terminalereignisse an aktuelle oder späte Beobachter.

Klassenerklärung

Es folgt die Erklärung für io.reactivex.subjects.PublishSubject<T> Klasse -

public final class PublishSubject<T>
extends Subject<T>

PublishSubject-Beispiel

Erstellen Sie das folgende Java-Programm mit einem beliebigen Editor Ihrer Wahl, z. B. in C: \> RxJava.

ObservableTester.java

import io.reactivex.subjects.PublishSubject;
public class ObservableTester  {
   public static void main(String[] args) {   
      final StringBuilder result1 = new StringBuilder();
      final StringBuilder result2 = new StringBuilder();         

      PublishSubject<String> subject = PublishSubject.create(); 
      subject.subscribe(value -> result1.append(value) ); 
      subject.onNext("a"); 
      subject.onNext("b"); 
      subject.onNext("c"); 
      subject.subscribe(value -> result2.append(value)); 
      subject.onNext("d"); 
      subject.onComplete();

      //Output will be abcd 
      System.out.println(result1);
      //Output will be d only
      //as subscribed after c item emitted.
      System.out.println(result2);
   }
}

Überprüfen Sie das Ergebnis

Kompilieren Sie die Klasse mit javac Compiler wie folgt -

C:\RxJava>javac ObservableTester.java

Führen Sie nun den ObservableTester wie folgt aus:

C:\RxJava>java ObservableTester

Es sollte die folgende Ausgabe erzeugen -

abcd
d

BehaviorSubject gibt das zuletzt beobachtete Element und anschließend alle nachfolgenden beobachteten Elemente an jeden abonnierten Beobachter aus.

Klassenerklärung

Es folgt die Erklärung für io.reactivex.subjects.BehaviorSubject<T> Klasse -

public final class BehaviorSubject<T>
extends Subject<T>

Beispiel für ein Verhaltenssubjekt

Erstellen Sie das folgende Java-Programm mit einem beliebigen Editor Ihrer Wahl, z. B. in C: \> RxJava.

ObservableTester.java

import io.reactivex.subjects.BehaviorSubject;
public class ObservableTester  {
   public static void main(String[] args) {   
      final StringBuilder result1 = new StringBuilder();
      final StringBuilder result2 = new StringBuilder();         
      BehaviorSubject<String> subject =  BehaviorSubject.create(); 
      subject.subscribe(value -> result1.append(value) ); 
      subject.onNext("a"); 
      subject.onNext("b"); 
      subject.onNext("c"); 
      subject.subscribe(value -> result2.append(value)); 
      subject.onNext("d"); 
      subject.onComplete();
      //Output will be abcd
      System.out.println(result1);
      //Output will be cd being BehaviorSubject 
      //(c is last item emitted before subscribe)
      System.out.println(result2);
   }
}

Überprüfen Sie das Ergebnis

Kompilieren Sie die Klasse mit javac Compiler wie folgt -

C:\RxJava>javac ObservableTester.java

Führen Sie nun den ObservableTester wie folgt aus:

C:\RxJava>java ObservableTester

Es sollte die folgende Ausgabe erzeugen -

abcd
cd

ReplaySubject gibt Ereignisse / Elemente an aktuelle und späte Beobachter weiter.

Klassenerklärung

Es folgt die Erklärung für io.reactivex.subjects.ReplaySubject<T> Klasse -

public final class ReplaySubject<T>
extends Subject<T>

ReplaySubject Beispiel

Erstellen Sie das folgende Java-Programm mit einem beliebigen Editor Ihrer Wahl, z. B. in C: \> RxJava.

ObservableTester.java

import io.reactivex.subjects.ReplaySubject;
public class ObservableTester  {
   public static void main(String[] args) {   
      final StringBuilder result1 = new StringBuilder();
      final StringBuilder result2 = new StringBuilder();         

      ReplaySubject<String> subject = ReplaySubject.create(); 
      subject.subscribe(value -> result1.append(value) ); 
      subject.onNext("a"); 
      subject.onNext("b"); 
      subject.onNext("c"); 
      subject.subscribe(value -> result2.append(value)); 
      subject.onNext("d"); 
      subject.onComplete();

      //Output will be abcd
      System.out.println(result1);
      //Output will be abcd being ReplaySubject
      //as ReplaySubject emits all the items
      System.out.println(result2);
   }
}

Überprüfen Sie das Ergebnis

Kompilieren Sie die Klasse mit javac Compiler wie folgt -

C:\RxJava>javac ObservableTester.java

Führen Sie nun den ObservableTester wie folgt aus:

C:\RxJava>java ObservableTester

Es sollte die folgende Ausgabe erzeugen -

abcd
abcd

AsyncSubject gibt den einzigen letzten Wert, gefolgt von einem Abschlussereignis oder dem empfangenen Fehler, an Observers aus.

Klassenerklärung

Es folgt die Erklärung für io.reactivex.subjects.AsyncSubject<T> Klasse -

public final class  AsyncSubject<T>
extends Subject<T>

AsyncSubject-Beispiel

Erstellen Sie das folgende Java-Programm mit einem beliebigen Editor Ihrer Wahl, z. B. in C: \> RxJava.

ObservableTester.java

import io.reactivex.subjects. AsyncSubject;
public class ObservableTester  {
   public static void main(String[] args) {   
      final StringBuilder result1 = new StringBuilder();
      final StringBuilder result2 = new StringBuilder();         

      AsyncSubject<String> subject =  AsyncSubject.create(); 
      subject.subscribe(value -> result1.append(value) ); 
      subject.onNext("a"); 
      subject.onNext("b"); 
      subject.onNext("c"); 
      subject.subscribe(value -> result2.append(value)); 
      subject.onNext("d"); 
      subject.onComplete();

      //Output will be d being the last item emitted
      System.out.println(result1);
      //Output will be d being the last item emitted     
      System.out.println(result2);
   }
}

Überprüfen Sie das Ergebnis

Kompilieren Sie die Klasse mit javac Compiler wie folgt -

C:\RxJava>javac ObservableTester.java

Führen Sie nun den ObservableTester wie folgt aus:

C:\RxJava>java ObservableTester

Es sollte die folgende Ausgabe erzeugen -

d
d

Scheduler werden in Multithreading-Umgebungen verwendet, um mit Observable-Operatoren zu arbeiten.

Nach dem Reactive, Scheduler werden verwendet, um zu planen, wie die Operator-Kette auf verschiedene Threads angewendet wird.

Standardmäßig erledigen ein Observable und die Kette von Operatoren, die Sie auf es anwenden, seine Arbeit und benachrichtigen seine Beobachter in demselben Thread, in dem seine Subscribe-Methode aufgerufen wird. Der SubscribeOn-Operator ändert dieses Verhalten, indem er einen anderen Scheduler angibt, auf dem das Observable ausgeführt werden soll. Der ObserveOn-Operator gibt einen anderen Scheduler an, mit dem der Observable Benachrichtigungen an seine Beobachter sendet.

In RxJava sind folgende Arten von Schedulern verfügbar:

Sr.Nr. Scheduler & Beschreibung
1

Schedulers.computation()

Erstellt einen Scheduler für Computerarbeiten und gibt ihn zurück. Die Anzahl der zu planenden Threads hängt von den im System vorhandenen CPUs ab. Pro CPU ist ein Thread zulässig. Am besten für Event-Loops oder Callback-Operationen.

2

Schedulers.io()

Erstellt einen Scheduler für E / A-gebundene Arbeiten und gibt ihn zurück. Der Thread-Pool kann nach Bedarf erweitert werden.

3

Schedulers.newThread()

Erstellt einen Scheduler und gibt ihn zurück, der für jede Arbeitseinheit einen neuen Thread erstellt.

4

Schedulers.trampoline()

Erstellt einen Scheduler und gibt ihn zurück, der die Arbeit an dem aktuellen Thread in die Warteschlange stellt, der nach Abschluss der aktuellen Arbeit ausgeführt werden soll.

4

Schedulers.from(java.util.concurrent.Executor executor)

Konvertiert einen Executor in eine neue Scheduler-Instanz.

Die Schedulers.trampoline () -Methode erstellt einen Scheduler und gibt ihn zurück, der die Arbeit an dem aktuellen Thread in die Warteschlange stellt, der nach Abschluss der aktuellen Arbeit ausgeführt werden soll.

Schedulers.trampoline () Beispiel

Erstellen Sie das folgende Java-Programm mit einem beliebigen Editor Ihrer Wahl, z. B. in C: \> RxJava.

ObservableTester.java

import java.util.Random;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread " 
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.trampoline()))
         .subscribe(length -> System.out.println("Receiver Thread " 
            + Thread.currentThread().getName() 
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

Überprüfen Sie das Ergebnis

Kompilieren Sie die Klasse mit javac Compiler wie folgt -

C:\RxJava>javac ObservableTester.java

Führen Sie nun den ObservableTester wie folgt aus:

C:\RxJava>java ObservableTester

Es sollte die folgende Ausgabe erzeugen -

Processing Thread main
Receiver Thread main, Item length 1
Processing Thread main
Receiver Thread main, Item length 2
Processing Thread main
Receiver Thread main, Item length 3

Die Schedulers.newThread () -Methode erstellt einen Scheduler und gibt ihn zurück, der für jede Arbeitseinheit einen neuen Thread erstellt.

Schedulers.newThread () Beispiel

Erstellen Sie das folgende Java-Programm mit einem beliebigen Editor Ihrer Wahl, z. B. in C: \> RxJava.

ObservableTester.java

import java.util.Random;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread " 
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.newThread()))
         .subscribe(length -> System.out.println("Receiver Thread " 
            + Thread.currentThread().getName() 
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

Überprüfen Sie das Ergebnis

Kompilieren Sie die Klasse mit javac Compiler wie folgt -

C:\RxJava>javac ObservableTester.java

Führen Sie nun den ObservableTester wie folgt aus:

C:\RxJava>java ObservableTester

Es sollte die folgende Ausgabe erzeugen -

Processing Thread RxNewThreadScheduler-1
Receiver Thread RxNewThreadScheduler-1, Item length 1
Processing Thread RxNewThreadScheduler-2
Receiver Thread RxNewThreadScheduler-2, Item length 2
Processing Thread RxNewThreadScheduler-3
Receiver Thread RxNewThreadScheduler-3, Item length 3

Die Schedulers.computation () -Methode erstellt einen Scheduler und gibt ihn für die Rechenarbeit zurück. Die Anzahl der zu planenden Threads hängt von den im System vorhandenen CPUs ab. Pro CPU ist ein Thread zulässig. Am besten für Event-Loops oder Callback-Operationen.

Schedulers.computation () Beispiel

Erstellen Sie das folgende Java-Programm mit einem beliebigen Editor Ihrer Wahl, z. B. in C: \> RxJava.

ObservableTester.java

import java.util.Random;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread " 
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.computation()))
         .subscribe(length -> System.out.println("Receiver Thread " 
            + Thread.currentThread().getName() 
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

Überprüfen Sie das Ergebnis

Kompilieren Sie die Klasse mit javac Compiler wie folgt -

C:\RxJava>javac ObservableTester.java

Führen Sie nun den ObservableTester wie folgt aus:

C:\RxJava>java ObservableTester

Es sollte die folgende Ausgabe erzeugen -

Processing Thread RxComputationThreadPool-1
Receiver Thread RxComputationThreadPool-1, Item length 1
Processing Thread RxComputationThreadPool-2
Receiver Thread RxComputationThreadPool-2, Item length 2
Processing Thread RxComputationThreadPool-3
Receiver Thread RxComputationThreadPool-3, Item length 3

Die Schedulers.io () -Methode erstellt einen Scheduler und gibt ihn für E / A-gebundene Arbeiten zurück. Der Thread-Pool kann nach Bedarf erweitert werden. Am besten für E / A-intensive Operationen.

Schedulers.io () Beispiel

Erstellen Sie das folgende Java-Programm mit einem beliebigen Editor Ihrer Wahl, z. B. in C: \> RxJava.

ObservableTester.java

import java.util.Random;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread " 
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.io()))
         .subscribe(length -> System.out.println("Receiver Thread " 
            + Thread.currentThread().getName() 
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

Überprüfen Sie das Ergebnis

Kompilieren Sie die Klasse mit javac Compiler wie folgt -

C:\RxJava>javac ObservableTester.java

Führen Sie nun den ObservableTester wie folgt aus:

C:\RxJava>java ObservableTester

Es sollte die folgende Ausgabe erzeugen -

Processing Thread RxCachedThreadScheduler-1
Receiver Thread RxCachedThreadScheduler-1, Item length 1
Processing Thread RxCachedThreadScheduler-1
Receiver Thread RxCachedThreadScheduler-1, Item length 2
Processing Thread RxCachedThreadScheduler-1
Receiver Thread RxCachedThreadScheduler-1, Item length 3

Die Schedulers.from (Executor) -Methode konvertiert einen Executor in eine neue Scheduler-Instanz.

Schedulers.from (Executor) Beispiel

Erstellen Sie das folgende Java-Programm mit einem beliebigen Editor Ihrer Wahl, z. B. in C: \> RxJava.

ObservableTester.java

import java.util.Random;
import java.util.concurrent.Executors;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread " 
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.from(Executors.newFixedThreadPool(3))))
         .subscribe(length -> System.out.println("Receiver Thread " 
            + Thread.currentThread().getName() 
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

Überprüfen Sie das Ergebnis

Kompilieren Sie die Klasse mit javac Compiler wie folgt -

C:\RxJava>javac ObservableTester.java

Führen Sie nun den ObservableTester wie folgt aus:

C:\RxJava>java ObservableTester

Es sollte die folgende Ausgabe erzeugen -

Processing Thread pool-1-thread-1
Processing Thread pool-3-thread-1
Receiver Thread pool-1-thread-1, Item length 1
Processing Thread pool-4-thread-1
Receiver Thread pool-4-thread-1, Item length 3
Receiver Thread pool-3-thread-1, Item length 2

Der Pufferoperator ermöglicht das Sammeln von von einem Observable ausgegebenen Elementen in einer Liste oder in Bündeln und das Ausgeben dieser Bündel anstelle von Elementen. Im folgenden Beispiel haben wir ein Observable erstellt, um 9 Elemente auszugeben. Mithilfe der Pufferung werden 3 Elemente zusammen ausgegeben.

Pufferbeispiel

Erstellen Sie das folgende Java-Programm mit einem beliebigen Editor Ihrer Wahl, z. B. in C: \> RxJava.

ObservableTester.java

import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;

import java.util.List;
import java.util.concurrent.TimeUnit;

public class ObservableTester {
   public static void main(String[] args) throws InterruptedException {
      Observable<Integer> observable = Observable.just(1, 2, 3, 4,
         5, 6, 7, 8, 9);

      observable.subscribeOn(Schedulers.io())
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .buffer(3)
         .subscribe(new Observer<List<Integer>>() {
            @Override
            public void onSubscribe(Disposable d) {
               System.out.println("Subscribed");
            }
            @Override
            public void onNext(List<Integer> integers) {
               System.out.println("onNext: ");
               for (Integer value : integers) {
                  System.out.println(value);
               }
            }
            @Override
            public void onError(Throwable e) {
               System.out.println("Error");
            }

            @Override
            public void onComplete() {
               System.out.println("Done! ");
            }
         });
      Thread.sleep(3000);
   }
}

Überprüfen Sie das Ergebnis

Kompilieren Sie die Klasse mit javac Compiler wie folgt -

C:\RxJava>javac ObservableTester.java

Führen Sie nun den ObservableTester wie folgt aus:

C:\RxJava>java ObservableTester

Es sollte die folgende Ausgabe erzeugen -

Subscribed
onNext: 
1
2
3
onNext: 
4
5
6
onNext: 
7
8
9
Done!

Der Fensteroperator funktioniert ähnlich wie der Pufferoperator, ermöglicht jedoch das Sammeln von von einem Observable ausgegebenen Elementen in einem anderen Observable anstelle einer Sammlung und das Ausgeben dieser Observables anstelle von Sammlungen. Im folgenden Beispiel haben wir ein Observable erstellt, um 9 Elemente auszugeben. Mit dem Fensteroperator werden 3 Observable zusammen ausgegeben.

Fensterbeispiel

Erstellen Sie das folgende Java-Programm mit einem beliebigen Editor Ihrer Wahl, z. B. in C: \> RxJava.

ObservableTester.java

import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;

import java.util.concurrent.TimeUnit;

public class ObservableTester {
   public static void main(String[] args) throws InterruptedException {
      Observable<Integer> observable = Observable.just(1, 2, 3, 4,
         5, 6, 7, 8, 9);

      observable.subscribeOn(Schedulers.io())
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .window(3)
         .subscribe(new Observer<Observable<Integer>>() {
            @Override
            public void onSubscribe(Disposable d) {
               System.out.println("Subscribed");
            }
            @Override
            public void onNext(Observable<Integer> integers) {
               System.out.println("onNext: ");
               integers.subscribe(value -> System.out.println(value));
            }
            @Override
            public void onError(Throwable e) {
               System.out.println("Error");
            }

            @Override
            public void onComplete() {
               System.out.println("Done! ");
            }
         });
      Thread.sleep(3000);
   }
}

Überprüfen Sie das Ergebnis

Kompilieren Sie die Klasse mit javac Compiler wie folgt -

C:\RxJava>javac ObservableTester.java

Führen Sie nun den ObservableTester wie folgt aus:

C:\RxJava>java ObservableTester

Es sollte die folgende Ausgabe erzeugen -

Subscribed
onNext: 
1
2
3
onNext: 
4
5
6
onNext: 
7
8
9
Done!