Kann ich einen Thread wiederholt aus einem Thread heraus aufrufen?

Nov 20 2020

Ich versuche , von Thread A zu übertragen Proben ( „Erwerb“) B ( „P300“) einzufädeln verwenden , queueaber ich kann keine Daten in Thread B lesen, obwohl Proben werden in Gewinden A. zugeordnet sind Gemessen an meiner Ausgabe, Ich denke, mein Thread B rast und testet Dinge, bevor mein Thread A anfängt, Daten einzugeben.

Unten sehen Sie eine Annäherung an meine Codestruktur:

import threading
import queue
from queue import Empty
import numpy as np
import warnings
warnings.filterwarnings("error")

class AcqThread(threading.Thread):
    def __init__(self, dataOutQ1, dataOutQ2, stopQ1, stopQ2, saveQ):
        threading.Thread.__init__(self)
        self.stopQ2 = stopQ2
        self.stopQ1 = stopQ1
        self.dataOutQ2 = dataOutQ2
        self.dataOutQ1 = dataOutQ1
        self.saveQ = saveQ

    def run(self):
        Acquisition(inlet, self.dataOutQ1, self.dataOutQ2, self.stopQ1, self.stopQ2, self.saveQ)

class P300Thread(threading.Thread):
    def __init__(self, dataInQ, featureQ, stopQ):
        threading.Thread.__init__(self)
        self.dataInQ = dataInQ
        self.featureQ = featureQ
        self.stopQ = stopQ

    def run(self):
        P300fun(self.dataInQ, self.featureQ, self.stopQ)

threadLock = threading.Lock()
SaveQ = queue.Queue()
DataOutQ1 = queue.Queue()
DataOutQ2 = queue.Queue()
StopQ1 = queue.Queue()
StopQ2 = queue.Queue()
FeatQ1 = queue.Queue()
StopQ1.put(0)
StopQ2.put(0)
#
AcqTh = AcqThread(DataOutQ1, DataOutQ2, StopQ1, StopQ2, SaveQ)
P300Th = P300Thread(DataOutQ1, FeatQ1, StopQ1)

def Acquisition(inlet, dataOutQ1, dataOutQ2, stopQ1, stopQ2, saveQ):
    i = 0
    print('Starting...')
    while i<1250: #i is the number of samples
        sample, timestamp = inlet.pull_sample() #samples coming in @ 250Hz
        ##Normalization, filtering##
        threadLock.acquire()
        dataOutQ1.put([filtsamples[:,-250:], rawtimestamps[-250:]]) #I only need the last 250 samples
        threadLock.release()
        i += 1

def P300fun(dataInQ, featureQ, stopQ):
    p300sample = []
    p300timestamp = []
    print(f"Is DataInQ size true? {DataOutQ1.qsize()}")
    print("Is dataInQ emtpy?", DataOutQ1.empty())
    while dataInQ.qsize(): #or while not dataqueue.empty():
        try:
            print("DataInQ has data")
            ss, ts = dataInQ.get(0) 
            print('<>P300\n>>Samples [', ss, ']\nTimestamp [', ts, ']')
        except Empty:
            return
    print('Thread Finished')

if __name__ == '__main__':
    print('Looking for an EEG stream...')
    streams = resolve_stream('type', 'EEG')
    inlet = StreamInlet(streams[0])
    print('Connected!\n')

    AcqTh.start()
    P300Th.start()

    AcqTh.join()
    P300Th.join()

    print("\n\n>>>DONE<<<\n\n")

Und Ausgabe:

Looking for an EEG stream...
Connected!

Is DataInQ size true? 0
Starting...
Is dataInQ emtpy? True
Thread Finished

>>>DONE<<<

In meiner Forschung schien Frage 1 ein ähnliches Problem darzustellen, aber es scheint, dass das Problem im Bildverarbeitungsteil lag (und sie verwenden das multiprocessingPaket). Frage 2 scheint ein Parallelitätsproblem zu haben, das mein Problem sein könnte, aber ich bin nicht sicher, wie ich es in mein Problem übersetzen soll. Ich weiß, ob ich falsch liege. Frage 3 hatte nur ein Problem mit der Reihenfolge der Argumente, daher denke ich, dass dies hier nicht anwendbar ist.

Wie soll ich das machen? Sollte ich Thread B wiederholt aus Thread A heraus aufrufen? Benötige ich eine Schleife oder eine Verzögerung für Thread B? Gibt es .join()vielleicht ein Problem mit dem Teil? Ich werde in naher Zukunft weitere Threads hinzufügen müssen, daher wäre es gut, zuerst herauszufinden, wie man mit nur zwei arbeitet ...

