RxPY-Observable 작업
Observable은 옵저버를 생성하여 값이 예상되는 소스에 연결하는 함수입니다 (예 : 클릭, dom 요소의 마우스 이벤트 등).
아래에 언급 된 주제는이 장에서 자세히 살펴볼 것입니다.
Observable 생성
Observable 구독 및 실행
관찰 가능 항목 만들기
Observable을 만들기 위해 우리는 create() 메서드를 사용하고 다음 항목이있는 함수를 전달합니다.
on_next() −이 함수는 Observable이 아이템을 방출 할 때 호출됩니다.
on_completed() −이 함수는 Observable이 완료되면 호출됩니다.
on_error() −이 함수는 Observable에 오류가 발생하면 호출됩니다.
create () 메서드를 사용하려면 먼저 아래와 같이 메서드를 가져옵니다.
from rx import create
다음은 observable을 만드는 작업 예제입니다.
testrx.py
from rx import create
deftest_observable(observer, scheduler):
observer.on_next("Hello")
observer.on_error("Error")
observer.on_completed()
source = create(test_observable).
Observable 구독 및 실행
Observable을 구독하려면 subscribe () 함수를 사용하고 on_next, on_error 및 on_completed 콜백 함수를 전달해야합니다.
다음은 작동 예입니다.
testrx.py
from rx import create
deftest_observable(observer, scheduler):
observer.on_next("Hello")
observer.on_completed()
source = create(test_observable)
source.subscribe(
on_next = lambda i: print("Got - {0}".format(i)),
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!"),
)
subscribe () 메서드는 Observable 실행을 처리합니다. 콜백 함수on_next, on_error 과 on_completedsubscribe 메소드로 전달되어야합니다. subscribe 메서드를 호출하면 test_observable () 함수가 실행됩니다.
세 가지 콜백 함수를 모두 subscribe () 메서드에 전달할 필요는 없습니다. 요구 사항에 따라 on_next (), on_error () 및 on_completed ()를 전달할 수 있습니다.
람다 함수는 on_next, on_error 및 on_completed에 사용됩니다. 인수를 받아 주어진 표현식을 실행합니다.
다음은 생성 된 관찰 가능 항목의 출력입니다.
E:\pyrx>python testrx.py
Got - Hello
Job Done!