RxJava - त्वरित गाइड

RxJava ReactiveX का जावा आधारित एक्सटेंशन है। यह जावा में कार्यान्वयन या रिएक्टिवएक्स परियोजना प्रदान करता है। RxJava की प्रमुख विशेषताएं निम्नलिखित हैं।

  • पर्यवेक्षक पैटर्न का विस्तार करता है।

  • डेटा / घटनाओं के समर्थन दृश्यों।

  • संचालकों को क्रमबद्ध रूप से एक साथ क्रमबद्ध करने के लिए प्रदान करता है।

  • आंतरिक रूप से थ्रेडिंग, सिंक्रोनाइज़ेशन, थ्रेड-सेफ्टी और समवर्ती डेटा संरचनाओं को संभालता है।

ReactiveX क्या है?

ReactiveX एक परियोजना है जिसका उद्देश्य विभिन्न प्रोग्रामिंग भाषाओं को प्रतिक्रियाशील प्रोग्रामिंग अवधारणा प्रदान करना है। रिएक्टिव प्रोग्रामिंग उस परिदृश्य को संदर्भित करता है, जहां प्रोग्राम डेटा के प्रकट होने पर प्रतिक्रिया करता है। यह एक घटना आधारित प्रोग्रामिंग अवधारणा है और घटनाएँ पर्यवेक्षकों को पंजीकृत करने के लिए प्रचारित कर सकती हैं।

के अनुसार Reactive, उन्होंने ऑब्जर्वर पैटर्न, Iterator पैटर्न और कार्यात्मक पैटर्न का सर्वोत्तम संयोजन किया है।

ऑब्जर्वर पैटर्न सही किया। ReactiveX ऑब्जर्वर पैटर्न, Iterator पैटर्न और कार्यात्मक प्रोग्रामिंग से सर्वश्रेष्ठ विचारों का एक संयोजन है।

कार्यात्मक प्रोग्रामिंग

फ़ंक्शनल प्रोग्रामिंग शुद्ध कार्यों का उपयोग करके सॉफ़्टवेयर के निर्माण के चारों ओर घूमती है। एक शुद्ध कार्य पिछली स्थिति पर निर्भर नहीं करता है और हमेशा पारित किए गए समान मापदंडों के लिए एक ही परिणाम देता है। शुद्ध कार्य साझा वस्तुओं से संबंधित समस्याओं से बचने में मदद करते हैं, पारस्परिक डेटा और साइड इफेक्ट्स अक्सर बहु-थ्रेडिंग वातावरण में प्रचलित होते हैं।

प्रतिक्रियाशील प्रोग्रामिंग

रिएक्टिव प्रोग्रामिंग इवेंट संचालित प्रोग्रामिंग को संदर्भित करता है जहां डेटा धाराएं अतुल्यकालिक फैशन में आती हैं और जब वे आती हैं तो संसाधित हो जाती हैं।

कार्यात्मक प्रतिक्रियाशील प्रोग्रामिंग

RxJava दोनों अवधारणाओं को एक साथ लागू करता है, जहां धाराओं का डेटा समय के साथ बदलता है और उपभोक्ता फ़ंक्शन तदनुसार प्रतिक्रिया करता है।

प्रतिक्रियाशील घोषणापत्र

रिएक्टिव मैनिफेस्टो एक ऑन-लाइन दस्तावेज़ है, जो एप्लिकेशन सॉफ़्टवेयर सिस्टम के उच्च मानक को बताता है। घोषणापत्र के अनुसार, निम्नलिखित प्रतिक्रियाशील सॉफ़्टवेयर की मुख्य विशेषताएं हैं -

  • Responsive - हमेशा समय पर जवाब देना चाहिए।

  • Message Driven - घटकों के बीच अतुल्यकालिक संदेश-गुजर का उपयोग करना चाहिए ताकि वे ढीली युग्मन बनाए रखें।

  • Elastic - उच्च भार के तहत भी उत्तरदायी रहना चाहिए।

  • Resilient - किसी भी घटक के विफल होने पर भी उत्तरदायी रहना चाहिए।

RxJava के प्रमुख घटक

RxJava के दो प्रमुख घटक हैं: वेधशाला और प्रेक्षक।

  • Observable - यह स्ट्रीम के समान एक वस्तु का प्रतिनिधित्व करता है जो शून्य या अधिक डेटा का उत्सर्जन कर सकता है, त्रुटि संदेश भेज सकता है, जिसकी गति को डेटा का एक सेट उत्सर्जित करते समय नियंत्रित किया जा सकता है, परिमित और अनंत डेटा भेज सकता है।

  • Observer- यह ऑब्जर्वबल के अनुक्रम के डेटा की सदस्यता लेता है और वेधशालाओं के प्रति आइटम पर प्रतिक्रिया करता है। जब भी ऑब्जर्वेबल कोई डेटा उत्सर्जित करता है तो पर्यवेक्षकों को सूचित किया जाता है। एक ऑब्जर्वर एक-एक करके डाटा संभालता है।

यदि आइटम मौजूद नहीं है या एक कॉलबैक पिछले आइटम के लिए वापस नहीं है, तो एक पर्यवेक्षक को कभी भी सूचित नहीं किया जाता है।

स्थानीय पर्यावरण सेटअप

RxJava जावा के लिए एक पुस्तकालय है, इसलिए सबसे पहली आवश्यकता आपके मशीन में JDK स्थापित करने की है।

व्यवस्था की आवश्यकता

JDK 1.5 या ऊपर।
याद कोई न्यूनतम आवश्यकता नहीं।
डिस्क में जगह कोई न्यूनतम आवश्यकता नहीं।
ऑपरेटिंग सिस्टम कोई न्यूनतम आवश्यकता नहीं।

चरण 1 - अपनी मशीन में जावा इंस्टॉलेशन को सत्यापित करें

सबसे पहले, कंसोल खोलें और आप जिस ऑपरेटिंग सिस्टम पर काम कर रहे हैं, उसके आधार पर एक जावा कमांड निष्पादित करें।

ओएस टास्क आदेश
खिड़कियाँ ओपन कमांड कंसोल c: \> java -version
लिनक्स कमांड टर्मिनल खोलें $ जावा-विचलन
मैक टर्मिनल खोलें मशीन: <joseph $ java -version

आइए सभी ऑपरेटिंग सिस्टम के लिए आउटपुट को सत्यापित करें -

ओएस उत्पादन
खिड़कियाँ

जावा संस्करण "1.8.0_101"

जावा (TM) एसई रनटाइम एनवायरनमेंट (बिल्ड 1.8.0_101)

लिनक्स

जावा संस्करण "1.8.0_101"

जावा (TM) एसई रनटाइम एनवायरनमेंट (बिल्ड 1.8.0_101)

मैक

जावा संस्करण "1.8.0_101"

जावा (TM) एसई रनटाइम एनवायरनमेंट (बिल्ड 1.8.0_101)

यदि आपके पास अपने सिस्टम पर जावा इंस्टॉल नहीं है, तो निम्न लिंक से जावा सॉफ्टवेयर डेवलपमेंट किट (एसडीके) डाउनलोड करें https://www.oracle.com। हम इस ट्यूटोरियल के लिए जावा 1.8.0_101 को स्थापित संस्करण मान रहे हैं।

चरण 2 - जावा पर्यावरण सेट करें

ठीक JAVA_HOMEवातावरण चर आधार निर्देशिका स्थान पर इंगित करने के लिए जहां जावा आपकी मशीन पर स्थापित है। उदाहरण के लिए।

ओएस उत्पादन
खिड़कियाँ पर्यावरण चर JAVA_HOME को C: \ Program Files \ Java \ jdk1.8.0_101 पर सेट करें
लिनक्स निर्यात JAVA_HOME = / usr / स्थानीय / जावा-वर्तमान
मैक निर्यात JAVA_HOME = / लाइब्रेरी / जावा / होम

सिस्टम पथ में जावा कंपाइलर स्थान को जोड़ें।

ओएस उत्पादन
खिड़कियाँ तार लगाओ C:\Program Files\Java\jdk1.8.0_101\bin सिस्टम चर के अंत में, Path
लिनक्स निर्यात पथ = $PATH:$JAVA_HOME / bin /
मैक आवश्यक नहीं

