파이썬에서 청크로 분류되지 않은 테이블 형식 데이터가있는 대용량 파일을 어떻게 읽습니까?

Dec 21 2020

메모리로 읽고 데이터를 청크로 처리하려는 대용량 CSV 파일 (> 100GB)이 있습니다. 두 가지 제약이 있습니다.

  1. 분명히 전체 파일을 메모리로 읽을 수는 없습니다. 내 컴퓨터에는 약 8GB의 램만 있습니다.
  2. 데이터는 표 형식이며 순서가 없습니다. 그룹으로 데이터를 읽어야합니다.
증권 시세 표시기 데이트 Field1 Field2 Field3
AAPL 20201201 0 0 0
AAPL 20201202 0 0 0
AAPL 20201203 0 0 0
AAPL 20201204 0 0 0
NFLX 20201201 0 0 0
NFLX 20201202 0 0 0
NFLX 20201203 0 0 0
NFLX 20201204 0 0 0

여기서 문제는 데이터를 그룹으로 읽어야한다는 것입니다. 티커 및 날짜별로 그룹화됩니다. 각 배치에서 10,000 개의 레코드를 읽고 싶다고하면. 해당 배치의 경계는 그룹을 분할하지 않아야합니다. 즉, 2020 년 12 월의 모든 AAPL 데이터는 동일한 배치로 끝나야합니다. 해당 데이터는 두 개의 배치로 표시되지 않아야합니다.

대부분의 동료들은 이와 같은 상황에 직면했을 때 일반적으로 awk, cut, sort, uniq를 사용하여 데이터를 그룹으로 나누고 여러 중간 파일을 디스크에 쓰는 bash 스크립트를 만듭니다. 그런 다음 Python을 사용하여 이러한 파일을 처리합니다. 이에 대한 동종의 Python / Pandas / Numpy 솔루션이 있는지 궁금합니다.

답변

genodeftest Dec 21 2020 at 03:11

이건 어때요:

  1. 파일 열기
  2. 읽기 행에 대한 루프 : 각 행에 대해 다음을 읽습니다.
  • 시세 파싱
  • 아직 완료하지 않은 경우 :
    • 해당 티커 ( " 티커 파일 ")에 대한 파일 생성 + 열기
    • key = ticker 및 value = file 핸들 인 일부 dict에 추가
  • 티커 파일에 줄 쓰기
  1. 티커 파일 과 원본 파일을 닫습니다.
  2. 각 단일 티커 파일 처리
Martin Dec 21 2020 at 03:44

두 가지 옵션을 살펴 보겠습니다.

Vaex 와 Dask.

Vaex 는 필요한 것에 정확히 집중하는 것 같습니다. 지연 처리 및 매우 큰 데이터 세트. 그들의 github를 확인하십시오. 그러나 파일을 hdf5로 변환해야하는데 약간의 시간이 소요될 수 있습니다.

Dask에 관한 한, 나는 성공을 믿지 않을 것입니다. Dask는 주로 분산 계산에 중점을두고 있으며 대용량 파일을 느리게 처리 할 수 ​​있는지 확실하지 않습니다. 그러나 당신은 시도하고 볼 수 있습니다.

tgrandje Dec 23 2020 at 03:05

이 접근 방식은 순수한 판다입니다. 두 가지 함수를 사용합니다. 하나는 인덱스를 계산하고 하나는 하나의 청크를 읽는 것입니다. 나는 당신의 그룹 중 하나가 기억에 맞지 않으면 완전히 실패 할 것이라고 말하고 싶습니다 (하지만 그 그룹을 한 번에 하나씩 읽어야한다는 당신의 기준을 감안할 때, 그것이 적합하다고 말할 것입니다).

전체 데이터 프레임을 읽으려면 인덱스 사전 (첫 번째 함수에서 계산 됨)을 반복해야합니다.

도움이 되길 바랍니다 ... (청크 크기의 기본값을 필요에 맞게 조정하는 것을 망설이지 마십시오).

import pandas as pd

def compute_indexes(url, cols_indexes=[], chunksize=100000, **kwargs):
    """
    Returns a dictionnary
    Keys are the pseudo indexes of the dataframe 
    Values are lists of indexes corresponding to this index
    """
    iterator = pd.read_csv(
            url, 
            usecols=cols_indexes, 
            chunksize=chunksize,
            **kwargs)
    
    dict_groups = dict()
    for df in iterator:
        groups_present = df.drop_duplicates(keep="first").values.tolist()
        df.reset_index(drop=False, inplace=True)
        df.set_index(cols_indexes, inplace=True)
        for group in groups_present:
            group = tuple(group)
            if group not in dict_groups:
                dict_groups[group] = []
            try:
                dict_groups[group] += df.loc[group]['index'].tolist()
            except TypeError:
                #only one row
                dict_groups[group] += [df.loc[group]['index']]
                
    return dict_groups

def read_csv_group(url, dict_groups, which_group, **kwargs):
    if isinstance(which_group, list):
        which_group = tuple(which_group)
    rows = dict_groups[which_group]
    def skip_rows(x):
        if x == 0:
            return False
        elif x in {x+1 for x in rows}:
            return False
        else:
            return True
    df = pd.read_csv(url, skiprows=skip_rows, **kwargs)
    return df
    
URL = "./dummy.csv"
indexes = ['Ticker', 'Date']
kwargs = {'dtype':{'Ticker':str, 'Date':int})
dict_groups = compute_indexes(URL, indexes, chunksize=100000, **kwargs)
df_one_group = read_csv_group(URL, dict_groups, ('AAPL', 20201201), **kwargs)