Como você lê um arquivo grande com dados tabulares não classificados em blocos no Python?
Tenho um arquivo CSV grande (> 100 GB) que desejo ler na memória e processar os dados em blocos. Existem duas restrições que tenho:
- Obviamente, não consigo ler todo o arquivo na memória. Tenho apenas cerca de 8 GB de RAM em minha máquina.
- Os dados são tabulares e não ordenados. Eu preciso ler os dados em grupos.
Ticker | Data | Field1 | Field2 | Field3 |
---|---|---|---|---|
AAPL | 20201201 | 0 | 0 | 0 |
AAPL | 20201202 | 0 | 0 | 0 |
AAPL | 20201203 | 0 | 0 | 0 |
AAPL | 20201204 | 0 | 0 | 0 |
NFLX | 20201201 | 0 | 0 | 0 |
NFLX | 20201202 | 0 | 0 | 0 |
NFLX | 20201203 | 0 | 0 | 0 |
NFLX | 20201204 | 0 | 0 | 0 |
A preocupação aqui é que os dados devem ser lidos em grupos. Agrupado por Ticker e data. Se eu disser que quero ler 10.000 registros em cada lote. O limite desse lote não deve dividir grupos. ou seja, todos os dados AAPL para dezembro de 2020 devem terminar no mesmo lote. Esses dados não devem aparecer em dois lotes.
A maioria dos meus colegas de trabalho, quando enfrentam uma situação como essa, geralmente criam um script bash onde usam awk, cut, sort, uniq para dividir dados em grupos e gravar vários arquivos intermediários no disco. Em seguida, eles usam Python para processar esses arquivos. Eu queria saber se existe uma solução homogênea Python / Pandas / Numpy para isso.
Respostas
Que tal agora:
- abra o arquivo
- loop sobre as linhas de leitura: para cada linha lida:
- analise o ticker
- se ainda não foi feito:
- criar + abrir um arquivo para esse ticker (" arquivo ticker ")
- anexar a algum dicionário onde key = ticker e value = file handle
- escreva a linha no arquivo ticker
- feche os arquivos do ticker e o arquivo original
- processar cada arquivo de ticker único
Eu examinaria duas opções
Vaex e Dask.
Vaex parece estar focado exatamente no que você precisa. Processamento lento e conjuntos de dados muito grandes. Verifique seu github. No entanto, parece que você precisa converter arquivos para hdf5, o que pode ser um pouco demorado.
No que diz respeito a Dask, eu não contaria com sucesso. Dask é focado principalmente em computação distribuída e eu não tenho certeza se ele pode processar arquivos grandes preguiçosamente. Mas você pode tentar e ver.
Essa abordagem é pura pandas. Ele usaria duas funções: uma para calcular os índices, outra para ler um pedaço. Eu diria que falharia completamente se qualquer um de seus grupos não se encaixasse na memória (mas dados seus critérios de que esses grupos devem ser lidos um de cada vez, eu diria que seria um palpite certo que se encaixa).
Você precisaria fazer um loop no dicionário de índices (conforme calculado a partir da primeira função) para ler todo o dataframe.
Espero que ajude ... (Não hesite em adaptar o valor padrão do chunksize às suas necessidades).
import pandas as pd
def compute_indexes(url, cols_indexes=[], chunksize=100000, **kwargs):
"""
Returns a dictionnary
Keys are the pseudo indexes of the dataframe
Values are lists of indexes corresponding to this index
"""
iterator = pd.read_csv(
url,
usecols=cols_indexes,
chunksize=chunksize,
**kwargs)
dict_groups = dict()
for df in iterator:
groups_present = df.drop_duplicates(keep="first").values.tolist()
df.reset_index(drop=False, inplace=True)
df.set_index(cols_indexes, inplace=True)
for group in groups_present:
group = tuple(group)
if group not in dict_groups:
dict_groups[group] = []
try:
dict_groups[group] += df.loc[group]['index'].tolist()
except TypeError:
#only one row
dict_groups[group] += [df.loc[group]['index']]
return dict_groups
def read_csv_group(url, dict_groups, which_group, **kwargs):
if isinstance(which_group, list):
which_group = tuple(which_group)
rows = dict_groups[which_group]
def skip_rows(x):
if x == 0:
return False
elif x in {x+1 for x in rows}:
return False
else:
return True
df = pd.read_csv(url, skiprows=skip_rows, **kwargs)
return df
URL = "./dummy.csv"
indexes = ['Ticker', 'Date']
kwargs = {'dtype':{'Ticker':str, 'Date':int})
dict_groups = compute_indexes(URL, indexes, chunksize=100000, **kwargs)
df_one_group = read_csv_group(URL, dict_groups, ('AAPL', 20201201), **kwargs)