Luigi: jak przekazać różne argumenty do liści zadań?

Nov 23 2020

To moja druga próba zrozumienia, jak przekazywać argumenty do zależności w Luigi. Pierwszy był tutaj .

Pomysł jest taki: mam to, TaskCod czego zależy TaskB, od czego zależy TaskA, od czego zależy Task0. Chcę, aby cała sekwencja była zawsze dokładnie taka sama, z wyjątkiem tego, że chcę mieć możliwość kontrolowania, z jakiego pliku Task0czyta, nazwijmy to path. Filozofia Luigiego jest taka, że ​​każde zadanie powinno wiedzieć tylko o zadaniach, od których zależy, i ich parametrach. Problem z tym jest taki TaskC, że TaskBi TaskAwszyscy musieliby zaakceptować zmienną pathwyłącznie w celu przekazania jej do Task0.

Tak więc rozwiązanie, które zapewnia Luigi, nazywa się Klasy konfiguracji

Oto przykładowy kod:

from pathlib import Path
import luigi
from luigi import Task, TaskParameter, IntParameter, LocalTarget, Parameter

class config(luigi.Config):
    path = Parameter(default="defaultpath.txt")

class Task0(Task):
    path = Parameter(default=config.path)
    arg = IntParameter(default=0)
    def run(self):
        print(f"READING FROM {self.path}")
        Path(self.output().path).touch()
    def output(self): return LocalTarget(f"task0{self.arg}.txt")

class TaskA(Task):
    arg = IntParameter(default=0)
    def requires(self): return Task0(arg=self.arg)
    def run(self): Path(self.output().path).touch()
    def output(self): return LocalTarget(f"taskA{self.arg}.txt")

class TaskB(Task):
    arg = IntParameter(default=0)
    def requires(self): return TaskA(arg=self.arg)
    def run(self): Path(self.output().path).touch()
    def output(self): return LocalTarget(f"taskB{self.arg}.txt")

class TaskC(Task):
    arg = IntParameter(default=0)
    def requires(self): return TaskB(arg=self.arg)
    def run(self): Path(self.output().path).touch()
    def output(self): return LocalTarget(f"taskC{self.arg}.txt")

(Zignoruj ​​wszystkie outputi runinne. Są po prostu tam, więc przykład działa pomyślnie).

Chodzi o powyższym przykładzie sterowania linii print(f"READING FROM {self.path}"), bez konieczności zadań A, B i C są zależne od path.

Rzeczywiście, dzięki klasom konfiguracji mogę kontrolować Task0argument. Jeśli Task0nie zostanie przekazany pathparametr, przyjmuje wartość domyślną, czyli config().path.

Mój problem polega na tym, że wydaje mi się, że działa to tylko w „czasie kompilacji”, kiedy interpreter ładuje kod po raz pierwszy, ale nie w czasie wykonywania (szczegóły nie są dla mnie jasne).

Więc żadne z tych nie działa:

ZA)

if __name__ == "__main__":
    for i in range(3):
        config.path = f"newpath_{i}"
        luigi.build([TaskC(arg=i)], log_level="INFO")

===== Luigi Execution Summary =====

Scheduled 4 tasks of which:
* 4 ran successfully:
    - 1 Task0(path=defaultpath.txt, arg=2)
    - 1 TaskA(arg=2)
    - 1 TaskB(arg=2)
    - 1 TaskC(arg=2)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

Nie wiem, dlaczego to nie działa.

B)

if __name__ == "__main__":
    for i in range(3):
        luigi.build([TaskC(arg=i), config(path=f"newpath_{i}")], log_level="INFO")

===== Luigi Execution Summary =====

Scheduled 5 tasks of which:
* 5 ran successfully:
    - 1 Task0(path=defaultpath.txt, arg=2)
    - 1 TaskA(arg=2)
    - 1 TaskB(arg=2)
    - 1 TaskC(arg=2)
    - 1 config(path=newpath_2)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

To ma sens. Są dwie configklasy i udało mi się zmienić tylko pathjedną z nich.

Wsparcie?

EDYCJA: Oczywiście posiadanie pathodniesienia do zmiennej globalnej działa, ale wtedy nie jest to parametr w zwykłym sensie Luigi.

