Intercomunicação de Threads

Na vida real, se uma equipe de pessoas está trabalhando em uma tarefa comum, deve haver comunicação entre eles para concluir a tarefa corretamente. A mesma analogia também se aplica a threads. Na programação, para reduzir o tempo ideal do processador, criamos vários threads e atribuímos diferentes subtarefas a cada thread. Portanto, deve haver uma facilidade de comunicação e eles devem interagir uns com os outros para terminar o trabalho de forma sincronizada.

Considere os seguintes pontos importantes relacionados à intercomunicação de thread -

  • No performance gain - Se não conseguirmos uma comunicação adequada entre threads e processos, os ganhos de desempenho com a simultaneidade e o paralelismo não terão utilidade.

  • Accomplish task properly - Sem mecanismo de intercomunicação adequado entre threads, a tarefa atribuída não pode ser concluída corretamente.

  • More efficient than inter-process communication - A comunicação entre threads é mais eficiente e fácil de usar do que a comunicação entre processos porque todos os threads de um processo compartilham o mesmo espaço de endereço e não precisam usar memória compartilhada.

Estruturas de dados Python para comunicação thread-safe

O código multithread surge com o problema de passar informações de um thread para outro. As primitivas de comunicação padrão não resolvem esse problema. Portanto, precisamos implementar nosso próprio objeto composto para compartilhar objetos entre threads para tornar a comunicação segura para threads. A seguir estão algumas estruturas de dados, que fornecem comunicação thread-safe depois de fazer algumas alterações nelas -

Jogos

Para usar a estrutura de dados definida de uma maneira segura para thread, precisamos estender a classe definida para implementar nosso próprio mecanismo de bloqueio.

Exemplo

Aqui está um exemplo Python de extensão da classe -

class extend_class(set):
   def __init__(self, *args, **kwargs):
      self._lock = Lock()
      super(extend_class, self).__init__(*args, **kwargs)

   def add(self, elem):
      self._lock.acquire()
	  try:
      super(extend_class, self).add(elem)
      finally:
      self._lock.release()
  
   def delete(self, elem):
      self._lock.acquire()
      try:
      super(extend_class, self).delete(elem)
      finally:
      self._lock.release()

No exemplo acima, um objeto de classe chamado extend_class foi definido, o que é posteriormente herdado do Python set class. Um objeto de bloqueio é criado dentro do construtor desta classe. Agora, existem duas funções -add() e delete(). Essas funções são definidas e são thread-safe. Ambos contam com osuper funcionalidade de classe com uma exceção chave.

Decorador

Este é outro método importante para a comunicação thread-safe é o uso de decoradores.

Exemplo

Considere um exemplo de Python que mostra como usar decoradores & mminus;

def lock_decorator(method):

   def new_deco_method(self, *args, **kwargs):
      with self._lock:
         return method(self, *args, **kwargs)
return new_deco_method

class Decorator_class(set):
   def __init__(self, *args, **kwargs):
      self._lock = Lock()
      super(Decorator_class, self).__init__(*args, **kwargs)

   @lock_decorator
   def add(self, *args, **kwargs):
      return super(Decorator_class, self).add(elem)
   @lock_decorator
   def delete(self, *args, **kwargs):
      return super(Decorator_class, self).delete(elem)

No exemplo acima, um método decorador denominado lock_decorator foi definido, o qual é posteriormente herdado da classe de método Python. Em seguida, um objeto de bloqueio é criado dentro do construtor desta classe. Agora, existem duas funções - add () e delete (). Essas funções são definidas e são thread-safe. Ambos contam com a funcionalidade de superclasse com uma exceção chave.

Listas

A estrutura de dados da lista é thread-safe, rápida e fácil para armazenamento temporário na memória. No Cpython, o GIL protege contra acesso simultâneo a eles. Como vimos, essas listas são seguras para threads, mas e os dados que estão nelas. Na verdade, os dados da lista não estão protegidos. Por exemplo,L.append(x)não é garantia de retornar o resultado esperado se outro encadeamento estiver tentando fazer a mesma coisa. Isso ocorre porque, emboraappend() é uma operação atômica e segura para threads, mas a outra thread está tentando modificar os dados da lista de maneira simultânea, portanto, podemos ver os efeitos colaterais das condições de corrida na saída.

Para resolver esse tipo de problema e modificar os dados com segurança, devemos implementar um mecanismo de bloqueio adequado, o que garante ainda que vários threads não possam entrar em condições de corrida. Para implementar o mecanismo de bloqueio adequado, podemos estender a classe como fizemos nos exemplos anteriores.

Algumas outras operações atômicas nas listas são as seguintes -

L.append(x)
L1.extend(L2)
x = L[i]
x = L.pop()
L1[i:j] = L2
L.sort()
x = y
x.field = y
D[x] = y
D1.update(D2)
D.keys()

Aqui -

  • L, L1, L2 são todas listas
  • D, D1, D2 são dictos
  • x, y são objetos
  • eu, j são ints