कमांड का उपयोग करके जावा इंस्टॉलेशन को सत्यापित करें java -version जैसा कि ऊपर बताया गया है।

चरण 3 - RxJava2 पुरालेख डाउनलोड करें

RxJava जार फ़ाइल का नवीनतम संस्करण RxJava @ MVNRepository और उसकी निर्भरता प्रतिक्रियाशील धाराओं @ MVNRepository से डाउनलोड करें । इस ट्यूटोरियल को लिखने के समय, हमने rxjava-2.2.4.jar, रिएक्टिव-स्ट्रीम-1.0.2.jar डाउनलोड किया है और इसे C: \> RxJava फ़ोल्डर में कॉपी किया है।

ओएस संग्रह का नाम
खिड़कियाँ rxjava-2.2.4.jar, प्रतिक्रियाशील-धाराएँ 1.0.2.jar
लिनक्स rxjava-2.2.4.jar, प्रतिक्रियाशील-धाराएँ 1.0.2.jar
मैक rxjava-2.2.4.jar, प्रतिक्रियाशील-धाराएँ 1.0.2.jar

चरण 4 - RxJava पर्यावरण सेट करें

ठीक RX_JAVAपर्यावरण चर को आधार निर्देशिका स्थान पर इंगित करने के लिए जहां RxJava जार आपकी मशीन पर संग्रहीत है। मान लेते हैं कि हमने RxJava फ़ोल्डर में rxjava-2.2.4.jar और प्रतिक्रियाशील-धाराएँ 1.0.2.jar संग्रहीत किया है।

अनु क्रमांक ओएस और विवरण
1

Windows

पर्यावरण चर RX_JAVA को C: \ RxJava पर सेट करें

2

Linux

निर्यात RX_JAVA = / usr / स्थानीय / RxJava

3

Mac

निर्यात RX_JAVA = / लाइब्रेरी / RxJava

स्टेप 5 - CLASSPATH वैरिएबल सेट करें

ठीक CLASSPATH पर्यावरण चर RxJava जार स्थान को इंगित करने के लिए।

अनु क्रमांक ओएस और विवरण
1

Windows

पर्यावरण चर कक्षा को% CLASSPATH% पर सेट करें;% RX_JAVA% \ rxjava-2.2.4.jar;% RX_JAVA% \ reactive-streams-1.0.2.jar;

2

Linux

निर्यात CLASSPATH = $CLASSPATH:$RX_JAVA / rxjava-2.2.4.jar: प्रतिक्रियाशील धाराओं-1.0.2.jar :.

3

Mac

निर्यात CLASSPATH = $CLASSPATH:$RX_JAVA / rxjava-2.2.4.jar: प्रतिक्रियाशील धाराओं-1.0.2.jar :.

चरण 6 - टेस्ट RxJava सेटअप

नीचे दिखाए अनुसार एक क्लास TestRx.java बनाएं -

import io.reactivex.Flowable;
public class TestRx {
   public static void main(String[] args) {
      Flowable.just("Hello World!")
         .subscribe(System.out::println);
   }
}

चरण 7 - परिणाम सत्यापित करें

उपयोग करने वाली कक्षाओं को संकलित करें javac संकलक निम्नानुसार है -

C:\RxJava>javac Tester.java

आउटपुट सत्यापित करें।

Hello World!

Observables जहाँ के रूप में डेटा के स्रोतों का प्रतिनिधित्व करता है Observers (Subscribers)उनकी बात सुनो। संक्षेप में, एक ऑब्जर्वेबल वस्तुओं का उत्सर्जन करता है और एक सब्सक्राइबर तब इन वस्तुओं का उपभोग करता है।

नमूदार

  • एक बार सब्सक्राइबर सुनना शुरू कर देता है तो ऑब्जर्वेबल डेटा प्रदान करता है।

  • अवलोकनीय किसी भी संख्या में वस्तुओं का उत्सर्जन कर सकता है।

  • अवलोकनीय केवल किसी भी वस्तु के साथ पूर्ण होने के संकेत का उत्सर्जन कर सकता है।

  • अवलोकनीय सफलतापूर्वक समाप्त कर सकता है।

  • अवलोकनीय कभी समाप्त नहीं हो सकता। उदाहरण के लिए किसी भी बटन को कई बार क्लिक किया जा सकता है।

  • अवलोकनीय त्रुटि किसी भी समय हो सकती है।

ग्राहक

  • अवलोकन योग्य कई ग्राहक हो सकते हैं।

  • जब एक ऑब्जर्वेबल एक आइटम का उत्सर्जन करता है, तो प्रत्येक ग्राहक onNext () विधि को लागू किया जाता है।

  • जब एक अवलोकन योग्य उत्सर्जक आइटम समाप्त हो जाता है, तो प्रत्येक ग्राहक पूर्ण () विधि से लागू हो जाता है।

  • यदि कोई अवलोकन योग्य त्रुटि आती है, तो प्रत्येक ग्राहक ऑनर्र () विधि लागू हो जाती है।

वेधशालाएँ बनाने के लिए आधार कक्षाएं निम्नलिखित हैं।

  • Flowable- 0..N प्रवाह, 0 या n आइटम का उत्सर्जन करता है। प्रतिक्रियाशील-धाराओं और पीठ-दबाव का समर्थन करता है।

  • Observable - 0..N प्रवाह, लेकिन कोई बैक-प्रेशर नहीं।

  • Single- 1 आइटम या त्रुटि। विधि कॉल के प्रतिक्रियाशील संस्करण के रूप में माना जा सकता है।

  • Completable- कोई वस्तु उत्सर्जित नहीं हुई। पूर्ण या त्रुटि के लिए एक संकेत के रूप में उपयोग किया जाता है। Runnable के प्रतिक्रियाशील संस्करण के रूप में माना जा सकता है।

  • MayBe- या तो कोई आइटम या 1 आइटम उत्सर्जित नहीं। वैकल्पिक के एक प्रतिक्रियाशील संस्करण के रूप में माना जा सकता है।

वेधशाला वर्ग में वेधशाला बनाने के लिए सुविधाजनक तरीके निम्नलिखित हैं।

  • just(T item) - एक ऑब्जर्वेबल लौटाता है जो दिए गए (निरंतर संदर्भ) आइटम को इंगित करता है और फिर पूरा होता है।

  • fromIterable(Iterable source) - एक ऑब्ज़र्वेबल स्रोत में एक Iterable अनुक्रम को परिवर्तित करता है जो अनुक्रम में आइटम का उत्सर्जन करता है।

  • fromArray(T... items) - एक ऐरे को एक ऑब्जर्वेबल सोर्स में परिवर्तित करता है जो एरे में आइटम का उत्सर्जन करता है।

  • fromCallable(Callable supplier) - एक ऑब्जर्वेबल लौटाता है, जब एक पर्यवेक्षक इसे सब्सक्राइब करता है, तो आपके द्वारा निर्दिष्ट एक फ़ंक्शन को आमंत्रित करता है और फिर उस फ़ंक्शन से लौटाए गए मान का उत्सर्जन करता है।

  • fromFuture(Future future) - एक ऑब्जर्वेबल स्रोत में एक भविष्य को रूपांतरित करता है।

  • interval(long initialDelay, long period, TimeUnit unit) - एक ऑब्जर्वेबल को लौटाता है जो प्रारंभिक अवधि के बाद 0 एल का उत्सर्जन करता है और उसके बाद प्रत्येक अवधि के बाद बढ़ती संख्या।

एकल वर्ग एकल मान प्रतिक्रिया का प्रतिनिधित्व करता है। एकल अवलोकन योग्य केवल एकल सफल मान या त्रुटि का उत्सर्जन कर सकता है। यह onComplete ईवेंट का उत्सर्जन नहीं करता है।

वर्ग घोषणा

निम्नलिखित के लिए घोषणा है io.reactivex.Single<T> वर्ग -

public abstract class Single<T>
   extends Object
      implements SingleSource<T>

मसविदा बनाना

निम्नलिखित अनुक्रमिक प्रोटोकॉल है जो सिंगल ऑब्जर्वेबल संचालित होता है -

