DataflowBlock ITargetSource.AsObservable () non attiva OnNext ()

Aug 28 2020

Sto cercando di utilizzare un dataflowblock e ho bisogno di spiare gli elementi che passano per il test unitario.

Per fare ciò, sto usando il AsObservable()metodo sul ISourceBlock<T>mio TransformBlock<Tinput, T>, così posso controllare dopo l'esecuzione che ogni blocco della mia pipeline abbia generato i valori attesi.

Tubatura

{
   ...
   var observer = new MyObserver<string>();
   _block  = new TransformManyBlock<string, string>(MyHandler, options);
   _block.LinkTo(_nextBlock);
   _block.AsObservable().Subscribe(observer);
   _block.Post("Test");
   ...
}

MyObserver

public class MyObserver<T> : IObserver<T>
{
    public List<Exception> Errors = new List<Exception>();
    public bool IsComplete = false;
    public List<T> Values = new List<T>();

    public void OnCompleted()
    {
        IsComplete = true;
    }

    public void OnNext(T value)
    {
        Values.Add(value);
    }

    public void OnError(Exception e)
    {
        Errors.Add(e);
    }
}

Quindi fondamentalmente iscrivo il mio osservatore al transformblock e mi aspetto che ogni valore che passa attraverso venga registrato nella mia lista dei "valori" dell'osservatore.

Ma, mentre IsCompleteè impostato su true e l' OnError()eccezione di registrazione riuscita, il OnNext()metodo non viene mai chiamato a meno che non sia l'ultimo blocco della pipeline ... Non riesco a capire perché, perché il "nextblock" è stato collegato correttamente a questo sourceBlock ricevere i dati, dimostrando che alcuni dati stanno uscendo dal blocco.

Da quello che ho capito, AsObservabledovrebbe riportare tutti i valori che escono dal blocco e non solo i valori che non sono stati consumati da altri blocchi collegati ...

Che cosa sto facendo di sbagliato ?

Risposte

2 00110001 Aug 28 2020 at 17:08

I vostri messaggi vengono consumati da _nextBlockprima di ottenere la possibilità di leggerli.

Se commentate questa riga _block.LinkTo(_nextBlock);, probabilmente funzionerebbe.

AsObservablel'unico scopo è solo quello di consentire a un blocco di essere consumato da RX . Non cambia il funzionamento interno del blocco per trasmettere messaggi a più destinazioni . Hai bisogno di un blocco speciale per questoBroadcastBlock

Suggerirei di trasmettere a un altro blocco e di usarlo perSubscribe

La missione di BroadcastBlock nella vita è consentire a tutti i bersagli collegati dal blocco di ottenere una copia di ogni elemento pubblicato

var options = new DataflowLinkOptions {PropagateCompletion = true};


var broadcastBlock = new BroadcastBlock<string>(x => x);
var bufferBlock = new BufferBlock<string>();
var actionBlock = new ActionBlock<string>(s => Console.WriteLine("Action " + s));

broadcastBlock.LinkTo(bufferBlock, options);
broadcastBlock.LinkTo(actionBlock, options);

bufferBlock.AsObservable().Subscribe(s => Console.WriteLine("peek " + s));

for (var i = 0; i < 5; i++)
   await broadcastBlock.SendAsync(i.ToString());

broadcastBlock.Complete();
await actionBlock.Completion;

Produzione

peek 0
Action 0
Action 1
Action 2
Action 3
Action 4
peek 1
peek 2
peek 3
peek 4