Como você lê um arquivo grande com dados tabulares não classificados em blocos no Python?

Dec 21 2020

Tenho um arquivo CSV grande (> 100 GB) que desejo ler na memória e processar os dados em blocos. Existem duas restrições que tenho:

  1. Obviamente, não consigo ler todo o arquivo na memória. Tenho apenas cerca de 8 GB de RAM em minha máquina.
  2. Os dados são tabulares e não ordenados. Eu preciso ler os dados em grupos.
Ticker Data Field1 Field2 Field3
AAPL 20201201 0 0 0
AAPL 20201202 0 0 0
AAPL 20201203 0 0 0
AAPL 20201204 0 0 0
NFLX 20201201 0 0 0
NFLX 20201202 0 0 0
NFLX 20201203 0 0 0
NFLX 20201204 0 0 0

A preocupação aqui é que os dados devem ser lidos em grupos. Agrupado por Ticker e data. Se eu disser que quero ler 10.000 registros em cada lote. O limite desse lote não deve dividir grupos. ou seja, todos os dados AAPL para dezembro de 2020 devem terminar no mesmo lote. Esses dados não devem aparecer em dois lotes.

A maioria dos meus colegas de trabalho, quando enfrentam uma situação como essa, geralmente criam um script bash onde usam awk, cut, sort, uniq para dividir dados em grupos e gravar vários arquivos intermediários no disco. Em seguida, eles usam Python para processar esses arquivos. Eu queria saber se existe uma solução homogênea Python / Pandas / Numpy para isso.

Respostas

genodeftest Dec 21 2020 at 03:11

Que tal agora:

  1. abra o arquivo
  2. loop sobre as linhas de leitura: para cada linha lida:
  • analise o ticker
  • se ainda não foi feito:
    • criar + abrir um arquivo para esse ticker (" arquivo ticker ")
    • anexar a algum dicionário onde key = ticker e value = file handle
  • escreva a linha no arquivo ticker
  1. feche os arquivos do ticker e o arquivo original
  2. processar cada arquivo de ticker único
Martin Dec 21 2020 at 03:44

Eu examinaria duas opções

Vaex e Dask.

Vaex parece estar focado exatamente no que você precisa. Processamento lento e conjuntos de dados muito grandes. Verifique seu github. No entanto, parece que você precisa converter arquivos para hdf5, o que pode ser um pouco demorado.

No que diz respeito a Dask, eu não contaria com sucesso. Dask é focado principalmente em computação distribuída e eu não tenho certeza se ele pode processar arquivos grandes preguiçosamente. Mas você pode tentar e ver.

tgrandje Dec 23 2020 at 03:05

Essa abordagem é pura pandas. Ele usaria duas funções: uma para calcular os índices, outra para ler um pedaço. Eu diria que falharia completamente se qualquer um de seus grupos não se encaixasse na memória (mas dados seus critérios de que esses grupos devem ser lidos um de cada vez, eu diria que seria um palpite certo que se encaixa).

Você precisaria fazer um loop no dicionário de índices (conforme calculado a partir da primeira função) para ler todo o dataframe.

Espero que ajude ... (Não hesite em adaptar o valor padrão do chunksize às suas necessidades).

import pandas as pd

def compute_indexes(url, cols_indexes=[], chunksize=100000, **kwargs):
    """
    Returns a dictionnary
    Keys are the pseudo indexes of the dataframe 
    Values are lists of indexes corresponding to this index
    """
    iterator = pd.read_csv(
            url, 
            usecols=cols_indexes, 
            chunksize=chunksize,
            **kwargs)
    
    dict_groups = dict()
    for df in iterator:
        groups_present = df.drop_duplicates(keep="first").values.tolist()
        df.reset_index(drop=False, inplace=True)
        df.set_index(cols_indexes, inplace=True)
        for group in groups_present:
            group = tuple(group)
            if group not in dict_groups:
                dict_groups[group] = []
            try:
                dict_groups[group] += df.loc[group]['index'].tolist()
            except TypeError:
                #only one row
                dict_groups[group] += [df.loc[group]['index']]
                
    return dict_groups

def read_csv_group(url, dict_groups, which_group, **kwargs):
    if isinstance(which_group, list):
        which_group = tuple(which_group)
    rows = dict_groups[which_group]
    def skip_rows(x):
        if x == 0:
            return False
        elif x in {x+1 for x in rows}:
            return False
        else:
            return True
    df = pd.read_csv(url, skiprows=skip_rows, **kwargs)
    return df
    
URL = "./dummy.csv"
indexes = ['Ticker', 'Date']
kwargs = {'dtype':{'Ticker':str, 'Date':int})
dict_groups = compute_indexes(URL, indexes, chunksize=100000, **kwargs)
df_one_group = read_csv_group(URL, dict_groups, ('AAPL', 20201201), **kwargs)