Đồng tiền trong Python - Nhóm chủ đề
Giả sử chúng ta phải tạo một số lượng lớn các luồng cho các tác vụ đa luồng của mình. Nó sẽ tốn kém nhất về mặt tính toán vì có thể có nhiều vấn đề về hiệu suất, do quá nhiều luồng. Một vấn đề lớn có thể là thông lượng bị hạn chế. Chúng ta có thể giải quyết vấn đề này bằng cách tạo một nhóm các chủ đề. Nhóm luồng có thể được định nghĩa là nhóm các luồng được khởi tạo trước và không hoạt động, sẵn sàng được giao công việc. Tạo nhóm luồng được ưu tiên hơn là tạo luồng mới cho mọi tác vụ khi chúng ta cần thực hiện một số lượng lớn tác vụ. Nhóm luồng có thể quản lý việc thực thi đồng thời số lượng lớn các luồng như sau:
Nếu một luồng trong nhóm luồng hoàn thành việc thực thi thì luồng đó có thể được sử dụng lại.
Nếu một luồng bị kết thúc, một luồng khác sẽ được tạo để thay thế luồng đó.
Mô-đun Python - Concurrent.futures
Thư viện chuẩn Python bao gồm concurrent.futuresmô-đun. Mô-đun này đã được thêm vào Python 3.2 để cung cấp cho các nhà phát triển một giao diện cấp cao để khởi chạy các tác vụ không đồng bộ. Nó là một lớp trừu tượng trên đầu các mô-đun phân luồng và đa xử lý của Python để cung cấp giao diện để chạy các tác vụ bằng cách sử dụng nhóm luồng hoặc quy trình.
Trong các phần tiếp theo, chúng ta sẽ tìm hiểu về các lớp khác nhau của mô-đun concurrent.futures.
Lớp thừa hành
Executorlà một lớp trừu tượng của concurrent.futuresMô-đun Python. Nó không thể được sử dụng trực tiếp và chúng tôi cần sử dụng một trong các lớp con bê tông sau:
- ThreadPoolExecutor
- ProcessPoolExecutor
ThreadPoolExecutor - Một lớp con Concrete
Nó là một trong những lớp con cụ thể của lớp Executor. Lớp con sử dụng đa luồng và chúng tôi nhận được một nhóm luồng để gửi các tác vụ. Nhóm này chỉ định các nhiệm vụ cho các luồng có sẵn và lên lịch chúng chạy.
Làm cách nào để tạo ThreadPoolExecutor?
Với sự giúp đỡ của concurrent.futures mô-đun và lớp con cụ thể của nó Executor, chúng ta có thể dễ dàng tạo một nhóm các chủ đề. Đối với điều này, chúng ta cần xây dựng mộtThreadPoolExecutorvới số lượng chủ đề chúng tôi muốn trong nhóm. Theo mặc định, con số là 5. Sau đó, chúng tôi có thể gửi một nhiệm vụ đến nhóm luồng. Khi nào chúng tasubmit() một nhiệm vụ, chúng tôi nhận lại một Future. Đối tượng Tương lai có một phương thức được gọi làdone(), cho biết liệu tương lai có được giải quyết hay không. Với điều này, một giá trị đã được đặt cho đối tượng tương lai cụ thể đó. Khi một nhiệm vụ kết thúc, trình thực thi nhóm luồng đặt giá trị cho đối tượng tương lai.
Thí dụ
from concurrent.futures import ThreadPoolExecutor
from time import sleep
def task(message):
sleep(2)
return message
def main():
executor = ThreadPoolExecutor(5)
future = executor.submit(task, ("Completed"))
print(future.done())
sleep(2)
print(future.done())
print(future.result())
if __name__ == '__main__':
main()
Đầu ra
False
True
Completed
Trong ví dụ trên, ThreadPoolExecutorđã được xây dựng với 5 chủ đề. Sau đó, một tác vụ, sẽ đợi 2 giây trước khi đưa ra thông báo, sẽ được gửi cho người thực thi nhóm luồng. Như đã thấy từ đầu ra, nhiệm vụ không hoàn thành cho đến 2 giây, vì vậy cuộc gọi đầu tiên đếndone()sẽ trả về Sai. Sau 2 giây, nhiệm vụ được hoàn thành và chúng tôi nhận được kết quả của tương lai bằng cách gọiresult() phương pháp trên đó.
Instantiating ThreadPoolExecutor - Trình quản lý ngữ cảnh
Một cách khác để khởi tạo ThreadPoolExecutorlà với sự trợ giúp của trình quản lý ngữ cảnh. Nó hoạt động tương tự như phương pháp được sử dụng trong ví dụ trên. Ưu điểm chính của việc sử dụng trình quản lý ngữ cảnh là nó trông tốt về mặt cú pháp. Việc khởi tạo có thể được thực hiện với sự trợ giúp của đoạn mã sau:
with ThreadPoolExecutor(max_workers = 5) as executor
Thí dụ
Ví dụ sau được mượn từ tài liệu Python. Trong ví dụ này, trước hếtconcurrent.futuresmô-đun phải được nhập. Sau đó, một hàm có tênload_url()được tạo sẽ tải url được yêu cầu. Sau đó, hàm tạoThreadPoolExecutorvới 5 chủ đề trong hồ bơi. CácThreadPoolExecutorđã được sử dụng làm trình quản lý ngữ cảnh. Chúng ta có thể nhận được kết quả của tương lai bằng cách gọiresult() phương pháp trên đó.
import concurrent.futures
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout = timeout) as conn:
return conn.read()
with concurrent.futures.ThreadPoolExecutor(max_workers = 5) as executor:
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
Đầu ra
Sau đây sẽ là đầu ra của tập lệnh Python ở trên:
'http://some-made-up-domain.com/' generated an exception: <urlopen error [Errno 11004] getaddrinfo failed>
'http://www.foxnews.com/' page is 229313 bytes
'http://www.cnn.com/' page is 168933 bytes
'http://www.bbc.co.uk/' page is 283893 bytes
'http://europe.wsj.com/' page is 938109 bytes
Sử dụng hàm Executor.map ()
Con trăn map()chức năng được sử dụng rộng rãi trong một số nhiệm vụ. Một trong những nhiệm vụ như vậy là áp dụng một hàm nhất định cho mọi phần tử trong các vòng lặp. Tương tự, chúng ta có thể ánh xạ tất cả các phần tử của một trình vòng lặp tới một hàm và gửi chúng dưới dạng các công việc độc lậpThreadPoolExecutor. Hãy xem xét ví dụ sau về tập lệnh Python để hiểu cách hoạt động của hàm.
Thí dụ
Trong ví dụ dưới đây, hàm bản đồ được sử dụng để áp dụng square() hàm cho mọi giá trị trong mảng giá trị.
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
values = [2,3,4,5]
def square(n):
return n * n
def main():
with ThreadPoolExecutor(max_workers = 3) as executor:
results = executor.map(square, values)
for result in results:
print(result)
if __name__ == '__main__':
main()
Đầu ra
Tập lệnh Python ở trên tạo ra kết quả sau:
4
9
16
25