ルイージ:リーフタスクにさまざまな引数を渡す方法は?

Nov 23 2020

これは、Luigiの依存関係に引数を渡す方法を理解するための2回目の試みです。最初のものはここにありました。

アイデアは次のとおりです。私はTaskCに依存しTaskB、に依存しTaskA、に依存しTask0ます。このシーケンス全体を常に完全に同じにしたいのですがTask0、ファイルの読み取り元を制御できるようにしたいので、それを呼び出しますpath。Luigiの哲学は、通常、各タスクは、依存するタスクとそのパラメーターについてのみ知る必要があるというものです。これで問題はあるTaskCTaskBTaskAすべての変数を受け入れなければならないだろうpath、その後にそれを渡す唯一の目的のためにTask0

したがって、Luigiがこれに提供するソリューションは、構成クラスと呼ばれます。

次にいくつかのサンプルコードを示します。

from pathlib import Path
import luigi
from luigi import Task, TaskParameter, IntParameter, LocalTarget, Parameter

class config(luigi.Config):
    path = Parameter(default="defaultpath.txt")

class Task0(Task):
    path = Parameter(default=config.path)
    arg = IntParameter(default=0)
    def run(self):
        print(f"READING FROM {self.path}")
        Path(self.output().path).touch()
    def output(self): return LocalTarget(f"task0{self.arg}.txt")

class TaskA(Task):
    arg = IntParameter(default=0)
    def requires(self): return Task0(arg=self.arg)
    def run(self): Path(self.output().path).touch()
    def output(self): return LocalTarget(f"taskA{self.arg}.txt")

class TaskB(Task):
    arg = IntParameter(default=0)
    def requires(self): return TaskA(arg=self.arg)
    def run(self): Path(self.output().path).touch()
    def output(self): return LocalTarget(f"taskB{self.arg}.txt")

class TaskC(Task):
    arg = IntParameter(default=0)
    def requires(self): return TaskB(arg=self.arg)
    def run(self): Path(self.output().path).touch()
    def output(self): return LocalTarget(f"taskC{self.arg}.txt")

(全て無視outputしてrunのものを。彼らはただそこにいるので、例が正常に実行されます。)

上記の例のポイントは、print(f"READING FROM {self.path}")タスクA、B、Cをに依存させずに回線を制御することですpath

実際、構成クラスを使用すると、Task0引数を制御できます。パラメータTask0が渡されない場合path、デフォルト値であるconfig().path。が使用されます。

私の問題は、これがインタプリタが最初にコードをロードする「ビルド時」にのみ機能し、実行時には機能しないように見えることです(詳細は私にはわかりません)。

したがって、これらはどちらも機能しません。

A)

if __name__ == "__main__":
    for i in range(3):
        config.path = f"newpath_{i}"
        luigi.build([TaskC(arg=i)], log_level="INFO")

===== Luigi Execution Summary =====

Scheduled 4 tasks of which:
* 4 ran successfully:
    - 1 Task0(path=defaultpath.txt, arg=2)
    - 1 TaskA(arg=2)
    - 1 TaskB(arg=2)
    - 1 TaskC(arg=2)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

なぜこれが機能しないのかわかりません。

B)

if __name__ == "__main__":
    for i in range(3):
        luigi.build([TaskC(arg=i), config(path=f"newpath_{i}")], log_level="INFO")

===== Luigi Execution Summary =====

Scheduled 5 tasks of which:
* 5 ran successfully:
    - 1 Task0(path=defaultpath.txt, arg=2)
    - 1 TaskA(arg=2)
    - 1 TaskB(arg=2)
    - 1 TaskC(arg=2)
    - 1 config(path=newpath_2)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

これは実際には理にかなっています。2つのconfigクラスがありpath、そのうちの1つを変更することしかできませんでした。

助けて?

編集:もちろん、pathグローバル変数を参照することは機能しますが、通常のルイージの意味でのパラメーターではありません。

EDIT2:私は以下の答えのポイント1)を試しました:

config 同じ定義を持っています

class config(luigi.Config):
    path = Parameter(default="defaultpath.txt")

Task0は指摘された間違いを修正しました、すなわち今:

class Task0(Task):
    path = Parameter(default=config().path)
    arg = IntParameter(default=0)
    def run(self):
        print(f"READING FROM {self.path}")
        Path(self.output().path).touch()
    def output(self): return LocalTarget(f"task0{self.arg}.txt")

そして最後に私はしました:

if __name__ == "__main__":
    for i in range(3):
        config.path = Parameter(f"file_{i}")
        luigi.build([TaskC(arg=i)], log_level="WARNING")

これは機能しませんが、Task0それでも取得しpath="defaultpath.txt"ます。

回答

iHowell Nov 23 2020 at 22:19

したがって、実行しようとしているのは、これらのパラメータを親クラスに渡さずに、パラメータを使用してタスクを作成することです。それは完全に理解できます、そして私はこれを処理しようとすることに時々イライラしました。

まず、configクラスを誤って使用しています。に記載されているように、Configクラスを使用する場合https://luigi.readthedocs.io/en/stable/configuration.html#configuration-classes、オブジェクトをインスタンス化する必要があります。したがって、代わりに:

class Task0(Task):
    path = Parameter(default=config.path)
    ...

使用するもの:

class Task0(Task):
    path = Parameter(default=config().path)
    ...

これにより、Parameterオブジェクトではなく値を使用していることが保証されますが、それでも問題は解決されません。クラスを作成する場合Task0config().pathしたがって、参照の割り当てではないが、評価されるconfig().pathまでにpath(常にされるが、代わりに呼ばれる値defaultpath.txt)。クラスを正しい方法で使用すると、luigiは、次のように、新しいインスタンスの属性名として属性Taskのみを持つオブジェクトを作成しますluigi.Parameter。https://github.com/spotify/luigi/blob/master/luigi/task.py#L436

ですから、2つの可能な道があります。

1.)1つ目は、次のParameterようなオブジェクトに設定することを除いて、実行時に構成パスを設定することです。

config.path = luigi.Parameter(f"newpath_{i}")

ただし、これを使用しconfig.pathてタスクを機能させるには、パラメーターを異なる方法で取り込む必要があるため、多くの作業が必要になります(クラスの作成時にデフォルトを評価することはできません)。

2.)はるかに簡単な方法は、構成ファイルでクラスの引数を指定することです。あなたが見ればhttps://github.com/spotify/luigi/blob/master/luigi/task.py#L825、ConfigLuigiのクラスは実際には単なるクラスであることがわかります。そのため、クラスでTaskできることは何でもできます。その逆も同様です。したがって、これを設定ファイルに含めることができます。

[Task0]
path = newpath_1
...

3.)しかし、それぞれに異なる引数を使用して複数のタスクを実行したいように思われるので、Luigiが推奨しているように、親に引数を渡すことをお勧めします。次に、次のコマンドですべてを実行できます。

luigi.build([TaskC(arg=i) for i in range(3)])

4.)最後に、渡される依存関係を本当に取り除く必要がある場合は、タスクインスタンスのpickleをオブジェクトとしてParamaterizedTaskParameter拡張luigi.ObjectParameterして使用するを作成できます。

上記の解決策のうち、2または3のいずれかを強くお勧めします。1はプログラミングが難しく、4は非常に醜いパラメーターを作成し、もう少し高度です。

編集:ソリューション1と2は何よりもハックであり、パラメーターをにバンドルすることをお勧めしますDictParameter