スレッドの相互通信

実生活では、人々のチームが共通のタスクに取り組んでいる場合、タスクを適切に完了するために彼らの間でコミュニケーションが必要です。同じアナロジーがスレッドにも当てはまります。プログラミングでは、プロセッサの理想的な時間を短縮するために、複数のスレッドを作成し、すべてのスレッドに異なるサブタスクを割り当てます。したがって、通信機能が必要であり、同期してジョブを完了するには、相互に対話する必要があります。

スレッドの相互通信に関連する次の重要な点を考慮してください-

  • No performance gain −スレッドとプロセス間の適切な通信を実現できない場合、並行性と並列性によるパフォーマンスの向上は役に立ちません。

  • Accomplish task properly −スレッド間の適切な相互通信メカニズムがないと、割り当てられたタスクを正しく完了できません。

  • More efficient than inter-process communication −プロセス内のすべてのスレッドは同じアドレス空間を共有し、共有メモリを使用する必要がないため、スレッド間通信はプロセス間通信よりも効率的で使いやすいです。

スレッドセーフな通信のためのPythonデータ構造

マルチスレッドコードでは、あるスレッドから別のスレッドに情報を渡すという問題が発生します。標準の通信プリミティブはこの問題を解決しません。したがって、スレッド間でオブジェクトを共有して通信をスレッドセーフにするために、独自の複合オブジェクトを実装する必要があります。以下は、いくつかの変更を加えた後、スレッドセーフな通信を提供するいくつかのデータ構造です。

セット

セットデータ構造をスレッドセーフな方法で使用するには、セットクラスを拡張して独自のロックメカニズムを実装する必要があります。

これはクラスを拡張するPythonの例です-

class extend_class(set):
   def __init__(self, *args, **kwargs):
      self._lock = Lock()
      super(extend_class, self).__init__(*args, **kwargs)

   def add(self, elem):
      self._lock.acquire()
	  try:
      super(extend_class, self).add(elem)
      finally:
      self._lock.release()
  
   def delete(self, elem):
      self._lock.acquire()
      try:
      super(extend_class, self).delete(elem)
      finally:
      self._lock.release()

上記の例では、 extend_class Pythonからさらに継承されたものが定義されています set class。このクラスのコンストラクター内にロックオブジェクトが作成されます。今、2つの機能があります-add() そして delete()。これらの関数は定義されており、スレッドセーフです。彼らは両方ともに依存していますsuper 1つの重要な例外を除いてクラス機能。

デコレータ

これは、スレッドセーフな通信のもう1つの重要な方法であり、デコレータを使用します。

デコレータの使用方法を示すPythonの例を考えてみましょう&mminus;

def lock_decorator(method):

   def new_deco_method(self, *args, **kwargs):
      with self._lock:
         return method(self, *args, **kwargs)
return new_deco_method

class Decorator_class(set):
   def __init__(self, *args, **kwargs):
      self._lock = Lock()
      super(Decorator_class, self).__init__(*args, **kwargs)

   @lock_decorator
   def add(self, *args, **kwargs):
      return super(Decorator_class, self).add(elem)
   @lock_decorator
   def delete(self, *args, **kwargs):
      return super(Decorator_class, self).delete(elem)

上記の例では、lock_decoratorという名前のデコレータメソッドが定義されており、Pythonメソッドクラスからさらに継承されています。次に、このクラスのコンストラクター内にロックオブジェクトが作成されます。現在、add()とdelete()の2つの関数があります。これらの関数は定義されており、スレッドセーフです。どちらも、1つの重要な例外を除いて、スーパークラスの機能に依存しています。

リスト

リストのデータ構造は、スレッドセーフで迅速かつ簡単な構造で、一時的なメモリ内ストレージに使用できます。Cpythonでは、GILはそれらへの同時アクセスから保護します。リストはスレッドセーフであることがわかったので、リストにあるデータについてはどうでしょうか。実際には、リストのデータは保護されていません。例えば、L.append(x)別のスレッドが同じことを行おうとしている場合、期待される結果を返すことは保証されません。これは、append() はアトミック操作でスレッドセーフですが、他のスレッドはリストのデータを同時に変更しようとしているため、出力に対する競合状態の副作用を確認できます。

この種の問題を解決し、データを安全に変更するには、適切なロックメカニズムを実装する必要があります。これにより、複数のスレッドが競合状態に陥ることがないようになります。適切なロックメカニズムを実装するために、前の例で行ったようにクラスを拡張できます。

リストに対する他のいくつかのアトミック操作は次のとおりです-

L.append(x)
L1.extend(L2)
x = L[i]
x = L.pop()
L1[i:j] = L2
L.sort()
x = y
x.field = y
D[x] = y
D1.update(D2)
D.keys()

