Luigi: comment passer différents arguments aux tâches feuille?

Nov 23 2020

C'est ma deuxième tentative pour comprendre comment passer des arguments aux dépendances dans Luigi. Le premier était ici .

L'idée est: j'ai TaskCqui dépend de TaskB, qui dépend de TaskA, qui dépend de Task0. Je veux que toute cette séquence soit toujours exactement la même, sauf que je veux pouvoir contrôler quel fichier Task0lit, appelons-le path. La philosophie de Luigi est normalement que chaque tâche ne doit connaître que les tâches dont elle dépend et leurs paramètres. Le problème avec ceci est que TaskC, TaskBet TaskAtous devraient accepter la variable pathdans le seul but de la passer ensuite à Task0.

Ainsi, la solution fournie par Luigi pour cela s'appelle les classes de configuration

Voici un exemple de code:

from pathlib import Path
import luigi
from luigi import Task, TaskParameter, IntParameter, LocalTarget, Parameter

class config(luigi.Config):
    path = Parameter(default="defaultpath.txt")

class Task0(Task):
    path = Parameter(default=config.path)
    arg = IntParameter(default=0)
    def run(self):
        print(f"READING FROM {self.path}")
        Path(self.output().path).touch()
    def output(self): return LocalTarget(f"task0{self.arg}.txt")

class TaskA(Task):
    arg = IntParameter(default=0)
    def requires(self): return Task0(arg=self.arg)
    def run(self): Path(self.output().path).touch()
    def output(self): return LocalTarget(f"taskA{self.arg}.txt")

class TaskB(Task):
    arg = IntParameter(default=0)
    def requires(self): return TaskA(arg=self.arg)
    def run(self): Path(self.output().path).touch()
    def output(self): return LocalTarget(f"taskB{self.arg}.txt")

class TaskC(Task):
    arg = IntParameter(default=0)
    def requires(self): return TaskB(arg=self.arg)
    def run(self): Path(self.output().path).touch()
    def output(self): return LocalTarget(f"taskC{self.arg}.txt")

(Ignorez tous les outputet les runtrucs. Ils sont juste là pour que l'exemple s'exécute avec succès.)

Le point de l'exemple ci-dessus est de contrôler la ligne print(f"READING FROM {self.path}")sans que les tâches A, B, C ne dépendent path.

En effet, avec les classes de configuration, je peux contrôler l' Task0argument. Si Task0aucun pathparamètre n'est passé , il prend sa valeur par défaut, qui est config().path.

Mon problème maintenant est que cela me semble fonctionner uniquement au "moment de la construction", lorsque l'interpréteur charge le code pour la première fois, mais pas au moment de l'exécution (les détails ne sont pas clairs pour moi).

Donc, aucun de ces deux ne fonctionne:

UNE)

if __name__ == "__main__":
    for i in range(3):
        config.path = f"newpath_{i}"
        luigi.build([TaskC(arg=i)], log_level="INFO")

===== Luigi Execution Summary =====

Scheduled 4 tasks of which:
* 4 ran successfully:
    - 1 Task0(path=defaultpath.txt, arg=2)
    - 1 TaskA(arg=2)
    - 1 TaskB(arg=2)
    - 1 TaskC(arg=2)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

Je ne sais pas pourquoi cela ne fonctionne pas.

B)

if __name__ == "__main__":
    for i in range(3):
        luigi.build([TaskC(arg=i), config(path=f"newpath_{i}")], log_level="INFO")

===== Luigi Execution Summary =====

Scheduled 5 tasks of which:
* 5 ran successfully:
    - 1 Task0(path=defaultpath.txt, arg=2)
    - 1 TaskA(arg=2)
    - 1 TaskB(arg=2)
    - 1 TaskC(arg=2)
    - 1 config(path=newpath_2)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

Cela a du sens. Il y a deux configclasses, et je n'ai réussi à changer que l' pathune d'entre elles.

Aidez-moi?

EDIT: Bien sûr, avoir pathune référence à une variable globale fonctionne, mais ce n'est pas un paramètre au sens habituel de Luigi.

