여러 관찰 가능 항목을 순서 보존 및 최대 동시성으로 병합하는 방법은 무엇입니까?
중복을 검색했지만 아무것도 찾지 못했습니다. 내가 가진 것은 중첩 된 옵저버 블 IObservable<IObservable<T>>
이고 나는 그것을 IObservable<T>
. Concat이전 Observable이 완료 될 때까지 각 내부 Observable에 대한 구독을 지연시키기 때문에 연산자 를 사용하고 싶지 않습니다 . 이것은 내부 관측 가능 항목이 차갑기 때문에 문제이며 T
외부 관측 가능 항목에서 방출 된 직후 값을 방출하기를 원합니다 . 또한 Merge방출 된 값의 순서를 엉망으로 만들기 때문에 연산자 를 사용하고 싶지 않습니다 . 아래의 대리석 다이어그램은 Merge
연산자 의 문제 (제 경우) 동작과 바람직한 병합 동작을 보여줍니다.
Stream of observables: +--1---2---3--|
Observable-1 : +-A----------B-----|
Observable-2 : +--C--------D-|
Observable-3 : +-E--------F----|
Merge : +----A----C--E--B--D--F----|
Desirable merging : +----A----------BC-DE-F----|
Observable-1이 내 보낸 모든 값은 Observable-2가 내 보낸 값보다 우선해야합니다. Observable-2와 Observable-3도 마찬가지입니다.
Merge
연산자 와 함께 내가 좋아하는 것은 내부 관찰 가능 항목에 대한 최대 동시 구독을 구성 할 수 있다는 것입니다. MergeOrdered
구현하려는 사용자 지정 연산자를 사용 하여이 기능을 유지하고 싶습니다 . 내 건설 방법은 다음과 같습니다.
public static IObservable<T> MergeOrdered<T>(
this IObservable<IObservable<T>> source,
int maximumConcurrency = Int32.MaxValue)
{
return source.Merge(maximumConcurrency); // How to make it ordered?
}
다음은 사용 예입니다.
var source = Observable
.Interval(TimeSpan.FromMilliseconds(300))
.Take(4)
.Select(x => Observable
.Interval(TimeSpan.FromMilliseconds(200))
.Select(y => $"{x + 1}-{(char)(65 + y)}") .Take(3)); var results = await source.MergeOrdered(2).ToArray(); Console.WriteLine($"Results: {String.Join(", ", results)}");
출력 (바람직하지 않음) :
Results: 1-A, 1-B, 2-A, 1-C, 2-B, 3-A, 2-C, 3-B, 4-A, 3-C, 4-B, 4-C
바람직한 출력은 다음과 같습니다.
Results: 1-A, 1-B, 1-C, 2-A, 2-B, 2-C, 3-A, 3-B, 3-C, 4-A, 4-B, 4-C
설명 : 값의 순서와 관련하여 값 자체는 관련이 없습니다. 중요한 것은 원래 내부 시퀀스의 순서와 해당 시퀀스에서의 위치입니다. 첫 번째 내부 시퀀스의 모든 값을 먼저 (원래 순서대로) 내보내고 두 번째 내부 시퀀스의 모든 값을 내 보낸 다음 세 번째 내부 시퀀스의 모든 값을 내 보내야합니다.
답변
이 옵저버 블이 내부 옵저버 블의 마지막 값이 생성되어야하는 첫 번째 값인지 알 수있는 방법은 없습니다.
예를 들어 다음과 같이 할 수 있습니다.
Stream of observables: +--1---2---3--|
Observable-1 : +------------B--------A-|
Observable-2 : +--C--------D-|
Observable-3 : +-E--------F-|
Desirable merging : +------------------------ABCDEF|
이 경우 다음을 수행합니다.
IObservable<char> query =
sources
.ToObservable()
.Merge()
.ToArray()
.SelectMany(xs => xs.OrderBy(x => x));
제어 가능한 방식으로 내부 시퀀스를 따뜻하게 (게시)하여이 문제에 대한 해결책을 찾았습니다. 이 솔루션은 Replay온도를 제어하기 위해 연산자를 사용 SemaphoreSlim하고 동시성을 제어 하기 위해 a 를 사용합니다. 최종 Concat연산자는 각 내부 시퀀스의 값이 원하는 순서로 (순차적으로) 방출되도록합니다.
/// <summary>
/// Merges elements from all inner observable sequences into a single observable
/// sequence, preserving the order of the elements based on the order of their
/// originated sequence, limiting the number of concurrent subscriptions to inner
/// sequences.
/// </summary>
public static IObservable<T> MergeOrdered<T>(
this IObservable<IObservable<T>> source,
int maximumConcurrency = Int32.MaxValue)
{
return Observable.Defer(() =>
{
var semaphore = new SemaphoreSlim(maximumConcurrency);
return source.Select(inner =>
{
var published = inner.Replay();
_ = semaphore.WaitAsync().ContinueWith(_ => published.Connect(),
TaskScheduler.Default);
return published.Finally(() => semaphore.Release());
})
.Concat();
});
}
Defer
연산자는 다른 갖기 위해 사용되는 SemaphoreSlim
각 서브 스크립 (위한 참조 ). SemaphoreSlim
여러 구독에 동일하게 사용하면 문제가 발생할 수 있습니다.
에서 현재 구독중인 내부 시퀀스 Concat
가 게시 될 이유가 없기 때문에 이것은 완벽한 솔루션이 아닙니다 . 이 비 효율성을 사소한 것이 아니라 최적화하므로 그대로 두겠습니다.