RxJava - Guide rapide

RxJava est une extension Java de ReactiveX. Il fournit une implémentation ou un projet ReactiveX en Java. Voici les principales caractéristiques de RxJava.

  • Étend le modèle d'observateur.

  • Soutenir les séquences de données / événements.

  • Fournit des opérateurs pour composer des séquences ensemble de manière déclarative.

  • Gère le threading, la synchronisation, la sécurité des threads et les structures de données simultanées en interne.

Qu'est-ce que ReactiveX?

ReactiveX est un projet qui vise à fournir un concept de programmation réactive à divers langages de programmation. La programmation réactive fait référence au scénario dans lequel le programme réagit au fur et à mesure que les données apparaissent. Il s'agit d'un concept de programmation basé sur les événements et les événements peuvent se propager aux observateurs de registres.

Selon le Reactive, ils ont combiné le meilleur du modèle Observer, du modèle Iterator et du modèle fonctionnel.

Le modèle Observer bien fait. ReactiveX est une combinaison des meilleures idées du modèle Observer, du modèle Iterator et de la programmation fonctionnelle.

Programmation fonctionnelle

La programmation fonctionnelle tourne autour de la construction du logiciel en utilisant des fonctions pures. Une fonction pure ne dépend pas de l'état précédent et renvoie toujours le même résultat pour les mêmes paramètres passés. Les fonctions pures permettent d'éviter les problèmes associés aux objets partagés, aux données mutables et aux effets secondaires souvent répandus dans les environnements multi-threading.

Programmation réactive

La programmation réactive fait référence à la programmation événementielle dans laquelle les flux de données arrivent de manière asynchrone et sont traités lorsqu'ils sont arrivés.

Programmation réactive fonctionnelle

RxJava implémente les deux concepts ensemble, où les données des flux changent au fil du temps et la fonction consommateur réagit en conséquence.

Le Manifeste Réactif

Reactive Manifesto est un document en ligne attestant le niveau élevé des systèmes logiciels d'application. Selon le manifeste, voici les attributs clés d'un logiciel réactif -

  • Responsive - Devrait toujours répondre en temps opportun.

  • Message Driven - Devrait utiliser le passage de messages asynchrone entre les composants afin qu'ils conservent un couplage lâche.

  • Elastic - Doit rester réactif même sous une charge élevée.

  • Resilient - Doit rester réactif même si un ou plusieurs composants échouent.

Composants clés de RxJava

RxJava a deux composants clés: Observables et Observer.

  • Observable - Il représente un objet similaire à Stream qui peut émettre zéro ou plusieurs données, peut envoyer un message d'erreur, dont la vitesse peut être contrôlée tout en émettant un ensemble de données, peut envoyer des données finies ou infinies.

  • Observer- Il s'abonne aux données de séquence d'Observable et réagit par item des observables. Les observateurs sont notifiés chaque fois qu'Observable émet une donnée. Un observateur gère les données une par une.

Un observateur n'est jamais averti si des éléments ne sont pas présents ou si un rappel n'est pas renvoyé pour un élément précédent.

Configuration de l'environnement local

RxJava est une bibliothèque pour Java, donc la toute première exigence est d'avoir JDK installé sur votre machine.

Exigence du système

JDK 1.5 ou supérieur.
Mémoire Aucune exigence minimale.
Espace disque Aucune exigence minimale.
Système opérateur Aucune exigence minimale.

Étape 1 - Vérifiez l'installation de Java sur votre machine

Tout d'abord, ouvrez la console et exécutez une commande java basée sur le système d'exploitation sur lequel vous travaillez.

OS Tâche Commander
les fenêtres Ouvrez la console de commande c: \> java -version
Linux Ouvrir le terminal de commande $ java -version
Mac Terminal ouvert machine: <joseph $ java -version

Vérifions la sortie pour tous les systèmes d'exploitation -

OS Production
les fenêtres

version java "1.8.0_101"

Environnement d'exécution Java (TM) SE (build 1.8.0_101)

Linux

version java "1.8.0_101"

Environnement d'exécution Java (TM) SE (build 1.8.0_101)

Mac

version java "1.8.0_101"

Environnement d'exécution Java (TM) SE (build 1.8.0_101)

Si Java n'est pas installé sur votre système, téléchargez le kit de développement logiciel Java (SDK) à partir du lien suivant https://www.oracle.com. Nous supposons que Java 1.8.0_101 est la version installée pour ce didacticiel.

Étape 2 - Définir l'environnement JAVA

Met le JAVA_HOMEvariable d'environnement pour pointer vers l'emplacement du répertoire de base où Java est installé sur votre machine. Par exemple.

OS Production
les fenêtres Définissez la variable d'environnement JAVA_HOME sur C: \ Program Files \ Java \ jdk1.8.0_101
Linux export JAVA_HOME = / usr / local / java-current
Mac export JAVA_HOME = / Bibliothèque / Java / Accueil

Ajoutez l'emplacement du compilateur Java au chemin système.

OS Production
les fenêtres Ajouter la chaîne C:\Program Files\Java\jdk1.8.0_101\bin à la fin de la variable système, Path.
Linux export PATH = $ PATH: $ JAVA_HOME / bin /
Mac non requis

Vérifiez l'installation de Java à l'aide de la commande java -version comme expliqué ci-dessus.

Étape 3 - Téléchargez l'archive RxJava2

Téléchargez la dernière version du fichier jar RxJava à partir de RxJava @ MVNRepository et de sa dépendance Reactive Streams @ MVNRepository . Au moment de la rédaction de ce tutoriel, nous avons téléchargé rxjava-2.2.4.jar, reactive-streams-1.0.2.jar et l'avons copié dans le dossier C: \> RxJava.

OS Nom de l'archive
les fenêtres rxjava-2.2.4.jar, flux-réactifs-1.0.2.jar
Linux rxjava-2.2.4.jar, flux-réactifs-1.0.2.jar
Mac rxjava-2.2.4.jar, flux-réactifs-1.0.2.jar

