Gérer deux flux de données entrants et les combiner en python?
J'ai recherché diverses options en python de threading, multitraitement asynchrone, etc. comme moyens de gérer deux flux entrants et de les combiner. Il y a beaucoup d'informations sur, mais les exemples sont souvent compliqués et compliqués, et consistent le plus souvent à diviser une seule tâche en plusieurs threads ou processus pour accélérer le résultat final de la tâche.
J'ai un flux de données entrant sur une socket (en utilisant actuellement UDP comme une autre application exécutée localement sur mon PC, mais je peux envisager de passer à TCP à l'avenir si l'application doit être exécutée sur un PC séparé), et un flux série venant via un adaptateur RS232, et je dois combiner les flux. Ce nouveau flux est ensuite retransmis sur une autre socket.
Le problème est qu'ils arrivent à des taux différents (les données série arrivent à 125 Hz, les données de socket à 60-120 Hz), je veux donc ajouter les dernières données série aux données de socket.
Ma question est essentiellement de savoir quelle est la meilleure façon de gérer cela, sur la base de l'expérience antérieure d'autres peuples. Comme il s'agit essentiellement d'une tâche d'E / S, cela se prête davantage au threading (qui, je le sais, est limité à la concurrence par le GIL), mais en raison du taux d'entrée élevé, je me demande si le multi-traitement est la voie à suivre.
Si vous utilisez le threading, je suppose que le meilleur moyen d'accéder à chaque ressource partagée consiste à utiliser un verrou pour écrire les données série sur un objet, et dans un thread séparé chaque fois qu'il y a de nouvelles données de socket, puis en acquérant le verrou, en accédant aux dernières données série dans le objet, le traitant puis l'envoyant sur l'autre socket. Cependant, le thread principal a beaucoup de travail entre chaque nouveau message de socket entrant.
Avec le multi-traitement, je pourrais utiliser un tube pour demander et recevoir les dernières données série de l'autre processus, mais cela ne fait que se décharger de la gestion des données série et laisse encore beaucoup pour le processus principal.
Réponses
Êtes-vous sûr d' avoir besoin du multi-threading ici? Si ce n'est pas strictement nécessaire, je l'éviterais à coup sûr.
- Je n'ai pas trop programmé ces derniers temps contre les ports série et les sockets, mais pour autant que je sache, car les données sont toutes deux mises en mémoire tampon par HW / middleware, donc de ce point de vue, il ne devrait pas être nécessaire d'avoir un thread par flux entrant.
- en ce qui concerne le thread principal qui a beaucoup de travail à faire: êtes-vous sûr que cela ne peut pas être combiné dans le thread qui effectue les E / S?
Si c'est faisable, j'écrirais une boucle qui lit alternativement les deux flux, la traiter / combiner et l'écrire dans la prise de sortie:
while True:
serial_data_in = serial_in.read()
socket_data_in = socket_in.read()
socket_out.write(combine(serial_data_in, socket_data_in))
Peut-être que quelques ajustements sur les délais d'attente des read () s sont nécessaires, pour éviter de manquer des données sur l'un s'il n'y aurait pas de données entrantes dans l'autre.
Si cela ne fonctionnait pas , je garderais toujours le moins de fils possible. Par exemple, vous pouvez utiliser un thread pour la lecture (comme ci-dessus) et utiliser une file d' attente pour communiquer avec un thread qui effectue le traitement et l'écriture dans le socket out:
q = queue.Queue()
def worker_1:
while True:
serial_data_in = serial_in.read()
socket_data_in = socket_in.read()
q.put((serial_data_in, socket_data_in))
def worker_2:
while True:
(serial_data_in, socket_data_in) = q.get()
socket_out.write(combine(serial_data_in, socket_data_in))
q.task_done()
Les files d'attente suppriment la complexité de synchronisation de niveau inférieur des objets de verrouillage.
Je pense que l'utilisation de select est très simple. Il vous indique quelle socket a des données (ou EOF
) à lire.
En fait, une question similaire a déjà été posée: Python - Serveur écoutant depuis deux sockets UDP
Veuillez noter qu'une seule lecture à partir d'une socket renvoyée par select
est garantie de ne pas bloquer. Vérifiez à nouveau avant de continuer la lecture. Cela signifie que si vous lisez un flux de données, lisez-le dans un tampon jusqu'à ce que vous receviez une ligne entière ou une autre unité de données pouvant être traitée.
Votre question diffère de celle liée, car vous devez lire à partir du réseau et d'une interface série. Linux n'a aucun problème avec cela, n'importe quel descripteur de fichier peut être utilisé avec select
. Cependant, sous Windows, seuls les sockets peuvent être utilisés avec select
. Je ne travaille pas avec Windows, mais il semble que vous aurez besoin d'un thread dédié pour lire la ligne série.
Je peux suggérer l'approche utilisée ici - https://stackoverflow.com/a/641488/4895189. Si vous disposez d'une structure pour les données que vous recevez via le socket et le numéro de série, vous pouvez écrire ces structures avec des horodatages sur des objets de canal individuels.
Je préférerais le multitraitement au threading d'après mon expérience. J'ai utilisé pyserial pour lire et écrire pour UART, dans lequel le fil principal était utilisé pour l'écriture et un fil séparé pour la lecture. Pour des raisons que je n'ai pas pu découvrir, j'ai manqué des images à la fois en entrée et en sortie si j'écrivais des données sans ajouter un délai assez important (~ 1000 ms) entre les appels d'écriture séquentielle. En général, je trouve que l'utilisation de pyserial avec le thread de Python a un comportement étrange. Actuellement, je ne sais pas si cela est dû à l'implémentation de pyserial ou au GIL de Python.
Cela étant dit, je pense que vous pouvez utiliser la structure suivante pour votre configuration en fonction de la réponse que j'ai liée ci-dessus:
Processus enfant 1 - Lire les données de Socket et écrire dans Pipe avec l'horodatage
Child Process 2 - Lire les données à l'aide de pyserial et écrire dans Pipe avec l'horodatage
Processus principal - Effectuez une sélection sur les deux objets de tuyau à un intervalle de votre choix, combinez les flux transmettre à la prise de sortie.