DataflowBlock ITargetSource.AsObservable () non attiva OnNext ()
Sto cercando di utilizzare un dataflowblock e ho bisogno di spiare gli elementi che passano per il test unitario.
Per fare ciò, sto usando il AsObservable()metodo sul ISourceBlock<T>mio TransformBlock<Tinput, T>, così posso controllare dopo l'esecuzione che ogni blocco della mia pipeline abbia generato i valori attesi.
Tubatura
{
...
var observer = new MyObserver<string>();
_block = new TransformManyBlock<string, string>(MyHandler, options);
_block.LinkTo(_nextBlock);
_block.AsObservable().Subscribe(observer);
_block.Post("Test");
...
}
MyObserver
public class MyObserver<T> : IObserver<T>
{
public List<Exception> Errors = new List<Exception>();
public bool IsComplete = false;
public List<T> Values = new List<T>();
public void OnCompleted()
{
IsComplete = true;
}
public void OnNext(T value)
{
Values.Add(value);
}
public void OnError(Exception e)
{
Errors.Add(e);
}
}
Quindi fondamentalmente iscrivo il mio osservatore al transformblock e mi aspetto che ogni valore che passa attraverso venga registrato nella mia lista dei "valori" dell'osservatore.
Ma, mentre IsCompleteè impostato su true e l' OnError()eccezione di registrazione riuscita, il OnNext()metodo non viene mai chiamato a meno che non sia l'ultimo blocco della pipeline ... Non riesco a capire perché, perché il "nextblock" è stato collegato correttamente a questo sourceBlock ricevere i dati, dimostrando che alcuni dati stanno uscendo dal blocco.
Da quello che ho capito, AsObservabledovrebbe riportare tutti i valori che escono dal blocco e non solo i valori che non sono stati consumati da altri blocchi collegati ...
Che cosa sto facendo di sbagliato ?
Risposte
I vostri messaggi vengono consumati da _nextBlockprima di ottenere la possibilità di leggerli.
Se commentate questa riga _block.LinkTo(_nextBlock);, probabilmente funzionerebbe.
AsObservablel'unico scopo è solo quello di consentire a un blocco di essere consumato da RX . Non cambia il funzionamento interno del blocco per trasmettere messaggi a più destinazioni . Hai bisogno di un blocco speciale per questoBroadcastBlock
Suggerirei di trasmettere a un altro blocco e di usarlo perSubscribe
La missione di BroadcastBlock nella vita è consentire a tutti i bersagli collegati dal blocco di ottenere una copia di ogni elemento pubblicato
var options = new DataflowLinkOptions {PropagateCompletion = true};
var broadcastBlock = new BroadcastBlock<string>(x => x);
var bufferBlock = new BufferBlock<string>();
var actionBlock = new ActionBlock<string>(s => Console.WriteLine("Action " + s));
broadcastBlock.LinkTo(bufferBlock, options);
broadcastBlock.LinkTo(actionBlock, options);
bufferBlock.AsObservable().Subscribe(s => Console.WriteLine("peek " + s));
for (var i = 0; i < 5; i++)
await broadcastBlock.SendAsync(i.ToString());
broadcastBlock.Complete();
await actionBlock.Completion;
Produzione
peek 0
Action 0
Action 1
Action 2
Action 3
Action 4
peek 1
peek 2
peek 3
peek 4