การประมวลผลหลายขั้นตอนใน python - สิ่งที่ได้รับสืบทอดมาจากกระบวนการ forkserver จากกระบวนการหลัก

Aug 15 2020

ฉันพยายามใช้forkserverและพบNameError: name 'xxx' is not definedในกระบวนการของผู้ปฏิบัติงาน

ฉันใช้ Python 3.6.4 แต่เอกสารควรเหมือนกันจาก https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods มันบอกว่า:

กระบวนการส้อมเซิร์ฟเวอร์เป็นเธรดเดียวดังนั้นจึงปลอดภัยสำหรับการใช้ os.fork () ไม่มีการสืบทอดทรัพยากรที่ไม่จำเป็น

นอกจากนี้ยังระบุว่า:

ดีกว่าที่จะรับมรดกดีกว่าของดอง / ไม่ดอง

เมื่อใช้เมธอด spawn หรือforkserver start หลายประเภทจากการประมวลผลหลายขั้นตอนจำเป็นต้องสามารถเลือกได้เพื่อให้โปรเซสย่อยสามารถใช้งานได้ อย่างไรก็ตามโดยทั่วไปควรหลีกเลี่ยงการส่งวัตถุที่ใช้ร่วมกันไปยังกระบวนการอื่นโดยใช้ไปป์หรือคิว แต่คุณควรจัดเรียงโปรแกรมเพื่อให้กระบวนการที่ต้องการเข้าถึงทรัพยากรที่ใช้ร่วมกันที่สร้างขึ้นจากที่อื่นสามารถสืบทอดจากกระบวนการบรรพบุรุษได้

เห็นได้ชัดว่าวัตถุสำคัญที่กระบวนการของผู้ปฏิบัติงานของฉันต้องดำเนินการไม่ได้รับการสืบทอดโดยกระบวนการเซิร์ฟเวอร์แล้วส่งต่อไปยังคนงานเหตุใดจึงเกิดขึ้น ฉันสงสัยว่ากระบวนการ forkserver สืบทอดมาจากกระบวนการหลักอย่างไร

นี่คือลักษณะของรหัสของฉัน:

import multiprocessing
import (a bunch of other modules)

def worker_func(nameList):
    global largeObject
    for item in nameList:
        # get some info from largeObject using item as index
        # do some calculation
        return [item, info]

if __name__ == '__main__':
    result = []
    largeObject # This is my large object, it's read-only and no modification will be made to it.
    nameList # Here is a list variable that I will need to get info for each item in it from the largeObject    
    ctx_in_main = multiprocessing.get_context('forkserver')
    print('Start parallel, using forking/spawning/?:', ctx_in_main.get_context())
    cores = ctx_in_main.cpu_count()
    with ctx_in_main.Pool(processes=4) as pool:
        for x in pool.imap_unordered(worker_func, nameList):
            result.append(x)

ขอขอบคุณ!

ดีที่สุด

คำตอบ

1 alex_noname Aug 16 2020 at 19:32

ทฤษฎี

ด้านล่างนี้เป็นข้อความที่ตัดตอนมาจากบล็อก Bojan Nikolic

เวอร์ชัน Python สมัยใหม่ (บน Linux) มีสามวิธีในการเริ่มต้นกระบวนการแยกกัน:

  1. Fork () -ing กระบวนการพาเรนต์และดำเนินการต่อด้วยอิมเมจกระบวนการเดียวกันทั้งในพาเรนต์และรอง วิธีนี้รวดเร็ว แต่อาจไม่น่าเชื่อถือเมื่อสถานะแม่มีความซับซ้อน

  2. วางไข่กระบวนการย่อยเช่น fork () - ing จากนั้น execv เพื่อแทนที่อิมเมจกระบวนการด้วยกระบวนการ Python ใหม่ วิธีนี้มีความน่าเชื่อถือ แต่ช้าเนื่องจากการประมวลผลอิมเมจถูกรีเฟรชอีกครั้ง

  3. กลไกforkserverซึ่งประกอบด้วยเซิร์ฟเวอร์ Python ที่แยกจากกันโดยมีสถานะที่ค่อนข้างง่ายและเป็น fork () - ed เมื่อต้องการกระบวนการใหม่ วิธีนี้รวมความเร็วของ Fork () - ing เข้ากับความน่าเชื่อถือที่ดี (เนื่องจากพาเรนต์ที่ถูกส้อมอยู่ในสถานะที่เรียบง่าย)

