RxJava - Hızlı Kılavuz

RxJava, ReactiveX'in Java tabanlı bir uzantısıdır. Java'da uygulama veya ReactiveX projesi sağlar. RxJava'nın temel özellikleri aşağıdadır.

  • Gözlemci modelini genişletir.

  • Veri / olay dizilerini destekleyin.

  • Operatörlerin dizileri birlikte bildirimli olarak oluşturmasını sağlar.

  • İş parçacığı, senkronizasyon, iş parçacığı güvenliği ve eşzamanlı veri yapılarını dahili olarak yönetir.

ReactiveX nedir?

ReactiveX, çeşitli programlama dillerine reaktif programlama kavramı sağlamayı amaçlayan bir projedir. Reaktif Programlama, programın veriler göründüğünde ve göründüğünde tepki verdiği senaryoyu ifade eder. Olay tabanlı bir programlama kavramıdır ve olaylar gözlemcilere kaydedilebilir.

Göre ReactiveGözlemci kalıbı, Yineleyici kalıbı ve işlevsel kalıbın en iyilerini birleştirdiler.

Gözlemci modeli doğru yapıldı. ReactiveX, Observer modelinden, Yineleyici modelinden ve fonksiyonel programlamadan gelen en iyi fikirlerin bir kombinasyonudur.

Fonksiyonel Programlama

Fonksiyonel programlama, yazılımın saf fonksiyonlar kullanılarak oluşturulması etrafında döner. Saf bir işlev önceki duruma bağlı değildir ve geçirilen aynı parametreler için her zaman aynı sonucu döndürür. Saf işlevler, paylaşılan nesneler, değişken veriler ve genellikle çok iş parçacıklı ortamlarda yaygın olan yan etkilerle ilişkili sorunlardan kaçınmaya yardımcı olur.

Reaktif Programlama

Reaktif programlama, veri akışlarının eşzamansız olarak geldiği ve ulaşıldığında işlendiği olay odaklı programlamayı ifade eder.

Fonksiyonel Reaktif Programlama

RxJava, akış verilerinin zaman içinde değiştiği ve tüketici işlevinin buna göre tepki verdiği her iki kavramı birlikte uygular.

Reaktif Manifesto

Reactive Manifesto , uygulama yazılım sistemlerinin yüksek standardını belirten çevrimiçi bir belgedir. Manifestoya göre, reaktif bir yazılımın temel özellikleri şunlardır:

  • Responsive - Her zaman zamanında yanıt vermelidir.

  • Message Driven - Bağlantının gevşek olmasını sağlamak için bileşenler arasında asenkron mesaj geçişi kullanmalıdır.

  • Elastic - Yüksek yük altında bile duyarlı kalmalıdır.

  • Resilient - Herhangi bir bileşen başarısız olsa bile yanıt vermeye devam etmelidir.

RxJava'nın temel bileşenleri

RxJava'nın iki temel bileşeni vardır: Gözlemlenebilirler ve Gözlemci.

  • Observable - Sıfır veya daha fazla veri yayabilen, hata mesajı gönderebilen, bir dizi veri yayarken hızı kontrol edilebilen, sonlu ve sonsuz veri gönderebilen Stream'e benzer bir nesneyi temsil eder.

  • Observer- Observable'ın sekans verilerine abone olur ve gözlemlenebilirlerin öğesi başına tepki verir. Gözlemlenebilir bir veri yayınladığında gözlemciler bilgilendirilir. Bir Gözlemci, verileri tek tek işler.

Öğeler yoksa veya önceki bir öğe için bir geri arama döndürülmezse, gözlemciye asla bildirimde bulunulmaz.

Yerel Ortam Kurulumu

RxJava, Java için bir kitaplıktır, bu nedenle ilk gereklilik, makinenize JDK'nın kurulu olmasıdır.

Sistem gereksinimleri

JDK 1.5 veya üstü.
Hafıza Minimum gereklilik yok.
Disk alanı Minimum gereklilik yok.
İşletim sistemi Minimum gereklilik yok.

Adım 1 - Makinenizde Java Kurulumunu Doğrulayın

Öncelikle konsolu açın ve üzerinde çalıştığınız işletim sistemine göre bir java komutu yürütün.

işletim sistemi Görev Komut
pencereler Komut Konsolunu Aç c: \> java sürümü
Linux Komut Terminalini Aç $ java sürümü
Mac Açık Terminal machine: <joseph $ java -version

Tüm işletim sistemleri için çıktıyı doğrulayalım -

işletim sistemi Çıktı
pencereler

java sürümü "1.8.0_101"

Java (TM) SE Çalışma Zamanı Ortamı (derleme 1.8.0_101)

Linux

java sürümü "1.8.0_101"

Java (TM) SE Çalışma Zamanı Ortamı (derleme 1.8.0_101)

Mac

java sürümü "1.8.0_101"

Java (TM) SE Çalışma Zamanı Ortamı (derleme 1.8.0_101)

Sisteminizde Java yüklü değilse, aşağıdaki bağlantıdan Java Yazılım Geliştirme Kitini (SDK) indirin https://www.oracle.com. Bu öğretici için Java 1.8.0_101 sürümünü yüklü sürüm olarak kabul ediyoruz.

Adım 2 - JAVA Ortamını Ayarlayın

Yı kur JAVA_HOMEJava'nın makinenizde kurulu olduğu temel dizin konumunu gösteren ortam değişkeni. Örneğin.

işletim sistemi Çıktı
pencereler JAVA_HOME ortam değişkenini C: \ Program Files \ Java \ jdk1.8.0_101 olarak ayarlayın
Linux dışa aktar JAVA_HOME = / usr / local / java-current
Mac dışa aktar JAVA_HOME = / Kitaplık / Java / Ana Sayfa

Java derleyici konumunu Sistem Yoluna ekleyin.

işletim sistemi Çıktı
pencereler Dizeyi ekleyin C:\Program Files\Java\jdk1.8.0_101\bin sistem değişkeninin sonunda, Path.
Linux dışa aktar PATH = $ PATH: $ JAVA_HOME / bin /
Mac gerekli değil

Komutu kullanarak Java kurulumunu doğrulayın java -version yukarıda açıklandığı gibi.

Adım 3 - RxJava2 Arşivini İndirin

RxJava jar dosyasının en son sürümünü RxJava @ MVNRepository ve bağımlılığı Reactive Streams @ MVNRepository'den indirin . Bu öğreticiyi yazarken, rxjava-2.2.4.jar, reactive-streams-1.0.2.jar'ı indirdik ve C: \> RxJava klasörüne kopyaladık.

işletim sistemi Arşiv adı
pencereler rxjava-2.2.4.jar, reactive-streams-1.0.2.jar
Linux rxjava-2.2.4.jar, reactive-streams-1.0.2.jar
Mac rxjava-2.2.4.jar, reactive-streams-1.0.2.jar

Adım 4 - RxJava Ortamını Ayarlayın

