Как вы читаете большой файл с несортированными табличными данными по частям в Python?

Dec 21 2020

У меня есть большой CSV-файл (> 100 ГБ), который я хочу прочитать в память и обработать данные по частям. У меня есть два ограничения:

  1. Очевидно, я не могу прочитать весь файл в памяти. У меня на машине всего около 8 ГБ оперативной памяти.
  2. Данные являются табличными и неупорядоченными. Мне нужно читать данные группами.
Бегущая строка Дата Поле1 Поле2 Поле3
AAPL 20201201 0 0 0
AAPL 20201202 0 0 0
AAPL 20201203 0 0 0
AAPL 20201204 0 0 0
NFLX 20201201 0 0 0
NFLX 20201202 0 0 0
NFLX 20201203 0 0 0
NFLX 20201204 0 0 0

Проблема здесь в том, что данные должны считываться группами. Сгруппированы по тикеру и дате. Если я скажу, что хочу прочитать 10 000 записей в каждом пакете. Граница этого пакета не должна разделять группы. т.е. все данные AAPL за декабрь 2020 г. должны попасть в один пакет. Эти данные не должны появляться в двух пакетах.

Большинство моих коллег, когда они сталкиваются с подобной ситуацией, обычно создают сценарий bash, в котором они используют awk, cut, sort, uniq для разделения данных на группы и записи нескольких промежуточных файлов на диск. Затем они используют Python для обработки этих файлов. Мне было интересно, есть ли для этого однородное решение Python / Pandas / Numpy.

Ответы

genodeftest Dec 21 2020 at 03:11

Как насчет этого:

  1. открыть файл
  2. цикл по строкам чтения: Для каждой строки прочтите:
  • проанализировать тикер
  • если еще не сделано:
    • создать + открыть файл для этого тикера (" файл тикера ")
    • добавить к некоторому dict, где ключ = тикер и значение = дескриптор файла
  • записать строку в файл тикера
  1. закройте файлы тикера и исходный файл
  2. обрабатывать каждый отдельный файл тикера
Martin Dec 21 2020 at 03:44

Я бы рассмотрел два варианта

Вэкс и Даск.

Кажется, Vaex сосредоточен именно на том, что вам нужно. Ленивая обработка и очень большие наборы данных. Проверьте их github. Однако кажется, что вам нужно конвертировать файлы в hdf5, что может занять немного времени.

Что касается Даска, то я бы не стал рассчитывать на успех. Dask в первую очередь ориентирован на распределенные вычисления, и я не совсем уверен, может ли он лениво обрабатывать большие файлы. Но вы можете попробовать и посмотреть.

tgrandje Dec 23 2020 at 03:05

Этот подход - чистые панды. Он будет использовать две функции: одну для вычисления индексов, другую для чтения одного фрагмента. Я бы сказал, что он полностью потерпит неудачу, если какая-либо из ваших групп не умещается в памяти (но, учитывая ваши критерии, согласно которым эти группы должны читаться по одной, я бы сказал, что наверняка это подходит).

Вам нужно будет перебрать словарь индексов (вычисленных из первой функции), чтобы прочитать весь фрейм данных.

Надеюсь, что это поможет ... (Не сомневайтесь, измените значение chunksize по умолчанию под свои нужды).

import pandas as pd

def compute_indexes(url, cols_indexes=[], chunksize=100000, **kwargs):
    """
    Returns a dictionnary
    Keys are the pseudo indexes of the dataframe 
    Values are lists of indexes corresponding to this index
    """
    iterator = pd.read_csv(
            url, 
            usecols=cols_indexes, 
            chunksize=chunksize,
            **kwargs)
    
    dict_groups = dict()
    for df in iterator:
        groups_present = df.drop_duplicates(keep="first").values.tolist()
        df.reset_index(drop=False, inplace=True)
        df.set_index(cols_indexes, inplace=True)
        for group in groups_present:
            group = tuple(group)
            if group not in dict_groups:
                dict_groups[group] = []
            try:
                dict_groups[group] += df.loc[group]['index'].tolist()
            except TypeError:
                #only one row
                dict_groups[group] += [df.loc[group]['index']]
                
    return dict_groups

def read_csv_group(url, dict_groups, which_group, **kwargs):
    if isinstance(which_group, list):
        which_group = tuple(which_group)
    rows = dict_groups[which_group]
    def skip_rows(x):
        if x == 0:
            return False
        elif x in {x+1 for x in rows}:
            return False
        else:
            return True
    df = pd.read_csv(url, skiprows=skip_rows, **kwargs)
    return df
    
URL = "./dummy.csv"
indexes = ['Ticker', 'Date']
kwargs = {'dtype':{'Ticker':str, 'Date':int})
dict_groups = compute_indexes(URL, indexes, chunksize=100000, **kwargs)
df_one_group = read_csv_group(URL, dict_groups, ('AAPL', 20201201), **kwargs)