Как вы читаете большой файл с несортированными табличными данными по частям в Python?
У меня есть большой CSV-файл (> 100 ГБ), который я хочу прочитать в память и обработать данные по частям. У меня есть два ограничения:
- Очевидно, я не могу прочитать весь файл в памяти. У меня на машине всего около 8 ГБ оперативной памяти.
- Данные являются табличными и неупорядоченными. Мне нужно читать данные группами.
Бегущая строка | Дата | Поле1 | Поле2 | Поле3 |
---|---|---|---|---|
AAPL | 20201201 | 0 | 0 | 0 |
AAPL | 20201202 | 0 | 0 | 0 |
AAPL | 20201203 | 0 | 0 | 0 |
AAPL | 20201204 | 0 | 0 | 0 |
NFLX | 20201201 | 0 | 0 | 0 |
NFLX | 20201202 | 0 | 0 | 0 |
NFLX | 20201203 | 0 | 0 | 0 |
NFLX | 20201204 | 0 | 0 | 0 |
Проблема здесь в том, что данные должны считываться группами. Сгруппированы по тикеру и дате. Если я скажу, что хочу прочитать 10 000 записей в каждом пакете. Граница этого пакета не должна разделять группы. т.е. все данные AAPL за декабрь 2020 г. должны попасть в один пакет. Эти данные не должны появляться в двух пакетах.
Большинство моих коллег, когда они сталкиваются с подобной ситуацией, обычно создают сценарий bash, в котором они используют awk, cut, sort, uniq для разделения данных на группы и записи нескольких промежуточных файлов на диск. Затем они используют Python для обработки этих файлов. Мне было интересно, есть ли для этого однородное решение Python / Pandas / Numpy.
Ответы
Как насчет этого:
- открыть файл
- цикл по строкам чтения: Для каждой строки прочтите:
- проанализировать тикер
- если еще не сделано:
- создать + открыть файл для этого тикера (" файл тикера ")
- добавить к некоторому dict, где ключ = тикер и значение = дескриптор файла
- записать строку в файл тикера
- закройте файлы тикера и исходный файл
- обрабатывать каждый отдельный файл тикера
Я бы рассмотрел два варианта
Вэкс и Даск.
Кажется, Vaex сосредоточен именно на том, что вам нужно. Ленивая обработка и очень большие наборы данных. Проверьте их github. Однако кажется, что вам нужно конвертировать файлы в hdf5, что может занять немного времени.
Что касается Даска, то я бы не стал рассчитывать на успех. Dask в первую очередь ориентирован на распределенные вычисления, и я не совсем уверен, может ли он лениво обрабатывать большие файлы. Но вы можете попробовать и посмотреть.
Этот подход - чистые панды. Он будет использовать две функции: одну для вычисления индексов, другую для чтения одного фрагмента. Я бы сказал, что он полностью потерпит неудачу, если какая-либо из ваших групп не умещается в памяти (но, учитывая ваши критерии, согласно которым эти группы должны читаться по одной, я бы сказал, что наверняка это подходит).
Вам нужно будет перебрать словарь индексов (вычисленных из первой функции), чтобы прочитать весь фрейм данных.
Надеюсь, что это поможет ... (Не сомневайтесь, измените значение chunksize по умолчанию под свои нужды).
import pandas as pd
def compute_indexes(url, cols_indexes=[], chunksize=100000, **kwargs):
"""
Returns a dictionnary
Keys are the pseudo indexes of the dataframe
Values are lists of indexes corresponding to this index
"""
iterator = pd.read_csv(
url,
usecols=cols_indexes,
chunksize=chunksize,
**kwargs)
dict_groups = dict()
for df in iterator:
groups_present = df.drop_duplicates(keep="first").values.tolist()
df.reset_index(drop=False, inplace=True)
df.set_index(cols_indexes, inplace=True)
for group in groups_present:
group = tuple(group)
if group not in dict_groups:
dict_groups[group] = []
try:
dict_groups[group] += df.loc[group]['index'].tolist()
except TypeError:
#only one row
dict_groups[group] += [df.loc[group]['index']]
return dict_groups
def read_csv_group(url, dict_groups, which_group, **kwargs):
if isinstance(which_group, list):
which_group = tuple(which_group)
rows = dict_groups[which_group]
def skip_rows(x):
if x == 0:
return False
elif x in {x+1 for x in rows}:
return False
else:
return True
df = pd.read_csv(url, skiprows=skip_rows, **kwargs)
return df
URL = "./dummy.csv"
indexes = ['Ticker', 'Date']
kwargs = {'dtype':{'Ticker':str, 'Date':int})
dict_groups = compute_indexes(URL, indexes, chunksize=100000, **kwargs)
df_one_group = read_csv_group(URL, dict_groups, ('AAPL', 20201201), **kwargs)