Zwei eingehende Datenströme verarbeiten und in Python kombinieren?

Nov 19 2020

Ich habe verschiedene Optionen in Python für Threading, Multiprocessing Async usw. untersucht, um zwei eingehende Streams zu verarbeiten und zu kombinieren. Es gibt viele Informationen darüber, aber die Beispiele sind oft kompliziert und kompliziert. Häufiger ist es, eine einzelne Aufgabe in mehrere Threads oder Prozesse aufzuteilen, um das Endergebnis der Aufgabe zu beschleunigen.

Ich habe einen Datenstrom, der über einen Socket eingeht (derzeit wird UDP als andere Anwendung verwendet, die lokal auf meinem PC ausgeführt wird, kann aber in Zukunft möglicherweise auf TCP umsteigen, wenn die Anwendung auf einem separaten PC ausgeführt werden muss) und einen seriellen Datenstrom Ich komme über einen RS232-Adapter herein und muss die Streams kombinieren. Dieser neue Stream wird dann auf einem anderen Socket erneut übertragen.

Das Problem ist, dass sie mit unterschiedlichen Raten eingehen (serielle Daten kommen mit 125 Hz, Socket-Daten mit 60-120 Hz), daher möchte ich den Socket-Daten die neuesten seriellen Daten hinzufügen.

Meine Frage ist im Wesentlichen, wie ich am besten damit umgehen kann, basierend auf den früheren Erfahrungen anderer Leute. Da dies im Wesentlichen eine E / A-Aufgabe ist, eignet sie sich eher für das Threading (von dem ich weiß, dass es durch die GIL auf Parallelität beschränkt ist). Aufgrund der hohen Eingaberate frage ich mich jedoch, ob Multi-Processing der richtige Weg ist.

Wenn Sie Threading verwenden, ist der beste Weg, auf jede gemeinsam genutzte Ressource zuzugreifen, die Verwendung einer Sperre, um die seriellen Daten in ein Objekt zu schreiben, und in einem separaten Thread, wenn neue Socket-Daten vorhanden sind, und dann die Sperre zu erwerben, um auf die neuesten seriellen Daten in der zuzugreifen Objekt, verarbeitet es und sendet es dann an den anderen Socket. Der Haupt-Thread hat jedoch viel Arbeit zwischen jeder neuen eingehenden Socket-Nachricht zu erledigen.

Mit Multi-Processing könnte ich eine Pipe verwenden, um die neuesten seriellen Daten vom anderen Prozess anzufordern und zu empfangen, aber das entlastet nur die serielle Datenverarbeitung und lässt noch viel für den Hauptprozess übrig.

Antworten

2 QuadU Nov 23 2020 at 15:18

Sind Sie sicher, dass Sie hier Multithreading benötigen ? Wenn es nicht unbedingt benötigt wird, würde ich es auf jeden Fall vermeiden.

  • Ich habe in letzter Zeit nicht zu viel gegen serielle Schnittstellen und Sockets programmiert, aber soweit ich weiß, werden beide Daten von HW / Middleware gepuffert, sodass aus dieser Perspektive kein Thread pro eingehendem Stream erforderlich sein sollte.
  • In Bezug auf den Haupt-Thread, der viel zu tun hat: Sind Sie sicher, dass dies nicht in dem Thread kombiniert werden kann, der die E / A ausführt?

Wenn es irgendwie machbar ist, würde ich eine Schleife schreiben, die alternativ aus beiden Streams liest, sie verarbeiten / kombinieren und in den Out-Socket schreiben:

while True:
  serial_data_in = serial_in.read()
  socket_data_in = socket_in.read()
  socket_out.write(combine(serial_data_in, socket_data_in))

Möglicherweise ist ein Tweeking über die Zeitüberschreitungen der read () s erforderlich, um zu vermeiden, dass Daten auf einem fehlen, wenn auf dem anderen keine Daten eingehen.

