¿Cómo se lee un archivo grande con datos tabulares sin clasificar en fragmentos en Python?
Tengo un archivo CSV grande (> 100 GB) que quiero leer en la memoria y procesar los datos en trozos. Hay dos limitaciones que tengo:
- Obviamente, no puedo leer todo el archivo en la memoria. Solo tengo unos 8 GB de RAM en mi máquina.
- Los datos son tabulares y desordenados. Necesito leer los datos en grupos.
Corazón | Fecha | Campo1 | Campo2 | Campo3 |
---|---|---|---|---|
AAPL | 20201201 | 0 | 0 | 0 |
AAPL | 20201202 | 0 | 0 | 0 |
AAPL | 20201203 | 0 | 0 | 0 |
AAPL | 20201204 | 0 | 0 | 0 |
NFLX | 20201201 | 0 | 0 | 0 |
NFLX | 20201202 | 0 | 0 | 0 |
NFLX | 20201203 | 0 | 0 | 0 |
NFLX | 20201204 | 0 | 0 | 0 |
La preocupación aquí es que los datos deben leerse en grupos. Agrupados por Ticker y fecha. Si digo que quiero leer 10,000 registros en cada lote. El límite de ese lote no debe dividir grupos. es decir, todos los datos de AAPL para diciembre de 2020 deberían terminar en el mismo lote. Esos datos no deben aparecer en dos lotes.
La mayoría de mis compañeros de trabajo, cuando se enfrentan a una situación como esta, suelen crear un script bash en el que utilizan awk, cut, sort, uniq para dividir los datos en grupos y escribir varios archivos intermedios en el disco. Luego usan Python para procesar estos archivos. Me preguntaba si hay una solución Python / Pandas / Numpy homogénea para esto.
Respuestas
Qué tal esto:
- abre el archivo
- bucle sobre líneas de lectura: Para cada línea, lea:
- analizar el ticker
- si no lo ha hecho ya:
- crear + abrir un archivo para ese ticker (" archivo ticker ")
- anexar a algún dictado donde clave = ticker y valor = identificador de archivo
- escribe la línea en el archivo ticker
- cerrar los archivos de ticker y el archivo original
- procesar cada archivo de ticker
Buscaría dos opciones
Vaex y Dask.
Vaex parece estar enfocado exactamente en lo que necesitas. Procesamiento lento y conjuntos de datos muy grandes. Compruebe su github. Sin embargo, parece que necesita convertir archivos a hdf5, lo que puede llevar un poco de tiempo.
En lo que respecta a Dask, no contaré con el éxito. Dask se centra principalmente en la computación distribuida y no estoy realmente seguro de si puede procesar archivos grandes con pereza. Pero puedes probar y ver.
Este enfoque es puro pandas. Utilizaría dos funciones: una para calcular los índices y otra para leer un fragmento. Yo diría que fallaría por completo si alguno de sus grupos no encaja en la memoria (pero dado su criterio de que esos grupos deben leerse uno a la vez, diría que sería una suposición segura que encaja).
Debería recorrer el diccionario de índices (calculado a partir de la primera función) para leer el marco de datos completo.
Espero que le ayude ... (No dude en adaptar el valor predeterminado de chunksize a sus necesidades).
import pandas as pd
def compute_indexes(url, cols_indexes=[], chunksize=100000, **kwargs):
"""
Returns a dictionnary
Keys are the pseudo indexes of the dataframe
Values are lists of indexes corresponding to this index
"""
iterator = pd.read_csv(
url,
usecols=cols_indexes,
chunksize=chunksize,
**kwargs)
dict_groups = dict()
for df in iterator:
groups_present = df.drop_duplicates(keep="first").values.tolist()
df.reset_index(drop=False, inplace=True)
df.set_index(cols_indexes, inplace=True)
for group in groups_present:
group = tuple(group)
if group not in dict_groups:
dict_groups[group] = []
try:
dict_groups[group] += df.loc[group]['index'].tolist()
except TypeError:
#only one row
dict_groups[group] += [df.loc[group]['index']]
return dict_groups
def read_csv_group(url, dict_groups, which_group, **kwargs):
if isinstance(which_group, list):
which_group = tuple(which_group)
rows = dict_groups[which_group]
def skip_rows(x):
if x == 0:
return False
elif x in {x+1 for x in rows}:
return False
else:
return True
df = pd.read_csv(url, skiprows=skip_rows, **kwargs)
return df
URL = "./dummy.csv"
indexes = ['Ticker', 'Date']
kwargs = {'dtype':{'Ticker':str, 'Date':int})
dict_groups = compute_indexes(URL, indexes, chunksize=100000, **kwargs)
df_one_group = read_csv_group(URL, dict_groups, ('AAPL', 20201201), **kwargs)