ここで−

  • L、L1、L2はすべてリストです
  • D、D1、D2は口述です
  • x、yはオブジェクトです
  • i、jはintです

キュー

リストのデータが保護されていない場合、その結果に直面しなければならない可能性があります。競合状態の間違ったデータ項目を取得または削除する場合があります。そのため、キューデータ構造を使用することをお勧めします。キューの実際の例としては、車両が最初に進入し、最初に退出する単一車線の一方通行道路があります。切符売り場やバス停の列の実例をもっと見ることができます。

キューはデフォルトでスレッドセーフなデータ構造であり、複雑なロックメカニズムの実装について心配する必要はありません。Pythonは私たちに アプリケーションでさまざまなタイプのキューを使用するためのモジュール。

キューの種類

このセクションでは、さまざまな種類のキューについて説明します。Pythonには、から使用するキューの3つのオプションがあります。<queue> モジュール-

  • 通常のキュー(FIFO、先入れ先出し)
  • LIFO、後入れ先出し
  • Priority

後続のセクションで、さまざまなキューについて学習します。

通常のキュー(FIFO、先入れ先出し)

これは、Pythonが提供する最も一般的に使用されるキューの実装です。このキューイングメカニズムでは、最初に来る人は誰でも、最初にサービスを取得します。FIFOは通常のキューとも呼ばれます。FIFOキューは次のように表すことができます-

FIFOキューのPython実装

Pythonでは、FIFOキューはマルチスレッドだけでなくシングルスレッドでも実装できます。

シングルスレッドのFIFOキュー

シングルスレッドでFIFOキューを実装する場合、 Queueクラスは、基本的な先入れ先出しコンテナを実装します。要素は、を使用してシーケンスの1つの「終わり」に追加されます。put()、を使用してもう一方の端から削除 get()

以下は、シングルスレッドでFIFOキューを実装するためのPythonプログラムです-

import queue

q = queue.Queue()

for i in range(8):
   q.put("item-" + str(i))

while not q.empty():
   print (q.get(), end = " ")

出力

item-0 item-1 item-2 item-3 item-4 item-5 item-6 item-7

出力は、上記のプログラムが単一のスレッドを使用して、要素が挿入されたのと同じ順序でキューから削除されることを示しています。

複数のスレッドを持つFIFOキュー

複数のスレッドでFIFOを実装するには、キューモジュールから拡張されたmyqueue()関数を定義する必要があります。get()メソッドとput()メソッドの動作は、シングルスレッドでFIFOキューを実装する場合の上記と同じです。次に、マルチスレッドにするには、スレッドを宣言してインスタンス化する必要があります。これらのスレッドは、FIFO方式でキューを消費します。

以下は、複数のスレッドでFIFOキューを実装するためのPythonプログラムです。

import threading
import queue
import random
import time
def myqueue(queue):
   while not queue.empty():
   item = queue.get()
   if item is None:
   break
   print("{} removed {} from the queue".format(threading.current_thread(), item))
   queue.task_done()
   time.sleep(2)
q = queue.Queue()
for i in range(5):
   q.put(i)
threads = []
for i in range(4):
   thread = threading.Thread(target=myqueue, args=(q,))
   thread.start()
   threads.append(thread)
for thread in threads:
   thread.join()

出力

<Thread(Thread-3654, started 5044)> removed 0 from the queue
<Thread(Thread-3655, started 3144)> removed 1 from the queue
<Thread(Thread-3656, started 6996)> removed 2 from the queue
<Thread(Thread-3657, started 2672)> removed 3 from the queue
<Thread(Thread-3654, started 5044)> removed 4 from the queue

LIFO、後入れ先出しキュー

このキューは、FIFO(先入れ先出し)キューとはまったく逆のアナロジーを使用します。このキューイングメカニズムでは、最後に来た人が最初にサービスを取得します。これは、スタックデータ構造の実装に似ています。LIFOキューは、人工知能のアルゴリズムのような深さ優先探索を実装する際に役立ちます。

LIFOキューのPython実装

Pythonでは、LIFOキューはマルチスレッドだけでなくシングルスレッドでも実装できます。

シングルスレッドのLIFOキュー

シングルスレッドでLIFOキューを実装する場合、 Queue クラスは、構造を使用して基本的な後入れ先出しコンテナを実装します Queue.LifoQueue。さて、電話でput()、要素はコンテナのヘッドに追加され、使用時にもヘッドから削除されます get()

以下は、シングルスレッドでLIFOキューを実装するためのPythonプログラムです-

import queue

q = queue.LifoQueue()

for i in range(8):
   q.put("item-" + str(i))

while not q.empty():
   print (q.get(), end=" ")
Output:
item-7 item-6 item-5 item-4 item-3 item-2 item-1 item-0

出力は、上記のプログラムが単一のスレッドを使用して、要素が挿入されたのとは逆の順序でキューから削除されることを示しています。