Étape 4 - Définir l'environnement RxJava

Met le RX_JAVAvariable d'environnement pour pointer vers l'emplacement du répertoire de base où le fichier JAR RxJava est stocké sur votre machine. Supposons que nous ayons stocké rxjava-2.2.4.jar et reactive-streams-1.0.2.jar dans le dossier RxJava.

Sr. Non OS et description
1

Windows

Définissez la variable d'environnement RX_JAVA sur C: \ RxJava

2

Linux

export RX_JAVA = / usr / local / RxJava

3

Mac

export RX_JAVA = / Bibliothèque / RxJava

Étape 5 - Définir la variable CLASSPATH

Met le CLASSPATH variable d'environnement pour pointer vers l'emplacement du fichier JAR RxJava.

Sr. Non OS et description
1

Windows

Définissez la variable d'environnement CLASSPATH sur% CLASSPATH%;% RX_JAVA% \ rxjava-2.2.4.jar;% RX_JAVA% \ reactive-streams-1.0.2.jar;.;

2

Linux

export CLASSPATH = $ CLASSPATH: $ RX_JAVA / rxjava-2.2.4.jar: reactive-streams-1.0.2.jar :.

3

Mac

export CLASSPATH = $ CLASSPATH: $ RX_JAVA / rxjava-2.2.4.jar: reactive-streams-1.0.2.jar :.

Étape 6 - Tester la configuration de RxJava

Créez une classe TestRx.java comme indiqué ci-dessous -

import io.reactivex.Flowable;
public class TestRx {
   public static void main(String[] args) {
      Flowable.just("Hello World!")
         .subscribe(System.out::println);
   }
}

Étape 7 - Vérifiez le résultat

Compilez les classes en utilisant javac compilateur comme suit -

C:\RxJava>javac Tester.java

Vérifiez la sortie.

Hello World!

Observables représente les sources de données où comme Observers (Subscribers)écoute-les. En bref, un observable émet des éléments et un abonné consomme ensuite ces éléments.

Observable

  • Observable fournit des données une fois que l'abonné commence à écouter.

  • Observable peut émettre n'importe quel nombre d'éléments.

  • Observable ne peut également émettre qu'un signal d'achèvement sans élément.

  • Observable peut se terminer avec succès.

  • Observable ne peut jamais se terminer. par exemple, un bouton peut être cliqué un nombre illimité de fois.

  • Observable peut générer une erreur à tout moment.

Abonné

  • Observable peut avoir plusieurs abonnés.

  • Lorsqu'un Observable émet un élément, chaque méthode onNext () de l'abonné est appelée.

  • Lorsqu'un Observable a fini d'émettre des éléments, chaque méthode onComplete () de l'abonné est appelée.

  • Si un Observable émet une erreur, chaque méthode onError () de l'abonné est appelée.

Voici les classes de base pour créer des observables.

  • Flowable- 0..N flux, émet 0 ou n éléments. Prend en charge les flux réactifs et la contre-pression.

  • Observable - 0..N s'écoule, mais pas de contre-pression.

  • Single- 1 élément ou erreur. Peut être traité comme une version réactive de l'appel de méthode.

  • Completable- Aucun élément émis. Utilisé comme signal d'achèvement ou d'erreur. Peut être traité comme une version réactive de Runnable.

  • MayBe- Soit aucun élément, soit 1 élément émis. Peut être traité comme une version réactive d'Optionnel.

Voici les méthodes pratiques pour créer des observables dans la classe Observable.

  • just(T item) - Renvoie un observable qui signale l'élément donné (référence constante), puis se termine.

  • fromIterable(Iterable source) - Convertit une séquence Iterable en une ObservableSource qui émet les éléments de la séquence.

  • fromArray(T... items) - Convertit un tableau en un ObservableSource qui émet les éléments du tableau.

  • fromCallable(Callable supplier) - Renvoie un Observable qui, lorsqu'un observateur s'y abonne, appelle une fonction que vous spécifiez puis émet la valeur renvoyée par cette fonction.

  • fromFuture(Future future) - Convertit un futur en une ObservableSource.

  • interval(long initialDelay, long period, TimeUnit unit) - Renvoie un Observable qui émet un 0L après le initialDelay et des nombres toujours croissants après chaque période de temps par la suite.

La classe Single représente la réponse à valeur unique. Une seule observable ne peut émettre qu'une seule valeur réussie ou une erreur. Il n'émet pas d'événement onComplete.

Déclaration de classe

Voici la déclaration pour io.reactivex.Single<T> classe -

public abstract class Single<T>
   extends Object
      implements SingleSource<T>

Protocole

Voici le protocole séquentiel utilisé par Single Observable -

onSubscribe (onSuccess | onError)?

Exemple unique

Créez le programme Java suivant en utilisant n'importe quel éditeur de votre choix dans, par exemple, C: \> RxJava.

ObservableTester.java

import java.util.concurrent.TimeUnit;

import io.reactivex.Single;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableSingleObserver;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args)  throws InterruptedException {
      //Create the observable
      Single<String> testSingle = Single.just("Hello World");

      //Create an observer
      Disposable disposable = testSingle
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .subscribeWith(
         new DisposableSingleObserver<String>() {

         @Override
         public void onError(Throwable e) { 
            e.printStackTrace();
         }

         @Override
         public void onSuccess(String value) {
            System.out.println(value);
         }
      }); 
      Thread.sleep(3000);
      //start observing
      disposable.dispose();
   }
}

Vérifiez le résultat

Compilez la classe en utilisant javac compilateur comme suit -

C:\RxJava>javac ObservableTester.java

Exécutez maintenant l'ObservableTester comme suit -

C:\RxJava>java ObservableTester

Il devrait produire la sortie suivante -

Hello World

La classe MayBe représente une réponse différée. MayBe observable peut émettre une seule valeur réussie ou aucune valeur.

Déclaration de classe

Voici la déclaration pour io.reactivex.Single<T> classe -

public abstract class Maybe<T>
   extends Object
      implements MaybeSource<T>

