RxPY-Observable 작업

Observable은 옵저버를 생성하여 값이 예상되는 소스에 연결하는 함수입니다 (예 : 클릭, dom 요소의 마우스 이벤트 등).

아래에 언급 된 주제는이 장에서 자세히 살펴볼 것입니다.

  • Observable 생성

  • Observable 구독 및 실행

관찰 가능 항목 만들기

Observable을 만들기 위해 우리는 create() 메서드를 사용하고 다음 항목이있는 함수를 전달합니다.

  • on_next() −이 함수는 Observable이 아이템을 방출 할 때 호출됩니다.

  • on_completed() −이 함수는 Observable이 완료되면 호출됩니다.

  • on_error() −이 함수는 Observable에 오류가 발생하면 호출됩니다.

create () 메서드를 사용하려면 먼저 아래와 같이 메서드를 가져옵니다.

from rx import create

다음은 observable을 만드는 작업 예제입니다.

testrx.py

from rx import create
deftest_observable(observer, scheduler):
   observer.on_next("Hello")
   observer.on_error("Error")
   observer.on_completed()
source = create(test_observable).

Observable 구독 및 실행

Observable을 구독하려면 subscribe () 함수를 사용하고 on_next, on_error 및 on_completed 콜백 함수를 전달해야합니다.

다음은 작동 예입니다.

testrx.py

from rx import create
deftest_observable(observer, scheduler):
   observer.on_next("Hello")
   observer.on_completed()
source = create(test_observable)
source.subscribe(
   on_next = lambda i: print("Got - {0}".format(i)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!"),
)

subscribe () 메서드는 Observable 실행을 처리합니다. 콜백 함수on_next, on_erroron_completedsubscribe 메소드로 전달되어야합니다. subscribe 메서드를 호출하면 test_observable () 함수가 실행됩니다.

세 가지 콜백 함수를 모두 subscribe () 메서드에 전달할 필요는 없습니다. 요구 사항에 따라 on_next (), on_error () 및 on_completed ()를 전달할 수 있습니다.

람다 함수는 on_next, on_error 및 on_completed에 사용됩니다. 인수를 받아 주어진 표현식을 실행합니다.

다음은 생성 된 관찰 가능 항목의 출력입니다.

E:\pyrx>python testrx.py
Got - Hello
Job Done!