스레드 내에서 스레드를 반복적으로 호출 할 수 있습니까?

Nov 20 2020

내가 사용하는 B ( "P300")을 스레드에 스레드 A ( "취득")에서 전송 샘플을 시도하고 queue샘플이 있지만,하지만 난 스레드 B의 모든 데이터를 읽을 수 있습니다 내 출력으로 판단 스레드 A에 할당되고, 내 스레드 A가 데이터를 입력하기 전에 스레드 B가 서두르고 테스트하고 있다고 생각합니다.

아래 코드 구조의 근사치를 참조하십시오.

import threading
import queue
from queue import Empty
import numpy as np
import warnings
warnings.filterwarnings("error")

class AcqThread(threading.Thread):
    def __init__(self, dataOutQ1, dataOutQ2, stopQ1, stopQ2, saveQ):
        threading.Thread.__init__(self)
        self.stopQ2 = stopQ2
        self.stopQ1 = stopQ1
        self.dataOutQ2 = dataOutQ2
        self.dataOutQ1 = dataOutQ1
        self.saveQ = saveQ

    def run(self):
        Acquisition(inlet, self.dataOutQ1, self.dataOutQ2, self.stopQ1, self.stopQ2, self.saveQ)

class P300Thread(threading.Thread):
    def __init__(self, dataInQ, featureQ, stopQ):
        threading.Thread.__init__(self)
        self.dataInQ = dataInQ
        self.featureQ = featureQ
        self.stopQ = stopQ

    def run(self):
        P300fun(self.dataInQ, self.featureQ, self.stopQ)

threadLock = threading.Lock()
SaveQ = queue.Queue()
DataOutQ1 = queue.Queue()
DataOutQ2 = queue.Queue()
StopQ1 = queue.Queue()
StopQ2 = queue.Queue()
FeatQ1 = queue.Queue()
StopQ1.put(0)
StopQ2.put(0)
#
AcqTh = AcqThread(DataOutQ1, DataOutQ2, StopQ1, StopQ2, SaveQ)
P300Th = P300Thread(DataOutQ1, FeatQ1, StopQ1)

def Acquisition(inlet, dataOutQ1, dataOutQ2, stopQ1, stopQ2, saveQ):
    i = 0
    print('Starting...')
    while i<1250: #i is the number of samples
        sample, timestamp = inlet.pull_sample() #samples coming in @ 250Hz
        ##Normalization, filtering##
        threadLock.acquire()
        dataOutQ1.put([filtsamples[:,-250:], rawtimestamps[-250:]]) #I only need the last 250 samples
        threadLock.release()
        i += 1

def P300fun(dataInQ, featureQ, stopQ):
    p300sample = []
    p300timestamp = []
    print(f"Is DataInQ size true? {DataOutQ1.qsize()}")
    print("Is dataInQ emtpy?", DataOutQ1.empty())
    while dataInQ.qsize(): #or while not dataqueue.empty():
        try:
            print("DataInQ has data")
            ss, ts = dataInQ.get(0) 
            print('<>P300\n>>Samples [', ss, ']\nTimestamp [', ts, ']')
        except Empty:
            return
    print('Thread Finished')

if __name__ == '__main__':
    print('Looking for an EEG stream...')
    streams = resolve_stream('type', 'EEG')
    inlet = StreamInlet(streams[0])
    print('Connected!\n')

    AcqTh.start()
    P300Th.start()

    AcqTh.join()
    P300Th.join()

    print("\n\n>>>DONE<<<\n\n")

그리고 출력 :

Looking for an EEG stream...
Connected!

Is DataInQ size true? 0
Starting...
Is dataInQ emtpy? True
Thread Finished

>>>DONE<<<

내 연구에서 질문 1 은 비슷한 문제를 제시하는 것처럼 보였지만 문제는 이미지 처리 부분에있는 것 같습니다 (그리고 multiprocessing패키지 를 사용합니다 ). 질문 2 에는 동시성 문제가있는 것 같습니다. 내 문제 일 수 있지만 문제로 변환하는 방법을 모르겠습니다. 내가 틀렸는 지 알려주세요. 질문 3 은 논증의 순서에 문제가 있었기 때문에 여기서는 적용 할 수 없다고 생각합니다.

어떻게해야합니까? 스레드 A 내에서 스레드 B를 반복적으로 호출해야합니까? 스레드 B에서 루프 또는 지연이 필요합니까? .join()부품에 문제가 있습니까? 가까운 장래에 더 많은 스레드를 추가해야하므로 먼저 두 개만 작업하는 방법을 알아내는 것이 좋습니다.

