Concurrency dengan Python - Kumpulan Proses

Kumpulan proses dapat dibuat dan digunakan dengan cara yang sama seperti yang kita buat dan gunakan kumpulan utas. Kumpulan proses dapat didefinisikan sebagai grup proses yang telah dibuat sebelumnya dan tidak aktif, yang siap untuk diberi pekerjaan. Membuat kumpulan proses lebih disukai daripada membuat contoh proses baru untuk setiap tugas ketika kita perlu melakukan banyak tugas.

Modul Python - Concurrent.futures

Perpustakaan standar Python memiliki modul yang disebut concurrent.futures. Modul ini ditambahkan dengan Python 3.2 untuk menyediakan antarmuka tingkat tinggi bagi pengembang untuk meluncurkan tugas-tugas asinkron. Ini adalah lapisan abstraksi di atas modul threading dan multiprocessing Python untuk menyediakan antarmuka untuk menjalankan tugas menggunakan kumpulan utas atau proses.

Di bagian selanjutnya, kita akan melihat subclass berbeda dari modul concurrent.futures.

Kelas Pelaksana

Executor adalah kelas abstrak dari concurrent.futuresModul Python. Itu tidak dapat digunakan secara langsung dan kita perlu menggunakan salah satu dari subclass konkret berikut -

  • ThreadPoolExecutor
  • ProcessPoolExecutor

ProcessPoolExecutor - Sebuah subclass konkret

Ini adalah salah satu subclass konkret dari kelas Executor. Ini menggunakan multi-pemrosesan dan kami mendapatkan kumpulan proses untuk mengirimkan tugas. Kumpulan ini menetapkan tugas ke proses yang tersedia dan menjadwalkannya untuk dijalankan.

Bagaimana cara membuat ProcessPoolExecutor?

Dengan bantuan concurrent.futures modul dan subkelas betonnya Executor, kita dapat dengan mudah membuat kumpulan proses. Untuk ini, kita perlu membangun fileProcessPoolExecutordengan jumlah proses yang kita inginkan di kumpulan. Secara default, jumlahnya adalah 5. Ini diikuti dengan mengirimkan tugas ke pool proses.

Contoh

Kami sekarang akan mempertimbangkan contoh yang sama yang kami gunakan saat membuat kumpulan utas, satu-satunya perbedaan adalah bahwa sekarang kami akan menggunakannya ProcessPoolExecutor dari pada ThreadPoolExecutor .

from concurrent.futures import ProcessPoolExecutor
from time import sleep
def task(message):
   sleep(2)
   return message

def main():
   executor = ProcessPoolExecutor(5)
   future = executor.submit(task, ("Completed"))
   print(future.done())
   sleep(2)
   print(future.done())
   print(future.result())
if __name__ == '__main__':
main()

Keluaran

False
False
Completed

Dalam contoh di atas, ProsesPoolExecutortelah dibuat dengan 5 utas. Kemudian sebuah tugas, yang akan menunggu selama 2 detik sebelum memberikan pesan, dikirimkan ke pelaksana kumpulan proses. Dilihat dari keluarannya, tugas tidak selesai sampai 2 detik, begitu juga panggilan pertama kedone()akan mengembalikan False. Setelah 2 detik, tugas selesai dan kita mendapatkan hasil di masa mendatang dengan memanggilresult() metode di atasnya.

Instantiating ProcessPoolExecutor - Manajer Konteks

Cara lain untuk membuat instance ProcessPoolExecutor adalah dengan bantuan manajer konteks. Cara kerjanya mirip dengan metode yang digunakan dalam contoh di atas. Keuntungan utama menggunakan pengelola konteks adalah tampilannya bagus secara sintaksis. Instansiasi dapat dilakukan dengan bantuan kode berikut -

with ProcessPoolExecutor(max_workers = 5) as executor

Contoh

Untuk pemahaman yang lebih baik, kami mengambil contoh yang sama seperti yang digunakan saat membuat kumpulan utas. Dalam contoh ini, kita perlu mulai dengan mengimpor fileconcurrent.futuresmodul. Kemudian sebuah fungsi bernamaload_url()dibuat yang akan memuat url yang diminta. ItuProcessPoolExecutorkemudian dibuat dengan 5 jumlah utas di kumpulan. ProsesPoolExecutortelah digunakan sebagai manajer konteks. Kita bisa mendapatkan hasil masa depan dengan memanggilresult() metode di atasnya.

import concurrent.futures
from concurrent.futures import ProcessPoolExecutor
import urllib.request