Forkserver

วิธีที่สามforkserverแสดงอยู่ด้านล่าง โปรดทราบว่าเด็ก ๆ จะเก็บสำเนาของสถานะของเซิร์ฟเวอร์ สถานะนี้มีจุดมุ่งหมายให้ค่อนข้างง่าย แต่สามารถปรับเปลี่ยนได้ผ่านหลายกระบวนการ API โดยใช้set_forkserver_preload()วิธีนี้

การปฏิบัติ

ดังนั้นหากคุณต้องการให้ simething ถูกสืบทอดโดยโปรเซสลูกจากพาเรนต์ต้องระบุสิ่งนี้ในสถานะforkserverด้วยวิธีการset_forkserver_preload(modules_names)ซึ่งตั้งค่ารายการชื่อโมดูลเพื่อพยายามโหลดในกระบวนการ forkserver ฉันยกตัวอย่างด้านล่าง:

# inherited.py
large_obj = {"one": 1, "two": 2, "three": 3}
# main.py
import multiprocessing
import os
from time import sleep

from inherited import large_obj


def worker_func(key: str):
    print(os.getpid(), id(large_obj))
    sleep(1)
    return large_obj[key]


if __name__ == '__main__':
    result = []
    ctx_in_main = multiprocessing.get_context('forkserver')
    ctx_in_main.set_forkserver_preload(['inherited'])
    cores = ctx_in_main.cpu_count()
    with ctx_in_main.Pool(processes=cores) as pool:
        for x in pool.imap(worker_func, ["one", "two", "three"]):
            result.append(x)
    for res in result:
        print(res)

เอาท์พุต:

# The PIDs are different but the address is always the same
PID=18603, obj id=139913466185024
PID=18604, obj id=139913466185024
PID=18605, obj id=139913466185024

และถ้าเราไม่ใช้การโหลดล่วงหน้า

...
    ctx_in_main = multiprocessing.get_context('forkserver')
    # ctx_in_main.set_forkserver_preload(['inherited']) 
    cores = ctx_in_main.cpu_count()
...
# The PIDs are different, the addresses are different too
# (but sometimes they can coincide)
PID=19046, obj id=140011789067776
PID=19047, obj id=140011789030976
PID=19048, obj id=140011789030912
1 sgyzetrov Aug 17 2020 at 03:00

ดังนั้นหลังจากการสนทนาที่สร้างแรงบันดาลใจกับ Alex ฉันคิดว่าฉันมีข้อมูลเพียงพอที่จะตอบคำถามของฉัน: อะไรคือสิ่งที่ได้รับสืบทอดมาจากกระบวนการ forkserver จากกระบวนการหลัก?

โดยทั่วไปเมื่อกระบวนการเซิร์ฟเวอร์เริ่มต้นขึ้นระบบจะนำเข้าโมดูลหลักของคุณและทุกอย่างก่อนif __name__ == '__main__'จะถูกดำเนินการ นั่นเป็นเหตุผลที่รหัสของฉันจะไม่ได้ทำงานเพราะlarge_objectไม่มีที่ไหนเลยที่จะพบในserverกระบวนการและในทุกกระบวนการของผู้ปฏิบัติงานที่ส้อมจากกระบวนการserver

วิธีการแก้ปัญหาของอเล็กซ์ทำงานเพราะตอนนี้ได้รับนำเข้าทั้งกระบวนการหลักและเซิร์ฟเวอร์เพื่อให้ผู้ปฏิบัติงานคดเคี้ยวจากเซิร์ฟเวอร์ทุกคนจะยังได้รับlarge_object large_objectถ้ารวมกับset_forkserver_preload(modules_names)คนงานทั้งหมดก็อาจจะได้รับสิ่งที่เหมือนกัน large_objectจากสิ่งที่ฉันเห็น เหตุผลในการใช้forkserverมีการอธิบายไว้อย่างชัดเจนในเอกสาร Python และในบล็อกของ Bojan:

