Comment lire un gros fichier avec des données tabulaires non triées en morceaux en Python?

Dec 21 2020

J'ai un gros fichier CSV (> 100 Go) que je veux lire en mémoire et traiter les données en morceaux. J'ai deux contraintes:

  1. Évidemment, je ne peux pas lire l'intégralité du fichier en mémoire. Je n'ai que 8 Go de RAM sur ma machine.
  2. Les données sont tabulaires et non ordonnées. J'ai besoin de lire les données en groupes.
Téléscripteur Date Champ1 Champ2 Champ3
AAPL 20201201 0 0 0
AAPL 20201202 0 0 0
AAPL 20201203 0 0 0
AAPL 20201204 0 0 0
NFLX 20201201 0 0 0
NFLX 20201202 0 0 0
NFLX 20201203 0 0 0
NFLX 20201204 0 0 0

Le problème ici est que les données doivent être lues en groupes. Regroupés par ticker et date. Si je dis que je veux lire 10 000 enregistrements dans chaque lot. La limite de ce lot ne doit pas diviser les groupes. c'est-à-dire que toutes les données AAPL pour décembre 2020 devraient se retrouver dans le même lot. Ces données ne doivent pas apparaître en deux lots.

La plupart de mes collègues lorsqu'ils sont confrontés à une situation comme celle-ci, ils créent généralement un script bash dans lequel ils utilisent awk, cut, sort, uniq pour diviser les données en groupes et écrire plusieurs fichiers intermédiaires sur le disque. Ensuite, ils utilisent Python pour traiter ces fichiers. Je me demandais s'il existe une solution homogène Python / Pandas / Numpy à cela.

Réponses

genodeftest Dec 21 2020 at 03:11

Que dis-tu de ça:

  1. ouvrir le dossier
  2. boucle sur les lignes de lecture: Pour chaque ligne, lisez:
  • analyser le ticker
  • si ce n'est déjà fait:
    • créer + ouvrir un fichier pour ce ticker (" ticker file ")
    • ajouter à un dict où key = ticker et value = file handle
  • écrire la ligne dans le fichier ticker
  1. fermez les fichiers ticker et le fichier original
  2. traiter chaque fichier de téléscripteur
Martin Dec 21 2020 at 03:44

Je regarderais dans deux options

Vaex et Dask .

Vaex semble se concentrer exactement sur ce dont vous avez besoin. Traitement paresseux et très grands ensembles de données. Vérifiez leur github. Cependant, il semble que vous deviez convertir des fichiers en hdf5, ce qui peut prendre un peu de temps.

En ce qui concerne Dask, je ne compterais pas sur le succès. Dask se concentre principalement sur le calcul distribué et je ne suis pas vraiment sûr qu'il puisse traiter de gros fichiers paresseusement. Mais vous pouvez essayer de voir.

tgrandje Dec 23 2020 at 03:05

Cette approche est de purs pandas. Il utiliserait deux fonctions: une pour calculer les index, une pour lire un morceau. Je dirais que cela échouerait complètement si l'un de vos groupes ne rentre pas dans la mémoire (mais étant donné vos critères selon lesquels ces groupes doivent être lus un à la fois, je dirais que ce serait une supposition sûre que cela correspond).

Vous auriez besoin de boucler sur le dictionnaire des index (tel que calculé à partir de la première fonction) pour lire la totalité de la trame de données.

J'espère que cela vous aidera ... (N'hésitez pas à adapter la valeur par défaut du chunksize à vos besoins).

import pandas as pd

def compute_indexes(url, cols_indexes=[], chunksize=100000, **kwargs):
    """
    Returns a dictionnary
    Keys are the pseudo indexes of the dataframe 
    Values are lists of indexes corresponding to this index
    """
    iterator = pd.read_csv(
            url, 
            usecols=cols_indexes, 
            chunksize=chunksize,
            **kwargs)
    
    dict_groups = dict()
    for df in iterator:
        groups_present = df.drop_duplicates(keep="first").values.tolist()
        df.reset_index(drop=False, inplace=True)
        df.set_index(cols_indexes, inplace=True)
        for group in groups_present:
            group = tuple(group)
            if group not in dict_groups:
                dict_groups[group] = []
            try:
                dict_groups[group] += df.loc[group]['index'].tolist()
            except TypeError:
                #only one row
                dict_groups[group] += [df.loc[group]['index']]
                
    return dict_groups

def read_csv_group(url, dict_groups, which_group, **kwargs):
    if isinstance(which_group, list):
        which_group = tuple(which_group)
    rows = dict_groups[which_group]
    def skip_rows(x):
        if x == 0:
            return False
        elif x in {x+1 for x in rows}:
            return False
        else:
            return True
    df = pd.read_csv(url, skiprows=skip_rows, **kwargs)
    return df
    
URL = "./dummy.csv"
indexes = ['Ticker', 'Date']
kwargs = {'dtype':{'Ticker':str, 'Date':int})
dict_groups = compute_indexes(URL, indexes, chunksize=100000, **kwargs)
df_one_group = read_csv_group(URL, dict_groups, ('AAPL', 20201201), **kwargs)