Filas

Se os dados da lista não estiverem protegidos, talvez tenhamos que enfrentar as consequências. Podemos obter ou excluir itens de dados errados das condições de corrida. É por isso que é recomendável usar a estrutura de dados da fila. Um exemplo do mundo real de fila pode ser uma estrada de mão única de faixa única, onde o veículo entra primeiro e sai primeiro. Mais exemplos do mundo real podem ser vistos das filas nos guichês e pontos de ônibus.

As filas são, por padrão, estrutura de dados thread-safe e não precisamos nos preocupar com a implementação de um mecanismo de bloqueio complexo. Python nos fornece o módulo para usar diferentes tipos de filas em nosso aplicativo.

Tipos de filas

Nesta seção, vamos aprender sobre os diferentes tipos de filas. Python oferece três opções de filas para usar a partir do<queue> módulo -

  • Filas normais (FIFO, primeiro a entrar, primeiro a sair)
  • UEPS, último a entrar, primeiro a sair
  • Priority

Aprenderemos sobre as diferentes filas nas seções subsequentes.

Filas normais (FIFO, primeiro a entrar, primeiro a sair)

São as implementações de fila mais comumente usadas oferecidas pelo Python. Nesse mecanismo de enfileiramento, quem vier primeiro, obterá o serviço primeiro. FIFO também é chamado de filas normais. As filas FIFO podem ser representadas da seguinte forma -

Implementação Python da Fila FIFO

Em python, a fila FIFO pode ser implementada com thread único, bem como multithreads.

Fila FIFO com thread único

Para implementar a fila FIFO com thread único, o Queueclasse implementará um contêiner básico primeiro a entrar, primeiro a sair. Os elementos serão adicionados a uma "extremidade" da sequência usandoput(), e removido da outra extremidade usando get().

Exemplo

A seguir está um programa Python para implementação de fila FIFO com thread único -

import queue

q = queue.Queue()

for i in range(8):
   q.put("item-" + str(i))

while not q.empty():
   print (q.get(), end = " ")

Resultado

item-0 item-1 item-2 item-3 item-4 item-5 item-6 item-7

A saída mostra que o programa acima usa um único thread para ilustrar que os elementos são removidos da fila na mesma ordem em que são inseridos.

Fila FIFO com vários threads

Para implementar FIFO com vários threads, precisamos definir a função myqueue (), que é estendida do módulo de fila. O funcionamento dos métodos get () e put () são os mesmos discutidos acima durante a implementação da fila FIFO com uma única thread. Então, para torná-lo multithread, precisamos declarar e instanciar os threads. Esses encadeamentos irão consumir a fila de maneira FIFO.

Exemplo

A seguir está um programa Python para implementação de fila FIFO com vários threads

import threading
import queue
import random
import time
def myqueue(queue):
   while not queue.empty():
   item = queue.get()
   if item is None:
   break
   print("{} removed {} from the queue".format(threading.current_thread(), item))
   queue.task_done()
   time.sleep(2)
q = queue.Queue()
for i in range(5):
   q.put(i)
threads = []
for i in range(4):
   thread = threading.Thread(target=myqueue, args=(q,))
   thread.start()
   threads.append(thread)
for thread in threads:
   thread.join()

Resultado

<Thread(Thread-3654, started 5044)> removed 0 from the queue
<Thread(Thread-3655, started 3144)> removed 1 from the queue
<Thread(Thread-3656, started 6996)> removed 2 from the queue
<Thread(Thread-3657, started 2672)> removed 3 from the queue
<Thread(Thread-3654, started 5044)> removed 4 from the queue

UEPS, fila de último a entrar, primeiro a sair

Essa fila usa uma analogia totalmente oposta às filas FIFO (First in First Out). Nesse mecanismo de enfileiramento, quem chegar por último receberá o serviço primeiro. Isso é semelhante para implementar a estrutura de dados da pilha. As filas LIFO são úteis ao implementar algoritmos de inteligência artificial como a busca em profundidade.

Implementação Python da fila LIFO

Em python, a fila LIFO pode ser implementada com thread único, bem como multithreads.

Fila LIFO com thread único

Para implementar a fila LIFO com thread único, o Queue classe implementará um contêiner básico último a entrar, primeiro a sair usando a estrutura Queue.LifoQueue. Agora, ao ligarput(), os elementos são adicionados na cabeça do recipiente e removidos da cabeça também ao usar get().

Exemplo

A seguir está um programa Python para implementação da fila LIFO com thread único -

import queue

q = queue.LifoQueue()

for i in range(8):
   q.put("item-" + str(i))

while not q.empty():
   print (q.get(), end=" ")
Output:
item-7 item-6 item-5 item-4 item-3 item-2 item-1 item-0

A saída mostra que o programa acima usa um único thread para ilustrar que os elementos são removidos da fila na ordem oposta em que são inseridos.

