Puis-je appeler un thread de manière récurrente à partir d'un thread?

Nov 20 2020

J'essaie de transférer des échantillons du thread A ("Acquisition") vers le thread B ("P300") en utilisant queuemais je ne peux lire aucune donnée dans le thread B, bien que des échantillons soient alloués dans le thread A. À en juger par ma sortie, Je pense que mon thread B se précipite et teste des choses avant que mon thread A ne commence à mettre des données.

Voir une approximation de ma structure de code ci-dessous:

import threading
import queue
from queue import Empty
import numpy as np
import warnings
warnings.filterwarnings("error")

class AcqThread(threading.Thread):
    def __init__(self, dataOutQ1, dataOutQ2, stopQ1, stopQ2, saveQ):
        threading.Thread.__init__(self)
        self.stopQ2 = stopQ2
        self.stopQ1 = stopQ1
        self.dataOutQ2 = dataOutQ2
        self.dataOutQ1 = dataOutQ1
        self.saveQ = saveQ

    def run(self):
        Acquisition(inlet, self.dataOutQ1, self.dataOutQ2, self.stopQ1, self.stopQ2, self.saveQ)

class P300Thread(threading.Thread):
    def __init__(self, dataInQ, featureQ, stopQ):
        threading.Thread.__init__(self)
        self.dataInQ = dataInQ
        self.featureQ = featureQ
        self.stopQ = stopQ

    def run(self):
        P300fun(self.dataInQ, self.featureQ, self.stopQ)

threadLock = threading.Lock()
SaveQ = queue.Queue()
DataOutQ1 = queue.Queue()
DataOutQ2 = queue.Queue()
StopQ1 = queue.Queue()
StopQ2 = queue.Queue()
FeatQ1 = queue.Queue()
StopQ1.put(0)
StopQ2.put(0)
#
AcqTh = AcqThread(DataOutQ1, DataOutQ2, StopQ1, StopQ2, SaveQ)
P300Th = P300Thread(DataOutQ1, FeatQ1, StopQ1)

def Acquisition(inlet, dataOutQ1, dataOutQ2, stopQ1, stopQ2, saveQ):
    i = 0
    print('Starting...')
    while i<1250: #i is the number of samples
        sample, timestamp = inlet.pull_sample() #samples coming in @ 250Hz
        ##Normalization, filtering##
        threadLock.acquire()
        dataOutQ1.put([filtsamples[:,-250:], rawtimestamps[-250:]]) #I only need the last 250 samples
        threadLock.release()
        i += 1

def P300fun(dataInQ, featureQ, stopQ):
    p300sample = []
    p300timestamp = []
    print(f"Is DataInQ size true? {DataOutQ1.qsize()}")
    print("Is dataInQ emtpy?", DataOutQ1.empty())
    while dataInQ.qsize(): #or while not dataqueue.empty():
        try:
            print("DataInQ has data")
            ss, ts = dataInQ.get(0) 
            print('<>P300\n>>Samples [', ss, ']\nTimestamp [', ts, ']')
        except Empty:
            return
    print('Thread Finished')

if __name__ == '__main__':
    print('Looking for an EEG stream...')
    streams = resolve_stream('type', 'EEG')
    inlet = StreamInlet(streams[0])
    print('Connected!\n')

    AcqTh.start()
    P300Th.start()

    AcqTh.join()
    P300Th.join()

    print("\n\n>>>DONE<<<\n\n")

Et sortie:

Looking for an EEG stream...
Connected!

Is DataInQ size true? 0
Starting...
Is dataInQ emtpy? True
Thread Finished

>>>DONE<<<

Dans mes recherches, la question 1 semblait présenter un problème similaire, mais il semble que le problème se situait dans la partie traitement d'image (et ils utilisent le multiprocessingpackage). La question 2 semble avoir un problème de concurrence d'accès, ce qui pourrait être mon problème, mais je ne sais pas comment le traduire dans mon problème, laissez-moi savoir si je me trompe, tho). La question 3 avait juste un problème avec l'ordre des arguments, donc non applicable ici, je pense.

Comment dois-je procéder? Dois-je appeler régulièrement le thread B à partir du thread A ?? Ai-je besoin d'une boucle ou d'un retard sur le thread B? Y a-t-il un problème avec la .join()pièce peut-être? J'aurai besoin d'ajouter plus de threads dans un proche avenir, il serait donc bon de comprendre comment travailler avec seulement deux premiers ...

