Kann ich einen Thread wiederholt aus einem Thread heraus aufrufen?
Ich versuche , von Thread A zu übertragen Proben ( „Erwerb“) B ( „P300“) einzufädeln verwenden , queue
aber ich kann keine Daten in Thread B lesen, obwohl Proben werden in Gewinden A. zugeordnet sind Gemessen an meiner Ausgabe, Ich denke, mein Thread B rast und testet Dinge, bevor mein Thread A anfängt, Daten einzugeben.
Unten sehen Sie eine Annäherung an meine Codestruktur:
import threading
import queue
from queue import Empty
import numpy as np
import warnings
warnings.filterwarnings("error")
class AcqThread(threading.Thread):
def __init__(self, dataOutQ1, dataOutQ2, stopQ1, stopQ2, saveQ):
threading.Thread.__init__(self)
self.stopQ2 = stopQ2
self.stopQ1 = stopQ1
self.dataOutQ2 = dataOutQ2
self.dataOutQ1 = dataOutQ1
self.saveQ = saveQ
def run(self):
Acquisition(inlet, self.dataOutQ1, self.dataOutQ2, self.stopQ1, self.stopQ2, self.saveQ)
class P300Thread(threading.Thread):
def __init__(self, dataInQ, featureQ, stopQ):
threading.Thread.__init__(self)
self.dataInQ = dataInQ
self.featureQ = featureQ
self.stopQ = stopQ
def run(self):
P300fun(self.dataInQ, self.featureQ, self.stopQ)
threadLock = threading.Lock()
SaveQ = queue.Queue()
DataOutQ1 = queue.Queue()
DataOutQ2 = queue.Queue()
StopQ1 = queue.Queue()
StopQ2 = queue.Queue()
FeatQ1 = queue.Queue()
StopQ1.put(0)
StopQ2.put(0)
#
AcqTh = AcqThread(DataOutQ1, DataOutQ2, StopQ1, StopQ2, SaveQ)
P300Th = P300Thread(DataOutQ1, FeatQ1, StopQ1)
def Acquisition(inlet, dataOutQ1, dataOutQ2, stopQ1, stopQ2, saveQ):
i = 0
print('Starting...')
while i<1250: #i is the number of samples
sample, timestamp = inlet.pull_sample() #samples coming in @ 250Hz
##Normalization, filtering##
threadLock.acquire()
dataOutQ1.put([filtsamples[:,-250:], rawtimestamps[-250:]]) #I only need the last 250 samples
threadLock.release()
i += 1
def P300fun(dataInQ, featureQ, stopQ):
p300sample = []
p300timestamp = []
print(f"Is DataInQ size true? {DataOutQ1.qsize()}")
print("Is dataInQ emtpy?", DataOutQ1.empty())
while dataInQ.qsize(): #or while not dataqueue.empty():
try:
print("DataInQ has data")
ss, ts = dataInQ.get(0)
print('<>P300\n>>Samples [', ss, ']\nTimestamp [', ts, ']')
except Empty:
return
print('Thread Finished')
if __name__ == '__main__':
print('Looking for an EEG stream...')
streams = resolve_stream('type', 'EEG')
inlet = StreamInlet(streams[0])
print('Connected!\n')
AcqTh.start()
P300Th.start()
AcqTh.join()
P300Th.join()
print("\n\n>>>DONE<<<\n\n")
Und Ausgabe:
Looking for an EEG stream...
Connected!
Is DataInQ size true? 0
Starting...
Is dataInQ emtpy? True
Thread Finished
>>>DONE<<<
In meiner Forschung schien Frage 1 ein ähnliches Problem darzustellen, aber es scheint, dass das Problem im Bildverarbeitungsteil lag (und sie verwenden das multiprocessing
Paket). Frage 2 scheint ein Parallelitätsproblem zu haben, das mein Problem sein könnte, aber ich bin nicht sicher, wie ich es in mein Problem übersetzen soll. Ich weiß, ob ich falsch liege. Frage 3 hatte nur ein Problem mit der Reihenfolge der Argumente, daher denke ich, dass dies hier nicht anwendbar ist.
Wie soll ich das machen? Sollte ich Thread B wiederholt aus Thread A heraus aufrufen? Benötige ich eine Schleife oder eine Verzögerung für Thread B? Gibt es .join()
vielleicht ein Problem mit dem Teil? Ich werde in naher Zukunft weitere Threads hinzufügen müssen, daher wäre es gut, zuerst herauszufinden, wie man mit nur zwei arbeitet ...
Jede Hilfe wird geschätzt!
Antworten
Ein Noob zu sein kann schwierig sein ... Also werde ich meine eigene Frage beantworten, um anderen Anfängern zu helfen, die möglicherweise auch auf dieses Problem stoßen.
Nun, das Wichtigste zuerst: Nein, es ist nicht möglich, einen Thread innerhalb eines Threads wiederholt aufzurufen, da jeder Thread nur einmal aufgerufen werden kann.
Es gibt jedoch eine Möglichkeit, das Beenden des Threads zu verhindern und sie auf Auslöser warten zu lassen, mit denen sie fortfahren können. Nach einigen weiteren Recherchen stieß ich auf diese Frage , die mir zeigte, dass es eine Möglichkeit gibt, Ereignisse für Threads zu erstellen. Die Dokumentation finden Sie hier . Und es ist ganz einfach: Die Ereignisobjekte verhalten sich wie Flags und können set()
(Anzeige von True) oder clear()
(Anzeige von False, was der ursprüngliche Wert ist) sein. Um ein Ereignis zu testen, kann man die is_set()
Methode für boolesche Probleme verwenden oder die wait()
Methode anstelle eines Timers verwenden. In meinem Fall hat es mir einige Warteschlangen erspart, die ich verwenden wollte:
import threading
import queue
from queue import Empty
import numpy as np
class AcqThread(threading.Thread):
def __init__(self, dataOutQ1, dataOutQ2, saveQ):
threading.Thread.__init__(self)
self.dataOutQ2 = dataOutQ2
self.dataOutQ1 = dataOutQ1
self.saveQ = saveQ
def run(self):
Acquisition(inlet, self.dataOutQ1, self.dataOutQ2, self.saveQ)
class P300Thread(threading.Thread):
def __init__(self, dataInQ, featureQ):
threading.Thread.__init__(self)
self.dataInQ = dataInQ
self.featureQ = featureQ
def run(self):
P300fun(self.dataInQ, self.featureQ)
threadLock = threading.Lock()
SaveQ = queue.Queue()
DataOutQ1 = queue.Queue()
DataOutQ2 = queue.Queue()
FeatQ1 = queue.Queue()
FeatQ2 = queue.Queue()
#NEW:: initializes Events
E = threading.Event()
EP300 = threading.Event()
#
AcqTh = AcqThread(DataOutQ1, DataOutQ2, SaveQ)
P300Th = P300Thread(DataOutQ1, FeatQ1)
Und es erlaubt mir, Thread B "wiederholt" "aufzurufen", da es meinen ersten aktiv hält (wegen Ereignis E) und nur dann in den Verarbeitungsteil eintritt, wenn das Ereignis EP300 gesetzt ist. Anschließend wird EP300 nach Abschluss des Vorgangs gelöscht:
def Acquisition(inlet, dataOutQ1, dataOutQ2 saveQ):
i = 0
print('Starting...')
while i<1250:
sample, timestamp = inlet.pull_sample()
##Normalization, filtering##
if _condition_:
threadLock.acquire()
dataOutQ1.put([filtsamples[:,-250:], rawtimestamps[-250:]])
threadLock.release()
EP300.set() #NEW:: allows the P300 function to collect data from queue
i += 1
E.set() #NEW:: flaggs end data collection
def P300fun(dataInQ, featureQ):
p300sample = []
p300timestamp = []
while not E.is_set(): #NEW:: loop until collection is ended
if EP300.is_set(): #NEW:: activated when Event is triggered
while dataInQ.qsize():
try:
print("DataInQ has data")
ss, ts = dataInQ.get(0)
print('<>P300\n>>Samples [', ss, ']\nTimestamp [', ts, ']')
except Empty:
return
if not E.is_set(): #NEW:: Event is cleared in case data collection is not over, waiting for a new set()
EP300.clear()
print('Thread Finished')
if __name__ == '__main__':
print('Looking for an EEG stream...')
streams = resolve_stream('type', 'EEG')
inlet = StreamInlet(streams[0])
print('Connected!\n')
AcqTh.start()
P300Th.start()
AcqTh.join()
P300Th.join()
print("\n\n>>>DONE<<<\n\n")