Yı kur RX_JAVARxJava jar dosyasının makinenizde depolandığı temel dizin konumuna işaret etmek için ortam değişkeni. Rxjava-2.2.4.jar ve reactive-streams-1.0.2.jar'ı RxJava klasöründe sakladığımızı varsayalım.

Sr.No İşletim Sistemi ve Açıklama
1

Windows

Ortam değişkenini RX_JAVA'yı C: \ RxJava olarak ayarlayın.

2

Linux

dışa aktar RX_JAVA = / usr / local / RxJava

3

Mac

dışa aktar RX_JAVA = / Kitaplık / RxJava

Adım 5 - CLASSPATH Değişkenini Ayarlama

Yı kur CLASSPATH RxJava kavanoz konumuna işaret edecek ortam değişkeni.

Sr.No İşletim Sistemi ve Açıklama
1

Windows

CLASSPATH ortam değişkenini% CLASSPATH%;% RX_JAVA% \ rxjava-2.2.4.jar;% RX_JAVA% \ reactive-streams-1.0.2.jar;.;

2

Linux

dışa aktar CLASSPATH = $ CLASSPATH: $ RX_JAVA / rxjava-2.2.4.jar: reactive-streams-1.0.2.jar :.

3

Mac

dışa aktar CLASSPATH = $ CLASSPATH: $ RX_JAVA / rxjava-2.2.4.jar: reactive-streams-1.0.2.jar :.

Adım 6 - RxJava Kurulumunu Test Edin

Aşağıda gösterildiği gibi bir TestRx.java sınıfı oluşturun -

import io.reactivex.Flowable;
public class TestRx {
   public static void main(String[] args) {
      Flowable.just("Hello World!")
         .subscribe(System.out::println);
   }
}

Adım 7 - Sonucu Doğrulayın

Kullanarak sınıfları derleyin javac aşağıdaki gibi derleyici -

C:\RxJava>javac Tester.java

Çıkışı doğrulayın.

Hello World!

Observables veri kaynaklarını temsil eder. Observers (Subscribers)onları dinle. Özetle, bir Gözlemlenebilir öğeler yayar ve ardından bir Abone bu öğeleri tüketir.

Gözlenebilir

  • Gözlemlenebilir, abone dinlemeye başladığında veri sağlar.

  • Gözlemlenebilir, herhangi bir sayıda öğe yayabilir.

  • Gözlemlenebilir yalnızca tamamlanma sinyalini ve hiçbir öğe olmadan yayabilir.

  • Gözlemlenebilir başarıyla sona erdirilebilir.

  • Gözlemlenebilirlik asla sona ermeyebilir. örneğin bir düğmeye herhangi bir sayıda tıklanabilir.

  • Gözlemlenebilir herhangi bir anda hata verebilir.

Abone

  • Observable birden fazla aboneye sahip olabilir.

  • Bir Observable bir öğe yaydığında, her abone onNext () yöntemi çağrılır.

  • Bir Observable öğeleri yaymayı bitirdiğinde, her bir abone onComplete () yöntemi çağrılır.

  • Bir Observable hata verirse, her abone onError () yöntemi çağrılır.

Aşağıda, gözlemlenebilirler oluşturmak için temel sınıflar verilmiştir.

  • Flowable- 0..N akış, 0 veya n öğe yayar. Reaktif Akışları ve karşı basıncı destekler.

  • Observable - 0..N akış var, ancak karşı basınç yok.

  • Single- 1 öğe veya hata. Yöntem çağrısının reaktif bir sürümü olarak değerlendirilebilir.

  • Completable- Hiçbir öğe yayılmadı. Tamamlanma veya hata için bir sinyal olarak kullanılır. Runnable'ın reaktif bir versiyonu olarak değerlendirilebilir.

  • MayBe- Ya hiç öğe yok ya da 1 öğe yayıldı. Opsiyonel'in reaktif bir versiyonu olarak değerlendirilebilir.

Aşağıda, Observable sınıfında gözlemlenebilirler oluşturmak için uygun yöntemler verilmiştir.

  • just(T item) - Verilen (sabit referans) öğeyi işaret eden ve ardından tamamlayan bir Gözlemlenebilir döndürür.

  • fromIterable(Iterable source) - Yinelenebilir bir diziyi, dizideki öğeleri yayan bir ObservableSource'a dönüştürür.

  • fromArray(T... items) - Bir Diziyi, Dizideki öğeleri yayınlayan bir ObservableSource'a dönüştürür.

  • fromCallable(Callable supplier) - Bir gözlemci ona abone olduğunda, belirttiğiniz bir işlevi çağıran ve ardından o işlevden döndürülen değeri yayınlayan bir Gözlemlenebilir döndürür.

  • fromFuture(Future future) - Bir Geleceği Gözlemlenebilir Kaynağa Dönüştürür.

  • interval(long initialDelay, long period, TimeUnit unit) - İlk Gecikmeden sonra 0L yayan ve daha sonraki her sürenin ardından sürekli artan sayıları veren bir Gözlemlenebilir döndürür.

Single sınıfı, tek değer yanıtını temsil eder. Tek gözlemlenebilir, yalnızca tek bir başarılı değer veya bir hata yayabilir. OnComplete olayını yaymaz.

Sınıf Beyanı

Aşağıdaki beyanı io.reactivex.Single<T> sınıf -

public abstract class Single<T>
   extends Object
      implements SingleSource<T>

Protokol

Tek Gözlemlenebilir'in çalıştırdığı ardışık protokol aşağıdadır -

onSubscribe (onSuccess | onError)?

Tek Örnek

C: \> RxJava'da seçtiğiniz herhangi bir düzenleyiciyi kullanarak aşağıdaki Java programını oluşturun.

ObservableTester.java

import java.util.concurrent.TimeUnit;

import io.reactivex.Single;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableSingleObserver;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args)  throws InterruptedException {
      //Create the observable
      Single<String> testSingle = Single.just("Hello World");

      //Create an observer
      Disposable disposable = testSingle
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .subscribeWith(
         new DisposableSingleObserver<String>() {

         @Override
         public void onError(Throwable e) { 
            e.printStackTrace();
         }

         @Override
         public void onSuccess(String value) {
            System.out.println(value);
         }
      }); 
      Thread.sleep(3000);
      //start observing
      disposable.dispose();
   }
}

Sonucu Doğrulayın

Kullanarak sınıfı derleyin javac aşağıdaki gibi derleyici -

C:\RxJava>javac ObservableTester.java

Şimdi ObservableTester'ı aşağıdaki gibi çalıştırın -

C:\RxJava>java ObservableTester

Aşağıdaki çıktıyı üretmelidir -

Hello World

MayBe sınıfı, ertelenmiş yanıtı temsil eder. MayBe gözlemlenebilir, tek bir başarılı değer yayabilir veya hiçbir değer yayınlamayabilir.

Sınıf Beyanı

Aşağıdaki beyanı io.reactivex.Single<T> sınıf -

public abstract class Maybe<T>
   extends Object
      implements MaybeSource<T>

Protokol

MayBe Observable'ın çalıştırdığı sıralı protokol aşağıdadır -

onSubscribe (onSuccess | onError | OnComplete)?