Toute aide est appréciée!

Réponses

mgmussi Nov 20 2020 at 19:02

Être un noob peut être délicat ... Je vais donc répondre à ma propre question pour aider d'autres débutants qui pourraient également rencontrer ce problème.

Eh bien, tout d'abord: non, il n'est pas possible d'appeler un thread depuis un thread de manière récurrente, car chaque thread ne peut être appelé qu'une seule fois.

Mais il existe un moyen d'empêcher le thread de se terminer, en les faisant attendre les déclencheurs qui leur permettront de continuer. Après quelques recherches supplémentaires, je suis tombé sur cette question qui m'a montré qu'il existe un moyen de créer des événements pour les fils. La documentation peut être trouvée ici . Et c'est assez simple: les objets événement se comportent comme des indicateurs et peuvent être set()(indiquant True) ou clear()(indiquant False, qui est la valeur d'origine). Pour tester un événement, on peut utiliser la is_set()méthode pour les problèmes booléens ou utiliser la wait()méthode à la place d'une minuterie. Dans mon cas, cela m'a évité quelques files d'attente que j'allais utiliser:

import threading
import queue
from queue import Empty
import numpy as np


class AcqThread(threading.Thread):
    def __init__(self, dataOutQ1, dataOutQ2, saveQ):
        threading.Thread.__init__(self)
        self.dataOutQ2 = dataOutQ2
        self.dataOutQ1 = dataOutQ1
        self.saveQ = saveQ

    def run(self):
        Acquisition(inlet, self.dataOutQ1, self.dataOutQ2, self.saveQ)

class P300Thread(threading.Thread):
    def __init__(self, dataInQ, featureQ):
        threading.Thread.__init__(self)
        self.dataInQ = dataInQ
        self.featureQ = featureQ

    def run(self):
        P300fun(self.dataInQ, self.featureQ)

threadLock = threading.Lock()
SaveQ = queue.Queue()
DataOutQ1 = queue.Queue()
DataOutQ2 = queue.Queue()
FeatQ1 = queue.Queue()
FeatQ2 = queue.Queue()

#NEW:: initializes Events
E = threading.Event()
EP300 = threading.Event()
#
AcqTh = AcqThread(DataOutQ1, DataOutQ2, SaveQ)
P300Th = P300Thread(DataOutQ1, FeatQ1)

Et cela me permet d '"appeler" le thread B "de façon récurrente", car il garde mon premier alors qu'il est actif (à cause de l'événement E) et n'entre dans la partie traitement que lorsque l'événement EP300 est défini. Ensuite, EP300 est effacé une fois le processus terminé:

def Acquisition(inlet, dataOutQ1, dataOutQ2 saveQ):
    i = 0
    print('Starting...')
    while i<1250:
        sample, timestamp = inlet.pull_sample()
        ##Normalization, filtering##
        if _condition_:
            threadLock.acquire()
            dataOutQ1.put([filtsamples[:,-250:], rawtimestamps[-250:]])
            threadLock.release()
            EP300.set() #NEW:: allows the P300 function to collect data from queue
        i += 1
    E.set() #NEW:: flaggs end data collection

def P300fun(dataInQ, featureQ):
    p300sample = []
    p300timestamp = []
    while not E.is_set(): #NEW:: loop until collection is ended
        if EP300.is_set(): #NEW:: activated when Event is triggered
            while dataInQ.qsize():
                try:
                    print("DataInQ has data")
                    ss, ts = dataInQ.get(0) 
                    print('<>P300\n>>Samples [', ss, ']\nTimestamp [', ts, ']')
                except Empty:
                    return
        if not E.is_set(): #NEW:: Event is cleared in case data collection is not over, waiting for a new set()
            EP300.clear()
    print('Thread Finished')

if __name__ == '__main__':
    print('Looking for an EEG stream...')
    streams = resolve_stream('type', 'EEG')
    inlet = StreamInlet(streams[0])
    print('Connected!\n')

    AcqTh.start()
    P300Th.start()

    AcqTh.join()
    P300Th.join()

    print("\n\n>>>DONE<<<\n\n")