Jede Hilfe wird geschätzt!

Antworten

mgmussi Nov 20 2020 at 19:02

Ein Noob zu sein kann schwierig sein ... Also werde ich meine eigene Frage beantworten, um anderen Anfängern zu helfen, die möglicherweise auch auf dieses Problem stoßen.

Nun, das Wichtigste zuerst: Nein, es ist nicht möglich, einen Thread innerhalb eines Threads wiederholt aufzurufen, da jeder Thread nur einmal aufgerufen werden kann.

Es gibt jedoch eine Möglichkeit, das Beenden des Threads zu verhindern und sie auf Auslöser warten zu lassen, mit denen sie fortfahren können. Nach einigen weiteren Recherchen stieß ich auf diese Frage , die mir zeigte, dass es eine Möglichkeit gibt, Ereignisse für Threads zu erstellen. Die Dokumentation finden Sie hier . Und es ist ganz einfach: Die Ereignisobjekte verhalten sich wie Flags und können set()(Anzeige von True) oder clear()(Anzeige von False, was der ursprüngliche Wert ist) sein. Um ein Ereignis zu testen, kann man die is_set()Methode für boolesche Probleme verwenden oder die wait()Methode anstelle eines Timers verwenden. In meinem Fall hat es mir einige Warteschlangen erspart, die ich verwenden wollte:

import threading
import queue
from queue import Empty
import numpy as np


class AcqThread(threading.Thread):
    def __init__(self, dataOutQ1, dataOutQ2, saveQ):
        threading.Thread.__init__(self)
        self.dataOutQ2 = dataOutQ2
        self.dataOutQ1 = dataOutQ1
        self.saveQ = saveQ

    def run(self):
        Acquisition(inlet, self.dataOutQ1, self.dataOutQ2, self.saveQ)

class P300Thread(threading.Thread):
    def __init__(self, dataInQ, featureQ):
        threading.Thread.__init__(self)
        self.dataInQ = dataInQ
        self.featureQ = featureQ

    def run(self):
        P300fun(self.dataInQ, self.featureQ)

threadLock = threading.Lock()
SaveQ = queue.Queue()
DataOutQ1 = queue.Queue()
DataOutQ2 = queue.Queue()
FeatQ1 = queue.Queue()
FeatQ2 = queue.Queue()

#NEW:: initializes Events
E = threading.Event()
EP300 = threading.Event()
#
AcqTh = AcqThread(DataOutQ1, DataOutQ2, SaveQ)
P300Th = P300Thread(DataOutQ1, FeatQ1)

Und es erlaubt mir, Thread B "wiederholt" "aufzurufen", da es meinen ersten aktiv hält (wegen Ereignis E) und nur dann in den Verarbeitungsteil eintritt, wenn das Ereignis EP300 gesetzt ist. Anschließend wird EP300 nach Abschluss des Vorgangs gelöscht:

def Acquisition(inlet, dataOutQ1, dataOutQ2 saveQ):
    i = 0
    print('Starting...')
    while i<1250:
        sample, timestamp = inlet.pull_sample()
        ##Normalization, filtering##
        if _condition_:
            threadLock.acquire()
            dataOutQ1.put([filtsamples[:,-250:], rawtimestamps[-250:]])
            threadLock.release()
            EP300.set() #NEW:: allows the P300 function to collect data from queue
        i += 1
    E.set() #NEW:: flaggs end data collection

def P300fun(dataInQ, featureQ):
    p300sample = []
    p300timestamp = []
    while not E.is_set(): #NEW:: loop until collection is ended
        if EP300.is_set(): #NEW:: activated when Event is triggered
            while dataInQ.qsize():
                try:
                    print("DataInQ has data")
                    ss, ts = dataInQ.get(0) 
                    print('<>P300\n>>Samples [', ss, ']\nTimestamp [', ts, ']')
                except Empty:
                    return
        if not E.is_set(): #NEW:: Event is cleared in case data collection is not over, waiting for a new set()
            EP300.clear()
    print('Thread Finished')

if __name__ == '__main__':
    print('Looking for an EEG stream...')
    streams = resolve_stream('type', 'EEG')
    inlet = StreamInlet(streams[0])
    print('Connected!\n')

    AcqTh.start()
    P300Th.start()

    AcqTh.join()
    P300Th.join()

    print("\n\n>>>DONE<<<\n\n")