MayBe Örneği

C: \> RxJava'da seçtiğiniz herhangi bir düzenleyiciyi kullanarak aşağıdaki Java programını oluşturun.

ObservableTester.java

import java.util.concurrent.TimeUnit;

import io.reactivex.Maybe;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableMaybeObserver;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester {
   public static void main(String[] args) throws InterruptedException {
      //Create an observer
      Disposable disposable = Maybe.just("Hello World")
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .subscribeWith(new DisposableMaybeObserver<String>() {
         @Override
         public void onError(Throwable e) { 
            e.printStackTrace();
         }

         @Override
         public void onSuccess(String value) {
            System.out.println(value);
         }

         @Override
         public void onComplete() {
            System.out.println("Done!");
         }
      }); 
      Thread.sleep(3000);
      //start observing
      disposable.dispose();
   }
}

Sonucu Doğrulayın

Kullanarak sınıfı derleyin javac aşağıdaki gibi derleyici -

C:\RxJava>javac ObservableTester.java

Şimdi ObservableTester'ı aşağıdaki gibi çalıştırın -

C:\RxJava>java ObservableTester

Aşağıdaki çıktıyı üretmelidir -

Hello World

Tamamlanabilir sınıf, ertelenmiş yanıtı temsil eder. Tamamlanabilir gözlemlenebilir, başarılı bir tamamlanma veya hatayı gösterebilir.

Sınıf Beyanı

Aşağıdaki beyanı io.reactivex.Completable sınıf -

public abstract class Completable
extends Object
implements CompletableSource

Protokol

Completable Observable'ın çalıştırdığı sıralı protokol aşağıdadır -

onSubscribe (onError | onComplete)?

Tamamlanabilir Örnek

C: \> RxJava'da seçtiğiniz herhangi bir düzenleyiciyi kullanarak aşağıdaki Java programını oluşturun.

ObservableTester.java

import java.util.concurrent.TimeUnit;

import io.reactivex.Completable;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableCompletableObserver;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester {
   public static void main(String[] args) throws InterruptedException {

      //Create an observer
      Disposable disposable = Completable.complete()
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .subscribeWith(new DisposableCompletableObserver() {
         @Override
         public void onError(Throwable e) { 
            e.printStackTrace();
         }
         @Override
         public void onStart() {
            System.out.println("Started!");
         }
         @Override
         public void onComplete() {
            System.out.println("Done!");
         }
      }); 
      Thread.sleep(3000);
      //start observing
      disposable.dispose();
   }
}

Sonucu Doğrulayın

Kullanarak sınıfı derleyin javac aşağıdaki gibi derleyici -

C:\RxJava>javac ObservableTester.java

Şimdi ObservableTester'ı aşağıdaki gibi çalıştırın -

C:\RxJava>java ObservableTester

Aşağıdaki çıktıyı üretmelidir -

Started!
Done!

CompositeDisposable sınıfı, birden çok tek kullanımlık malzemeyi tutabilen ve tek kullanımlık malzemelerin eklenmesi ve çıkarılması için O (1) karmaşıklığı sunan bir kabı temsil eder.

Sınıf Beyanı

Aşağıdaki beyanı io.reactivex.disposables.CompositeDisposable sınıf -

public final class CompositeDisposable
extends Object
implements Disposable, io.reactivex.internal.disposables.DisposableContainer

Kompozit Tek Kullanımlık Örnek

C: \> RxJava'da seçtiğiniz herhangi bir düzenleyiciyi kullanarak aşağıdaki Java programını oluşturun.

ObservableTester.java

import io.reactivex.Maybe;
import io.reactivex.Single;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableMaybeObserver;
import io.reactivex.observers.DisposableSingleObserver;
import io.reactivex.schedulers.Schedulers;

import java.util.concurrent.TimeUnit;

public class ObservableTester  {
   public static void main(String[] args)  throws InterruptedException {    
      CompositeDisposable compositeDisposable = new CompositeDisposable();

      //Create an Single observer 
      Disposable disposableSingle = Single.just("Hello World")
      .delay(2, TimeUnit.SECONDS, Schedulers.io())
      .subscribeWith(
      new DisposableSingleObserver<String>() {
         @Override
         public void onError(Throwable e) {
            e.printStackTrace();
         }

         @Override
         public void onSuccess(String value) {
            System.out.println(value);
         }
      }); 

      //Create an observer
      Disposable disposableMayBe = Maybe.just("Hi")
      .delay(2, TimeUnit.SECONDS, Schedulers.io())
      .subscribeWith(new DisposableMaybeObserver<String>() {
         @Override
         public void onError(Throwable e) { 
            e.printStackTrace();
         }

         @Override
         public void onSuccess(String value) {
            System.out.println(value);
         }

         @Override
         public void onComplete() {
            System.out.println("Done!");
         }
      }); 

      Thread.sleep(3000);

      compositeDisposable.add(disposableSingle);
      compositeDisposable.add(disposableMayBe);

      //start observing
      compositeDisposable.dispose();
   }
}

Sonucu Doğrulayın

Kullanarak sınıfı derleyin javac aşağıdaki gibi derleyici -

C:\RxJava>javac ObservableTester.java

Şimdi ObservableTester'ı aşağıdaki gibi çalıştırın -

C:\RxJava>java ObservableTester

Aşağıdaki çıktıyı üretmelidir -

Hello World
Hi

Aşağıda, bir Gözlemlenebilir oluşturmak için kullanılan operatörler verilmiştir.

Sr.No. Operatör ve Açıklama
1

Create

Sıfırdan bir Gözlemlenebilir oluşturur ve gözlemci yönteminin programlı olarak çağırmasına izin verir.

2

Defer

Bir gözlemci abone olana kadar bir Observable oluşturmayın. Her gözlemci için taze bir gözlemlenebilir yaratır.

3

Empty/Never/Throw

Sınırlı davranışa sahip bir Gözlemlenebilir oluşturur.

4

From

Bir nesneyi / veri yapısını bir Gözlemlenebilir'e dönüştürür.

5

Interval

Belirtilen zaman aralığı boşluğuyla sırayla bir Gözlemlenebilir yayan tamsayı oluşturur.

6

Just

Bir nesneyi / veri yapısını, aynı veya aynı türde nesneleri yaymak için bir Gözlemlenebilir'e dönüştürür.

7

Range

Belirli bir aralıkta bir Gözlemlenebilir yayan tamsayı oluşturur.

8

Repeat

Art arda sırayla bir Observable yayan tamsayı oluşturur.

9

Start

Bir işlevin dönüş değerini yaymak için bir Observable oluşturur.

10

Timer

Belirli bir gecikmeden sonra tek bir öğe yaymak için bir Gözlemlenebilir oluşturur.

Operatör Örneği Oluşturma

C: \> RxJava'da seçtiğiniz herhangi bir düzenleyiciyi kullanarak aşağıdaki Java programını oluşturun.

ObservableTester.java