Protocole

Voici le protocole séquentiel que MayBe Observable exploite -

onSubscribe (onSuccess | onError | OnComplete)?

Peut-être un exemple

Créez le programme Java suivant en utilisant n'importe quel éditeur de votre choix dans, par exemple, C: \> RxJava.

ObservableTester.java

import java.util.concurrent.TimeUnit;

import io.reactivex.Maybe;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableMaybeObserver;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester {
   public static void main(String[] args) throws InterruptedException {
      //Create an observer
      Disposable disposable = Maybe.just("Hello World")
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .subscribeWith(new DisposableMaybeObserver<String>() {
         @Override
         public void onError(Throwable e) { 
            e.printStackTrace();
         }

         @Override
         public void onSuccess(String value) {
            System.out.println(value);
         }

         @Override
         public void onComplete() {
            System.out.println("Done!");
         }
      }); 
      Thread.sleep(3000);
      //start observing
      disposable.dispose();
   }
}

Vérifiez le résultat

Compilez la classe en utilisant javac compilateur comme suit -

C:\RxJava>javac ObservableTester.java

Exécutez maintenant l'ObservableTester comme suit -

C:\RxJava>java ObservableTester

Il devrait produire la sortie suivante -

Hello World

La classe Completable représente une réponse différée. L'observable complétable peut indiquer une réussite ou une erreur.

Déclaration de classe

Voici la déclaration pour io.reactivex.Completable classe -

public abstract class Completable
extends Object
implements CompletableSource

Protocole

Voici le protocole séquentiel utilisé par Completable Observable -

onSubscribe (onError | onComplete)?

Exemple complet

Créez le programme Java suivant en utilisant n'importe quel éditeur de votre choix dans, par exemple, C: \> RxJava.

ObservableTester.java

import java.util.concurrent.TimeUnit;

import io.reactivex.Completable;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableCompletableObserver;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester {
   public static void main(String[] args) throws InterruptedException {

      //Create an observer
      Disposable disposable = Completable.complete()
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .subscribeWith(new DisposableCompletableObserver() {
         @Override
         public void onError(Throwable e) { 
            e.printStackTrace();
         }
         @Override
         public void onStart() {
            System.out.println("Started!");
         }
         @Override
         public void onComplete() {
            System.out.println("Done!");
         }
      }); 
      Thread.sleep(3000);
      //start observing
      disposable.dispose();
   }
}

Vérifiez le résultat

Compilez la classe en utilisant javac compilateur comme suit -

C:\RxJava>javac ObservableTester.java

Exécutez maintenant l'ObservableTester comme suit -

C:\RxJava>java ObservableTester

Il devrait produire la sortie suivante -

Started!
Done!

La classe CompositeDisposable représente un conteneur qui peut contenir plusieurs jetables et offre une complexité O (1) pour l'ajout et la suppression de jetables.

Déclaration de classe

Voici la déclaration pour io.reactivex.disposables.CompositeDisposable classe -

public final class CompositeDisposable
extends Object
implements Disposable, io.reactivex.internal.disposables.DisposableContainer

Exemple CompositeDisposable

Créez le programme Java suivant en utilisant n'importe quel éditeur de votre choix dans, par exemple, C: \> RxJava.

ObservableTester.java

import io.reactivex.Maybe;
import io.reactivex.Single;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableMaybeObserver;
import io.reactivex.observers.DisposableSingleObserver;
import io.reactivex.schedulers.Schedulers;

import java.util.concurrent.TimeUnit;

public class ObservableTester  {
   public static void main(String[] args)  throws InterruptedException {    
      CompositeDisposable compositeDisposable = new CompositeDisposable();

      //Create an Single observer 
      Disposable disposableSingle = Single.just("Hello World")
      .delay(2, TimeUnit.SECONDS, Schedulers.io())
      .subscribeWith(
      new DisposableSingleObserver<String>() {
         @Override
         public void onError(Throwable e) {
            e.printStackTrace();
         }

         @Override
         public void onSuccess(String value) {
            System.out.println(value);
         }
      }); 

      //Create an observer
      Disposable disposableMayBe = Maybe.just("Hi")
      .delay(2, TimeUnit.SECONDS, Schedulers.io())
      .subscribeWith(new DisposableMaybeObserver<String>() {
         @Override
         public void onError(Throwable e) { 
            e.printStackTrace();
         }

         @Override
         public void onSuccess(String value) {
            System.out.println(value);
         }

         @Override
         public void onComplete() {
            System.out.println("Done!");
         }
      }); 

      Thread.sleep(3000);

      compositeDisposable.add(disposableSingle);
      compositeDisposable.add(disposableMayBe);

      //start observing
      compositeDisposable.dispose();
   }
}

Vérifiez le résultat

Compilez la classe en utilisant javac compilateur comme suit -

C:\RxJava>javac ObservableTester.java

Exécutez maintenant l'ObservableTester comme suit -

C:\RxJava>java ObservableTester

Il devrait produire la sortie suivante -

Hello World
Hi

Voici les opérateurs qui sont utilisés pour créer un observable.

N ° Sr. Opérateur et description
1

Create

Crée un Observable à partir de zéro et permet à la méthode d'observateur d'appeler par programme.

2

Defer

Ne créez pas d'observable tant qu'un observateur n'est pas abonné. Crée une nouvelle observable pour chaque observateur.

3

Empty/Never/Throw

Crée un observable avec un comportement limité.

4

From

Convertit un objet / structure de données en observable.

5

Interval

Crée un observable émettant des entiers en séquence avec un intervalle de temps spécifié.

6

Just

Convertit un objet / structure de données en Observable pour émettre le même ou le même type d'objets.

sept

Range

Crée un observable émettant des entiers dans l'ordre d'une plage donnée.

8

Repeat

Crée un observable émettant des entiers en séquence à plusieurs reprises.

9

Start

Crée un Observable pour émettre la valeur de retour d'une fonction.

dix

Timer

Crée un observable pour émettre un seul élément après un délai donné.