onSubscribe (onSuccess | onError)?

एकल उदाहरण

C: \> RxJava, अपनी पसंद के किसी भी संपादक का उपयोग करके निम्नलिखित जावा प्रोग्राम बनाएं।

ObservableTester.java

import java.util.concurrent.TimeUnit;

import io.reactivex.Single;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableSingleObserver;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args)  throws InterruptedException {
      //Create the observable
      Single<String> testSingle = Single.just("Hello World");

      //Create an observer
      Disposable disposable = testSingle
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .subscribeWith(
         new DisposableSingleObserver<String>() {

         @Override
         public void onError(Throwable e) { 
            e.printStackTrace();
         }

         @Override
         public void onSuccess(String value) {
            System.out.println(value);
         }
      }); 
      Thread.sleep(3000);
      //start observing
      disposable.dispose();
   }
}

परिणाम सत्यापित करें

का उपयोग कर वर्ग संकलित करें javac संकलक निम्नानुसार है -

C:\RxJava>javac ObservableTester.java

अब वेधशाला चलाने के लिए निम्नानुसार है -

C:\RxJava>java ObservableTester

यह निम्नलिखित उत्पादन का उत्पादन करना चाहिए -

Hello World

MayBe वर्ग आस्थगित प्रतिक्रिया का प्रतिनिधित्व करता है। MayBe अवलोकनीय या तो एकल सफल मान या कोई मान नहीं छोड़ सकता है।

वर्ग घोषणा

निम्नलिखित के लिए घोषणा है io.reactivex.Single<T> वर्ग -

public abstract class Maybe<T>
   extends Object
      implements MaybeSource<T>

मसविदा बनाना

निम्नलिखित अनुक्रमिक प्रोटोकॉल है जो मे बी ऑब्जर्वेबल संचालित करता है -

onSubscribe (onSuccess | onError | OnComplete)?

MayBe उदाहरण

C: \> RxJava, अपनी पसंद के किसी भी संपादक का उपयोग करके निम्नलिखित जावा प्रोग्राम बनाएं।

ObservableTester.java

import java.util.concurrent.TimeUnit;

import io.reactivex.Maybe;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableMaybeObserver;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester {
   public static void main(String[] args) throws InterruptedException {
      //Create an observer
      Disposable disposable = Maybe.just("Hello World")
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .subscribeWith(new DisposableMaybeObserver<String>() {
         @Override
         public void onError(Throwable e) { 
            e.printStackTrace();
         }

         @Override
         public void onSuccess(String value) {
            System.out.println(value);
         }

         @Override
         public void onComplete() {
            System.out.println("Done!");
         }
      }); 
      Thread.sleep(3000);
      //start observing
      disposable.dispose();
   }
}

परिणाम सत्यापित करें

का उपयोग कर वर्ग संकलित करें javac संकलक निम्नानुसार है -

C:\RxJava>javac ObservableTester.java

अब वेधशाला चलाने के लिए निम्नानुसार है -

C:\RxJava>java ObservableTester

यह निम्नलिखित उत्पादन का उत्पादन करना चाहिए -

Hello World

कंप्लीटटेबल वर्ग आस्थगित प्रतिक्रिया का प्रतिनिधित्व करता है। पूर्णरूपेण अवलोकन योग्य या तो एक सफल समापन या त्रुटि का संकेत दे सकता है।

वर्ग घोषणा

निम्नलिखित के लिए घोषणा है io.reactivex.Completable वर्ग -

public abstract class Completable
extends Object
implements CompletableSource

मसविदा बनाना

निम्नलिखित अनुक्रमिक प्रोटोकॉल है जो कम्पलीटेबल ऑब्जर्वेबल संचालित होता है -

onSubscribe (onError | onComplete)?

पूर्ण उदाहरण

C: \> RxJava, अपनी पसंद के किसी भी संपादक का उपयोग करके निम्नलिखित जावा प्रोग्राम बनाएं।

ObservableTester.java

import java.util.concurrent.TimeUnit;

import io.reactivex.Completable;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableCompletableObserver;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester {
   public static void main(String[] args) throws InterruptedException {

      //Create an observer
      Disposable disposable = Completable.complete()
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .subscribeWith(new DisposableCompletableObserver() {
         @Override
         public void onError(Throwable e) { 
            e.printStackTrace();
         }
         @Override
         public void onStart() {
            System.out.println("Started!");
         }
         @Override
         public void onComplete() {
            System.out.println("Done!");
         }
      }); 
      Thread.sleep(3000);
      //start observing
      disposable.dispose();
   }
}

परिणाम सत्यापित करें

का उपयोग कर वर्ग संकलित करें javac संकलक निम्नानुसार है -

C:\RxJava>javac ObservableTester.java

अब वेधशाला चलाने के लिए निम्नानुसार है -

C:\RxJava>java ObservableTester

यह निम्नलिखित उत्पादन का उत्पादन करना चाहिए -

Started!
Done!

कम्पोजिटडिसोपॉली क्लास एक कंटेनर का प्रतिनिधित्व करता है जो कई डिस्पोजेबल को पकड़ सकता है और डिस्पोजल को जोड़ने और हटाने की ओ (1) जटिलता प्रदान करता है।

वर्ग घोषणा

निम्नलिखित के लिए घोषणा है io.reactivex.disposables.CompositeDisposable वर्ग -

public final class CompositeDisposable
extends Object
implements Disposable, io.reactivex.internal.disposables.DisposableContainer

कम्पोजिटडिसोपोलेटरी उदाहरण

C: \> RxJava, अपनी पसंद के किसी भी संपादक का उपयोग करके निम्नलिखित जावा प्रोग्राम बनाएं।

ObservableTester.java

import io.reactivex.Maybe;
import io.reactivex.Single;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableMaybeObserver;
import io.reactivex.observers.DisposableSingleObserver;
import io.reactivex.schedulers.Schedulers;

import java.util.concurrent.TimeUnit;

public class ObservableTester  {
   public static void main(String[] args)  throws InterruptedException {    
      CompositeDisposable compositeDisposable = new CompositeDisposable();

      //Create an Single observer 
      Disposable disposableSingle = Single.just("Hello World")
      .delay(2, TimeUnit.SECONDS, Schedulers.io())
      .subscribeWith(
      new DisposableSingleObserver<String>() {
         @Override
         public void onError(Throwable e) {
            e.printStackTrace();
         }

         @Override
         public void onSuccess(String value) {
            System.out.println(value);
         }
      }); 

      //Create an observer
      Disposable disposableMayBe = Maybe.just("Hi")
      .delay(2, TimeUnit.SECONDS, Schedulers.io())
      .subscribeWith(new DisposableMaybeObserver<String>() {
         @Override
         public void onError(Throwable e) { 
            e.printStackTrace();
         }

         @Override
         public void onSuccess(String value) {
            System.out.println(value);
         }

         @Override
         public void onComplete() {
            System.out.println("Done!");
         }
      }); 

      Thread.sleep(3000);

      compositeDisposable.add(disposableSingle);
      compositeDisposable.add(disposableMayBe);

      //start observing
      compositeDisposable.dispose();
   }
}

परिणाम सत्यापित करें

का उपयोग कर वर्ग संकलित करें javac संकलक निम्नानुसार है -

C:\RxJava>javac ObservableTester.java

अब वेधशाला चलाने के लिए निम्नानुसार है -

C:\RxJava>java ObservableTester

यह निम्नलिखित उत्पादन का उत्पादन करना चाहिए -

Hello World
Hi

निम्नलिखित वे ऑपरेटर हैं जिनका उपयोग एक अवलोकन योग्य बनाने के लिए किया जाता है।

अनु क्रमांक। ऑपरेटर और विवरण
1

Create

खरोंच से एक देखने योग्य बनाता है और पर्यवेक्षक विधि को प्रोग्रामेटिक रूप से कॉल करने की अनुमति देता है।

2

Defer

जब तक एक पर्यवेक्षक सदस्यता नहीं लेता है, तब तक अवलोकन न करें। प्रत्येक पर्यवेक्षक के लिए एक ताजा अवलोकन बनाता है।

3

Empty/Never/Throw

