여러 관찰 가능 항목을 순서 보존 및 최대 동시성으로 병합하는 방법은 무엇입니까?

Nov 15 2020

중복을 검색했지만 아무것도 찾지 못했습니다. 내가 가진 것은 중첩 된 옵저버 블 IObservable<IObservable<T>>이고 나는 그것을 IObservable<T>. Concat이전 Observable이 완료 될 때까지 각 내부 Observable에 대한 구독을 지연시키기 때문에 연산자 를 사용하고 싶지 않습니다 . 이것은 내부 관측 가능 항목이 차갑기 때문에 문제이며 T외부 관측 가능 항목에서 방출 된 직후 값을 방출하기를 원합니다 . 또한 Merge방출 된 값의 순서를 엉망으로 만들기 때문에 연산자 를 사용하고 싶지 않습니다 . 아래의 대리석 다이어그램은 Merge연산자 의 문제 (제 경우) 동작과 바람직한 병합 동작을 보여줍니다.

Stream of observables: +--1---2---3--|
Observable-1         :    +-A----------B-----|
Observable-2         :        +--C--------D-|
Observable-3         :            +-E--------F----|
Merge                : +----A----C--E--B--D--F----|
Desirable merging    : +----A----------BC-DE-F----|

Observable-1이 내 보낸 모든 값은 Observable-2가 내 보낸 값보다 우선해야합니다. Observable-2와 Observable-3도 마찬가지입니다.

Merge연산자 와 함께 내가 좋아하는 것은 내부 관찰 가능 항목에 대한 최대 동시 구독을 구성 할 수 있다는 것입니다. MergeOrdered구현하려는 사용자 지정 연산자를 사용 하여이 기능을 유지하고 싶습니다 . 내 건설 방법은 다음과 같습니다.

public static IObservable<T> MergeOrdered<T>(
    this IObservable<IObservable<T>> source,
    int maximumConcurrency = Int32.MaxValue)
{
    return source.Merge(maximumConcurrency); // How to make it ordered?
}

다음은 사용 예입니다.

var source = Observable
    .Interval(TimeSpan.FromMilliseconds(300))
    .Take(4)
    .Select(x => Observable
        .Interval(TimeSpan.FromMilliseconds(200))
        .Select(y => $"{x + 1}-{(char)(65 + y)}") .Take(3)); var results = await source.MergeOrdered(2).ToArray(); Console.WriteLine($"Results: {String.Join(", ", results)}");

출력 (바람직하지 않음) :

Results: 1-A, 1-B, 2-A, 1-C, 2-B, 3-A, 2-C, 3-B, 4-A, 3-C, 4-B, 4-C

바람직한 출력은 다음과 같습니다.

Results: 1-A, 1-B, 1-C, 2-A, 2-B, 2-C, 3-A, 3-B, 3-C, 4-A, 4-B, 4-C

설명 : 값의 순서와 관련하여 값 자체는 관련이 없습니다. 중요한 것은 원래 내부 시퀀스의 순서와 해당 시퀀스에서의 위치입니다. 첫 번째 내부 시퀀스의 모든 값을 먼저 (원래 순서대로) 내보내고 두 번째 내부 시퀀스의 모든 값을 내 보낸 다음 세 번째 내부 시퀀스의 모든 값을 내 보내야합니다.

답변

Enigmativity Nov 15 2020 at 13:47

이 옵저버 블이 내부 옵저버 블의 마지막 값이 생성되어야하는 첫 번째 값인지 알 수있는 방법은 없습니다.

예를 들어 다음과 같이 할 수 있습니다.

Stream of observables: +--1---2---3--|
Observable-1         :    +------------B--------A-|
Observable-2         :        +--C--------D-|
Observable-3         :            +-E--------F-|
Desirable merging    : +------------------------ABCDEF|

이 경우 다음을 수행합니다.

IObservable<char> query =
    sources
        .ToObservable()
        .Merge()
        .ToArray()
        .SelectMany(xs => xs.OrderBy(x => x));
TheodorZoulias Nov 18 2020 at 01:05

제어 가능한 방식으로 내부 시퀀스를 따뜻하게 (게시)하여이 문제에 대한 해결책을 찾았습니다. 이 솔루션은 Replay온도를 제어하기 위해 연산자를 사용 SemaphoreSlim하고 동시성을 제어 하기 위해 a 를 사용합니다. 최종 Concat연산자는 각 내부 시퀀스의 값이 원하는 순서로 (순차적으로) 방출되도록합니다.

/// <summary>
/// Merges elements from all inner observable sequences into a single observable
/// sequence, preserving the order of the elements based on the order of their
/// originated sequence, limiting the number of concurrent subscriptions to inner
/// sequences.
/// </summary>
public static IObservable<T> MergeOrdered<T>(
    this IObservable<IObservable<T>> source,
    int maximumConcurrency = Int32.MaxValue)
{
    return Observable.Defer(() =>
    {
        var semaphore = new SemaphoreSlim(maximumConcurrency);
        return source.Select(inner =>
        {
            var published = inner.Replay();
            _ = semaphore.WaitAsync().ContinueWith(_ => published.Connect(),
                TaskScheduler.Default);
            return published.Finally(() => semaphore.Release());
        })
        .Concat();
    });
}

Defer연산자는 다른 갖기 위해 사용되는 SemaphoreSlim각 서브 스크립 (위한 참조 ). SemaphoreSlim여러 구독에 동일하게 사용하면 문제가 발생할 수 있습니다.

에서 현재 구독중인 내부 시퀀스 Concat가 게시 될 이유가 없기 때문에 이것은 완벽한 솔루션이 아닙니다 . 이 비 효율성을 사소한 것이 아니라 최적화하므로 그대로 두겠습니다.