Pythonで、並べ替えられていない表形式のデータがチャンクになっている大きなファイルをどのように読み取りますか?

Dec 21 2020

メモリに読み込んでデータをチャンクで処理したい大きなCSVファイル(> 100 GB)があります。私には2つの制約があります。

  1. 明らかに、ファイル全体をメモリに読み込むことはできません。私のマシンには約8GBのRAMしかありません。
  2. データは表形式で順序付けられていません。グループでデータを読み取る必要があります。
ティッカー 日付 フィールド1 フィールド2 フィールド3
AAPL 20201201 0 0 0
AAPL 20201202 0 0 0
AAPL 20201203 0 0 0
AAPL 20201204 0 0 0
NFLX 20201201 0 0 0
NFLX 20201202 0 0 0
NFLX 20201203 0 0 0
NFLX 20201204 0 0 0

ここでの懸念は、データをグループで読み取る必要があることです。ティッカーと日付でグループ化。各バッチで10,000レコードを読み取りたいと言った場合。そのバッチの境界は、グループを分割するべきではありません。つまり、2020年12月のすべてのAAPLデータは同じバッチになるはずです。そのデータは2つのバッチで表示されるべきではありません。

私の同僚のほとんどは、このような状況に直面すると、通常、awk、cut、sort、uniqを使用してデータをグループに分割し、複数の中間ファイルをディスクに書き込むbashスクリプトを作成します。次に、Pythonを使用してこれらのファイルを処理します。これに対して同種のPython / Pandas / Numpyソリューションがあるかどうか疑問に思いました。

回答

genodeftest Dec 21 2020 at 03:11

これはどう:

  1. ファイルを開く
  2. 読み取り行のループ:読み取り行ごとに:
  • ティッカーを解析します
  • まだ行っていない場合:
    • そのティッカーのファイルを作成して開く(「ティッカーファイル」)
    • key = tickerおよびvalue = fileハンドルの辞書に追加します
  • ティッカーファイルに行を書き込む
  1. ティッカーファイルと元のファイルを閉じます
  2. 各単一のティッカーファイルを処理します
Martin Dec 21 2020 at 03:44

私は2つのオプションを検討します

VaexDask

Vaexはあなたが必要としているものに正確に焦点を合わせているようです。遅延処理と非常に大きなデータセット。彼らのgithubをチェックしてください。ただし、ファイルをhdf5に変換する必要があるようですが、これには少し時間がかかる場合があります。

Daskに関する限り、私は成功を期待していません。Daskは主に分散計算に重点を置いており、大きなファイルを遅延処理できるかどうかはよくわかりません。しかし、あなたは試して見ることができます。

tgrandje Dec 23 2020 at 03:05

このアプローチは純粋なパンダです。2つの関数を使用します。1つはインデックスを計算するためのもので、もう1つは1つのチャンクを読み取るためのものです。グループのいずれかがメモリに収まらない場合は完全に失敗すると思います(ただし、それらのグループを一度に1つずつ読み取る必要があるという基準を考えると、確実に収まると思います)。

データフレーム全体を読み取るには、(最初​​の関数から計算された)インデックスの辞書をループする必要があります。

それがお役に立てば幸いです...(チャンクサイズのデフォルト値をニーズに合わせて調整することを躊躇しないでください)。

import pandas as pd

def compute_indexes(url, cols_indexes=[], chunksize=100000, **kwargs):
    """
    Returns a dictionnary
    Keys are the pseudo indexes of the dataframe 
    Values are lists of indexes corresponding to this index
    """
    iterator = pd.read_csv(
            url, 
            usecols=cols_indexes, 
            chunksize=chunksize,
            **kwargs)
    
    dict_groups = dict()
    for df in iterator:
        groups_present = df.drop_duplicates(keep="first").values.tolist()
        df.reset_index(drop=False, inplace=True)
        df.set_index(cols_indexes, inplace=True)
        for group in groups_present:
            group = tuple(group)
            if group not in dict_groups:
                dict_groups[group] = []
            try:
                dict_groups[group] += df.loc[group]['index'].tolist()
            except TypeError:
                #only one row
                dict_groups[group] += [df.loc[group]['index']]
                
    return dict_groups

def read_csv_group(url, dict_groups, which_group, **kwargs):
    if isinstance(which_group, list):
        which_group = tuple(which_group)
    rows = dict_groups[which_group]
    def skip_rows(x):
        if x == 0:
            return False
        elif x in {x+1 for x in rows}:
            return False
        else:
            return True
    df = pd.read_csv(url, skiprows=skip_rows, **kwargs)
    return df
    
URL = "./dummy.csv"
indexes = ['Ticker', 'Date']
kwargs = {'dtype':{'Ticker':str, 'Date':int})
dict_groups = compute_indexes(URL, indexes, chunksize=100000, **kwargs)
df_one_group = read_csv_group(URL, dict_groups, ('AAPL', 20201201), **kwargs)