सीमित व्यवहार के साथ एक अवलोकन बनाता है।

4

From

एक ऑब्जर्वेबल में ऑब्जेक्ट / डेटा संरचना को परिवर्तित करता है।

5

Interval

निर्दिष्ट समय अंतराल के अंतराल के साथ अनुक्रम में एक अवलोकन योग्य उत्सर्जक पूर्णांक बनाता है।

6

Just

एक वस्तु या डेटा संरचना को एक ही या एक ही प्रकार की वस्तुओं के उत्सर्जन के लिए एक ऑब्जर्वेबल में परिवर्तित करता है।

7

Range

दी गई सीमा के अनुक्रम में एक अवलोकन योग्य उत्सर्जक पूर्णांक बनाता है।

8

Repeat

बार-बार अनुक्रम में एक देखने योग्य उत्सर्जक पूर्णांक बनाता है।

9

Start

किसी फ़ंक्शन के रिटर्न मान का उत्सर्जन करने के लिए एक अवलोकन योग्य बनाता है।

10

Timer

दी गई देरी के बाद किसी एक वस्तु का उत्सर्जन करने के लिए एक अवलोकन योग्य बनाता है।

संचालक उदाहरण बनाना

C: \> RxJava, अपनी पसंद के किसी भी संपादक का उपयोग करके निम्नलिखित जावा प्रोग्राम बनाएं।

ObservableTester.java