import io.reactivex.Observable;
//Using fromArray operator to create an Observable
public class ObservableTester  {
   public static void main(String[] args) { 
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable = Observable.fromArray(letters);
      observable
         .map(String::toUpperCase)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

Sonucu Doğrulayın

Kullanarak sınıfı derleyin javac aşağıdaki gibi derleyici -

C:\RxJava>javac ObservableTester.java

Şimdi ObservableTester'ı aşağıdaki gibi çalıştırın -

C:\RxJava>java ObservableTester

Aşağıdaki çıktıyı üretmelidir -

ABCDEFG

Aşağıda, bir Gözlemlenebilirden yayılan bir öğeyi dönüştürmek için kullanılan operatörler verilmiştir.

Sr.No. Operatör ve Açıklama
1

Buffer

Observable'daki öğeleri periyodik olarak paketler halinde toplar ve ardından öğeler yerine demetleri yayınlar.

2

FlatMap

İç içe gözlemlenebilirlerde kullanılır. Öğeleri Gözlemlenebilirlere dönüştürür. Ardından öğeleri tek bir Gözlemlenebilir olarak düzleştirin.

3

GroupBy

Bir Gözlemlenebilir Öğeyi, farklı öğe gruplarını yaymak için anahtara göre düzenlenmiş bir Gözlemlenebilirler grubuna bölün.

4

Map

Verilen her öğeye dönüştürmek için bir işlev uygulayın.

5

Scan

Yayılan her öğeye sırayla bir işlev uygulayın ve ardından ardışık değeri yayınlayın.

6

Window

Gözlemlenebilir öğelerden düzenli aralıklarla Gözlemlenebilir pencerelerdeki öğeleri toplar ve ardından öğeler yerine pencereleri yayar.

Operatör Örneğini Dönüştürme

C: \> RxJava'da seçtiğiniz herhangi bir düzenleyiciyi kullanarak aşağıdaki Java programını oluşturun.

ObservableTester.java

import io.reactivex.Observable;
//Using map operator to transform an Observable
public class ObservableTester  { 
   public static void main(String[] args) {    
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable = Observable.fromArray(letters);
      observable
         .map(String::toUpperCase)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

Sonucu Doğrulayın

Kullanarak sınıfı derleyin javac aşağıdaki gibi derleyici -

C:\RxJava>javac ObservableTester.java

Şimdi ObservableTester'ı aşağıdaki gibi çalıştırın -

C:\RxJava>java ObservableTester

Aşağıdaki çıktıyı üretmelidir -

ABCDEFG

Aşağıdakiler, bir Gözlemlenebilir Öğeden seçici olarak öğe (leri) yaymak için kullanılan operatörlerdir.

Sr.No. Operatör ve Açıklama
1

Debounce

Öğeleri yalnızca, başka bir öğe yaymadan zaman aşımı oluştuğunda yayar.

2

Distinct

Yalnızca benzersiz öğeleri yayar.

3

ElementAt

Yalnızca bir Observable tarafından yayılan n dizindeki öğeyi yayar.

4

Filter

Yalnızca verilen yüklem işlevini geçen öğeleri yayar.

5

First

Verilen ölçütü geçen ilk öğeyi veya ilk öğeyi yayar.

6

IgnoreElements

Observable'dan herhangi bir öğe yaymayın, ancak tamamlandığını gösterir.

7

Last

Observable'dan son öğeyi yayar.

8

Sample

Belirli bir zaman aralığına sahip en son öğeyi yayınlar.

9

Skip

Bir Gözlemlenebilirden ilk n öğeyi atlar.

10

SkipLast

Bir Gözlemlenebilirden son n öğeyi atlar.

11

Take

Bir Gözlemlenebilirden ilk n maddeyi alır.

12

TakeLast

bir Gözlemlenebilirden son n maddeyi alır.

Filtreleme Operatörü Örneği

C: \> RxJava'da seçtiğiniz herhangi bir düzenleyiciyi kullanarak aşağıdaki Java programını oluşturun.

ObservableTester.java

import io.reactivex.Observable;
//Using take operator to filter an Observable
public class ObservableTester  {
   public static void main(String[] args) {    
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable = Observable.fromArray(letters);
      observable
         .take(2)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

Sonucu Doğrulayın

Kullanarak sınıfı derleyin javac aşağıdaki gibi derleyici -

C:\RxJava>javac ObservableTester.java

Şimdi ObservableTester'ı aşağıdaki gibi çalıştırın -

C:\RxJava>java ObservableTester

Aşağıdaki çıktıyı üretmelidir -

ab

Aşağıdakiler, birden fazla Gözlemlenebilirden tek bir Gözlemlenebilir oluşturmak için kullanılan operatörlerdir.

Sr.No. Operatör ve Açıklama
1 And/Then/When

Kalıp ve Plan aracılarını kullanarak eşya setlerini birleştirin.

2 CombineLatest

Her bir Gözlemlenebilir tarafından yayılan en son öğeyi belirli bir işlev aracılığıyla birleştirin ve sonuçlanan öğeyi yayınlayın.

3 Join

İkinci Gözlemlenebilir yayılan öğenin zaman dilimi sırasında yayılırsa, iki Gözlemlenebilir Öğe tarafından yayılan öğeleri birleştirin.

4 Merge

Gözlemlenebilirlerden yayılan öğeleri birleştirir.

5 StartWith

Gözlemlenebilir kaynaktan öğeleri yaymaya başlamadan önce belirli bir öğe dizisi yayınlayın

6 Switch

Gözlemlenebilirler tarafından yayılan en son öğeleri yayar.

7 Zip

Gözlemlenebilir öğelerini işleve dayalı olarak birleştirir ve sonuçlanan öğeleri yayar.

Operatör Örneğini Birleştirme

C: \> RxJava'da seçtiğiniz herhangi bir düzenleyiciyi kullanarak aşağıdaki Java programını oluşturun.

ObservableTester.java

import io.reactivex.Observable;
//Using combineLatest operator to combine Observables
public class ObservableTester {
   public static void main(String[] args) {    
      Integer[] numbers = { 1, 2, 3, 4, 5, 6};
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable1 = Observable.fromArray(letters);
      Observable<Integer> observable2 = Observable.fromArray(numbers);
      Observable.combineLatest(observable1, observable2, (a,b) -> a + b)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

Sonucu Doğrulayın

Kullanarak sınıfı derleyin javac aşağıdaki gibi derleyici -

C:\RxJava>javac ObservableTester.java

Şimdi ObservableTester'ı aşağıdaki gibi çalıştırın -

C:\RxJava>java ObservableTester

Aşağıdaki çıktıyı üretmelidir -

g1g2g3g4g5g6

Gözlemlenebilirler ile genellikle yararlı olan operatörler aşağıdadır.

Sr.No. Operatör ve Açıklama
1

Delay

Gözlemlenebilir yaşam döngüsü olaylarını işlemek için eylem kaydedin.

2

Materialize/Dematerialize

Yayılan öğeyi ve gönderilen bildirimi temsil eder.

3

ObserveOn

Gözlemlenecek planlayıcıyı belirtin.

4

Serialize

Gözlemlenebilirliği seri hale getirilmiş aramalar yapmaya zorla.

5

Subscribe

Gözlemlenebilir bir cihazdan tamamlanmış gibi öğelerin ve bildirimlerin emisyonları üzerinde çalışın

6

SubscribeOn

Abone olduğunda bir Observable tarafından kullanılacak planlayıcıyı belirtin.

7

TimeInterval

Bir Gözlemlenebilir Öğeyi, emisyonlar arasında geçen sürenin göstergelerini yayacak şekilde dönüştürün.

8

Timeout

Belirtilen süre herhangi bir öğe yaymadan oluşursa hata bildirimi verir.

9

Timestamp

Yayınlanan her öğeye zaman damgası ekleyin.

9

Using

Tek kullanımlık bir kaynak oluşturur veya Gözlemlenebilir ile aynı ömrü oluşturur.

Yardımcı Operatör Örneği

C: \> RxJava'da seçtiğiniz herhangi bir düzenleyiciyi kullanarak aşağıdaki Java programını oluşturun.

ObservableTester.java

import io.reactivex.Observable;
//Using subscribe operator to subscribe to an Observable
public class ObservableTester  {
   public static void main(String[] args) {    
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable = Observable.fromArray(letters);
      observable.subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

Sonucu Doğrulayın

Kullanarak sınıfı derleyin javac aşağıdaki gibi derleyici -

C:\RxJava>javac ObservableTester.java

Şimdi ObservableTester'ı aşağıdaki gibi çalıştırın -

C:\RxJava>java ObservableTester

Aşağıdaki çıktıyı üretmelidir -

abcdefg

Aşağıda, bir veya daha fazla Gözlemlenebilir Öğeyi veya yayılan öğeyi değerlendiren operatörler yer almaktadır.

Sr.No. Operatör ve Açıklama
1

All

Verilen kriterleri karşılamak için yayılan tüm öğeleri değerlendirir.

2

Amb

Yalnızca birden fazla Gözlemlenebilir verildiğinde, ilk Gözlenebilirden tüm öğeleri yayar.

3

Contains

Bir Gözlemlenebilir Öğenin belirli bir öğeyi yayıp yaymadığını kontrol eder.

4

DefaultIfEmpty

Observable hiçbir şey yaymazsa varsayılan öğeyi yayar.

5

SequenceEqual

İki Gözlemlenebilir Öğenin aynı öğe sırasını yayıp yaymadığını kontrol eder.

6

SkipUntil

İlk Gözlemlenebilir tarafından yayılan öğeleri, ikinci bir Gözlemlenebilir öğe bir öğe yayana kadar atar.

7

SkipWhile

Belirli bir koşul yanlış hale gelene kadar bir Gözlemlenebilirlik tarafından yayılan öğeleri atın.

8

TakeUntil

İkinci bir Gözlemlenebilir Öğeyi yayınladıktan veya sona erdikten sonra bir Gözlemlenebilirlik tarafından yayılan öğeleri atar.

9

TakeWhile

Belirli bir koşul yanlış hale geldikten sonra bir Gözlemlenebilirlik tarafından yayılan öğeleri atın.

Koşullu Operatör Örneği

C: \> RxJava'da seçtiğiniz herhangi bir düzenleyiciyi kullanarak aşağıdaki Java programını oluşturun.

ObservableTester.java

import io.reactivex.Observable;
//Using defaultIfEmpty operator to operate on an Observable
public class ObservableTester  {
   public static void main(String[] args) {    
      final StringBuilder result = new StringBuilder();
      Observable.empty()
      .defaultIfEmpty("No Data")
      .subscribe(s -> result.append(s));
      System.out.println(result);
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result1 = new StringBuilder();
      Observable.fromArray(letters)
      .firstElement()
      .defaultIfEmpty("No data")   
      .subscribe(s -> result1.append(s));
      System.out.println(result1);
   }
}

Sonucu Doğrulayın

Kullanarak sınıfı derleyin javac aşağıdaki gibi derleyici -

C:\RxJava>javac ObservableTester.java

Şimdi ObservableTester'ı aşağıdaki gibi çalıştırın -

C:\RxJava>java ObservableTester

Aşağıdaki çıktıyı üretmelidir -

No Data
a

Aşağıda, bir Gözlemlenebilir Cihaz tarafından yayılan tüm öğeler üzerinde çalışan operatörler yer almaktadır.

Sr.No. Operatör ve Açıklama
1

Average

Tüm öğelerin ortalamasını değerlendirir ve sonucu yayınlar.

2

Concat

Araya girmeden birden çok Gözlemlenebilirden tüm öğeleri yayar.

3

Count

Tüm öğeleri sayar ve sonucu yayınlar.

4

Max

Tüm öğelerin maksimum değerli öğesini değerlendirir ve sonucu yayınlar.

5

Min

Tüm öğelerin minimum değerli öğesini değerlendirir ve sonucu yayınlar.

6

Reduce

Her öğeye bir işlev uygulayın ve sonucu döndürün.

7

Sum

Tüm öğelerin toplamını değerlendirir ve sonucu yayınlar.

Matematiksel Operatör Örneği

C: \> RxJava'da seçtiğiniz herhangi bir düzenleyiciyi kullanarak aşağıdaki Java programını oluşturun.

ObservableTester.java

import io.reactivex.Observable;
//Using concat operator to operate on multiple Observables
public class ObservableTester  {
   public static void main(String[] args)  throws InterruptedException {    
      Integer[] numbers = { 1, 2, 3, 4, 5, 6};
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      Observable<String> observable1 = Observable.fromArray(letters);
      Observable<Integer> observable2 = Observable.fromArray(numbers);
      Observable.concat(observable1, observable2)
         .subscribe( letter -> result.append(letter));
      System.out.println(result);
   }
}

Sonucu Doğrulayın

Kullanarak sınıfı derleyin javac aşağıdaki gibi derleyici -

C:\RxJava>javac ObservableTester.java

Şimdi ObservableTester'ı aşağıdaki gibi çalıştırın -

C:\RxJava>java ObservableTester

Aşağıdaki çıktıyı üretmelidir -

abcdefg123456

Abonelik üzerinde daha kesin bir kontrole sahip olan operatörler aşağıdadır.

Sr.No. Operatör ve Açıklama
1

Connect

Bağlanabilir bir Observable'ı abonelerine öğe göndermesi için yönlendirin.

2

Publish

Bir Gözlemlenebilir Öğeyi bağlanabilir Gözlemlenebilir'e dönüştürür.

3

RefCount

Bağlanabilir bir Gözlemlenebilir'i sıradan Gözlenebilir'e dönüştürür.

4

Replay

Gözlemlenebilir öğe yaymaya başladıktan ve aboneler daha sonra abone olduktan sonra bile, her abone tarafından aynı sırayla yayımlanan öğelerin görülmesini sağlayın.

Bağlanabilir Operatör Örneği

C: \> RxJava'da seçtiğiniz herhangi bir düzenleyiciyi kullanarak aşağıdaki Java programını oluşturun.

ObservableTester.java

import io.reactivex.Observable;
import io.reactivex.observables.ConnectableObservable;
//Using connect operator on a ConnectableObservable
public class ObservableTester {
   public static void main(String[] args) {
      String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
      final StringBuilder result = new StringBuilder();
      ConnectableObservable<String> connectable = Observable.fromArray(letters).publish();      
      connectable.subscribe(letter -> result.append(letter));
      System.out.println(result.length());
      connectable.connect();
      System.out.println(result.length());
      System.out.println(result);
   }
}

Sonucu Doğrulayın

Kullanarak sınıfı derleyin javac aşağıdaki gibi derleyici -

C:\RxJava>javac ObservableTester.java

Şimdi ObservableTester'ı aşağıdaki gibi çalıştırın -

C:\RxJava>java ObservableTester

Aşağıdaki çıktıyı üretmelidir -

0
7
abcdefg

Göre Reactive, bir Özne hem Gözlemlenebilir hem de Gözlemci olarak hareket edebilir.

Özne, hem gözlemci hem de Gözlemlenebilir olarak hareket eden bazı ReactiveX uygulamalarında bulunan bir tür köprü veya proxy'dir. Gözlemci olduğu için bir veya daha fazla Gözlemlenebilir'e abone olabilir ve bir Gözlemlenebilir olduğu için gözlemlediği öğeleri tekrar göndererek geçebilir ve yeni öğeler de yayabilir.

Dört tür Konu vardır -

Sr.No. Konu açıklaması
1

Publish Subject

Yalnızca abonelikten sonra yayınlanan öğeleri yayar.

2 Replay Subject

Observable kaynağına ne zaman abone olduğuna bakılmaksızın Observable tarafından yayılan tüm öğeleri yayar.

3

Behavior Subject

Abonelikten sonra, en son öğeyi yayar ve ardından Gözlemlenebilir kaynak tarafından yayımlanan öğeyi yaymaya devam eder.

4

Async Subject

Yayımı tamamlandıktan sonra Gözlemlenebilir kaynak tarafından yayılan son öğeyi yayar.

PublishSubject öğeleri şu anda abone olan Gözlemcilere ve terminal olaylarını mevcut veya geç Gözlemcilere gönderir.

Sınıf Beyanı

Aşağıdaki beyanı io.reactivex.subjects.PublishSubject<T> sınıf -

public final class PublishSubject<T>
extends Subject<T>

PublishSubject Örneği

C: \> RxJava'da seçtiğiniz herhangi bir düzenleyiciyi kullanarak aşağıdaki Java programını oluşturun.

ObservableTester.java

import io.reactivex.subjects.PublishSubject;
public class ObservableTester  {
   public static void main(String[] args) {   
      final StringBuilder result1 = new StringBuilder();
      final StringBuilder result2 = new StringBuilder();         

      PublishSubject<String> subject = PublishSubject.create(); 
      subject.subscribe(value -> result1.append(value) ); 
      subject.onNext("a"); 
      subject.onNext("b"); 
      subject.onNext("c"); 
      subject.subscribe(value -> result2.append(value)); 
      subject.onNext("d"); 
      subject.onComplete();

      //Output will be abcd 
      System.out.println(result1);
      //Output will be d only
      //as subscribed after c item emitted.
      System.out.println(result2);
   }
}

Sonucu Doğrulayın

Kullanarak sınıfı derleyin javac aşağıdaki gibi derleyici -

C:\RxJava>javac ObservableTester.java

Şimdi ObservableTester'ı aşağıdaki gibi çalıştırın -

C:\RxJava>java ObservableTester

Aşağıdaki çıktıyı üretmelidir -

abcd
d

BehaviorSubject, gözlemlediği en son öğeyi ve ardından tüm gözlemlenen öğeleri abone olunan her bir Observer'a gönderir.

Sınıf Beyanı

Aşağıdaki beyanı io.reactivex.subjects.BehaviorSubject<T> sınıf -

public final class BehaviorSubject<T>
extends Subject<T>

BehaviorSubject Örneği

C: \> RxJava'da seçtiğiniz herhangi bir düzenleyiciyi kullanarak aşağıdaki Java programını oluşturun.

ObservableTester.java

import io.reactivex.subjects.BehaviorSubject;
public class ObservableTester  {
   public static void main(String[] args) {   
      final StringBuilder result1 = new StringBuilder();
      final StringBuilder result2 = new StringBuilder();         
      BehaviorSubject<String> subject =  BehaviorSubject.create(); 
      subject.subscribe(value -> result1.append(value) ); 
      subject.onNext("a"); 
      subject.onNext("b"); 
      subject.onNext("c"); 
      subject.subscribe(value -> result2.append(value)); 
      subject.onNext("d"); 
      subject.onComplete();
      //Output will be abcd
      System.out.println(result1);
      //Output will be cd being BehaviorSubject 
      //(c is last item emitted before subscribe)
      System.out.println(result2);
   }
}

Sonucu Doğrulayın

Kullanarak sınıfı derleyin javac aşağıdaki gibi derleyici -

C:\RxJava>javac ObservableTester.java

Şimdi ObservableTester'ı aşağıdaki gibi çalıştırın -

C:\RxJava>java ObservableTester

Aşağıdaki çıktıyı üretmelidir -

abcd
cd

ReplaySubject olayları / öğeleri mevcut ve geç Gözlemciler için tekrar oynatır.

Sınıf Beyanı

Aşağıdaki beyanı io.reactivex.subjects.ReplaySubject<T> sınıf -

public final class ReplaySubject<T>
extends Subject<T>

ReplaySubject Örneği

C: \> RxJava'da seçtiğiniz herhangi bir düzenleyiciyi kullanarak aşağıdaki Java programını oluşturun.

ObservableTester.java

import io.reactivex.subjects.ReplaySubject;
public class ObservableTester  {
   public static void main(String[] args) {   
      final StringBuilder result1 = new StringBuilder();
      final StringBuilder result2 = new StringBuilder();         

      ReplaySubject<String> subject = ReplaySubject.create(); 
      subject.subscribe(value -> result1.append(value) ); 
      subject.onNext("a"); 
      subject.onNext("b"); 
      subject.onNext("c"); 
      subject.subscribe(value -> result2.append(value)); 
      subject.onNext("d"); 
      subject.onComplete();

      //Output will be abcd
      System.out.println(result1);
      //Output will be abcd being ReplaySubject
      //as ReplaySubject emits all the items
      System.out.println(result2);
   }
}

Sonucu Doğrulayın

Kullanarak sınıfı derleyin javac aşağıdaki gibi derleyici -

C:\RxJava>javac ObservableTester.java

Şimdi ObservableTester'ı aşağıdaki gibi çalıştırın -

C:\RxJava>java ObservableTester

Aşağıdaki çıktıyı üretmelidir -

abcd
abcd

AsyncSubject tek son değeri, ardından bir tamamlama olayını veya alınan hatayı Gözlemciler'e yayar.

Sınıf Beyanı

Aşağıdaki beyanı io.reactivex.subjects.AsyncSubject<T> sınıf -

public final class  AsyncSubject<T>
extends Subject<T>

AsyncSubject Örneği

C: \> RxJava'da seçtiğiniz herhangi bir düzenleyiciyi kullanarak aşağıdaki Java programını oluşturun.

ObservableTester.java

import io.reactivex.subjects. AsyncSubject;
public class ObservableTester  {
   public static void main(String[] args) {   
      final StringBuilder result1 = new StringBuilder();
      final StringBuilder result2 = new StringBuilder();         

      AsyncSubject<String> subject =  AsyncSubject.create(); 
      subject.subscribe(value -> result1.append(value) ); 
      subject.onNext("a"); 
      subject.onNext("b"); 
      subject.onNext("c"); 
      subject.subscribe(value -> result2.append(value)); 
      subject.onNext("d"); 
      subject.onComplete();

      //Output will be d being the last item emitted
      System.out.println(result1);
      //Output will be d being the last item emitted     
      System.out.println(result2);
   }
}

Sonucu Doğrulayın

Kullanarak sınıfı derleyin javac aşağıdaki gibi derleyici -

C:\RxJava>javac ObservableTester.java

Şimdi ObservableTester'ı aşağıdaki gibi çalıştırın -

C:\RxJava>java ObservableTester

Aşağıdaki çıktıyı üretmelidir -

d
d

Zamanlayıcılar, Gözlemlenebilir operatörlerle çalışmak için çoklu iş parçacığı ortamında kullanılır.

Göre Reactive, Zamanlayıcı, işleçler zincirinin farklı iş parçacıklarına nasıl uygulanacağını planlamak için kullanılır.

Varsayılan olarak, bir Observable ve ona uyguladığınız işleçler zinciri işini yapacak ve gözlemcilerini Abone olma yönteminin çağrıldığı aynı iş parçacığı üzerinden bilgilendirecektir. Abone Olma işleci, Gözlemlenebilir öğenin üzerinde çalışması gereken farklı bir Zamanlayıcı belirleyerek bu davranışı değiştirir. ObserveOn işleci, Gözlemlenebilir'in gözlemcilerine bildirim göndermek için kullanacağı farklı bir Zamanlayıcı belirtir.

RxJava'da aşağıdaki Planlayıcı türleri mevcuttur -

Sr.No. Zamanlayıcı ve Açıklama
1

Schedulers.computation()

Hesaplama çalışması için tasarlanmış bir Zamanlayıcı oluşturur ve döndürür. Programlanacak iş parçacığı sayısı, sistemde bulunan CPU'lara bağlıdır. CPU başına bir iş parçacığına izin verilir. Olay döngüleri veya geri arama işlemleri için idealdir.

2

Schedulers.io()

GÇ'ye bağlı çalışmaya yönelik bir Zamanlayıcı oluşturur ve döndürür. İş parçacığı havuzu gerektiği kadar genişletilebilir.

3

Schedulers.newThread()

Her çalışma birimi için yeni bir İş Parçacığı oluşturan bir Zamanlayıcı oluşturur ve döndürür.

4

Schedulers.trampoline()

Geçerli iş tamamlandıktan sonra yürütülecek olan mevcut iş parçacığı üzerinde kuyrukların çalıştığı bir Zamanlayıcı oluşturur ve döndürür.

4

Schedulers.from(java.util.concurrent.Executor executor)

Bir Yürütücüyü yeni bir Zamanlayıcı örneğine dönüştürür.

Schedulers.trampoline () yöntemi, geçerli iş tamamlandıktan sonra yürütülecek geçerli iş parçacığı üzerinde çalışmayı sıraya koyan bir Zamanlayıcı oluşturur ve döndürür.

Schedulers.trampoline () Örnek

C: \> RxJava'da seçtiğiniz herhangi bir düzenleyiciyi kullanarak aşağıdaki Java programını oluşturun.

ObservableTester.java

import java.util.Random;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread " 
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.trampoline()))
         .subscribe(length -> System.out.println("Receiver Thread " 
            + Thread.currentThread().getName() 
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

Sonucu Doğrulayın

Kullanarak sınıfı derleyin javac aşağıdaki gibi derleyici -

C:\RxJava>javac ObservableTester.java

Şimdi ObservableTester'ı aşağıdaki gibi çalıştırın -

C:\RxJava>java ObservableTester

Aşağıdaki çıktıyı üretmelidir -

Processing Thread main
Receiver Thread main, Item length 1
Processing Thread main
Receiver Thread main, Item length 2
Processing Thread main
Receiver Thread main, Item length 3

Schedulers.newThread () yöntemi, her çalışma birimi için yeni bir İş Parçacığı oluşturan bir Zamanlayıcı oluşturur ve döndürür.

Schedulers.newThread () Örnek

C: \> RxJava'da seçtiğiniz herhangi bir düzenleyiciyi kullanarak aşağıdaki Java programını oluşturun.

ObservableTester.java

import java.util.Random;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread " 
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.newThread()))
         .subscribe(length -> System.out.println("Receiver Thread " 
            + Thread.currentThread().getName() 
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

Sonucu Doğrulayın

Kullanarak sınıfı derleyin javac aşağıdaki gibi derleyici -

C:\RxJava>javac ObservableTester.java

Şimdi ObservableTester'ı aşağıdaki gibi çalıştırın -

C:\RxJava>java ObservableTester

Aşağıdaki çıktıyı üretmelidir -

Processing Thread RxNewThreadScheduler-1
Receiver Thread RxNewThreadScheduler-1, Item length 1
Processing Thread RxNewThreadScheduler-2
Receiver Thread RxNewThreadScheduler-2, Item length 2
Processing Thread RxNewThreadScheduler-3
Receiver Thread RxNewThreadScheduler-3, Item length 3

Schedulers.computation () yöntemi, hesaplama çalışması için tasarlanmış bir Zamanlayıcı oluşturur ve döndürür. Programlanacak iş parçacığı sayısı, sistemde bulunan CPU'lara bağlıdır. CPU başına bir iş parçacığına izin verilir. Olay döngüleri veya geri arama işlemleri için idealdir.

Schedulers.computation () Örnek

C: \> RxJava'da seçtiğiniz herhangi bir düzenleyiciyi kullanarak aşağıdaki Java programını oluşturun.

ObservableTester.java

import java.util.Random;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread " 
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.computation()))
         .subscribe(length -> System.out.println("Receiver Thread " 
            + Thread.currentThread().getName() 
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

Sonucu Doğrulayın

Kullanarak sınıfı derleyin javac aşağıdaki gibi derleyici -

C:\RxJava>javac ObservableTester.java

Şimdi ObservableTester'ı aşağıdaki gibi çalıştırın -

C:\RxJava>java ObservableTester

Aşağıdaki çıktıyı üretmelidir -

Processing Thread RxComputationThreadPool-1
Receiver Thread RxComputationThreadPool-1, Item length 1
Processing Thread RxComputationThreadPool-2
Receiver Thread RxComputationThreadPool-2, Item length 2
Processing Thread RxComputationThreadPool-3
Receiver Thread RxComputationThreadPool-3, Item length 3

Schedulers.io () yöntemi, GÇ'ye bağlı çalışmaya yönelik bir Zamanlayıcı oluşturur ve döndürür. İş parçacığı havuzu gerektiği kadar genişletilebilir. Yoğun I / O operasyonları için idealdir.

Schedulers.io () Örnek

C: \> RxJava'da seçtiğiniz herhangi bir düzenleyiciyi kullanarak aşağıdaki Java programını oluşturun.

ObservableTester.java

import java.util.Random;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread " 
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.io()))
         .subscribe(length -> System.out.println("Receiver Thread " 
            + Thread.currentThread().getName() 
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

Sonucu Doğrulayın

Kullanarak sınıfı derleyin javac aşağıdaki gibi derleyici -

C:\RxJava>javac ObservableTester.java

Şimdi ObservableTester'ı aşağıdaki gibi çalıştırın -

C:\RxJava>java ObservableTester

Aşağıdaki çıktıyı üretmelidir -

Processing Thread RxCachedThreadScheduler-1
Receiver Thread RxCachedThreadScheduler-1, Item length 1
Processing Thread RxCachedThreadScheduler-1
Receiver Thread RxCachedThreadScheduler-1, Item length 2
Processing Thread RxCachedThreadScheduler-1
Receiver Thread RxCachedThreadScheduler-1, Item length 3

Schedulers.from (Executor) yöntemi, bir Yürütücüyü yeni bir Zamanlayıcı örneğine dönüştürür.

Schedulers.from (Yürütücü) Örnek

C: \> RxJava'da seçtiğiniz herhangi bir düzenleyiciyi kullanarak aşağıdaki Java programını oluşturun.

ObservableTester.java

import java.util.Random;
import java.util.concurrent.Executors;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class ObservableTester  {
   public static void main(String[] args) throws InterruptedException {
      Observable.just("A", "AB", "ABC")
         .flatMap(v -> getLengthWithDelay(v)
         .doOnNext(s -> System.out.println("Processing Thread " 
            + Thread.currentThread().getName()))
         .subscribeOn(Schedulers.from(Executors.newFixedThreadPool(3))))
         .subscribe(length -> System.out.println("Receiver Thread " 
            + Thread.currentThread().getName() 
            + ", Item length " + length));

         Thread.sleep(10000);
   }
   protected static Observable<Integer> getLengthWithDelay(String v) {
      Random random = new Random();
      try {
         Thread.sleep(random.nextInt(3) * 1000);
         return Observable.just(v.length());
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
      return null;
   }
}

Sonucu Doğrulayın

Kullanarak sınıfı derleyin javac aşağıdaki gibi derleyici -

C:\RxJava>javac ObservableTester.java

Şimdi ObservableTester'ı aşağıdaki gibi çalıştırın -

C:\RxJava>java ObservableTester

Aşağıdaki çıktıyı üretmelidir -

Processing Thread pool-1-thread-1
Processing Thread pool-3-thread-1
Receiver Thread pool-1-thread-1, Item length 1
Processing Thread pool-4-thread-1
Receiver Thread pool-4-thread-1, Item length 3
Receiver Thread pool-3-thread-1, Item length 2

Tamponlama operatörü, bir Gözlemlenebilir tarafından yayılan öğeleri bir liste veya paketler halinde toplamaya ve öğeler yerine bu paketleri yaymaya izin verir. Aşağıdaki örnekte, 9 öğe yaymak için bir Observable oluşturduk ve tamponlama kullanarak, 3 öğe birlikte yayınlanacak.

Arabelleğe Alma Örneği

C: \> RxJava'da seçtiğiniz herhangi bir düzenleyiciyi kullanarak aşağıdaki Java programını oluşturun.

ObservableTester.java

import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;

import java.util.List;
import java.util.concurrent.TimeUnit;

public class ObservableTester {
   public static void main(String[] args) throws InterruptedException {
      Observable<Integer> observable = Observable.just(1, 2, 3, 4,
         5, 6, 7, 8, 9);

      observable.subscribeOn(Schedulers.io())
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .buffer(3)
         .subscribe(new Observer<List<Integer>>() {
            @Override
            public void onSubscribe(Disposable d) {
               System.out.println("Subscribed");
            }
            @Override
            public void onNext(List<Integer> integers) {
               System.out.println("onNext: ");
               for (Integer value : integers) {
                  System.out.println(value);
               }
            }
            @Override
            public void onError(Throwable e) {
               System.out.println("Error");
            }

            @Override
            public void onComplete() {
               System.out.println("Done! ");
            }
         });
      Thread.sleep(3000);
   }
}

Sonucu Doğrulayın

Kullanarak sınıfı derleyin javac aşağıdaki gibi derleyici -

C:\RxJava>javac ObservableTester.java

Şimdi ObservableTester'ı aşağıdaki gibi çalıştırın -

C:\RxJava>java ObservableTester

Aşağıdaki çıktıyı üretmelidir -

Subscribed
onNext: 
1
2
3
onNext: 
4
5
6
onNext: 
7
8
9
Done!

Pencereleme operatörü, tampon operatörüne benzer şekilde çalışır, ancak bir Gözlemlenebilir tarafından yayılan öğeleri toplama yerine başka bir gözlemlenebilirde toplamaya ve koleksiyonlar yerine bu Gözlemlenebilir Öğeleri yaymaya izin verir. Aşağıdaki örnekte, 9 öğe yaymak için bir Gözlemlenebilir oluşturduk ve pencere operatörü kullanarak 3 Gözlemlenebilir birlikte yayımlanacak.

Pencereleme Örneği

C: \> RxJava'da seçtiğiniz herhangi bir düzenleyiciyi kullanarak aşağıdaki Java programını oluşturun.

ObservableTester.java

import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;

import java.util.concurrent.TimeUnit;

public class ObservableTester {
   public static void main(String[] args) throws InterruptedException {
      Observable<Integer> observable = Observable.just(1, 2, 3, 4,
         5, 6, 7, 8, 9);

      observable.subscribeOn(Schedulers.io())
         .delay(2, TimeUnit.SECONDS, Schedulers.io())
         .window(3)
         .subscribe(new Observer<Observable<Integer>>() {
            @Override
            public void onSubscribe(Disposable d) {
               System.out.println("Subscribed");
            }
            @Override
            public void onNext(Observable<Integer> integers) {
               System.out.println("onNext: ");
               integers.subscribe(value -> System.out.println(value));
            }
            @Override
            public void onError(Throwable e) {
               System.out.println("Error");
            }

            @Override
            public void onComplete() {
               System.out.println("Done! ");
            }
         });
      Thread.sleep(3000);
   }
}

Sonucu Doğrulayın

Kullanarak sınıfı derleyin javac aşağıdaki gibi derleyici -

C:\RxJava>javac ObservableTester.java

Şimdi ObservableTester'ı aşağıdaki gibi çalıştırın -

C:\RxJava>java ObservableTester

Aşağıdaki çıktıyı üretmelidir -

Subscribed
onNext: 
1
2
3
onNext: 
4
5
6
onNext: 
7
8
9
Done!