Gestire due flussi di dati in arrivo e combinarli in Python?

Nov 19 2020

Ho cercato varie opzioni in Python di threading, multiprocessing async ecc.Come modi per gestire due flussi in arrivo e combinarli. Ci sono molte informazioni in merito, ma gli esempi sono spesso contorti e complicati e più comunemente servono a suddividere una singola attività in più thread o processi per accelerare il risultato finale dell'attività.

Ho un flusso di dati in arrivo su un socket (attualmente utilizzo UDP come un'altra applicazione in esecuzione localmente sul mio PC, ma potrei considerare di passare a TCP in futuro se l'applicazione deve essere eseguita su un PC separato) e un flusso seriale in arrivo tramite un adattatore RS232 e devo combinare i flussi. Questo nuovo flusso viene quindi ritrasmesso su un altro socket.

Il problema è che arrivano a velocità diverse (i dati seriali arrivano a 125 Hz, i dati del socket a 60-120 Hz), quindi voglio aggiungere i dati seriali più recenti ai dati del socket.

La mia domanda è essenzialmente quale sia il modo migliore per gestire questo, sulla base dell'esperienza precedente di altre persone. Poiché si tratta essenzialmente di un'attività di I / O, si presta maggiormente al threading (che so è limitato alla concorrenza dal GIL), ma a causa dell'elevata velocità di input, mi chiedo se la multielaborazione sia la strada da percorrere?

Se si utilizza il threading, immagino che il modo migliore per accedere a ciascuna risorsa condivisa sia utilizzare un blocco per scrivere i dati seriali su un oggetto e in un thread separato ogni volta che sono presenti nuovi dati socket, quindi acquisire il blocco, accedere agli ultimi dati seriali nel oggetto, elaborandolo e quindi inviandolo sull'altro socket. Tuttavia, il thread principale deve lavorare molto tra ogni nuovo messaggio socket in arrivo.

Con il multielaborazione potrei usare una pipe per richiedere e ricevere i dati seriali più recenti dall'altro processo, ma questo scarica solo la gestione dei dati seriali e lascia ancora molto per il processo principale.

Risposte

2 QuadU Nov 23 2020 at 15:18

Sei sicuro di aver bisogno del multi-threading qui? Se non strettamente necessario, lo eviterei di sicuro.

  • Non ho programmato molto ultimamente contro porte seriali e socket, ma per quanto ne so, per entrambi i dati sono bufferizzati da HW / middleware, quindi da quella prospettiva non dovrebbe esserci bisogno di un thread per flusso in entrata.
  • riguardo al thread principale che ha molto lavoro da fare: sei sicuro che questo non possa essere combinato nel thread che fa l'I / O?

Se è in qualche modo fattibile, scriverei un ciclo che legge alternativamente da entrambi i flussi, lo elaboro / combini e lo scrivo nel socket in uscita:

while True:
  serial_data_in = serial_in.read()
  socket_data_in = socket_in.read()
  socket_out.write(combine(serial_data_in, socket_data_in))

Forse è necessario qualche ritocco sui timeout delle read (), per evitare di perdere dati su uno se non ci fossero dati in arrivo nell'altro.

Se non funzionasse , terrei comunque il minor numero di thread possibile. Ad esempio, potresti utilizzare un thread per la lettura (come sopra) e utilizzare una coda per comunicare con un thread che esegue l'elaborazione e la scrittura sul socket in uscita:

q = queue.Queue()

def worker_1:
  while True:
    serial_data_in = serial_in.read()
    socket_data_in = socket_in.read()
    q.put((serial_data_in, socket_data_in))

def worker_2:
  while True:
    (serial_data_in, socket_data_in) = q.get()
    socket_out.write(combine(serial_data_in, socket_data_in))
    q.task_done()

Le code eliminano la complessità di sincronizzazione di livello inferiore degli oggetti di blocco.

2 VPfB Nov 24 2020 at 15:28

Penso che usare select sia molto semplice. Ti dice quale socket ha dati (o EOF) da leggere.

In realtà, una domanda simile è stata posta prima: Python - Server in ascolto da due socket UDP

Si noti che selectè garantito che solo una lettura da un socket restituito da non si blocchi. Controlla di nuovo prima di continuare a leggere. Ciò significa che se stai leggendo un flusso di dati, leggilo in un buffer finché non ricevi un'intera riga o un'altra unità di dati che può essere elaborata.

La tua domanda è diversa da quella collegata, perché devi leggere dalla rete e da un'interfaccia seriale. Linux non ha problemi con esso, qualsiasi descrittore di file può essere utilizzato con select. Tuttavia, su Windows, è possibile utilizzare solo i socket con select. Non lavoro con Windows, ma sembra che avrai bisogno di un thread dedicato per leggere la linea seriale.

1 DivyeshPeshavaria Nov 30 2020 at 07:43

Posso suggerire l'approccio usato qui - https://stackoverflow.com/a/641488/4895189. Se hai una struttura per i dati che ricevi attraverso il socket e il seriale puoi scrivere quelle strutture con timestamp sui singoli oggetti pipe.

Preferirei il multiprocessing rispetto al threading in base alla mia esperienza. Ho usato pyserial per la lettura e la scrittura per UART, in cui il thread principale è stato utilizzato per la scrittura e un thread separato per la lettura. Per motivi che non sono riuscito a scoprire, ho perso frame sia in input che in output se scrivevo dati senza aggiungere un ritardo piuttosto ampio (~ 1000ms) tra le chiamate di scrittura sequenziali. In generale, trovo che l'uso di pyserial con Threading di Python abbia un comportamento strano. Al momento, non sono sicuro che sia dovuto all'implementazione di pyserial o al GIL di Python.

Detto questo, penso che tu possa utilizzare la seguente struttura per la tua configurazione in base alla risposta che ho collegato sopra:

Processo figlio 1 - Leggi i dati da Socket e scrivi su Pipe con il timestamp
Processo figlio 2 - Leggi i dati usando pyserial e scrivi su Pipe con il timestamp
Processo principale - Esegui la selezione su entrambi gli oggetti pipe a un intervallo di tua scelta, combina i flussi e trasmettere alla presa di uscita.