EDIT2: wypróbowałem punkt 1) odpowiedzi poniżej:

config ma tę samą definicję

class config(luigi.Config):
    path = Parameter(default="defaultpath.txt")

Naprawiłem wskazany błąd, czyli Task0teraz jest:

class Task0(Task):
    path = Parameter(default=config().path)
    arg = IntParameter(default=0)
    def run(self):
        print(f"READING FROM {self.path}")
        Path(self.output().path).touch()
    def output(self): return LocalTarget(f"task0{self.arg}.txt")

iw końcu zrobiłem:

if __name__ == "__main__":
    for i in range(3):
        config.path = Parameter(f"file_{i}")
        luigi.build([TaskC(arg=i)], log_level="WARNING")

To nie działa, Task0nadal działa path="defaultpath.txt".

Odpowiedzi

iHowell Nov 23 2020 at 22:19

Więc to, co próbujesz zrobić, to utworzyć zadania z parametrami bez przekazywania tych parametrów do klasy nadrzędnej. Jest to całkowicie zrozumiałe i czasami denerwowałem się, próbując sobie z tym poradzić.

Po pierwsze, configniepoprawnie używasz klasy. Podczas korzystania z klasy Config, jak opisano whttps://luigi.readthedocs.io/en/stable/configuration.html#configuration-classes, musisz utworzyć wystąpienie obiektu. Więc zamiast:

class Task0(Task):
    path = Parameter(default=config.path)
    ...

użyłbyś:

class Task0(Task):
    path = Parameter(default=config().path)
    ...

Chociaż zapewnia to teraz, że używasz wartości, a nie Parameterobiektu, nadal nie rozwiązuje problemu. Tworząc klasę Task0, config().pathbędzie oceniane, dlatego nie jest to przypisanie odniesienie config().pathdo path, lecz gdy wartość nazwie (które zawsze będzie defaultpath.txt). Używając klasy w poprawny sposób, luigi skonstruuje Taskobiekt z tylko luigi.Parameteratrybutami jako nazwami atrybutów w nowej instancji, jak widać tutaj:https://github.com/spotify/luigi/blob/master/luigi/task.py#L436

Widzę więc dwie możliwe ścieżki naprzód.

1.) Pierwszym jest ustawienie ścieżki konfiguracyjnej w czasie wykonywania, tak jak to robiłeś, z wyjątkiem ustawienia takiego Parameterobiektu:

config.path = luigi.Parameter(f"newpath_{i}")

Jednak wymagałoby to dużo pracy, aby Twoje zadania config.pathdziałały, ponieważ teraz muszą one inaczej przyjmować swoje parametry (nie można ich oceniać pod kątem wartości domyślnych podczas tworzenia klasy).

2.) O wiele łatwiejszym sposobem jest po prostu określenie argumentów dla twoich klas w pliku konfiguracyjnym. Jeśli spojrzysz nahttps://github.com/spotify/luigi/blob/master/luigi/task.py#L825, zobaczysz, że Configklasa w Luigi jest właściwie tylko Taskklasą, więc możesz z nią zrobić wszystko, co możesz zrobić z klasą i odwrotnie. Dlatego możesz po prostu mieć to w swoim pliku konfiguracyjnym:

[Task0]
path = newpath_1
...

3.) Ale ponieważ wydaje się, że chcesz wykonać wiele zadań z różnymi argumentami dla każdego, zalecałbym po prostu przekazywanie argumentów przez rodziców, jak zachęca cię do tego Luigi. Wtedy możesz uruchomić wszystko za pomocą:

luigi.build([TaskC(arg=i) for i in range(3)])

4.) Wreszcie, jeśli naprawdę chcesz pozbyć się przekazywanych zależności, możesz utworzyć obiekt, ParamaterizedTaskParameterktóry rozszerza luigi.ObjectParameteri używa pikle instancji zadania jako obiektu.

Z powyższych rozwiązań zdecydowanie sugeruję, że 2 lub 3. 1 byłoby trudne do zaprogramowania, a 4 stworzyłoby bardzo brzydkie parametry i jest nieco bardziej zaawansowane.

Edycja: Rozwiązania 1 i 2 to bardziej hacki niż cokolwiek innego i zaleca się po prostu pakowanie parametrów w DictParameter.