Wenn das nicht funktionieren würde , würde ich immer noch so wenig Threads wie möglich behalten. Sie können beispielsweise einen Thread zum Lesen verwenden (wie oben) und eine Warteschlange verwenden , um mit einem Thread zu kommunizieren, der die Verarbeitung und das Schreiben in den Out-Socket übernimmt:

q = queue.Queue()

def worker_1:
  while True:
    serial_data_in = serial_in.read()
    socket_data_in = socket_in.read()
    q.put((serial_data_in, socket_data_in))

def worker_2:
  while True:
    (serial_data_in, socket_data_in) = q.get()
    socket_out.write(combine(serial_data_in, socket_data_in))
    q.task_done()

Warteschlangen verringern die Komplexität der Synchronisierung auf niedrigerer Ebene beim Sperren von Objekten.

2 VPfB Nov 24 2020 at 15:28

Ich denke, die Verwendung von select ist sehr einfach. Hier erfahren Sie, welcher Socket Daten (oder EOF) zu lesen hat.

Tatsächlich wurde zuvor eine ähnliche Frage gestellt: Python - Server, der von zwei UDP-Sockets abhört

Bitte beachten Sie, dass nur ein Lesevorgang von einem von zurückgegebenen Socket selectgarantiert nicht blockiert. Überprüfen Sie dies erneut, bevor Sie mit dem Lesen fortfahren. Das heißt, wenn Sie einen Datenstrom lesen, lesen Sie in einen Puffer, bis Sie eine ganze Zeile oder eine andere Dateneinheit erhalten, die verarbeitet werden kann.

Ihre Frage unterscheidet sich von der verknüpften, da Sie aus dem Netzwerk und einer seriellen Schnittstelle lesen müssen. Linux hat kein Problem damit, jeder Dateideskriptor kann mit verwendet werden select. Unter Windows können jedoch nur Sockets verwendet werden select. Ich arbeite nicht mit Windows, aber es sieht so aus, als würden Sie einen dedizierten Thread zum Lesen der seriellen Leitung benötigen.

1 DivyeshPeshavaria Nov 30 2020 at 07:43

Ich kann den hier verwendeten Ansatz vorschlagen - https://stackoverflow.com/a/641488/4895189. Wenn Sie eine Struktur für die Daten haben, die Sie über den Socket und die Seriennummer erhalten, können Sie diese Strukturen mit Zeitstempeln in einzelne Pipe-Objekte schreiben.

Aus meiner Erfahrung würde ich Multiprocessing dem Threading vorziehen. Ich habe Pyserial zum Lesen und Schreiben für UART verwendet, wobei der Haupt-Thread zum Schreiben und ein separater Thread zum Lesen verwendet wurde. Aus Gründen, die ich nicht herausfinden konnte, habe ich sowohl bei der Eingabe als auch bei der Ausgabe Frames verpasst, wenn ich Daten geschrieben habe, ohne eine ziemlich große Verzögerung (~ 1000 ms) zwischen aufeinanderfolgenden Schreibaufrufen hinzuzufügen. Im Allgemeinen finde ich die Verwendung von Pyserial mit Pythons Threading ein seltsames Verhalten. Derzeit bin ich mir nicht sicher, ob es an der Implementierung von pyserial oder an Pythons GIL liegt.

Abgesehen davon denke ich, dass Sie die folgende Struktur für Ihr Setup verwenden können, basierend auf der Antwort, die ich oben verlinkt habe:

Untergeordneter Prozess 1 - Lesen von Daten aus Socket und Schreiben mit dem Zeitstempel in Pipe
Untergeordneter Prozess 2 - Lesen von Daten mit Pyserial und Schreiben mit dem Zeitstempel in Pipe
Hauptprozess - Wählen Sie beide Pipe-Objekte in einem Intervall Ihrer Wahl aus, kombinieren Sie die Streams und an die Ausgangsbuchse senden.