모든 도움이 감사합니다!

답변

mgmussi Nov 20 2020 at 19:02

멍청이가되는 것은 까다로울 수 있습니다. 그래서이 문제를 접할 수있는 다른 초보자를 돕기 위해 제 질문에 답하겠습니다.

글쎄요, 우선 먼저 : 아니오, 각 스레드는 한 번만 호출 될 수 있기 때문에 스레드 내에서 스레드를 반복적으로 호출 할 수 없습니다.

그러나 스레드가 종료되는 것을 방지하여 스레드가 계속 될 수있는 트리거를 기다리게하는 방법이 있습니다. 좀 더 조사한 끝에 스레드에 대한 이벤트를 만드는 방법이 있음을 보여주는 이 질문 을 발견 했습니다 . 문서는 여기 에서 찾을 수 있습니다 . 그리고 이것은 매우 간단합니다. 이벤트 객체는 플래그처럼 동작하며 set()(True를 나타냄) 또는 clear()(원래 값인 False를 나타냄 ) 일 수 있습니다 . 이벤트를 테스트하려면 is_set()부울 문제에 대한 메서드를 사용하거나 wait()타이머 대신 메서드를 사용할 수 있습니다 . 제 경우에는 사용할 대기열을 절약했습니다.

import threading
import queue
from queue import Empty
import numpy as np


class AcqThread(threading.Thread):
    def __init__(self, dataOutQ1, dataOutQ2, saveQ):
        threading.Thread.__init__(self)
        self.dataOutQ2 = dataOutQ2
        self.dataOutQ1 = dataOutQ1
        self.saveQ = saveQ

    def run(self):
        Acquisition(inlet, self.dataOutQ1, self.dataOutQ2, self.saveQ)

class P300Thread(threading.Thread):
    def __init__(self, dataInQ, featureQ):
        threading.Thread.__init__(self)
        self.dataInQ = dataInQ
        self.featureQ = featureQ

    def run(self):
        P300fun(self.dataInQ, self.featureQ)

threadLock = threading.Lock()
SaveQ = queue.Queue()
DataOutQ1 = queue.Queue()
DataOutQ2 = queue.Queue()
FeatQ1 = queue.Queue()
FeatQ2 = queue.Queue()

#NEW:: initializes Events
E = threading.Event()
EP300 = threading.Event()
#
AcqTh = AcqThread(DataOutQ1, DataOutQ2, SaveQ)
P300Th = P300Thread(DataOutQ1, FeatQ1)

그리고 스레드 B를 "반복적으로" "호출"할 수 있습니다. 활성화 된 동안 (이벤트 E로 인해) 첫 번째를 유지하고 이벤트 EP300이 설정된 경우에만 처리 ​​부분으로 들어가기 때문입니다. 그런 다음 프로세스가 완료되면 EP300이 지워집니다.

def Acquisition(inlet, dataOutQ1, dataOutQ2 saveQ):
    i = 0
    print('Starting...')
    while i<1250:
        sample, timestamp = inlet.pull_sample()
        ##Normalization, filtering##
        if _condition_:
            threadLock.acquire()
            dataOutQ1.put([filtsamples[:,-250:], rawtimestamps[-250:]])
            threadLock.release()
            EP300.set() #NEW:: allows the P300 function to collect data from queue
        i += 1
    E.set() #NEW:: flaggs end data collection

def P300fun(dataInQ, featureQ):
    p300sample = []
    p300timestamp = []
    while not E.is_set(): #NEW:: loop until collection is ended
        if EP300.is_set(): #NEW:: activated when Event is triggered
            while dataInQ.qsize():
                try:
                    print("DataInQ has data")
                    ss, ts = dataInQ.get(0) 
                    print('<>P300\n>>Samples [', ss, ']\nTimestamp [', ts, ']')
                except Empty:
                    return
        if not E.is_set(): #NEW:: Event is cleared in case data collection is not over, waiting for a new set()
            EP300.clear()
    print('Thread Finished')

if __name__ == '__main__':
    print('Looking for an EEG stream...')
    streams = resolve_stream('type', 'EEG')
    inlet = StreamInlet(streams[0])
    print('Connected!\n')

    AcqTh.start()
    P300Th.start()

    AcqTh.join()
    P300Th.join()

    print("\n\n>>>DONE<<<\n\n")