Luigi: jak przekazać różne argumenty do liści zadań?
To moja druga próba zrozumienia, jak przekazywać argumenty do zależności w Luigi. Pierwszy był tutaj .
Pomysł jest taki: mam to, TaskC
od czego zależy TaskB
, od czego zależy TaskA
, od czego zależy Task0
. Chcę, aby cała sekwencja była zawsze dokładnie taka sama, z wyjątkiem tego, że chcę mieć możliwość kontrolowania, z jakiego pliku Task0
czyta, nazwijmy to path
. Filozofia Luigiego jest taka, że każde zadanie powinno wiedzieć tylko o zadaniach, od których zależy, i ich parametrach. Problem z tym jest taki TaskC
, że TaskB
i TaskA
wszyscy musieliby zaakceptować zmienną path
wyłącznie w celu przekazania jej do Task0
.
Tak więc rozwiązanie, które zapewnia Luigi, nazywa się Klasy konfiguracji
Oto przykładowy kod:
from pathlib import Path
import luigi
from luigi import Task, TaskParameter, IntParameter, LocalTarget, Parameter
class config(luigi.Config):
path = Parameter(default="defaultpath.txt")
class Task0(Task):
path = Parameter(default=config.path)
arg = IntParameter(default=0)
def run(self):
print(f"READING FROM {self.path}")
Path(self.output().path).touch()
def output(self): return LocalTarget(f"task0{self.arg}.txt")
class TaskA(Task):
arg = IntParameter(default=0)
def requires(self): return Task0(arg=self.arg)
def run(self): Path(self.output().path).touch()
def output(self): return LocalTarget(f"taskA{self.arg}.txt")
class TaskB(Task):
arg = IntParameter(default=0)
def requires(self): return TaskA(arg=self.arg)
def run(self): Path(self.output().path).touch()
def output(self): return LocalTarget(f"taskB{self.arg}.txt")
class TaskC(Task):
arg = IntParameter(default=0)
def requires(self): return TaskB(arg=self.arg)
def run(self): Path(self.output().path).touch()
def output(self): return LocalTarget(f"taskC{self.arg}.txt")
(Zignoruj wszystkie output
i run
inne. Są po prostu tam, więc przykład działa pomyślnie).
Chodzi o powyższym przykładzie sterowania linii print(f"READING FROM {self.path}")
, bez konieczności zadań A, B i C są zależne od path
.
Rzeczywiście, dzięki klasom konfiguracji mogę kontrolować Task0
argument. Jeśli Task0
nie zostanie przekazany path
parametr, przyjmuje wartość domyślną, czyli config().path
.
Mój problem polega na tym, że wydaje mi się, że działa to tylko w „czasie kompilacji”, kiedy interpreter ładuje kod po raz pierwszy, ale nie w czasie wykonywania (szczegóły nie są dla mnie jasne).
Więc żadne z tych nie działa:
ZA)
if __name__ == "__main__":
for i in range(3):
config.path = f"newpath_{i}"
luigi.build([TaskC(arg=i)], log_level="INFO")
===== Luigi Execution Summary =====
Scheduled 4 tasks of which:
* 4 ran successfully:
- 1 Task0(path=defaultpath.txt, arg=2)
- 1 TaskA(arg=2)
- 1 TaskB(arg=2)
- 1 TaskC(arg=2)
This progress looks :) because there were no failed tasks or missing dependencies
===== Luigi Execution Summary =====
Nie wiem, dlaczego to nie działa.
B)
if __name__ == "__main__":
for i in range(3):
luigi.build([TaskC(arg=i), config(path=f"newpath_{i}")], log_level="INFO")
===== Luigi Execution Summary =====
Scheduled 5 tasks of which:
* 5 ran successfully:
- 1 Task0(path=defaultpath.txt, arg=2)
- 1 TaskA(arg=2)
- 1 TaskB(arg=2)
- 1 TaskC(arg=2)
- 1 config(path=newpath_2)
This progress looks :) because there were no failed tasks or missing dependencies
===== Luigi Execution Summary =====
To ma sens. Są dwie config
klasy i udało mi się zmienić tylko path
jedną z nich.
Wsparcie?
EDYCJA: Oczywiście posiadanie path
odniesienia do zmiennej globalnej działa, ale wtedy nie jest to parametr w zwykłym sensie Luigi.
EDIT2: wypróbowałem punkt 1) odpowiedzi poniżej:
config
ma tę samą definicję
class config(luigi.Config):
path = Parameter(default="defaultpath.txt")
Naprawiłem wskazany błąd, czyli Task0
teraz jest:
class Task0(Task):
path = Parameter(default=config().path)
arg = IntParameter(default=0)
def run(self):
print(f"READING FROM {self.path}")
Path(self.output().path).touch()
def output(self): return LocalTarget(f"task0{self.arg}.txt")
iw końcu zrobiłem:
if __name__ == "__main__":
for i in range(3):
config.path = Parameter(f"file_{i}")
luigi.build([TaskC(arg=i)], log_level="WARNING")
To nie działa, Task0
nadal działa path="defaultpath.txt"
.
Odpowiedzi
Więc to, co próbujesz zrobić, to utworzyć zadania z parametrami bez przekazywania tych parametrów do klasy nadrzędnej. Jest to całkowicie zrozumiałe i czasami denerwowałem się, próbując sobie z tym poradzić.
Po pierwsze, config
niepoprawnie używasz klasy. Podczas korzystania z klasy Config, jak opisano whttps://luigi.readthedocs.io/en/stable/configuration.html#configuration-classes, musisz utworzyć wystąpienie obiektu. Więc zamiast:
class Task0(Task):
path = Parameter(default=config.path)
...
użyłbyś:
class Task0(Task):
path = Parameter(default=config().path)
...
Chociaż zapewnia to teraz, że używasz wartości, a nie Parameter
obiektu, nadal nie rozwiązuje problemu. Tworząc klasę Task0
, config().path
będzie oceniane, dlatego nie jest to przypisanie odniesienie config().path
do path
, lecz gdy wartość nazwie (które zawsze będzie defaultpath.txt
). Używając klasy w poprawny sposób, luigi skonstruuje Task
obiekt z tylko luigi.Parameter
atrybutami jako nazwami atrybutów w nowej instancji, jak widać tutaj:https://github.com/spotify/luigi/blob/master/luigi/task.py#L436
Widzę więc dwie możliwe ścieżki naprzód.
1.) Pierwszym jest ustawienie ścieżki konfiguracyjnej w czasie wykonywania, tak jak to robiłeś, z wyjątkiem ustawienia takiego Parameter
obiektu:
config.path = luigi.Parameter(f"newpath_{i}")
Jednak wymagałoby to dużo pracy, aby Twoje zadania config.path
działały, ponieważ teraz muszą one inaczej przyjmować swoje parametry (nie można ich oceniać pod kątem wartości domyślnych podczas tworzenia klasy).
2.) O wiele łatwiejszym sposobem jest po prostu określenie argumentów dla twoich klas w pliku konfiguracyjnym. Jeśli spojrzysz nahttps://github.com/spotify/luigi/blob/master/luigi/task.py#L825, zobaczysz, że Config
klasa w Luigi jest właściwie tylko Task
klasą, więc możesz z nią zrobić wszystko, co możesz zrobić z klasą i odwrotnie. Dlatego możesz po prostu mieć to w swoim pliku konfiguracyjnym:
[Task0]
path = newpath_1
...
3.) Ale ponieważ wydaje się, że chcesz wykonać wiele zadań z różnymi argumentami dla każdego, zalecałbym po prostu przekazywanie argumentów przez rodziców, jak zachęca cię do tego Luigi. Wtedy możesz uruchomić wszystko za pomocą:
luigi.build([TaskC(arg=i) for i in range(3)])
4.) Wreszcie, jeśli naprawdę chcesz pozbyć się przekazywanych zależności, możesz utworzyć obiekt, ParamaterizedTaskParameter
który rozszerza luigi.ObjectParameter
i używa pikle instancji zadania jako obiektu.
Z powyższych rozwiązań zdecydowanie sugeruję, że 2 lub 3. 1 byłoby trudne do zaprogramowania, a 4 stworzyłoby bardzo brzydkie parametry i jest nieco bardziej zaawansowane.
Edycja: Rozwiązania 1 i 2 to bardziej hacki niż cokolwiek innego i zaleca się po prostu pakowanie parametrów w DictParameter
.