Planificateur Dask vide / graphique non affiché

Nov 19 2020

J'ai une configuration comme suit:

# etl.py
from dask.distributed import Client
import dask
from tasks import task1, task2, task3

def runall(**kwargs):
    print("done")


def etl():
    client = Client()

    tasks = {}
    tasks['task1'] = dask.delayed(task)(*args)
    tasks['task2'] = dask.delayed(task)(*args)
    tasks['task3'] = dask.delayed(task)(*args)

     out = dask.delayed(runall)(**tasks)
     out.compute()

Cette logique a été empruntée à luigi et fonctionne bien avec les instructions if pour contrôler les tâches à exécuter.

Cependant, certaines des tâches chargent de grandes quantités de données à partir de SQL et provoquent des avertissements de gel GIL (au moins, c'est ce que je soupçonne car il est difficile de diagnostiquer quelle ligne cause exactement le problème). Parfois, le graphique / la surveillance montrés sur 8787 ne montre rien juste scheduler empty, je soupçonne que ceux-ci sont causés par le gel de l'application. Quelle est la meilleure façon de charger de grandes quantités de données à partir de SQL dans dask. (MSSQL et oracle). Pour le moment, cela ne concerne sqlalchemyque les paramètres réglés. Est-ce que l'ajout asyncet l' awaitaide?

Cependant, certaines tâches sont un peu lentes et j'aimerais utiliser des trucs comme dask.dataframeou en baginterne. Les documents déconseillent d'appeler retardé à l'intérieur retardé. Cela vaut-il également pour dataframeet bag. L'ensemble du script est exécuté sur une seule machine à 40 cœurs.

En utilisant bag.starmapj'obtiens un graphique comme celui-ci:

où les lignes droites supérieures sont ajoutées / découvertes une fois que le calcul atteint cette tâche et que le calcul est appelé à l'intérieur.

Réponses

ic_fl2 Nov 24 2020 at 10:15

Il ne semble y avoir aucune rime ou raison autre que la machine était / est très occupée et a du mal à afficher les mises à jour d'état et les tracés de bokeh comme souhaité.