Współbieżność w Pythonie - pula wątków

Załóżmy, że musieliśmy utworzyć dużą liczbę wątków dla naszych zadań wielowątkowych. Byłoby to najbardziej kosztowne obliczeniowo, ponieważ może wystąpić wiele problemów z wydajnością z powodu zbyt wielu wątków. Głównym problemem może być ograniczenie przepustowości. Możemy rozwiązać ten problem, tworząc pulę wątków. Pula wątków może być zdefiniowana jako grupa wstępnie utworzonych i bezczynnych wątków, które są gotowe do wykonania pracy. Tworzenie puli wątków jest preferowane zamiast tworzenia wystąpienia nowych wątków dla każdego zadania, gdy potrzebujemy wykonać dużą liczbę zadań. Pula wątków może zarządzać współbieżnym wykonywaniem dużej liczby wątków w następujący sposób -

  • Jeśli wątek w puli wątków zakończy wykonywanie, można go ponownie wykorzystać.

  • Jeśli wątek zostanie zakończony, zostanie utworzony inny wątek, który zastąpi ten wątek.

Moduł Pythona - Concurrent.futures

Biblioteka standardowa Pythona zawiera rozszerzenie concurrent.futuresmoduł. Ten moduł został dodany w Pythonie 3.2, aby zapewnić programistom interfejs wysokiego poziomu do uruchamiania zadań asynchronicznych. Jest to warstwa abstrakcji znajdująca się na wierzchu modułów obsługi wątków i wieloprocesorowych Pythona, zapewniająca interfejs do uruchamiania zadań przy użyciu puli wątków lub procesów.

W kolejnych sekcjach dowiemy się o różnych klasach modułu concurrent.futures.

Klasa egzekutora

Executorjest abstrakcyjną klasą klasy concurrent.futuresModuł Pythona. Nie można go używać bezpośrednio i musimy użyć jednej z następujących konkretnych podklas -

  • ThreadPoolExecutor
  • ProcessPoolExecutor

ThreadPoolExecutor - podklasa betonu

Jest to jedna z konkretnych podklas klasy Executor. Podklasa korzysta z wielowątkowości i otrzymujemy pulę wątków do przesyłania zadań. Ta pula przypisuje zadania do dostępnych wątków i planuje ich wykonanie.

Jak stworzyć ThreadPoolExecutor?

Z pomocą concurrent.futures moduł i jego konkretną podklasę Executor, możemy łatwo stworzyć pulę wątków. W tym celu musimy skonstruować plikThreadPoolExecutorz liczbą wątków, które chcemy w puli. Domyślnie jest to liczba 5. Następnie możemy przesłać zadanie do puli wątków. Kiedy mysubmit() zadanie, otrzymujemy z powrotem Future. Obiekt Future ma metodę o nazwiedone(), który mówi, czy przyszłość się rozwiązała. W ten sposób została ustawiona wartość dla tego konkretnego przyszłego obiektu. Po zakończeniu zadania moduł wykonawczy puli wątków ustawia wartość na przyszły obiekt.

Przykład

from concurrent.futures import ThreadPoolExecutor
from time import sleep
def task(message):
   sleep(2)
   return message

def main():
   executor = ThreadPoolExecutor(5)
   future = executor.submit(task, ("Completed"))
   print(future.done())
   sleep(2)
   print(future.done())
   print(future.result())
if __name__ == '__main__':
main()

Wynik

False
True
Completed

W powyższym przykładzie a ThreadPoolExecutorzostał zbudowany z 5 wątków. Następnie zadanie, które będzie czekało 2 sekundy przed przekazaniem komunikatu, jest przesyłane do modułu wykonawczego puli wątków. Jak widać z danych wyjściowych, zadanie nie kończy się przed upływem 2 sekund, więc pierwsze wywołaniedone()zwróci wartość False. Po 2 sekundach zadanie jest wykonane, a wynik na przyszłość uzyskujemy dzwoniąc doresult() metoda na nim.

Tworzenie wystąpienia ThreadPoolExecutor - Menedżer kontekstu

Inny sposób na utworzenie wystąpienia ThreadPoolExecutorodbywa się z pomocą menedżera kontekstu. Działa podobnie do metody zastosowanej w powyższym przykładzie. Główną zaletą używania menedżera kontekstu jest to, że wygląda dobrze składniowo. Tworzenie instancji można wykonać za pomocą następującego kodu -

with ThreadPoolExecutor(max_workers = 5) as executor

Przykład

Poniższy przykład został zapożyczony z dokumentacji Pythona. W tym przykładzie przede wszystkimconcurrent.futuresmoduł musi zostać zaimportowany. Następnie funkcja o nazwieload_url()zostanie utworzony, który załaduje żądany adres URL. Następnie funkcja tworzyThreadPoolExecutor z 5 wątkami w basenie. PlikThreadPoolExecutorzostał wykorzystany jako menedżer kontekstu. Możemy uzyskać wynik w przyszłości, dzwoniąc doresult() metoda na nim.

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
   'http://www.cnn.com/',
   'http://europe.wsj.com/',
   'http://www.bbc.co.uk/',
   'http://some-made-up-domain.com/']

def load_url(url, timeout):
   with urllib.request.urlopen(url, timeout = timeout) as conn:
   return conn.read()

with concurrent.futures.ThreadPoolExecutor(max_workers = 5) as executor:

   future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
   for future in concurrent.futures.as_completed(future_to_url):
   url = future_to_url[future]
   try:
      data = future.result()
   except Exception as exc:
      print('%r generated an exception: %s' % (url, exc))
   else:
      print('%r page is %d bytes' % (url, len(data)))

Wynik

Oto wynik powyższego skryptu w Pythonie -

'http://some-made-up-domain.com/' generated an exception: <urlopen error [Errno 11004] getaddrinfo failed>
'http://www.foxnews.com/' page is 229313 bytes
'http://www.cnn.com/' page is 168933 bytes
'http://www.bbc.co.uk/' page is 283893 bytes
'http://europe.wsj.com/' page is 938109 bytes

Użycie funkcji Executor.map ()

Python map()funkcja jest szeroko stosowana w wielu zadaniach. Jednym z takich zadań jest zastosowanie określonej funkcji do każdego elementu w elementach iterable. Podobnie, możemy odwzorować wszystkie elementy iteratora na funkcję i przesłać je jako niezależne zadaniaThreadPoolExecutor. Rozważ następujący przykład skryptu w języku Python, aby zrozumieć, jak działa ta funkcja.

Przykład

W poniższym przykładzie funkcja mapy jest używana do zastosowania rozszerzenia square() funkcji do każdej wartości w tablicy wartości.

from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
values = [2,3,4,5]
def square(n):
   return n * n
def main():
   with ThreadPoolExecutor(max_workers = 3) as executor:
      results = executor.map(square, values)
for result in results:
      print(result)
if __name__ == '__main__':
   main()

Wynik

Powyższy skrypt w języku Python generuje następujące dane wyjściowe -

4
9
16
25