URLS = ['http://www.foxnews.com/',
   'http://www.cnn.com/',
   'http://europe.wsj.com/',
   'http://www.bbc.co.uk/',
   'http://some-made-up-domain.com/']

def load_url(url, timeout):
   with urllib.request.urlopen(url, timeout = timeout) as conn:
      return conn.read()

def main():
   with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
      future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
      for future in concurrent.futures.as_completed(future_to_url):
      url = future_to_url[future]
      try:
         data = future.result()
      except Exception as exc:
         print('%r generated an exception: %s' % (url, exc))
      else:
         print('%r page is %d bytes' % (url, len(data)))

if __name__ == '__main__':
   main()

Keluaran

Skrip Python di atas akan menghasilkan output berikut -

'http://some-made-up-domain.com/' generated an exception: <urlopen error [Errno 11004] getaddrinfo failed>
'http://www.foxnews.com/' page is 229476 bytes
'http://www.cnn.com/' page is 165323 bytes
'http://www.bbc.co.uk/' page is 284981 bytes
'http://europe.wsj.com/' page is 967575 bytes

Penggunaan fungsi Executor.map ()

Python map()fungsi banyak digunakan untuk melakukan sejumlah tugas. Salah satu tugas tersebut adalah menerapkan fungsi tertentu ke setiap elemen dalam iterable. Demikian pula, kita dapat memetakan semua elemen iterator ke suatu fungsi dan mengirimkannya sebagai tugas independen keProcessPoolExecutor. Perhatikan contoh skrip Python berikut untuk memahami ini.

Contoh

Kami akan mempertimbangkan contoh yang sama yang kami gunakan saat membuat kumpulan utas menggunakan Executor.map()fungsi. Dalam contoh yang diberikan di bawah ini, fungsi peta digunakan untuk diterapkansquare() berfungsi untuk setiap nilai dalam larik nilai.

from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed
values = [2,3,4,5]
def square(n):
   return n * n
def main():
   with ProcessPoolExecutor(max_workers = 3) as executor:
      results = executor.map(square, values)
   for result in results:
      print(result)
if __name__ == '__main__':
   main()

Keluaran

Skrip Python di atas akan menghasilkan keluaran sebagai berikut

4
9
16
25

Kapan menggunakan ProcessPoolExecutor dan ThreadPoolExecutor?

Sekarang kita telah mempelajari tentang kedua kelas Executor - ThreadPoolExecutor dan ProcessPoolExecutor, kita perlu tahu kapan harus menggunakan eksekutor mana. Kita perlu memilih ProcessPoolExecutor jika beban kerja terikat CPU dan ThreadPoolExecutor jika beban kerja terikat I / O.

Jika kami menggunakan ProcessPoolExecutor, maka kita tidak perlu khawatir dengan GIL karena menggunakan multiprocessing. Apalagi waktu eksekusinya akan lebih sedikit jika dibandingkanThreadPoolExecution. Pertimbangkan contoh skrip Python berikut untuk memahami ini.

Contoh

import time
import concurrent.futures

value = [8000000, 7000000]

def counting(n):
   start = time.time()
   while n > 0:
      n -= 1
   return time.time() - start

def main():
   start = time.time()
   with concurrent.futures.ProcessPoolExecutor() as executor:
      for number, time_taken in zip(value, executor.map(counting, value)):
         print('Start: {} Time taken: {}'.format(number, time_taken))
   print('Total time taken: {}'.format(time.time() - start))

if __name__ == '__main__':
main()

Keluaran

Start: 8000000 Time taken: 1.5509998798370361
Start: 7000000 Time taken: 1.3259999752044678
Total time taken: 2.0840001106262207

Example- Python script with ThreadPoolExecutor:
import time
import concurrent.futures

value = [8000000, 7000000]

def counting(n):
   start = time.time()
   while n > 0:
      n -= 1
   return time.time() - start

def main():
   start = time.time()
   with concurrent.futures.ThreadPoolExecutor() as executor:
      for number, time_taken in zip(value, executor.map(counting, value)):
         print('Start: {} Time taken: {}'.format(number, time_taken))
      print('Total time taken: {}'.format(time.time() - start))

if __name__ == '__main__':
main()

Keluaran

Start: 8000000 Time taken: 3.8420000076293945
Start: 7000000 Time taken: 3.6010000705718994
Total time taken: 3.8480000495910645

Dari keluaran kedua program di atas, kita dapat melihat perbedaan waktu eksekusi saat menggunakan ProcessPoolExecutor dan ThreadPoolExecutor.