Pythonで、並べ替えられていない表形式のデータがチャンクになっている大きなファイルをどのように読み取りますか?
メモリに読み込んでデータをチャンクで処理したい大きなCSVファイル(> 100 GB)があります。私には2つの制約があります。
- 明らかに、ファイル全体をメモリに読み込むことはできません。私のマシンには約8GBのRAMしかありません。
- データは表形式で順序付けられていません。グループでデータを読み取る必要があります。
ティッカー | 日付 | フィールド1 | フィールド2 | フィールド3 |
---|---|---|---|---|
AAPL | 20201201 | 0 | 0 | 0 |
AAPL | 20201202 | 0 | 0 | 0 |
AAPL | 20201203 | 0 | 0 | 0 |
AAPL | 20201204 | 0 | 0 | 0 |
NFLX | 20201201 | 0 | 0 | 0 |
NFLX | 20201202 | 0 | 0 | 0 |
NFLX | 20201203 | 0 | 0 | 0 |
NFLX | 20201204 | 0 | 0 | 0 |
ここでの懸念は、データをグループで読み取る必要があることです。ティッカーと日付でグループ化。各バッチで10,000レコードを読み取りたいと言った場合。そのバッチの境界は、グループを分割するべきではありません。つまり、2020年12月のすべてのAAPLデータは同じバッチになるはずです。そのデータは2つのバッチで表示されるべきではありません。
私の同僚のほとんどは、このような状況に直面すると、通常、awk、cut、sort、uniqを使用してデータをグループに分割し、複数の中間ファイルをディスクに書き込むbashスクリプトを作成します。次に、Pythonを使用してこれらのファイルを処理します。これに対して同種のPython / Pandas / Numpyソリューションがあるかどうか疑問に思いました。
回答
これはどう:
- ファイルを開く
- 読み取り行のループ:読み取り行ごとに:
- ティッカーを解析します
- まだ行っていない場合:
- そのティッカーのファイルを作成して開く(「ティッカーファイル」)
- key = tickerおよびvalue = fileハンドルの辞書に追加します
- ティッカーファイルに行を書き込む
- ティッカーファイルと元のファイルを閉じます
- 各単一のティッカーファイルを処理します
私は2つのオプションを検討します
VaexとDask。
Vaexはあなたが必要としているものに正確に焦点を合わせているようです。遅延処理と非常に大きなデータセット。彼らのgithubをチェックしてください。ただし、ファイルをhdf5に変換する必要があるようですが、これには少し時間がかかる場合があります。
Daskに関する限り、私は成功を期待していません。Daskは主に分散計算に重点を置いており、大きなファイルを遅延処理できるかどうかはよくわかりません。しかし、あなたは試して見ることができます。
このアプローチは純粋なパンダです。2つの関数を使用します。1つはインデックスを計算するためのもので、もう1つは1つのチャンクを読み取るためのものです。グループのいずれかがメモリに収まらない場合は完全に失敗すると思います(ただし、それらのグループを一度に1つずつ読み取る必要があるという基準を考えると、確実に収まると思います)。
データフレーム全体を読み取るには、(最初の関数から計算された)インデックスの辞書をループする必要があります。
それがお役に立てば幸いです...(チャンクサイズのデフォルト値をニーズに合わせて調整することを躊躇しないでください)。
import pandas as pd
def compute_indexes(url, cols_indexes=[], chunksize=100000, **kwargs):
"""
Returns a dictionnary
Keys are the pseudo indexes of the dataframe
Values are lists of indexes corresponding to this index
"""
iterator = pd.read_csv(
url,
usecols=cols_indexes,
chunksize=chunksize,
**kwargs)
dict_groups = dict()
for df in iterator:
groups_present = df.drop_duplicates(keep="first").values.tolist()
df.reset_index(drop=False, inplace=True)
df.set_index(cols_indexes, inplace=True)
for group in groups_present:
group = tuple(group)
if group not in dict_groups:
dict_groups[group] = []
try:
dict_groups[group] += df.loc[group]['index'].tolist()
except TypeError:
#only one row
dict_groups[group] += [df.loc[group]['index']]
return dict_groups
def read_csv_group(url, dict_groups, which_group, **kwargs):
if isinstance(which_group, list):
which_group = tuple(which_group)
rows = dict_groups[which_group]
def skip_rows(x):
if x == 0:
return False
elif x in {x+1 for x in rows}:
return False
else:
return True
df = pd.read_csv(url, skiprows=skip_rows, **kwargs)
return df
URL = "./dummy.csv"
indexes = ['Ticker', 'Date']
kwargs = {'dtype':{'Ticker':str, 'Date':int})
dict_groups = compute_indexes(URL, indexes, chunksize=100000, **kwargs)
df_one_group = read_csv_group(URL, dict_groups, ('AAPL', 20201201), **kwargs)