¿Cómo se lee un archivo grande con datos tabulares sin clasificar en fragmentos en Python?

Dec 21 2020

Tengo un archivo CSV grande (> 100 GB) que quiero leer en la memoria y procesar los datos en trozos. Hay dos limitaciones que tengo:

  1. Obviamente, no puedo leer todo el archivo en la memoria. Solo tengo unos 8 GB de RAM en mi máquina.
  2. Los datos son tabulares y desordenados. Necesito leer los datos en grupos.
Corazón Fecha Campo1 Campo2 Campo3
AAPL 20201201 0 0 0
AAPL 20201202 0 0 0
AAPL 20201203 0 0 0
AAPL 20201204 0 0 0
NFLX 20201201 0 0 0
NFLX 20201202 0 0 0
NFLX 20201203 0 0 0
NFLX 20201204 0 0 0

La preocupación aquí es que los datos deben leerse en grupos. Agrupados por Ticker y fecha. Si digo que quiero leer 10,000 registros en cada lote. El límite de ese lote no debe dividir grupos. es decir, todos los datos de AAPL para diciembre de 2020 deberían terminar en el mismo lote. Esos datos no deben aparecer en dos lotes.

La mayoría de mis compañeros de trabajo, cuando se enfrentan a una situación como esta, suelen crear un script bash en el que utilizan awk, cut, sort, uniq para dividir los datos en grupos y escribir varios archivos intermedios en el disco. Luego usan Python para procesar estos archivos. Me preguntaba si hay una solución Python / Pandas / Numpy homogénea para esto.

Respuestas

genodeftest Dec 21 2020 at 03:11

Qué tal esto:

  1. abre el archivo
  2. bucle sobre líneas de lectura: Para cada línea, lea:
  • analizar el ticker
  • si no lo ha hecho ya:
    • crear + abrir un archivo para ese ticker (" archivo ticker ")
    • anexar a algún dictado donde clave = ticker y valor = identificador de archivo
  • escribe la línea en el archivo ticker
  1. cerrar los archivos de ticker y el archivo original
  2. procesar cada archivo de ticker
Martin Dec 21 2020 at 03:44

Buscaría dos opciones

Vaex y Dask.

Vaex parece estar enfocado exactamente en lo que necesitas. Procesamiento lento y conjuntos de datos muy grandes. Compruebe su github. Sin embargo, parece que necesita convertir archivos a hdf5, lo que puede llevar un poco de tiempo.

En lo que respecta a Dask, no contaré con el éxito. Dask se centra principalmente en la computación distribuida y no estoy realmente seguro de si puede procesar archivos grandes con pereza. Pero puedes probar y ver.

tgrandje Dec 23 2020 at 03:05

Este enfoque es puro pandas. Utilizaría dos funciones: una para calcular los índices y otra para leer un fragmento. Yo diría que fallaría por completo si alguno de sus grupos no encaja en la memoria (pero dado su criterio de que esos grupos deben leerse uno a la vez, diría que sería una suposición segura que encaja).

Debería recorrer el diccionario de índices (calculado a partir de la primera función) para leer el marco de datos completo.

Espero que le ayude ... (No dude en adaptar el valor predeterminado de chunksize a sus necesidades).

import pandas as pd

def compute_indexes(url, cols_indexes=[], chunksize=100000, **kwargs):
    """
    Returns a dictionnary
    Keys are the pseudo indexes of the dataframe 
    Values are lists of indexes corresponding to this index
    """
    iterator = pd.read_csv(
            url, 
            usecols=cols_indexes, 
            chunksize=chunksize,
            **kwargs)
    
    dict_groups = dict()
    for df in iterator:
        groups_present = df.drop_duplicates(keep="first").values.tolist()
        df.reset_index(drop=False, inplace=True)
        df.set_index(cols_indexes, inplace=True)
        for group in groups_present:
            group = tuple(group)
            if group not in dict_groups:
                dict_groups[group] = []
            try:
                dict_groups[group] += df.loc[group]['index'].tolist()
            except TypeError:
                #only one row
                dict_groups[group] += [df.loc[group]['index']]
                
    return dict_groups

def read_csv_group(url, dict_groups, which_group, **kwargs):
    if isinstance(which_group, list):
        which_group = tuple(which_group)
    rows = dict_groups[which_group]
    def skip_rows(x):
        if x == 0:
            return False
        elif x in {x+1 for x in rows}:
            return False
        else:
            return True
    df = pd.read_csv(url, skiprows=skip_rows, **kwargs)
    return df
    
URL = "./dummy.csv"
indexes = ['Ticker', 'Date']
kwargs = {'dtype':{'Ticker':str, 'Date':int})
dict_groups = compute_indexes(URL, indexes, chunksize=100000, **kwargs)
df_one_group = read_csv_group(URL, dict_groups, ('AAPL', 20201201), **kwargs)