複数のスレッドを持つLIFOキュー

実装は、複数のスレッドでFIFOキューの実装を行ったのと似ています。唯一の違いは、を使用する必要があるということですQueue 構造体を使用して基本的な後入れ先出しコンテナを実装するクラス Queue.LifoQueue

以下は、複数のスレッドでLIFOキューを実装するためのPythonプログラムです-

import threading
import queue
import random
import time
def myqueue(queue):
   while not queue.empty():
      item = queue.get()
      if item is None:
      break
	  print("{} removed {} from the queue".format(threading.current_thread(), item))
      queue.task_done()
      time.sleep(2)
q = queue.LifoQueue()
for i in range(5):
   q.put(i)
threads = []
for i in range(4):
   thread = threading.Thread(target=myqueue, args=(q,))
   thread.start()
   threads.append(thread)
for thread in threads:
   thread.join()

出力

<Thread(Thread-3882, started 4928)> removed 4 from the queue
<Thread(Thread-3883, started 4364)> removed 3 from the queue
<Thread(Thread-3884, started 6908)> removed 2 from the queue
<Thread(Thread-3885, started 3584)> removed 1 from the queue
<Thread(Thread-3882, started 4928)> removed 0 from the queue

優先キュー

FIFOおよびLIFOキューでは、アイテムの順序は挿入の順序に関連しています。ただし、挿入順序よりも優先度が重要な場合が多くあります。実際の例を考えてみましょう。空港のセキュリティがさまざまなカテゴリの人々をチェックしているとします。VVIPの人々、航空会社のスタッフ、税関職員、カテゴリーは、庶民のように到着に基づいてチェックされるのではなく、優先的にチェックされる場合があります。

優先キューで考慮する必要があるもう1つの重要な側面は、タスクスケジューラを開発する方法です。一般的な設計の1つは、キュー内で最も多くのエージェントタスクを優先度に基づいて処理することです。このデータ構造を使用して、優先度の値に基づいてキューからアイテムを取得できます。

優先度付きキューのPython実装

Pythonでは、優先キューはマルチスレッドだけでなくシングルスレッドでも実装できます。

シングルスレッドの優先キュー

シングルスレッドで優先キューを実装する場合、 Queue クラスは、構造を使用して優先コンテナにタスクを実装します Queue.PriorityQueue。さて、電話でput()、要素には、最小値が最高の優先度を持つ値が追加されるため、を使用して最初に取得されます。 get()

シングルスレッドで優先キューを実装するための次のPythonプログラムを検討してください-

import queue as Q
p_queue = Q.PriorityQueue()

p_queue.put((2, 'Urgent'))
p_queue.put((1, 'Most Urgent'))
p_queue.put((10, 'Nothing important'))
prio_queue.put((5, 'Important'))

while not p_queue.empty():
   item = p_queue.get()
   print('%s - %s' % item)

出力

1 – Most Urgent
2 - Urgent
5 - Important
10 – Nothing important

上記の出力では、キューが優先度に基づいてアイテムを格納していることがわかります。値が少ないほど優先度が高くなります。

マルチスレッドの優先キュー

実装は、複数のスレッドを持つFIFOおよびLIFOキューの実装に似ています。唯一の違いは、を使用する必要があるということですQueue 構造体を使用して優先度を初期化するためのクラス Queue.PriorityQueue。もう1つの違いは、キューの生成方法です。以下の例では、2つの同一のデータセットを使用して生成されます。

次のPythonプログラムは、複数のスレッドを使用した優先キューの実装に役立ちます-

import threading
import queue
import random
import time
def myqueue(queue):
   while not queue.empty():
      item = queue.get()
      if item is None:
      break
      print("{} removed {} from the queue".format(threading.current_thread(), item))
      queue.task_done()
      time.sleep(1)
q = queue.PriorityQueue()
for i in range(5):
   q.put(i,1)

for i in range(5):
   q.put(i,1)

threads = []
for i in range(2):
   thread = threading.Thread(target=myqueue, args=(q,))
   thread.start()
   threads.append(thread)
for thread in threads:
   thread.join()

出力

<Thread(Thread-4939, started 2420)> removed 0 from the queue
<Thread(Thread-4940, started 3284)> removed 0 from the queue
<Thread(Thread-4939, started 2420)> removed 1 from the queue
<Thread(Thread-4940, started 3284)> removed 1 from the queue
<Thread(Thread-4939, started 2420)> removed 2 from the queue
<Thread(Thread-4940, started 3284)> removed 2 from the queue
<Thread(Thread-4939, started 2420)> removed 3 from the queue
<Thread(Thread-4940, started 3284)> removed 3 from the queue
<Thread(Thread-4939, started 2420)> removed 4 from the queue
<Thread(Thread-4940, started 3284)> removed 4 from the queue