เมื่อโปรแกรมเริ่มทำงานและเลือกวิธีการเริ่มต้นของ forkserver กระบวนการเซิร์ฟเวอร์จะเริ่มทำงาน จากนั้นเมื่อใดก็ตามที่จำเป็นต้องมีกระบวนการใหม่กระบวนการพาเรนต์จะเชื่อมต่อกับเซิร์ฟเวอร์และร้องขอให้แยกกระบวนการใหม่ กระบวนการส้อมเซิร์ฟเวอร์เป็นเธรดเดียวดังนั้นจึงปลอดภัยสำหรับการใช้ os.fork () ไม่มีทรัพยากรที่ไม่จำเป็นได้รับมา

กลไก forkserver ซึ่งประกอบด้วยเซิร์ฟเวอร์ Python ที่แยกจากกันโดยมีสถานะที่ค่อนข้างง่ายและเป็น fork () - ed เมื่อต้องการกระบวนการใหม่ วิธีการนี้รวมความเร็วของส้อม () - ไอเอ็นจีมีความน่าเชื่อถือที่ดี (เพราะแม่ถูกคดเคี้ยวอยู่ในสถานะที่ง่าย)

ดังนั้นจึงเป็นเรื่องที่น่ากังวลมากกว่าที่นี่

โปรดทราบว่าหากคุณใช้forkเป็นวิธีการเริ่มต้นคุณไม่จำเป็นต้องนำเข้าใด ๆ เนื่องจากกระบวนการย่อยทั้งหมดได้รับสำเนาของหน่วยความจำกระบวนการของผู้ปกครอง (หรือข้อมูลอ้างอิงหากระบบใช้ COW- copy-on-writeโปรดแก้ไขฉันถ้าฉันเป็น ไม่ถูกต้อง). ในกรณีนี้ใช้global large_objectคุณจะได้รับการเข้าถึงlarge_objectในworker_funcโดยตรง

วิธีนี้forkserverอาจไม่ใช่แนวทางที่เหมาะสมสำหรับฉันเนื่องจากปัญหาที่ฉันกำลังเผชิญคือค่าใช้จ่ายด้านหน่วยความจำ การดำเนินการทั้งหมดที่ทำให้ฉันได้รับlarge_objectในตอนแรกนั้นใช้หน่วยความจำมากดังนั้นฉันจึงไม่ต้องการทรัพยากรที่ไม่จำเป็นในกระบวนการทำงานของฉัน

หากฉันใส่การคำนวณทั้งหมดเหล่านั้นโดยตรงinherited.pyตามที่ Alex แนะนำมันจะถูกดำเนินการสองครั้ง (ครั้งเดียวเมื่อฉันนำเข้าโมดูลในหลักและอีกครั้งเมื่อเซิร์ฟเวอร์นำเข้าอาจจะมากกว่านั้นเมื่อกระบวนการของผู้ปฏิบัติงานเกิดขึ้น?) สิ่งนี้เหมาะสมถ้าฉัน เพียงแค่ต้องการกระบวนการที่ปลอดภัยแบบเธรดเดียวที่พนักงานสามารถแยกออกจากกันได้ แต่เนื่องจากฉันพยายามดึงคนงานไม่ให้สืบทอดทรัพยากรที่ไม่จำเป็นและได้รับเพียงอย่างเดียวlarge_objectจึงไม่ได้ผล และการใส่การคำนวณเหล่านั้น__main__เข้าไปinherited.pyจะไม่ได้ผลเนื่องจากตอนนี้ไม่มีกระบวนการใดที่จะดำเนินการได้รวมถึงเซิร์ฟเวอร์หลักและเซิร์ฟเวอร์

จึงเป็นข้อสรุปถ้าเป้าหมายของที่นี่คือการรับคนงานทรัพยากรน้อยที่สุดสืบทอดผมดีกว่าที่จะหมดรหัสของฉันเป็น 2 ไม่calculation.pyแรกดองออกจากล่ามหนึ่งและเริ่มต้นใหม่เพื่อโหลดดองlarge_object large_objectจากนั้นฉันก็สามารถถั่วด้วยอย่างใดอย่างหนึ่งforkหรือforkserver.