Luigi: Wie kann man verschiedene Argumente an Blattaufgaben übergeben?

Nov 23 2020

Dies ist mein zweiter Versuch zu verstehen, wie Argumente an Abhängigkeiten in Luigi übergeben werden. Der erste war hier .

Die Idee ist: Ich habe TaskCwas davon TaskBabhängt TaskA, was davon abhängt , was davon abhängt Task0. Ich möchte, dass diese gesamte Sequenz immer genau gleich ist, außer ich möchte steuern können, aus welcher Datei Task0gelesen wird, nennen wir es path. Luigis Philosophie ist normalerweise, dass jede Aufgabe nur über die Aufgaben, von denen sie abhängt, und ihre Parameter Bescheid wissen sollte. Das Problem dabei ist , dass TaskC, TaskBund TaskAalle müßten variabel akzeptieren pathfür den alleinigen Zweck der dann es vorbei Task0.

Die Lösung, die Luigi dafür bereitstellt, heißt Konfigurationsklassen

Hier ist ein Beispielcode:

from pathlib import Path
import luigi
from luigi import Task, TaskParameter, IntParameter, LocalTarget, Parameter

class config(luigi.Config):
    path = Parameter(default="defaultpath.txt")

class Task0(Task):
    path = Parameter(default=config.path)
    arg = IntParameter(default=0)
    def run(self):
        print(f"READING FROM {self.path}")
        Path(self.output().path).touch()
    def output(self): return LocalTarget(f"task0{self.arg}.txt")

class TaskA(Task):
    arg = IntParameter(default=0)
    def requires(self): return Task0(arg=self.arg)
    def run(self): Path(self.output().path).touch()
    def output(self): return LocalTarget(f"taskA{self.arg}.txt")

class TaskB(Task):
    arg = IntParameter(default=0)
    def requires(self): return TaskA(arg=self.arg)
    def run(self): Path(self.output().path).touch()
    def output(self): return LocalTarget(f"taskB{self.arg}.txt")

class TaskC(Task):
    arg = IntParameter(default=0)
    def requires(self): return TaskB(arg=self.arg)
    def run(self): Path(self.output().path).touch()
    def output(self): return LocalTarget(f"taskC{self.arg}.txt")

(Ignoriere alles outputund runalles. Sie sind nur da, damit das Beispiel erfolgreich ausgeführt wird.)

Der Punkt des obigen Beispiels ist die Steuerung der Linie, print(f"READING FROM {self.path}")ohne dass die Aufgaben A, B, C davon abhängen path.

In der Tat kann ich mit Konfigurationsklassen das Task0Argument steuern . Wenn Task0kein pathParameter übergeben wird, wird der Standardwert verwendet config().path.

Mein Problem ist jetzt, dass dies für mich nur zur "Erstellungszeit" zu funktionieren scheint, wenn der Interpreter den Code zum ersten Mal lädt, aber nicht zur Laufzeit (die Details sind mir nicht klar).

Also keine dieser Arbeiten:

EIN)

if __name__ == "__main__":
    for i in range(3):
        config.path = f"newpath_{i}"
        luigi.build([TaskC(arg=i)], log_level="INFO")

===== Luigi Execution Summary =====

Scheduled 4 tasks of which:
* 4 ran successfully:
    - 1 Task0(path=defaultpath.txt, arg=2)
    - 1 TaskA(arg=2)
    - 1 TaskB(arg=2)
    - 1 TaskC(arg=2)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

Ich bin mir nicht sicher, warum das nicht funktioniert.

B)

if __name__ == "__main__":
    for i in range(3):
        luigi.build([TaskC(arg=i), config(path=f"newpath_{i}")], log_level="INFO")

===== Luigi Execution Summary =====

Scheduled 5 tasks of which:
* 5 ran successfully:
    - 1 Task0(path=defaultpath.txt, arg=2)
    - 1 TaskA(arg=2)
    - 1 TaskB(arg=2)
    - 1 TaskC(arg=2)
    - 1 config(path=newpath_2)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

Das macht eigentlich Sinn. Es gibt zwei configKlassen, und ich habe nur patheine von ihnen geändert .

Hilfe?

EDIT: Natürlich pathfunktioniert es, eine globale Variable zu referenzieren, aber dann ist es kein Parameter im üblichen Luigi-Sinne.

