wieloprocesorowość w Pythonie - co jest dziedziczone przez proces forkserver z procesu nadrzędnego?

Aug 15 2020

Próbuję wykorzystać forkserveri napotkałem NameError: name 'xxx' is not definedw procesach roboczych.

Używam Pythona 3.6.4, ale dokumentacja powinna być taka sama, z https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods tu jest napisane:

Proces serwera fork jest jednowątkowy, więc użycie os.fork () jest bezpieczne. Żadne niepotrzebne zasoby nie są dziedziczone.

Mówi też:

Lepiej dziedziczyć niż marynować / unpickle

Podczas korzystania z metod startowych spawn lub forkserver wiele typów z przetwarzania wieloprocesowego musi być możliwych do pobrania, aby procesy potomne mogły ich używać . Jednak generalnie należy unikać wysyłania obiektów współdzielonych do innych procesów przy użyciu potoków lub kolejek. Zamiast tego należy tak zorganizować program, aby proces wymagający dostępu do współdzielonego zasobu utworzonego w innym miejscu mógł odziedziczyć go po procesie nadrzędnym.

Najwyraźniej kluczowy obiekt, nad którym musi pracować mój proces roboczy, nie został odziedziczony przez proces serwera, a następnie przekazany pracownikom. Dlaczego tak się stało? Zastanawiam się, co dokładnie jest dziedziczone przez proces forkserver z procesu nadrzędnego?

Oto jak wygląda mój kod:

import multiprocessing
import (a bunch of other modules)

def worker_func(nameList):
    global largeObject
    for item in nameList:
        # get some info from largeObject using item as index
        # do some calculation
        return [item, info]

if __name__ == '__main__':
    result = []
    largeObject # This is my large object, it's read-only and no modification will be made to it.
    nameList # Here is a list variable that I will need to get info for each item in it from the largeObject    
    ctx_in_main = multiprocessing.get_context('forkserver')
    print('Start parallel, using forking/spawning/?:', ctx_in_main.get_context())
    cores = ctx_in_main.cpu_count()
    with ctx_in_main.Pool(processes=4) as pool:
        for x in pool.imap_unordered(worker_func, nameList):
            result.append(x)

Dziękuję Ci!

Najlepsza,

Odpowiedzi

1 alex_noname Aug 16 2020 at 19:32

Teoria

Poniżej fragment bloga Bojan Nikolic

Nowoczesne wersje Pythona (w systemie Linux) zapewniają trzy sposoby uruchamiania oddzielnych procesów:

  1. Fork () - kontynuowanie procesów nadrzędnych i kontynuowanie z tym samym obrazem procesów zarówno w rodzicu, jak iu potomku. Ta metoda jest szybka, ale potencjalnie zawodna, gdy stan nadrzędny jest złożony

  2. Odradzanie procesów potomnych, tj. Fork () - ing, a następnie execv w celu zastąpienia obrazu procesu nowym procesem w Pythonie. Ta metoda jest niezawodna, ale powolna, ponieważ obraz procesu jest ponownie ładowany.

  3. Mechanizm forkserver , który składa się z oddzielnego serwera Pythona o stosunkowo prostym stanie, który jest fork () - uruchamiany, gdy potrzebny jest nowy proces. Ta metoda łączy prędkość Fork () z dobrą niezawodnością (ponieważ rozwidlany rodzic jest w prostym stanie).

Forkserver

Trzecią metodę, forkserver , przedstawiono poniżej. Zauważ, że dzieci zachowują kopię stanu serwera widelca. Stan ten ma być stosunkowo prosty, ale można go dostosować za pomocą wieloprocesowego interfejsu API za pomocą set_forkserver_preload()metody.

Ćwiczyć

Tak więc, jeśli chcesz, aby simething był dziedziczony przez procesy potomne z rodzica, musi to być określone w stanie forkserver za pomocą set_forkserver_preload(modules_names), który ustawia listę nazw modułów, które próbują załadować w procesie forkserver. Poniżej podaję przykład:

# inherited.py
large_obj = {"one": 1, "two": 2, "three": 3}
# main.py
import multiprocessing
import os
from time import sleep

from inherited import large_obj


def worker_func(key: str):
    print(os.getpid(), id(large_obj))
    sleep(1)
    return large_obj[key]


if __name__ == '__main__':
    result = []
    ctx_in_main = multiprocessing.get_context('forkserver')
    ctx_in_main.set_forkserver_preload(['inherited'])
    cores = ctx_in_main.cpu_count()
    with ctx_in_main.Pool(processes=cores) as pool:
        for x in pool.imap(worker_func, ["one", "two", "three"]):
            result.append(x)
    for res in result:
        print(res)