Fila LIFO com vários threads

A implementação é semelhante, pois fizemos a implementação de filas FIFO com vários threads. A única diferença é que precisamos usar oQueue classe que implementará um contêiner básico último a entrar, primeiro a sair usando a estrutura Queue.LifoQueue.

Exemplo

A seguir está um programa Python para implementação de fila LIFO com vários threads -

import threading
import queue
import random
import time
def myqueue(queue):
   while not queue.empty():
      item = queue.get()
      if item is None:
      break
	  print("{} removed {} from the queue".format(threading.current_thread(), item))
      queue.task_done()
      time.sleep(2)
q = queue.LifoQueue()
for i in range(5):
   q.put(i)
threads = []
for i in range(4):
   thread = threading.Thread(target=myqueue, args=(q,))
   thread.start()
   threads.append(thread)
for thread in threads:
   thread.join()

Resultado

<Thread(Thread-3882, started 4928)> removed 4 from the queue
<Thread(Thread-3883, started 4364)> removed 3 from the queue
<Thread(Thread-3884, started 6908)> removed 2 from the queue
<Thread(Thread-3885, started 3584)> removed 1 from the queue
<Thread(Thread-3882, started 4928)> removed 0 from the queue

Fila de prioridade

Nas filas FIFO e LIFO, a ordem dos itens está relacionada à ordem de inserção. No entanto, existem muitos casos em que a prioridade é mais importante do que a ordem de inserção. Vamos considerar um exemplo do mundo real. Suponha que a segurança do aeroporto esteja verificando pessoas de diferentes categorias. Pessoas do VVIP, pessoal da companhia aérea, oficial de alfândega, categorias podem ser verificadas com prioridade em vez de serem verificadas com base na chegada, como acontece com os plebeus.

Outro aspecto importante que precisa ser considerado para fila de prioridade é como desenvolver um agendador de tarefas. Um design comum é atender a maioria das tarefas do agente com base na prioridade na fila. Essa estrutura de dados pode ser usada para selecionar os itens da fila com base em seu valor de prioridade.

Implementação Python da fila prioritária

Em python, a fila de prioridade pode ser implementada com thread único, bem como multithreads.

Fila de prioridade com thread único

Para implementar a fila de prioridade com um único thread, o Queue classe irá implementar uma tarefa no recipiente de prioridade usando a estrutura Queue.Fila de prioridade. Agora, ao ligarput(), os elementos são adicionados com um valor em que o valor mais baixo terá a prioridade mais alta e, portanto, recuperado primeiro usando get().

Exemplo

Considere o seguinte programa Python para implementação da fila Priority com thread único -

import queue as Q
p_queue = Q.PriorityQueue()

p_queue.put((2, 'Urgent'))
p_queue.put((1, 'Most Urgent'))
p_queue.put((10, 'Nothing important'))
prio_queue.put((5, 'Important'))

while not p_queue.empty():
   item = p_queue.get()
   print('%s - %s' % item)

Resultado

1 – Most Urgent
2 - Urgent
5 - Important
10 – Nothing important

Na saída acima, podemos ver que a fila armazenou os itens com base na prioridade - o valor menos prioritário é o de alta prioridade.

Fila de prioridade com vários threads

A implementação é semelhante à implementação de filas FIFO e LIFO com vários threads. A única diferença é que precisamos usar oQueue classe para inicializar a prioridade usando a estrutura Queue.PriorityQueue. Outra diferença está na forma como a fila seria gerada. No exemplo abaixo, ele será gerado com dois conjuntos de dados idênticos.

Exemplo

O programa Python a seguir ajuda na implementação da fila de prioridade com vários threads -

import threading
import queue
import random
import time
def myqueue(queue):
   while not queue.empty():
      item = queue.get()
      if item is None:
      break
      print("{} removed {} from the queue".format(threading.current_thread(), item))
      queue.task_done()
      time.sleep(1)
q = queue.PriorityQueue()
for i in range(5):
   q.put(i,1)

for i in range(5):
   q.put(i,1)

threads = []
for i in range(2):
   thread = threading.Thread(target=myqueue, args=(q,))
   thread.start()
   threads.append(thread)
for thread in threads:
   thread.join()

Resultado

<Thread(Thread-4939, started 2420)> removed 0 from the queue
<Thread(Thread-4940, started 3284)> removed 0 from the queue
<Thread(Thread-4939, started 2420)> removed 1 from the queue
<Thread(Thread-4940, started 3284)> removed 1 from the queue
<Thread(Thread-4939, started 2420)> removed 2 from the queue
<Thread(Thread-4940, started 3284)> removed 2 from the queue
<Thread(Thread-4939, started 2420)> removed 3 from the queue
<Thread(Thread-4940, started 3284)> removed 3 from the queue
<Thread(Thread-4939, started 2420)> removed 4 from the queue
<Thread(Thread-4940, started 3284)> removed 4 from the queue