EDIT2: Ich habe Punkt 1) der Antwort unten versucht:

config hat die gleiche Definition

class config(luigi.Config):
    path = Parameter(default="defaultpath.txt")

Ich habe den Fehler behoben, auf den hingewiesen wurde, dh Task0jetzt:

class Task0(Task):
    path = Parameter(default=config().path)
    arg = IntParameter(default=0)
    def run(self):
        print(f"READING FROM {self.path}")
        Path(self.output().path).touch()
    def output(self): return LocalTarget(f"task0{self.arg}.txt")

und schließlich tat ich:

if __name__ == "__main__":
    for i in range(3):
        config.path = Parameter(f"file_{i}")
        luigi.build([TaskC(arg=i)], log_level="WARNING")

Das funktioniert nicht, wird Task0immer noch path="defaultpath.txt".

Antworten

iHowell Nov 23 2020 at 22:19

Sie versuchen also, Aufgaben mit Parametern zu erstellen, ohne diese Parameter an die übergeordnete Klasse zu übergeben. Das ist völlig verständlich, und ich habe mich manchmal darüber geärgert, damit umzugehen.

Erstens verwenden Sie die configKlasse falsch. Bei Verwendung einer Config-Klasse, wie in angegebenhttps://luigi.readthedocs.io/en/stable/configuration.html#configuration-classesmüssen Sie das Objekt instanziieren. Also statt:

class Task0(Task):
    path = Parameter(default=config.path)
    ...

Sie würden verwenden:

class Task0(Task):
    path = Parameter(default=config().path)
    ...

Dies stellt zwar sicher, dass Sie einen Wert und kein ParameterObjekt verwenden, löst Ihr Problem jedoch nicht. Beim Erstellen der Klasse Task0wird config().pathausgewertet, daher wird nicht die Referenz von config().pathzugewiesen path, sondern der Wert beim Aufruf (der immer sein wird defaultpath.txt). Bei korrekter Verwendung der Klasse erstellt luigi ein TaskObjekt mit nur luigi.ParameterAttributen als Attributnamen für die neue Instanz, wie hier dargestellt:https://github.com/spotify/luigi/blob/master/luigi/task.py#L436

Ich sehe also zwei mögliche Wege vorwärts.

1.) Der erste besteht darin, den Konfigurationspfad zur Laufzeit wie zuvor festzulegen, außer dass er ein ParameterObjekt wie das folgende ist:

config.path = luigi.Parameter(f"newpath_{i}")

Dies würde jedoch viel Arbeit config.patherfordern , um Ihre Aufgaben zum Laufen zu bringen , da sie jetzt ihre Parameter anders aufnehmen müssen (kann beim Erstellen der Klasse nicht auf Standardeinstellungen überprüft werden).

2.) Der viel einfachere Weg besteht darin, die Argumente für Ihre Klassen einfach in der Konfigurationsdatei anzugeben. Wenn du siehsthttps://github.com/spotify/luigi/blob/master/luigi/task.py#L825Sie werden sehen, dass die ConfigKlasse in Luigi eigentlich nur eine TaskKlasse ist, also können Sie alles damit machen, was Sie mit einer Klasse machen könnten und umgekehrt. Daher können Sie dies einfach in Ihrer Konfigurationsdatei haben:

[Task0]
path = newpath_1
...

3.) Da Sie jedoch anscheinend mehrere Aufgaben mit jeweils unterschiedlichen Argumenten ausführen möchten, würde ich nur empfehlen, Argumente durch die Eltern weiterzugeben, wie Luigi Sie dazu ermutigt. Dann könnten Sie alles ausführen mit:

luigi.build([TaskC(arg=i) for i in range(3)])

4.) Wenn Sie die Übergabe von Abhängigkeiten wirklich beseitigen müssen, können Sie eine erstellen ParamaterizedTaskParameter, luigi.ObjectParameterdie das Pickle einer Task-Instanz erweitert und als Objekt verwendet.

Von den oben genannten Lösungen empfehle ich dringend, entweder 2 oder 3 zu verwenden. 1 wäre schwer zu programmieren, und 4 würde einige sehr hässliche Parameter erzeugen und ist etwas fortgeschrittener.

Bearbeiten: Die Lösungen 1 und 2 sind mehr Hacks als alles andere, und es wird nur empfohlen, Parameter in zu bündeln DictParameter.