Wynik:

# The PIDs are different but the address is always the same
PID=18603, obj id=139913466185024
PID=18604, obj id=139913466185024
PID=18605, obj id=139913466185024

A jeśli nie używamy wstępnego ładowania

...
    ctx_in_main = multiprocessing.get_context('forkserver')
    # ctx_in_main.set_forkserver_preload(['inherited']) 
    cores = ctx_in_main.cpu_count()
...
# The PIDs are different, the addresses are different too
# (but sometimes they can coincide)
PID=19046, obj id=140011789067776
PID=19047, obj id=140011789030976
PID=19048, obj id=140011789030912
1 sgyzetrov Aug 17 2020 at 03:00

Więc po inspirującej dyskusji z Alexem wydaje mi się, że mam wystarczające informacje, aby odpowiedzieć na moje pytanie: co dokładnie jest dziedziczone przez proces forkserver z procesu nadrzędnego?

Zasadniczo, gdy proces serwera się rozpocznie, zaimportuje twój główny moduł i wszystko, co było wcześniej, if __name__ == '__main__'zostanie wykonane. Dlatego mój kod nie działa, ponieważ large_objectnigdzie nie można go znaleźć w serverprocesie i we wszystkich tych procesach roboczych, które rozwidlają się z serverprocesu .

Rozwiązanie Alexa działa, ponieważ large_objectteraz jest importowane zarówno do procesu głównego, jak i serwera, więc każdy pracownik rozwidlony z serwera również otrzyma large_object. W połączeniu ze set_forkserver_preload(modules_names)wszystkimi pracownikami może nawet uzyskać to samo large_object z tego, co widziałem. Powód użycia forkserverjest wyraźnie wyjaśniony w dokumentacjach Pythona i na blogu Bojana:

Gdy program uruchamia się i wybiera metodę uruchamiania serwera forkserver, uruchamiany jest proces serwera. Od tego momentu za każdym razem, gdy potrzebny jest nowy proces, proces nadrzędny łączy się z serwerem i żąda rozwidlenia nowego procesu. Proces serwera fork jest jednowątkowy, więc użycie os.fork () jest bezpieczne. Żadne niepotrzebne zasoby nie są dziedziczone .

Mechanizm forkserver, który składa się z oddzielnego serwera Pythona o stosunkowo prostym stanie, który jest fork () - uruchamiany, gdy potrzebny jest nowy proces. Ta metoda łączy prędkość Fork () z dobrą niezawodnością (ponieważ rozwidlany rodzic jest w prostym stanie) .

Więc tutaj jest bardziej po bezpiecznej stronie zmartwień.

Na marginesie, jeśli używasz forkjako metody początkowej, nie musisz niczego importować, ponieważ wszystkie procesy potomne otrzymują kopię pamięci procesów rodziców (lub odniesienie, jeśli system używa COW- copy-on-write, popraw mnie, jeśli tak źle). W tym przypadku za pomocą global large_objectotrzymasz dostęp do large_objectw worker_funcbezpośrednio.

To forkservermoże nie być dla mnie odpowiednie podejście, ponieważ problemem, z którym się zmagam, jest narzut pamięci. Wszystkie operacje, które mnie large_objectpociągają, zajmują dużo pamięci, więc nie chcę żadnych niepotrzebnych zasobów w moich procesach roboczych.

Jeśli wstawię wszystkie te obliczenia bezpośrednio do tego, inherited.pyco zasugerował Alex, zostanie to wykonane dwukrotnie (raz, gdy zaimportowałem moduł w pliku main i raz, gdy serwer go zaimportuje; może nawet więcej, gdy narodziły się procesy robocze?), Jest to odpowiednie, jeśli ja chcą tylko jednowątkowego bezpiecznego procesu, z którego pracownicy mogą korzystać. Ale ponieważ staram się, aby pracownicy nie odziedziczyli niepotrzebnych zasobów i tylko otrzymywali large_object, to nie zadziała. I umieszczenie w tych obliczeń __main__w inherited.pynie zadziała albo od teraz żaden z procesów będzie ich wykonania, w tym główny i serwer.

Podsumowując, jeśli celem jest sprawienie, aby pracownicy odziedziczyli minimalne zasoby, lepiej podzielę mój kod na 2, zrób calculation.pynajpierw, zalej large_object, wyjdź z interpretera i rozpocznij nowy, aby załadować marynowane large_object. Wtedy mogę po prostu oszaleć z jednym forklub drugim forkserver.