import io.reactivex.Observable;
//Using fromArray operator to create an Observable
public class ObservableTester  {
   public static void main(String[] args) { 
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable = Observable.fromArray(letters);
      observable
         .map(String::toUpperCase)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

परिणाम सत्यापित करें

का उपयोग कर वर्ग संकलित करें javac संकलक निम्नानुसार है -

C:\RxJava>javac ObservableTester.java

अब वेधशाला चलाने के लिए निम्नानुसार है -

C:\RxJava>java ObservableTester

यह निम्नलिखित उत्पादन का उत्पादन करना चाहिए -

ABCDEFG

निम्नलिखित वे ऑपरेटर हैं जो एक ऑब्जर्वेबल से उत्सर्जित आइटम को बदलने के लिए उपयोग किए जाते हैं।

अनु क्रमांक। ऑपरेटर और विवरण
1

Buffer

वेधशालाओं के सामानों को समय-समय पर बंडलों में रखा जाता है और फिर वस्तुओं के बजाए बंडलों का उत्सर्जन होता है।

2

FlatMap

नेस्टेड वेधशालाओं में उपयोग किया जाता है। वेधशालाओं में वस्तुओं को परिवर्तित करता है। फिर आइटमों को सिंगल ऑब्जर्वेबल में समतल करें।

3

GroupBy

अलग-अलग वस्तुओं के समूह का उत्सर्जन करने के लिए कुंजी द्वारा आयोजित वेधशालाओं के सेट में एक ऑब्जर्वेबल को विभाजित करें।

4

Map

इसे बदलने के लिए प्रत्येक उत्सर्जित आइटम के लिए एक फ़ंक्शन लागू करें।

5

Scan

क्रमिक रूप से प्रत्येक उत्सर्जित वस्तु पर एक फ़ंक्शन लागू करें, और फिर क्रमिक मूल्य का उत्सर्जन करें।

6

Window

ऑब्जर्वेबल से ऑब्जर्वेबल विंडो में गैदरर्स को समय-समय पर देखा जाता है और फिर आइटम्स के बजाय विंडो को एमिट किया जाता है।

ट्रांसफॉर्मिंग ऑपरेटर उदाहरण

C: \> RxJava, अपनी पसंद के किसी भी संपादक का उपयोग करके निम्नलिखित जावा प्रोग्राम बनाएं।

ObservableTester.java

import io.reactivex.Observable;
//Using map operator to transform an Observable
public class ObservableTester  { 
   public static void main(String[] args) {    
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable = Observable.fromArray(letters);
      observable
         .map(String::toUpperCase)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

परिणाम सत्यापित करें

का उपयोग कर वर्ग संकलित करें javac संकलक निम्नानुसार है -

C:\RxJava>javac ObservableTester.java

अब वेधशाला चलाने के लिए निम्नानुसार है -

C:\RxJava>java ObservableTester

यह निम्नलिखित उत्पादन का उत्पादन करना चाहिए -

ABCDEFG

निम्नलिखित वे ऑपरेटर हैं जो एक ऑब्जर्वेबल से आइटम (च) का चयन करने के लिए उपयोग किए जाते हैं।

अनु क्रमांक। ऑपरेटर और विवरण
1

Debounce

किसी अन्य आइटम का उत्सर्जन किए बिना केवल टाइमआउट होने पर आइटम का उत्सर्जन करता है।

2

Distinct

केवल अनूठी वस्तुओं का उत्सर्जन करता है।

3

ElementAt

वेधशाला द्वारा उत्सर्जित n सूचकांक में केवल आइटम का उत्सर्जन करें।

4

Filter

केवल उन वस्तुओं का उत्सर्जन करता है जो दिए गए विधेय फ़ंक्शन को पास करते हैं।

5

First

पहले आइटम या पहले आइटम का उत्सर्जन करता है जो दिए गए मानदंडों को पारित करता है।

6

IgnoreElements

ऑब्जर्वेबल से किसी भी आइटम का उत्सर्जन न करें लेकिन पूरा होने के निशान।

7

Last

ओब्जर्वेबल से अंतिम तत्व का उत्सर्जन करता है।

8

Sample

दिए गए समय अंतराल के साथ सबसे हालिया आइटम का उत्सर्जन करता है।

9

Skip

एक ऑब्जर्वेबल से पहली n आइटमों को छोड़ता है।

10

SkipLast

एक ऑब्जर्वेबल से अंतिम n आइटम को छोड़ता है।

1 1

Take

एक ऑब्जर्वेबल से पहला n आइटम लेता है।

12

TakeLast

एक ऑब्जर्वेबल से अंतिम n आइटम लेता है।

फ़िल्टरिंग ऑपरेटर उदाहरण

C: \> RxJava, अपनी पसंद के किसी भी संपादक का उपयोग करके निम्नलिखित जावा प्रोग्राम बनाएं।

ObservableTester.java

import io.reactivex.Observable;
//Using take operator to filter an Observable
public class ObservableTester  {
   public static void main(String[] args) {    
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable = Observable.fromArray(letters);
      observable
         .take(2)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

परिणाम सत्यापित करें

का उपयोग कर वर्ग संकलित करें javac संकलक निम्नानुसार है -

C:\RxJava>javac ObservableTester.java

अब वेधशाला चलाने के लिए निम्नानुसार है -

C:\RxJava>java ObservableTester

यह निम्नलिखित उत्पादन का उत्पादन करना चाहिए -

ab

निम्नलिखित वे ऑपरेटर हैं जिनका उपयोग कई वेधशालाओं में से एक एकल वेधशाला बनाने के लिए किया जाता है।

अनु क्रमांक। ऑपरेटर और विवरण
1 And/Then/When

पैटर्न और योजना बिचौलियों का उपयोग करके आइटम सेट मिलाएं।

2 CombineLatest

प्रत्येक ऑब्जर्वेबल द्वारा एक निर्दिष्ट फ़ंक्शन के माध्यम से उत्सर्जित नवीनतम आइटम को मिलाएं और परिणामित आइटम का उत्सर्जन करें।

3 Join

दूसरी वेधनीय उत्सर्जित वस्तु के समय-सीमा के दौरान उत्सर्जित होने पर दो वेधशाला द्वारा उत्सर्जित वस्तुओं को मिलाएं।

4 Merge

वेधशालाओं से उत्सर्जित वस्तुओं को जोड़ती है।

5 StartWith

स्रोत से आइटमों को देखने के लिए शुरू करने से पहले वस्तुओं के एक निर्दिष्ट अनुक्रम का पालन करें

6 Switch

वेधशालाओं द्वारा उत्सर्जित सबसे हाल की वस्तुओं का उत्सर्जन करता है।

7 Zip

वेधशालाओं की वस्तुओं को कार्य पर आधारित करता है और परिणामी वस्तुओं का उत्सर्जन करता है।

संचालक संचालक उदाहरण

C: \> RxJava, अपनी पसंद के किसी भी संपादक का उपयोग करके निम्नलिखित जावा प्रोग्राम बनाएं।

ObservableTester.java

import io.reactivex.Observable;
//Using combineLatest operator to combine Observables
public class ObservableTester {
   public static void main(String[] args) {    
      Integer[] numbers = { 1, 2, 3, 4, 5, 6};
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable1 = Observable.fromArray(letters);
      Observable<Integer> observable2 = Observable.fromArray(numbers);
      Observable.combineLatest(observable1, observable2, (a,b) -> a + b)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

परिणाम सत्यापित करें

का उपयोग कर वर्ग संकलित करें javac संकलक निम्नानुसार है -

C:\RxJava>javac ObservableTester.java

अब वेधशाला चलाने के लिए निम्नानुसार है -

C:\RxJava>java ObservableTester

यह निम्नलिखित उत्पादन का उत्पादन करना चाहिए -

g1g2g3g4g5g6

निम्नलिखित वे ऑपरेटर हैं जो अक्सर वेधशालाओं के साथ उपयोगी होते हैं।

अनु क्रमांक। ऑपरेटर और विवरण
1

Delay

अवलोकनीय जीवन-चक्र की घटनाओं को संभालने के लिए कार्रवाई रजिस्टर करें।

2

Materialize/Dematerialize

भेजे गए आइटम का प्रतिनिधित्व करता है और अधिसूचना भेजी जाती है।

3

ObserveOn

अनुसूचक का अवलोकन करने के लिए निर्दिष्ट करें।

4

Serialize

सीरीयस कॉल करने के लिए फोर्स ऑब्जर्वेबल।

5

Subscribe

वेधशाला से पूर्ण की तरह वस्तुओं और सूचनाओं के उत्सर्जन पर काम करते हैं

6

SubscribeOn

जब वह सब्सक्राइब किया जाता है, तो अवलोकनकर्ता द्वारा उपयोग किए जाने वाले अनुसूचक को निर्दिष्ट करें।

7

TimeInterval

उत्सर्जन के बीच बीता समय की मात्रा के संकेत का उत्सर्जन करने के लिए एक अवलोकन के रूप में परिवर्तित करें।

8

Timeout

यदि किसी भी आइटम को छोड़ने के बिना निर्दिष्ट समय होता है, तो त्रुटि अधिसूचना जारी करता है।

9

Timestamp

उत्सर्जित प्रत्येक आइटम के लिए टाइमस्टैम्प संलग्न करें।

9

Using

ऑब्जर्वेबल के रूप में एक डिस्पोजेबल संसाधन या एक ही जीवन काल बनाता है।

उपयोगिता ऑपरेटर उदाहरण

C: \> RxJava, अपनी पसंद के किसी भी संपादक का उपयोग करके निम्नलिखित जावा प्रोग्राम बनाएं।

ObservableTester.java

import io.reactivex.Observable;
//Using subscribe operator to subscribe to an Observable
public class ObservableTester  {
   public static void main(String[] args) {    
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable = Observable.fromArray(letters);
      observable.subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

परिणाम सत्यापित करें

का उपयोग कर वर्ग संकलित करें javac संकलक निम्नानुसार है -

C:\RxJava>javac ObservableTester.java

अब वेधशाला चलाने के लिए निम्नानुसार है -

C:\RxJava>java ObservableTester

यह निम्नलिखित उत्पादन का उत्पादन करना चाहिए -

abcdefg

निम्नलिखित वे ऑपरेटर हैं जो एक या एक से अधिक वेधशाला या उत्सर्जित वस्तुओं का मूल्यांकन करते हैं।

अनु क्रमांक। ऑपरेटर और विवरण
1

All

दिए गए मानदंडों को पूरा करने के लिए उत्सर्जित सभी वस्तुओं का मूल्यांकन करता है।

2

Amb

पहले वेधशाला से सभी वस्तुओं का उत्सर्जन करता है केवल कई वेधशालाएं दी जाती हैं।

3

Contains

जांचता है कि क्या कोई ऑब्जर्वेबल किसी विशेष वस्तु का उत्सर्जन करता है या नहीं।

4

DefaultIfEmpty

डिफ़ॉल्ट आइटम का उत्सर्जन करता है अगर ऑब्जर्वेबल कुछ भी उत्सर्जित नहीं करता है।

5

SequenceEqual

जाँचता है कि क्या दो वेधशालाएँ वस्तुओं के समान अनुक्रम का उत्सर्जन करती हैं।

6

SkipUntil

पहली ऑब्जर्वेबल द्वारा उत्सर्जित वस्तुएं, जब तक कि दूसरी ऑब्जर्वेबल एक आइटम का उत्सर्जन नहीं करती है।

7

SkipWhile

एक पर्यवेक्षित द्वारा उत्सर्जित वस्तुओं को तब तक त्यागें जब तक कि दी गई स्थिति झूठी न हो जाए।

8

TakeUntil

एक ऑब्जर्वेबल द्वारा एक दूसरे ऑब्जर्वेबल के बाद उत्सर्जित वस्तुओं को त्यागता है या एक आइटम को समाप्त करता है।

9

TakeWhile

एक निर्दिष्ट स्थिति के झूठे होने के बाद एक ऑब्जर्वेबल द्वारा उत्सर्जित वस्तुओं को त्यागें।

सशर्त संचालक उदाहरण

C: \> RxJava, अपनी पसंद के किसी भी संपादक का उपयोग करके निम्नलिखित जावा प्रोग्राम बनाएं।

ObservableTester.java

import io.reactivex.Observable;
//Using defaultIfEmpty operator to operate on an Observable
public class ObservableTester  {
   public static void main(String[] args) {    
      final StringBuilder result = new StringBuilder();
      Observable.empty()
      .defaultIfEmpty("No Data")
      .subscribe(s -> result.append(s));
      System.out.println(result);
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result1 = new StringBuilder();
      Observable.fromArray(letters)
      .firstElement()
      .defaultIfEmpty("No data")   
      .subscribe(s -> result1.append(s));
      System.out.println(result1);
   }
}

परिणाम सत्यापित करें

का उपयोग कर वर्ग संकलित करें javac संकलक निम्नानुसार है -

C:\RxJava>javac ObservableTester.java

अब वेधशाला चलाने के लिए निम्नानुसार है -

C:\RxJava>java ObservableTester

यह निम्नलिखित उत्पादन का उत्पादन करना चाहिए -

No Data
a

निम्नलिखित वे ऑपरेटर हैं जो एक ऑब्जर्वेबल द्वारा उत्सर्जित संपूर्ण वस्तुओं पर काम करते हैं।

अनु क्रमांक। ऑपरेटर और विवरण
1

Average

सभी वस्तुओं का औसत मूल्यांकन करता है और परिणाम का उत्सर्जन करता है।

2

Concat

कई वस्तुओं से सभी वस्तुओं को बिना किसी इंटरलेयर के बाहर निकालता है।

3

Count

सभी वस्तुओं को गिनता है और परिणाम का उत्सर्जन करता है।

4

Max

सभी वस्तुओं के अधिकतम मूल्यवान आइटम का मूल्यांकन करता है और परिणाम का उत्सर्जन करता है।

5

Min

सभी वस्तुओं के न्यूनतम मूल्य का मूल्यांकन करता है और परिणाम का उत्सर्जन करता है।

6

Reduce

प्रत्येक आइटम पर एक फ़ंक्शन लागू करें और परिणाम लौटाएं।

7

Sum

सभी वस्तुओं के योग का मूल्यांकन करता है और परिणाम का उत्सर्जन करता है।

गणितीय संचालक उदाहरण

C: \> RxJava, अपनी पसंद के किसी भी संपादक का उपयोग करके निम्नलिखित जावा प्रोग्राम बनाएं।

ObservableTester.java

import io.reactivex.Observable;
//Using concat operator to operate on multiple Observables
public class ObservableTester  {
   public static void main(String[] args)  throws InterruptedException {    
      Integer[] numbers = { 1, 2, 3, 4, 5, 6};
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable1 = Observable.fromArray(letters);
      Observable<Integer> observable2 = Observable.fromArray(numbers);
      Observable.concat(observable1, observable2)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

परिणाम सत्यापित करें

का उपयोग कर वर्ग संकलित करें javac संकलक निम्नानुसार है -

C:\RxJava>javac ObservableTester.java

अब वेधशाला चलाने के लिए निम्नानुसार है -

C:\RxJava>java ObservableTester

यह निम्नलिखित उत्पादन का उत्पादन करना चाहिए -

abcdefg123456

निम्नलिखित ऐसे ऑपरेटर हैं जो सदस्यता पर अधिक सटीक नियंत्रण रखते हैं।

अनु क्रमांक। ऑपरेटर और विवरण
1

Connect

अपने ग्राहकों के लिए आइटम उत्सर्जित करने के लिए एक कनेक्ट करने योग्य अवलोकन का निर्देश दें।

2

Publish

कनेक्ट करने योग्य एक ऑब्जर्वेबल को कंवर्टेबल ऑब्जर्वेबल में बदलता है।

3

RefCount

एक कनेक्ट करने योग्य ऑब्जर्व को साधारण ऑब्जर्व करने योग्य बनाता है।

4

Replay

प्रत्येक ग्राहक द्वारा देखे जाने वाले उत्सर्जित वस्तुओं के समान क्रम को सुनिश्चित करें, भले ही ऑब्जर्वेबल ने बाद में आइटम छोड़ना शुरू कर दिया हो और ग्राहक बाद में सदस्यता लें।

कनेक्ट करने योग्य ऑपरेटर उदाहरण

C: \> RxJava, अपनी पसंद के किसी भी संपादक का उपयोग करके निम्नलिखित जावा प्रोग्राम बनाएं।

ObservableTester.java

import io.reactivex.Observable;
import io.reactivex.observables.ConnectableObservable;
//Using connect operator on a ConnectableObservable
public class ObservableTester {
   public static void main(String[] args) {
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      ConnectableObservable<String> connectable = Observable.fromArray(letters).publish();      
      connectable.subscribe(letter -> result.append(letter));
      System.out.println(result.length());
      connectable.connect();
      System.out.println(result.length());
      System.out.println(result);
   }
}

परिणाम सत्यापित करें

का उपयोग कर वर्ग संकलित करें javac संकलक निम्नानुसार है -

C:\RxJava>javac ObservableTester.java

अब वेधशाला चलाने के लिए निम्नानुसार है -

C:\RxJava>java ObservableTester

यह निम्नलिखित उत्पादन का उत्पादन करना चाहिए -

0
7
abcdefg

के अनुसार Reactive, एक सब्जेक्ट ऑब्जर्वबल के साथ-साथ ऑब्जर्वर दोनों के रूप में कार्य कर सकता है।

एक सब्जेक्ट एक तरह का ब्रिज या प्रॉक्सी होता है जो रिएक्टिवएक्स के कुछ कार्यान्वयन में उपलब्ध होता है जो ऑब्जर्वर और ऑब्जर्वेबल के रूप में कार्य करता है। क्योंकि यह एक पर्यवेक्षक है, यह एक या एक से अधिक वेधशालाओं की सदस्यता ले सकता है, और क्योंकि यह एक ऑब्जर्वेबल है, इसलिए यह उन वस्तुओं से गुजर सकता है जिन्हें वे पुन: देख सकते हैं और यह नई वस्तुओं का उत्सर्जन भी कर सकते हैं।

विषय चार प्रकार के होते हैं -

अनु क्रमांक। विषय विवरण
1

Publish Subject

केवल उन्हीं वस्तुओं का उत्सर्जन करता है जो सदस्यता के समय के बाद उत्सर्जित होती हैं।

2 Replay Subject

स्रोत ऑब्जर्वेबल द्वारा उत्सर्जित सभी वस्तुओं की परवाह किए बिना जब उन्होंने ऑब्जर्वेबल को सब्सक्राइब किया है।

3

Behavior Subject

सदस्यता के बाद, सबसे हाल ही में आइटम का उत्सर्जन करता है तो स्रोत ऑब्जर्वेबल द्वारा उत्सर्जित आइटम का उत्सर्जन जारी रखता है।

4

Async Subject

स्रोत द्वारा उत्सर्जित अंतिम वस्तु को उत्सर्जन पूरा होने के बाद उत्सर्जित करता है।

PublishSubject वर्तमान या देर से पर्यवेक्षकों के लिए पर्यवेक्षकों और टर्मिनल घटनाओं को सब्सक्राइब किए गए आइटम का उत्सर्जन करता है।

वर्ग घोषणा

निम्नलिखित के लिए घोषणा है io.reactivex.subjects.PublishSubject<T> वर्ग -

public final class PublishSubject<T>
extends Subject<T>

पब्लिशसब्यूज उदाहरण

C: \> RxJava, अपनी पसंद के किसी भी संपादक का उपयोग करके निम्नलिखित जावा प्रोग्राम बनाएं।

ObservableTester.java

import io.reactivex.subjects.PublishSubject;
public class ObservableTester  {
   public static void main(String[] args) {   
      final StringBuilder result1 = new StringBuilder();
      final StringBuilder result2 = new StringBuilder();         

      PublishSubject<String> subject = PublishSubject.create(); 
      subject.subscribe(value -> result1.append(value) ); 
      subject.onNext("a"); 
      subject.onNext("b"); 
      subject.onNext("c"); 
      subject.subscribe(value -> result2.append(value)); 
      subject.onNext("d"); 
      subject.onComplete();

      //Output will be abcd 
      System.out.println(result1);
      //Output will be d only
      //as subscribed after c item emitted.
      System.out.println(result2);
   }
}

परिणाम सत्यापित करें

का उपयोग कर वर्ग संकलित करें javac संकलक निम्नानुसार है -

C:\RxJava>javac ObservableTester.java

अब वेधशाला चलाने के लिए निम्नानुसार है -

C:\RxJava>java ObservableTester

यह निम्नलिखित उत्पादन का उत्पादन करना चाहिए -

abcd
d

BehaviorSubject ने अपने द्वारा देखे गए सबसे हाल के आइटम का उत्सर्जन किया और फिर प्रत्येक सब्सक्राइब्ड ऑब्जर्वर के लिए सभी बाद में देखे गए आइटम।

वर्ग घोषणा

निम्नलिखित के लिए घोषणा है io.reactivex.subjects.BehaviorSubject<T> वर्ग -

public final class BehaviorSubject<T>
extends Subject<T>

व्यवहार उदाहरण

C: \> RxJava, अपनी पसंद के किसी भी संपादक का उपयोग करके निम्नलिखित जावा प्रोग्राम बनाएं।

ObservableTester.java

import io.reactivex.subjects.BehaviorSubject;
public class ObservableTester  {
   public static void main(String[] args) {   
      final StringBuilder result1 = new StringBuilder();
      final StringBuilder result2 = new StringBuilder();         
      BehaviorSubject<String> subject =  BehaviorSubject.create(); 
      subject.subscribe(value -> result1.append(value) ); 
      subject.onNext("a"); 
      subject.onNext("b"); 
      subject.onNext("c"); 
      subject.subscribe(value -> result2.append(value)); 
      subject.onNext("d"); 
      subject.onComplete();
      //Output will be abcd
      System.out.println(result1);
      //Output will be cd being BehaviorSubject 
      //(c is last item emitted before subscribe)
      System.out.println(result2);
   }
}

परिणाम सत्यापित करें

का उपयोग कर वर्ग संकलित करें javac संकलक निम्नानुसार है -

C:\RxJava>javac ObservableTester.java

अब वेधशाला चलाने के लिए निम्नानुसार है -

C:\RxJava>java ObservableTester

यह निम्नलिखित उत्पादन का उत्पादन करना चाहिए -

abcd
cd

ReplaySubject घटनाओं / वस्तुओं को वर्तमान और देर से पर्यवेक्षकों को फिर से दिखाता है।

वर्ग घोषणा

निम्नलिखित के लिए घोषणा है io.reactivex.subjects.ReplaySubject<T> वर्ग -

public final class ReplaySubject<T>
extends Subject<T>

ReplaySubject उदाहरण

C: \> RxJava, अपनी पसंद के किसी भी संपादक का उपयोग करके निम्नलिखित जावा प्रोग्राम बनाएं।

ObservableTester.java

import io.reactivex.subjects.ReplaySubject;
public class ObservableTester  {
   public static void main(String[] args) {   
      final StringBuilder result1 = new StringBuilder();
      final StringBuilder result2 = new StringBuilder();         

      ReplaySubject<String> subject = ReplaySubject.create(); 
      subject.subscribe(value -> result1.append(value) ); 
      subject.onNext("a"); 
      subject.onNext("b"); 
      subject.onNext("c"); 
      subject.subscribe(value -> result2.append(value)); 
      subject.onNext("d"); 
      subject.onComplete();

      //Output will be abcd
      System.out.println(result1);
      //Output will be abcd being ReplaySubject
      //as ReplaySubject emits all the items
      System.out.println(result2);
   }
}

परिणाम सत्यापित करें

का उपयोग कर वर्ग संकलित करें javac संकलक निम्नानुसार है -

C:\RxJava>javac ObservableTester.java

अब वेधशाला चलाने के लिए निम्नानुसार है -

C:\RxJava>java ObservableTester

यह निम्नलिखित उत्पादन का उत्पादन करना चाहिए -

abcd
abcd

AsyncSubject एक अंतिम ईवेंट के बाद एक अंतिम ईवेंट या ऑब्जर्वर को प्राप्त त्रुटि का अनुसरण करता है।

वर्ग घोषणा

निम्नलिखित के लिए घोषणा है io.reactivex.subjects.AsyncSubject<T> वर्ग -

public final class  AsyncSubject<T>
extends Subject<T>

AsyncSubject उदाहरण

C: \> RxJava, अपनी पसंद के किसी भी संपादक का उपयोग करके निम्नलिखित जावा प्रोग्राम बनाएं।

ObservableTester.java

import io.reactivex.subjects. AsyncSubject;
public class ObservableTester  {
   public static void main(String[] args) {   
      final StringBuilder result1 = new StringBuilder();
      final StringBuilder result2 = new StringBuilder();         

      AsyncSubject<String> subject =  AsyncSubject.create(); 
      subject.subscribe(value -> result1.append(value) ); 
      subject.onNext("a"); 
      subject.onNext("b"); 
      subject.onNext("c"); 
      subject.subscribe(value -> result2.append(value)); 
      subject.onNext("d"); 
      subject.onComplete();

      //Output will be d being the last item emitted
      System.out.println(result1);
      //Output will be d being the last item emitted     
      System.out.println(result2);
   }
}

परिणाम सत्यापित करें

का उपयोग कर वर्ग संकलित करें javac संकलक निम्नानुसार है -

C:\RxJava>javac ObservableTester.java

अब वेधशाला चलाने के लिए निम्नानुसार है -

C:\RxJava>java ObservableTester

यह निम्नलिखित उत्पादन का उत्पादन करना चाहिए -

d
d

अवलोकनीय ऑपरेटरों के साथ काम करने के लिए बहु-थ्रेडिंग वातावरण में शेड्यूलर्स का उपयोग किया जाता है।

के अनुसार Reactive, शेड्यूलर का उपयोग यह निर्धारित करने के लिए किया जाता है कि ऑपरेटरों की श्रृंखला विभिन्न थ्रेड्स पर कैसे लागू होगी।

डिफ़ॉल्ट रूप से, एक अवलोकन योग्य और आप पर लागू होने वाले ऑपरेटरों की श्रृंखला अपना काम करेगी, और अपने पर्यवेक्षकों को सूचित करेगी, उसी थ्रेड पर, जिस पर इसकी सदस्यता विधि कहा जाता है। SubscribeOn ऑपरेटर एक अलग शेड्यूलर को निर्दिष्ट करके इस व्यवहार को बदलता है, जिस पर ऑब्जर्वेबल को काम करना चाहिए। ओब्जर्वऑन ऑपरेटर एक अलग शेड्यूलर को निर्दिष्ट करता है जिसे ऑब्जर्वेबल अपने पर्यवेक्षकों को सूचनाएं भेजने के लिए उपयोग करेगा।

RxJava में निम्न प्रकार के अनुसूचक उपलब्ध हैं -

अनु क्रमांक। अनुसूचक और विवरण
1

Schedulers.computation()

कम्प्यूटेशनल काम के लिए एक शेड्यूलर बनाता है और देता है। शेड्यूल किए जाने वाले थ्रेड्स की गणना सिस्टम में मौजूद सीपीयू पर निर्भर करती है। प्रति CPU एक थ्रेड की अनुमति है। इवेंट-लूप या कॉलबैक ऑपरेशन के लिए सर्वश्रेष्ठ।

2

Schedulers.io()

IO- बाउंड कार्य के लिए एक शेड्यूलर बनाता और वापस करता है। थ्रेड पूल आवश्यकतानुसार बढ़ सकता है।

3

Schedulers.newThread()

एक समयबद्धक बनाता है और रिटर्न करता है जो प्रत्येक इकाई के काम के लिए एक नया थ्रेड बनाता है।

4

Schedulers.trampoline()

एक अनुसूचक बनाता है और वर्तमान कार्य पूरा होने के बाद निष्पादित करने के लिए वर्तमान थ्रेड पर काम करता है।

4

Schedulers.from(java.util.concurrent.Executor executor)

एक निर्धारक को एक नए समयबद्धक उदाहरण में परिवर्तित करता है।

Schedulers.trampoline () विधि एक अनुसूचक बनाता है और वर्तमान कार्य पूरा होने के बाद निष्पादित करने के लिए वर्तमान थ्रेड पर काम करता है।

Schedulers.trampoline () उदाहरण

C: \> RxJava, अपनी पसंद के किसी भी संपादक का उपयोग करके निम्नलिखित जावा प्रोग्राम बनाएं।

ObservableTester.java

import java.util.Random;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread " 
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.trampoline()))
         .subscribe(length -> System.out.println("Receiver Thread " 
            + Thread.currentThread().getName() 
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

परिणाम सत्यापित करें

का उपयोग कर वर्ग संकलित करें javac संकलक निम्नानुसार है -

C:\RxJava>javac ObservableTester.java

अब वेधशाला चलाने के लिए निम्नानुसार है -

C:\RxJava>java ObservableTester

यह निम्नलिखित उत्पादन का उत्पादन करना चाहिए -

Processing Thread main
Receiver Thread main, Item length 1
Processing Thread main
Receiver Thread main, Item length 2
Processing Thread main
Receiver Thread main, Item length 3

Schedulers.newThread () विधि एक अनुसूचक बनाता है और काम की प्रत्येक इकाई के लिए एक नया थ्रेड बनाता है।

शेड्यूलर .newThread () उदाहरण

C: \> RxJava, अपनी पसंद के किसी भी संपादक का उपयोग करके निम्नलिखित जावा प्रोग्राम बनाएं।

ObservableTester.java

import java.util.Random;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread " 
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.newThread()))
         .subscribe(length -> System.out.println("Receiver Thread " 
            + Thread.currentThread().getName() 
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

परिणाम सत्यापित करें

का उपयोग कर वर्ग संकलित करें javac संकलक निम्नानुसार है -

C:\RxJava>javac ObservableTester.java

अब वेधशाला चलाने के लिए निम्नानुसार है -

C:\RxJava>java ObservableTester

यह निम्नलिखित उत्पादन का उत्पादन करना चाहिए -

Processing Thread RxNewThreadScheduler-1
Receiver Thread RxNewThreadScheduler-1, Item length 1
Processing Thread RxNewThreadScheduler-2
Receiver Thread RxNewThreadScheduler-2, Item length 2
Processing Thread RxNewThreadScheduler-3
Receiver Thread RxNewThreadScheduler-3, Item length 3

Schedulers.computation () विधि बनाता है और कम्प्यूटेशनल काम के लिए एक समयबद्धक देता है। शेड्यूल किए जाने वाले थ्रेड्स की गणना सिस्टम में मौजूद सीपीयू पर निर्भर करती है। प्रति CPU एक थ्रेड की अनुमति है। इवेंट-लूप या कॉलबैक ऑपरेशन के लिए सर्वश्रेष्ठ।

Schedulers.computation () उदाहरण

C: \> RxJava, अपनी पसंद के किसी भी संपादक का उपयोग करके निम्नलिखित जावा प्रोग्राम बनाएं।

ObservableTester.java

import java.util.Random;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread " 
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.computation()))
         .subscribe(length -> System.out.println("Receiver Thread " 
            + Thread.currentThread().getName() 
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

परिणाम सत्यापित करें

का उपयोग कर वर्ग संकलित करें javac संकलक निम्नानुसार है -

C:\RxJava>javac ObservableTester.java

अब वेधशाला चलाने के लिए निम्नानुसार है -

C:\RxJava>java ObservableTester

यह निम्नलिखित उत्पादन का उत्पादन करना चाहिए -

Processing Thread RxComputationThreadPool-1
Receiver Thread RxComputationThreadPool-1, Item length 1
Processing Thread RxComputationThreadPool-2
Receiver Thread RxComputationThreadPool-2, Item length 2
Processing Thread RxComputationThreadPool-3
Receiver Thread RxComputationThreadPool-3, Item length 3

Schedulers.io () विधि IO- बाउंड कार्य के लिए एक समयबद्धक बनाता है और देता है। थ्रेड पूल आवश्यकतानुसार बढ़ सकता है। आई / ओ गहन संचालन के लिए सर्वश्रेष्ठ।

Schedulers.io () उदाहरण

C: \> RxJava, अपनी पसंद के किसी भी संपादक का उपयोग करके निम्नलिखित जावा प्रोग्राम बनाएं।

ObservableTester.java

import java.util.Random;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread " 
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.io()))
         .subscribe(length -> System.out.println("Receiver Thread " 
            + Thread.currentThread().getName() 
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

परिणाम सत्यापित करें

का उपयोग कर वर्ग संकलित करें javac संकलक निम्नानुसार है -

C:\RxJava>javac ObservableTester.java

अब वेधशाला चलाने के लिए निम्नानुसार है -

C:\RxJava>java ObservableTester

यह निम्नलिखित उत्पादन का उत्पादन करना चाहिए -

Processing Thread RxCachedThreadScheduler-1
Receiver Thread RxCachedThreadScheduler-1, Item length 1
Processing Thread RxCachedThreadScheduler-1
Receiver Thread RxCachedThreadScheduler-1, Item length 2
Processing Thread RxCachedThreadScheduler-1
Receiver Thread RxCachedThreadScheduler-1, Item length 3

Schedulers.from (एक्ज़ीक्यूटर) विधि एक एक्सक्यूज़र को एक नए शेड्यूलर उदाहरण में परिवर्तित करती है।

Schedulers.from (निष्पादनकर्ता) उदाहरण

C: \> RxJava, अपनी पसंद के किसी भी संपादक का उपयोग करके निम्नलिखित जावा प्रोग्राम बनाएं।

ObservableTester.java

import java.util.Random;
import java.util.concurrent.Executors;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread " 
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.from(Executors.newFixedThreadPool(3))))
         .subscribe(length -> System.out.println("Receiver Thread " 
            + Thread.currentThread().getName() 
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

परिणाम सत्यापित करें

का उपयोग कर वर्ग संकलित करें javac संकलक निम्नानुसार है -

C:\RxJava>javac ObservableTester.java

अब वेधशाला चलाने के लिए निम्नानुसार है -

C:\RxJava>java ObservableTester

यह निम्नलिखित उत्पादन का उत्पादन करना चाहिए -

Processing Thread pool-1-thread-1
Processing Thread pool-3-thread-1
Receiver Thread pool-1-thread-1, Item length 1
Processing Thread pool-4-thread-1
Receiver Thread pool-4-thread-1, Item length 3
Receiver Thread pool-3-thread-1, Item length 2

बफरिंग ऑपरेटर एक ऑब्जर्वबल द्वारा उत्सर्जित वस्तुओं को एक सूची या बंडलों में इकट्ठा करने और उन बंडलों को वस्तुओं के बजाय बाहर निकालने की अनुमति देता है। नीचे दिए गए उदाहरण में, हमने 9 वस्तुओं का उत्सर्जन करने के लिए एक ऑब्जर्वेबल बनाया है और बफरिंग का उपयोग करते हुए, 3 वस्तुओं को एक साथ उत्सर्जित किया जाएगा।

बफरिंग उदाहरण

C: \> RxJava, अपनी पसंद के किसी भी संपादक का उपयोग करके निम्नलिखित जावा प्रोग्राम बनाएं।

ObservableTester.java

import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;

import java.util.List;
import java.util.concurrent.TimeUnit;

public class ObservableTester {
   public static void main(String[] args) throws InterruptedException {
      Observable<Integer> observable = Observable.just(1, 2, 3, 4,
         5, 6, 7, 8, 9);

      observable.subscribeOn(Schedulers.io())
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .buffer(3)
         .subscribe(new Observer<List<Integer>>() {
            @Override
            public void onSubscribe(Disposable d) {
               System.out.println("Subscribed");
            }
            @Override
            public void onNext(List<Integer> integers) {
               System.out.println("onNext: ");
               for (Integer value : integers) {
                  System.out.println(value);
               }
            }
            @Override
            public void onError(Throwable e) {
               System.out.println("Error");
            }

            @Override
            public void onComplete() {
               System.out.println("Done! ");
            }
         });
      Thread.sleep(3000);
   }
}

परिणाम सत्यापित करें

का उपयोग कर वर्ग संकलित करें javac संकलक निम्नानुसार है -

C:\RxJava>javac ObservableTester.java

अब वेधशाला चलाने के लिए निम्नानुसार है -

C:\RxJava>java ObservableTester

यह निम्नलिखित उत्पादन का उत्पादन करना चाहिए -

Subscribed
onNext: 
1
2
3
onNext: 
4
5
6
onNext: 
7
8
9
Done!

विंडिंग ऑपरेटर बफर ऑपरेटर के समान काम करता है लेकिन यह एक ऑब्जर्वेबल द्वारा संग्रहित वस्तुओं को संग्रह के बजाय किसी अन्य वेधशाला में उत्सर्जित करने की अनुमति देता है और संग्रह के बजाय उन वेधशालाओं का उत्सर्जन करता है। नीचे दिए गए उदाहरण में, हमने 9 वस्तुओं का उत्सर्जन करने के लिए एक ऑब्जर्वेबल बनाया है और विंडो ऑपरेटर का उपयोग करते हुए, 3 ऑब्जर्वेबल को एक साथ उत्सर्जित किया जाएगा।

घुमावदार उदाहरण

C: \> RxJava, अपनी पसंद के किसी भी संपादक का उपयोग करके निम्नलिखित जावा प्रोग्राम बनाएं।

ObservableTester.java

import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;

import java.util.concurrent.TimeUnit;

public class ObservableTester {
   public static void main(String[] args) throws InterruptedException {
      Observable<Integer> observable = Observable.just(1, 2, 3, 4,
         5, 6, 7, 8, 9);

      observable.subscribeOn(Schedulers.io())
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .window(3)
         .subscribe(new Observer<Observable<Integer>>() {
            @Override
            public void onSubscribe(Disposable d) {
               System.out.println("Subscribed");
            }
            @Override
            public void onNext(Observable<Integer> integers) {
               System.out.println("onNext: ");
               integers.subscribe(value -> System.out.println(value));
            }
            @Override
            public void onError(Throwable e) {
               System.out.println("Error");
            }

            @Override
            public void onComplete() {
               System.out.println("Done! ");
            }
         });
      Thread.sleep(3000);
   }
}

परिणाम सत्यापित करें

का उपयोग कर वर्ग संकलित करें javac संकलक निम्नानुसार है -

C:\RxJava>javac ObservableTester.java

अब वेधशाला चलाने के लिए निम्नानुसार है -

C:\RxJava>java ObservableTester

यह निम्नलिखित उत्पादन का उत्पादन करना चाहिए -

Subscribed
onNext: 
1
2
3
onNext: 
4
5
6
onNext: 
7
8
9
Done!