Création d'un exemple d'opérateur

Créez le programme Java suivant en utilisant n'importe quel éditeur de votre choix dans, par exemple, C: \> RxJava.

ObservableTester.java

import io.reactivex.Observable;
//Using fromArray operator to create an Observable
public class ObservableTester  {
   public static void main(String[] args) { 
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable = Observable.fromArray(letters);
      observable
         .map(String::toUpperCase)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

Vérifiez le résultat

Compilez la classe en utilisant javac compilateur comme suit -

C:\RxJava>javac ObservableTester.java

Exécutez maintenant l'ObservableTester comme suit -

C:\RxJava>java ObservableTester

Il devrait produire la sortie suivante -

ABCDEFG

Voici les opérateurs qui sont utilisés pour transformer un élément émis par un observable.

N ° Sr. Opérateur et description
1

Buffer

Rassemble périodiquement les éléments d'Observable dans des lots, puis émet les lots plutôt que les éléments.

2

FlatMap

Utilisé dans les observables imbriqués. Transforme les éléments en observables. Ensuite, aplatissez les éléments en un seul observable.

3

GroupBy

Divisez un observable en ensemble d'observables organisés par clé pour émettre différents groupes d'éléments.

4

Map

Appliquez une fonction à chaque élément émis pour le transformer.

5

Scan

Appliquez une fonction à chaque élément émis, séquentiellement puis émettez la valeur successive.

6

Window

Rassemble périodiquement les éléments de Observable dans les fenêtres Observable, puis émet les fenêtres plutôt que les éléments.

Exemple d'opérateur de transformation

Créez le programme Java suivant en utilisant n'importe quel éditeur de votre choix dans, par exemple, C: \> RxJava.

ObservableTester.java

import io.reactivex.Observable;
//Using map operator to transform an Observable
public class ObservableTester  { 
   public static void main(String[] args) {    
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable = Observable.fromArray(letters);
      observable
         .map(String::toUpperCase)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

Vérifiez le résultat

Compilez la classe en utilisant javac compilateur comme suit -

C:\RxJava>javac ObservableTester.java

Exécutez maintenant l'ObservableTester comme suit -

C:\RxJava>java ObservableTester

Il devrait produire la sortie suivante -

ABCDEFG

Voici les opérateurs qui sont utilisés pour émettre sélectivement des éléments à partir d'un observable.

N ° Sr. Opérateur et description
1

Debounce

Émet des éléments uniquement lorsque le délai expire sans émettre un autre élément.

2

Distinct

Émet uniquement les éléments uniques.

3

ElementAt

émettre uniquement un élément à n index émis par un observable.

4

Filter

Émet uniquement les éléments qui passent la fonction de prédicat donnée.

5

First

Émet le premier élément ou le premier élément qui a passé les critères donnés.

6

IgnoreElements

N'émet aucun élément d'Observable mais marque la fin.

sept

Last

Émet le dernier élément d'Observable.

8

Sample

Émet l'élément le plus récent avec un intervalle de temps donné.

9

Skip

Ignore les n premiers éléments d'un observable.

dix

SkipLast

Ignore les n derniers éléments d'un observable.

11

Take

prend les n premiers éléments d'un observable.

12

TakeLast

prend les n derniers éléments d'un observable.

Exemple d'opérateur de filtrage

Créez le programme Java suivant en utilisant n'importe quel éditeur de votre choix dans, par exemple, C: \> RxJava.

ObservableTester.java

import io.reactivex.Observable;
//Using take operator to filter an Observable
public class ObservableTester  {
   public static void main(String[] args) {    
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable = Observable.fromArray(letters);
      observable
         .take(2)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

Vérifiez le résultat

Compilez la classe en utilisant javac compilateur comme suit -

C:\RxJava>javac ObservableTester.java

Exécutez maintenant l'ObservableTester comme suit -

C:\RxJava>java ObservableTester

Il devrait produire la sortie suivante -

ab

Voici les opérateurs qui sont utilisés pour créer un seul observable à partir de plusieurs observables.

N ° Sr. Opérateur et description
1 And/Then/When

Combinez des ensembles d'articles à l'aide des intermédiaires Pattern et Plan.

2 CombineLatest

Combinez le dernier élément émis par chaque observable via une fonction spécifiée et émettez l'élément résultant.

3 Join

Combinez les éléments émis par deux observables s'ils sont émis pendant la période de temps du deuxième élément émis par observable.

4 Merge

Combine les éléments émis par les observables.

5 StartWith

Émettre une séquence d'éléments spécifiée avant de commencer à émettre les éléments de la source Observable

6 Switch

Émet les éléments les plus récents émis par les Observables.

sept Zip

Combine les éléments d'observables en fonction de la fonction et émet les éléments résultants.

Exemple d'opérateur combiné

Créez le programme Java suivant en utilisant n'importe quel éditeur de votre choix dans, par exemple, C: \> RxJava.

ObservableTester.java

import io.reactivex.Observable;
//Using combineLatest operator to combine Observables
public class ObservableTester {
   public static void main(String[] args) {    
      Integer[] numbers = { 1, 2, 3, 4, 5, 6};
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable1 = Observable.fromArray(letters);
      Observable<Integer> observable2 = Observable.fromArray(numbers);
      Observable.combineLatest(observable1, observable2, (a,b) -> a + b)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

Vérifiez le résultat

Compilez la classe en utilisant javac compilateur comme suit -

C:\RxJava>javac ObservableTester.java

Exécutez maintenant l'ObservableTester comme suit -

C:\RxJava>java ObservableTester

Il devrait produire la sortie suivante -

g1g2g3g4g5g6

Voici les opérateurs qui sont souvent utiles avec Observables.

N ° Sr. Opérateur et description
1

Delay

Enregistrez l'action pour gérer les événements observables du cycle de vie.

2

Materialize/Dematerialize

Représente l'élément émis et la notification envoyée.

3

ObserveOn

Spécifiez le planificateur à observer.

4

Serialize

Force Observable à effectuer des appels sérialisés.

5

Subscribe

Opérer sur les émissions d'articles et de notifications comme complet à partir d'un observable

6

SubscribeOn

Spécifiez le planificateur à utiliser par un observable lorsqu'il est abonné.

sept

TimeInterval

Convertissez un observable pour émettre des indications sur le temps écoulé entre les émissions.

8

Timeout

Emet une notification d'erreur si l'heure spécifiée se produit sans émettre aucun élément.

9

Timestamp

Attachez l'horodatage à chaque élément émis.

9

Using

Crée une ressource jetable ou la même durée de vie que celle d'Observable.

Exemple d'opérateur utilitaire

Créez le programme Java suivant en utilisant n'importe quel éditeur de votre choix dans, par exemple, C: \> RxJava.

ObservableTester.java

import io.reactivex.Observable;
//Using subscribe operator to subscribe to an Observable
public class ObservableTester  {
   public static void main(String[] args) {    
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable = Observable.fromArray(letters);
      observable.subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

Vérifiez le résultat

Compilez la classe en utilisant javac compilateur comme suit -

C:\RxJava>javac ObservableTester.java

Exécutez maintenant l'ObservableTester comme suit -

C:\RxJava>java ObservableTester

Il devrait produire la sortie suivante -

abcdefg

Voici les opérateurs qui évaluent un ou plusieurs observables ou éléments émis.

N ° Sr. Opérateur et description
1

All

Évalue tous les éléments émis pour répondre à des critères donnés.

2

Amb

Émet tous les éléments du premier observable uniquement avec plusieurs observables.

3

Contains

Vérifie si un observable émet un élément particulier ou non.

4

DefaultIfEmpty

Émet l'élément par défaut si Observable n'émet rien.

5

SequenceEqual

Vérifie si deux observables émettent la même séquence d'éléments.

6

SkipUntil

Ignore les éléments émis par le premier observable jusqu'à ce qu'un deuxième observable émette un élément.

sept

SkipWhile

Ignorez les éléments émis par un observable jusqu'à ce qu'une condition donnée devienne fausse.

8

TakeUntil

Ignore les éléments émis par un observable après qu'un deuxième observable émet un élément ou se termine.

9

TakeWhile

Ignore les éléments émis par un observable après qu'une condition spécifiée devient fausse.

Exemple d'opérateur conditionnel

Créez le programme Java suivant en utilisant n'importe quel éditeur de votre choix dans, par exemple, C: \> RxJava.

ObservableTester.java

import io.reactivex.Observable;
//Using defaultIfEmpty operator to operate on an Observable
public class ObservableTester  {
   public static void main(String[] args) {    
      final StringBuilder result = new StringBuilder();
      Observable.empty()
      .defaultIfEmpty("No Data")
      .subscribe(s -> result.append(s));
      System.out.println(result);
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result1 = new StringBuilder();
      Observable.fromArray(letters)
      .firstElement()
      .defaultIfEmpty("No data")   
      .subscribe(s -> result1.append(s));
      System.out.println(result1);
   }
}

Vérifiez le résultat

Compilez la classe en utilisant javac compilateur comme suit -

C:\RxJava>javac ObservableTester.java

Exécutez maintenant l'ObservableTester comme suit -

C:\RxJava>java ObservableTester

Il devrait produire la sortie suivante -

No Data
a

Voici les opérateurs qui opèrent sur des éléments entiers émis par un observable.

N ° Sr. Opérateur et description
1

Average

Évalue les moyennes de tous les éléments et émet le résultat.

2

Concat

Émet tous les éléments de plusieurs observables sans entrelacement.

3

Count

Compte tous les éléments et émet le résultat.

4

Max

Évalue l'élément valorisé maximum de tous les éléments et émet le résultat.

5

Min

Évalue l'élément de valeur minimale de tous les éléments et émet le résultat.

6

Reduce

Appliquez une fonction sur chaque élément et renvoyez le résultat.

sept

Sum

Évalue la somme de tous les éléments et émet le résultat.

Exemple d'opérateur mathématique

Créez le programme Java suivant en utilisant n'importe quel éditeur de votre choix dans, par exemple, C: \> RxJava.

ObservableTester.java

import io.reactivex.Observable;
//Using concat operator to operate on multiple Observables
public class ObservableTester  {
   public static void main(String[] args)  throws InterruptedException {    
      Integer[] numbers = { 1, 2, 3, 4, 5, 6};
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable1 = Observable.fromArray(letters);
      Observable<Integer> observable2 = Observable.fromArray(numbers);
      Observable.concat(observable1, observable2)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

Vérifiez le résultat

Compilez la classe en utilisant javac compilateur comme suit -

C:\RxJava>javac ObservableTester.java

Exécutez maintenant l'ObservableTester comme suit -

C:\RxJava>java ObservableTester

Il devrait produire la sortie suivante -

abcdefg123456

Voici les opérateurs qui contrôlent plus précisément l'abonnement.

N ° Sr. Opérateur et description
1

Connect

Demandez à un observable connectable d'émettre des éléments à ses abonnés.

2

Publish

Convertit un observable en observable connectable.

3

RefCount

Convertit un observable connectable en observable ordinaire.

4

Replay

Assurez-vous que la même séquence d'éléments émis soit visible par chaque abonné, même après que l'Observable a commencé à émettre des éléments et que les abonnés s'abonnent plus tard.

Exemple d'opérateur connectable

Créez le programme Java suivant en utilisant n'importe quel éditeur de votre choix dans, par exemple, C: \> RxJava.

ObservableTester.java

import io.reactivex.Observable;
import io.reactivex.observables.ConnectableObservable;
//Using connect operator on a ConnectableObservable
public class ObservableTester {
   public static void main(String[] args) {
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      ConnectableObservable<String> connectable = Observable.fromArray(letters).publish();      
      connectable.subscribe(letter -> result.append(letter));
      System.out.println(result.length());
      connectable.connect();
      System.out.println(result.length());
      System.out.println(result);
   }
}

Vérifiez le résultat

Compilez la classe en utilisant javac compilateur comme suit -

C:\RxJava>javac ObservableTester.java

Exécutez maintenant l'ObservableTester comme suit -

C:\RxJava>java ObservableTester

Il devrait produire la sortie suivante -

0
7
abcdefg

Selon le Reactive, un sujet peut agir à la fois comme observable et comme observateur.

Un sujet est une sorte de pont ou proxy disponible dans certaines implémentations de ReactiveX qui agit à la fois en tant qu'observateur et en tant qu'observable. Comme il s'agit d'un observateur, il peut s'abonner à un ou plusieurs observables, et comme il s'agit d'un observable, il peut passer par les éléments qu'il observe en les réémettant, et il peut également émettre de nouveaux éléments.

Il existe quatre types de sujets -

N ° Sr. Description du sujet
1

Publish Subject

Émet uniquement les éléments qui sont émis après la date d'abonnement.

2 Replay Subject

Émet tous les éléments émis par la source Observable indépendamment du moment où elle a souscrit à l'Observable.

3

Behavior Subject

Lors de l'abonnement, émet l'élément le plus récent puis continue à émettre l'élément émis par la source Observable.

4

Async Subject

Émet le dernier élément émis par la source Observable une fois l'émission terminée.

PublishSubject émet des éléments aux observateurs actuellement abonnés et des événements terminaux aux observateurs actuels ou en retard.

Déclaration de classe

Voici la déclaration pour io.reactivex.subjects.PublishSubject<T> classe -

public final class PublishSubject<T>
extends Subject<T>

Exemple de PublishSubject

Créez le programme Java suivant en utilisant n'importe quel éditeur de votre choix dans, par exemple, C: \> RxJava.

ObservableTester.java

import io.reactivex.subjects.PublishSubject;
public class ObservableTester  {
   public static void main(String[] args) {   
      final StringBuilder result1 = new StringBuilder();
      final StringBuilder result2 = new StringBuilder();         

      PublishSubject<String> subject = PublishSubject.create(); 
      subject.subscribe(value -> result1.append(value) ); 
      subject.onNext("a"); 
      subject.onNext("b"); 
      subject.onNext("c"); 
      subject.subscribe(value -> result2.append(value)); 
      subject.onNext("d"); 
      subject.onComplete();

      //Output will be abcd 
      System.out.println(result1);
      //Output will be d only
      //as subscribed after c item emitted.
      System.out.println(result2);
   }
}

Vérifiez le résultat

Compilez la classe en utilisant javac compilateur comme suit -

C:\RxJava>javac ObservableTester.java

Exécutez maintenant l'ObservableTester comme suit -

C:\RxJava>java ObservableTester

Il devrait produire la sortie suivante -

abcd
d

BehaviorSubject émet l'élément le plus récent qu'il a observé, puis tous les éléments observés suivants à chaque observateur abonné.

Déclaration de classe

Voici la déclaration pour io.reactivex.subjects.BehaviorSubject<T> classe -

public final class BehaviorSubject<T>
extends Subject<T>

Exemple de BehaviorSubject

Créez le programme Java suivant en utilisant n'importe quel éditeur de votre choix dans, par exemple, C: \> RxJava.

ObservableTester.java

import io.reactivex.subjects.BehaviorSubject;
public class ObservableTester  {
   public static void main(String[] args) {   
      final StringBuilder result1 = new StringBuilder();
      final StringBuilder result2 = new StringBuilder();         
      BehaviorSubject<String> subject =  BehaviorSubject.create(); 
      subject.subscribe(value -> result1.append(value) ); 
      subject.onNext("a"); 
      subject.onNext("b"); 
      subject.onNext("c"); 
      subject.subscribe(value -> result2.append(value)); 
      subject.onNext("d"); 
      subject.onComplete();
      //Output will be abcd
      System.out.println(result1);
      //Output will be cd being BehaviorSubject 
      //(c is last item emitted before subscribe)
      System.out.println(result2);
   }
}

Vérifiez le résultat

Compilez la classe en utilisant javac compilateur comme suit -

C:\RxJava>javac ObservableTester.java

Exécutez maintenant l'ObservableTester comme suit -

C:\RxJava>java ObservableTester

Il devrait produire la sortie suivante -

abcd
cd

ReplaySubject rejoue les événements / éléments aux observateurs actuels et tardifs.

Déclaration de classe

Voici la déclaration pour io.reactivex.subjects.ReplaySubject<T> classe -

public final class ReplaySubject<T>
extends Subject<T>

Exemple de ReplaySubject

Créez le programme Java suivant en utilisant n'importe quel éditeur de votre choix dans, par exemple, C: \> RxJava.

ObservableTester.java

import io.reactivex.subjects.ReplaySubject;
public class ObservableTester  {
   public static void main(String[] args) {   
      final StringBuilder result1 = new StringBuilder();
      final StringBuilder result2 = new StringBuilder();         

      ReplaySubject<String> subject = ReplaySubject.create(); 
      subject.subscribe(value -> result1.append(value) ); 
      subject.onNext("a"); 
      subject.onNext("b"); 
      subject.onNext("c"); 
      subject.subscribe(value -> result2.append(value)); 
      subject.onNext("d"); 
      subject.onComplete();

      //Output will be abcd
      System.out.println(result1);
      //Output will be abcd being ReplaySubject
      //as ReplaySubject emits all the items
      System.out.println(result2);
   }
}

Vérifiez le résultat

Compilez la classe en utilisant javac compilateur comme suit -

C:\RxJava>javac ObservableTester.java

Exécutez maintenant l'ObservableTester comme suit -

C:\RxJava>java ObservableTester

Il devrait produire la sortie suivante -

abcd
abcd

AsyncSubject émet la seule dernière valeur suivie d'un événement d'achèvement ou de l'erreur reçue aux observateurs.

Déclaration de classe

Voici la déclaration pour io.reactivex.subjects.AsyncSubject<T> classe -

public final class  AsyncSubject<T>
extends Subject<T>

Exemple AsyncSubject

Créez le programme Java suivant en utilisant n'importe quel éditeur de votre choix dans, par exemple, C: \> RxJava.

ObservableTester.java

import io.reactivex.subjects. AsyncSubject;
public class ObservableTester  {
   public static void main(String[] args) {   
      final StringBuilder result1 = new StringBuilder();
      final StringBuilder result2 = new StringBuilder();         

      AsyncSubject<String> subject =  AsyncSubject.create(); 
      subject.subscribe(value -> result1.append(value) ); 
      subject.onNext("a"); 
      subject.onNext("b"); 
      subject.onNext("c"); 
      subject.subscribe(value -> result2.append(value)); 
      subject.onNext("d"); 
      subject.onComplete();

      //Output will be d being the last item emitted
      System.out.println(result1);
      //Output will be d being the last item emitted     
      System.out.println(result2);
   }
}

Vérifiez le résultat

Compilez la classe en utilisant javac compilateur comme suit -

C:\RxJava>javac ObservableTester.java

Exécutez maintenant l'ObservableTester comme suit -

C:\RxJava>java ObservableTester

Il devrait produire la sortie suivante -

d
d

Les planificateurs sont utilisés dans un environnement multi-thread pour travailler avec les opérateurs Observable.

Selon le Reactive, Scheduler est utilisé pour planifier la manière dont la chaîne d'opérateurs s'appliquera à différents threads.

Par défaut, un Observable et la chaîne d'opérateurs que vous lui appliquez feront son travail, et en informeront ses observateurs, sur le même thread sur lequel sa méthode Subscribe est appelée. L'opérateur SubscribeOn modifie ce comportement en spécifiant un autre Scheduler sur lequel l'Observable doit fonctionner. L'opérateur ObserveOn spécifie un autre Scheduler que l'Observable utilisera pour envoyer des notifications à ses observateurs.

Les types d'ordonnanceurs suivants sont disponibles dans RxJava -

N ° Sr. Planificateur et description
1

Schedulers.computation()

Crée et renvoie un planificateur destiné au travail de calcul. Le nombre de threads à planifier dépend des processeurs présents dans le système. Un thread est autorisé par CPU. Idéal pour les boucles d'événements ou les opérations de rappel.

2

Schedulers.io()

Crée et renvoie un planificateur destiné au travail lié aux E / S. Le pool de threads peut s'étendre si nécessaire.

3

Schedulers.newThread()

Crée et renvoie un planificateur qui crée un nouveau fil pour chaque unité de travail.

4

Schedulers.trampoline()

Crée et renvoie un planificateur qui met en file d'attente le travail sur le thread actuel à exécuter une fois le travail en cours terminé.

4

Schedulers.from(java.util.concurrent.Executor executor)

Convertit un exécuteur en une nouvelle instance de Scheduler.

La méthode Schedulers.trampoline () crée et renvoie un planificateur qui met en file d'attente le travail sur le thread actuel à exécuter une fois le travail en cours terminé.

Exemple de Schedulers.trampoline ()

Créez le programme Java suivant en utilisant n'importe quel éditeur de votre choix dans, par exemple, C: \> RxJava.

ObservableTester.java

import java.util.Random;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread " 
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.trampoline()))
         .subscribe(length -> System.out.println("Receiver Thread " 
            + Thread.currentThread().getName() 
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

Vérifiez le résultat

Compilez la classe en utilisant javac compilateur comme suit -

C:\RxJava>javac ObservableTester.java

Exécutez maintenant l'ObservableTester comme suit -

C:\RxJava>java ObservableTester

Il devrait produire la sortie suivante -

Processing Thread main
Receiver Thread main, Item length 1
Processing Thread main
Receiver Thread main, Item length 2
Processing Thread main
Receiver Thread main, Item length 3

La méthode Schedulers.newThread () crée et retourne un Scheduler qui crée un nouveau Thread pour chaque unité de travail.

Exemple de Schedulers.newThread ()

Créez le programme Java suivant en utilisant n'importe quel éditeur de votre choix dans, par exemple, C: \> RxJava.

ObservableTester.java

import java.util.Random;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread " 
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.newThread()))
         .subscribe(length -> System.out.println("Receiver Thread " 
            + Thread.currentThread().getName() 
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

Vérifiez le résultat

Compilez la classe en utilisant javac compilateur comme suit -

C:\RxJava>javac ObservableTester.java

Exécutez maintenant l'ObservableTester comme suit -

C:\RxJava>java ObservableTester

Il devrait produire la sortie suivante -

Processing Thread RxNewThreadScheduler-1
Receiver Thread RxNewThreadScheduler-1, Item length 1
Processing Thread RxNewThreadScheduler-2
Receiver Thread RxNewThreadScheduler-2, Item length 2
Processing Thread RxNewThreadScheduler-3
Receiver Thread RxNewThreadScheduler-3, Item length 3

La méthode Schedulers.computation () crée et retourne un Scheduler destiné au travail de calcul. Le nombre de threads à planifier dépend des processeurs présents dans le système. Un thread est autorisé par CPU. Idéal pour les boucles d'événements ou les opérations de rappel.

Exemple de Schedulers.computation ()

Créez le programme Java suivant en utilisant n'importe quel éditeur de votre choix dans, par exemple, C: \> RxJava.

ObservableTester.java

import java.util.Random;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread " 
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.computation()))
         .subscribe(length -> System.out.println("Receiver Thread " 
            + Thread.currentThread().getName() 
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

Vérifiez le résultat

Compilez la classe en utilisant javac compilateur comme suit -

C:\RxJava>javac ObservableTester.java

Exécutez maintenant l'ObservableTester comme suit -

C:\RxJava>java ObservableTester

Il devrait produire la sortie suivante -

Processing Thread RxComputationThreadPool-1
Receiver Thread RxComputationThreadPool-1, Item length 1
Processing Thread RxComputationThreadPool-2
Receiver Thread RxComputationThreadPool-2, Item length 2
Processing Thread RxComputationThreadPool-3
Receiver Thread RxComputationThreadPool-3, Item length 3

La méthode Schedulers.io () crée et retourne un Scheduler destiné au travail lié aux E / S. Le pool de threads peut s'étendre si nécessaire. Idéal pour les opérations intensives d'E / S.

Exemple de Schedulers.io ()

Créez le programme Java suivant en utilisant n'importe quel éditeur de votre choix dans, par exemple, C: \> RxJava.

ObservableTester.java

import java.util.Random;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread " 
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.io()))
         .subscribe(length -> System.out.println("Receiver Thread " 
            + Thread.currentThread().getName() 
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

Vérifiez le résultat

Compilez la classe en utilisant javac compilateur comme suit -

C:\RxJava>javac ObservableTester.java

Exécutez maintenant l'ObservableTester comme suit -

C:\RxJava>java ObservableTester

Il devrait produire la sortie suivante -

Processing Thread RxCachedThreadScheduler-1
Receiver Thread RxCachedThreadScheduler-1, Item length 1
Processing Thread RxCachedThreadScheduler-1
Receiver Thread RxCachedThreadScheduler-1, Item length 2
Processing Thread RxCachedThreadScheduler-1
Receiver Thread RxCachedThreadScheduler-1, Item length 3

La méthode Schedulers.from (Executor) convertit un Executor en une nouvelle instance de Scheduler.

Schedulers.from (Executor) Exemple

Créez le programme Java suivant en utilisant n'importe quel éditeur de votre choix dans, par exemple, C: \> RxJava.

ObservableTester.java

import java.util.Random;
import java.util.concurrent.Executors;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread " 
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.from(Executors.newFixedThreadPool(3))))
         .subscribe(length -> System.out.println("Receiver Thread " 
            + Thread.currentThread().getName() 
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

Vérifiez le résultat

Compilez la classe en utilisant javac compilateur comme suit -

C:\RxJava>javac ObservableTester.java

Exécutez maintenant l'ObservableTester comme suit -

C:\RxJava>java ObservableTester

Il devrait produire la sortie suivante -

Processing Thread pool-1-thread-1
Processing Thread pool-3-thread-1
Receiver Thread pool-1-thread-1, Item length 1
Processing Thread pool-4-thread-1
Receiver Thread pool-4-thread-1, Item length 3
Receiver Thread pool-3-thread-1, Item length 2

L'opérateur de mise en mémoire tampon permet de regrouper les éléments émis par un observable dans une liste ou des bundles et d'émettre ces bundles au lieu d'éléments. Dans l'exemple ci-dessous, nous avons créé un Observable pour émettre 9 éléments et en utilisant la mise en mémoire tampon, 3 éléments seront émis ensemble.

Exemple de mise en mémoire tampon

Créez le programme Java suivant en utilisant n'importe quel éditeur de votre choix dans, par exemple, C: \> RxJava.

ObservableTester.java

import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;

import java.util.List;
import java.util.concurrent.TimeUnit;

public class ObservableTester {
   public static void main(String[] args) throws InterruptedException {
      Observable<Integer> observable = Observable.just(1, 2, 3, 4,
         5, 6, 7, 8, 9);

      observable.subscribeOn(Schedulers.io())
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .buffer(3)
         .subscribe(new Observer<List<Integer>>() {
            @Override
            public void onSubscribe(Disposable d) {
               System.out.println("Subscribed");
            }
            @Override
            public void onNext(List<Integer> integers) {
               System.out.println("onNext: ");
               for (Integer value : integers) {
                  System.out.println(value);
               }
            }
            @Override
            public void onError(Throwable e) {
               System.out.println("Error");
            }

            @Override
            public void onComplete() {
               System.out.println("Done! ");
            }
         });
      Thread.sleep(3000);
   }
}

Vérifiez le résultat

Compilez la classe en utilisant javac compilateur comme suit -

C:\RxJava>javac ObservableTester.java

Exécutez maintenant l'ObservableTester comme suit -

C:\RxJava>java ObservableTester

Il devrait produire la sortie suivante -

Subscribed
onNext: 
1
2
3
onNext: 
4
5
6
onNext: 
7
8
9
Done!

L'opérateur de fenêtrage fonctionne de manière similaire à l'opérateur de tampon, mais il permet de rassembler les éléments émis par un observable dans une autre observable au lieu d'une collection et d'émettre ces observables au lieu de collections. Dans l'exemple ci-dessous, nous avons créé un observable pour émettre 9 éléments et en utilisant l'opérateur de fenêtre, 3 observables seront émis ensemble.

Exemple de fenêtrage

Créez le programme Java suivant en utilisant n'importe quel éditeur de votre choix dans, par exemple, C: \> RxJava.

ObservableTester.java

import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;

import java.util.concurrent.TimeUnit;

public class ObservableTester {
   public static void main(String[] args) throws InterruptedException {
      Observable<Integer> observable = Observable.just(1, 2, 3, 4,
         5, 6, 7, 8, 9);

      observable.subscribeOn(Schedulers.io())
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .window(3)
         .subscribe(new Observer<Observable<Integer>>() {
            @Override
            public void onSubscribe(Disposable d) {
               System.out.println("Subscribed");
            }
            @Override
            public void onNext(Observable<Integer> integers) {
               System.out.println("onNext: ");
               integers.subscribe(value -> System.out.println(value));
            }
            @Override
            public void onError(Throwable e) {
               System.out.println("Error");
            }

            @Override
            public void onComplete() {
               System.out.println("Done! ");
            }
         });
      Thread.sleep(3000);
   }
}

Vérifiez le résultat

Compilez la classe en utilisant javac compilateur comme suit -

C:\RxJava>javac ObservableTester.java

Exécutez maintenant l'ObservableTester comme suit -

C:\RxJava>java ObservableTester

Il devrait produire la sortie suivante -

Subscribed
onNext: 
1
2
3
onNext: 
4
5
6
onNext: 
7
8
9
Done!