EDIT2: j'ai essayé le point 1) de la réponse ci-dessous:

config a la même définition

class config(luigi.Config):
    path = Parameter(default="defaultpath.txt")

J'ai corrigé l'erreur signalée, c'est Task0-à- dire maintenant:

class Task0(Task):
    path = Parameter(default=config().path)
    arg = IntParameter(default=0)
    def run(self):
        print(f"READING FROM {self.path}")
        Path(self.output().path).touch()
    def output(self): return LocalTarget(f"task0{self.arg}.txt")

et finalement j'ai fait:

if __name__ == "__main__":
    for i in range(3):
        config.path = Parameter(f"file_{i}")
        luigi.build([TaskC(arg=i)], log_level="WARNING")

Cela ne fonctionne pas, ça Task0continue path="defaultpath.txt".

Réponses

iHowell Nov 23 2020 at 22:19

Donc, ce que vous essayez de faire est de créer des tâches avec des paramètres sans passer ces paramètres à la classe parente. C'est tout à fait compréhensible, et j'ai parfois été ennuyé d'essayer de gérer cela.

Premièrement, vous n'utilisez configpas correctement la classe. Lors de l'utilisation d'une classe Config, comme indiqué danshttps://luigi.readthedocs.io/en/stable/configuration.html#configuration-classes, vous devez instancier l'objet. Donc, au lieu de:

class Task0(Task):
    path = Parameter(default=config.path)
    ...

vous utiliseriez:

class Task0(Task):
    path = Parameter(default=config().path)
    ...

Bien que cela garantisse maintenant que vous utilisez une valeur et non un Parameterobjet, cela ne résout toujours pas votre problème. Lors de la création de la classe Task0, config().pathserait évaluée, donc il n'attribue pas la référence de config().pathà path, mais plutôt la valeur lorsqu'elle est appelée (ce qui sera toujours defaultpath.txt). Lors de l'utilisation correcte de la classe, luigi construira un Taskobjet avec uniquement des luigi.Parameterattributs comme noms d'attributs sur la nouvelle instance, comme indiqué ici:https://github.com/spotify/luigi/blob/master/luigi/task.py#L436

Donc, je vois deux voies possibles.

1.) La première consiste à définir le chemin de configuration au moment de l'exécution comme vous l'aviez fait, sauf à le définir comme un Parameterobjet comme celui-ci:

config.path = luigi.Parameter(f"newpath_{i}")

Cependant, cela prendrait beaucoup de travail pour que vos tâches config.pathfonctionnent, car elles doivent maintenant prendre leurs paramètres différemment (ne peuvent pas être évaluées par défaut lors de la création de la classe).

2.) Le moyen le plus simple est de simplement spécifier les arguments de vos classes dans le fichier de configuration. Si vous regardezhttps://github.com/spotify/luigi/blob/master/luigi/task.py#L825, vous verrez que la Configclasse dans Luigi, est en fait juste une Taskclasse, donc vous pouvez tout ce que vous pouvez faire avec une classe et vice-versa. Par conséquent, vous pouvez simplement avoir ceci dans votre fichier de configuration:

[Task0]
path = newpath_1
...

3.) Mais, puisque vous semblez vouloir exécuter plusieurs tâches avec les différents arguments pour chacune, je recommanderais simplement de passer des arguments par les parents comme Luigi vous encourage à le faire. Ensuite, vous pouvez tout exécuter avec:

luigi.build([TaskC(arg=i) for i in range(3)])

4.) Enfin, si vous avez vraiment besoin de vous débarrasser des dépendances qui passent, vous pouvez créer un ParamaterizedTaskParameterqui étend luigi.ObjectParameteret utilise le pickle d'une instance de tâche comme objet.

Parmi les solutions ci-dessus, je suggère fortement que 2 ou 3. 1 serait difficile à programmer, et 4 créerait des paramètres très laids et est un peu plus avancé.

Edit: Les solutions 1 et 2 sont plus des hacks qu'autre chose, et il est simplement recommandé de regrouper les paramètres dans DictParameter.