Làm cách nào để bạn đọc một tệp lớn với dữ liệu dạng bảng không được sắp xếp thành nhiều phần trong Python?

Dec 21 2020

Tôi có một tệp CSV lớn (> 100 GB) mà tôi muốn đọc vào bộ nhớ và xử lý dữ liệu theo từng phần. Tôi có hai ràng buộc:

  1. Rõ ràng là tôi không thể đọc toàn bộ tệp vào bộ nhớ. Tôi chỉ có khoảng 8GB ram trên máy của tôi.
  2. Dữ liệu là dạng bảng và không có thứ tự. Tôi cần đọc dữ liệu theo nhóm.
Ticker Ngày Field1 Field2 Field3
AAPL 20201201 0 0 0
AAPL 20201202 0 0 0
AAPL 20201203 0 0 0
AAPL 20201204 0 0 0
NFLX 20201201 0 0 0
NFLX 20201202 0 0 0
NFLX 20201203 0 0 0
NFLX 20201204 0 0 0

Mối quan tâm ở đây là dữ liệu phải được đọc theo nhóm. Được nhóm theo Mã và ngày. Nếu tôi nói tôi muốn đọc 10.000 bản ghi trong mỗi đợt. Ranh giới của lô đó không được chia nhóm. tức là tất cả dữ liệu AAPL cho tháng 12 năm 2020 sẽ kết thúc trong cùng một đợt. Dữ liệu đó không được xuất hiện trong hai đợt.

Hầu hết các đồng nghiệp của tôi khi gặp trường hợp như thế này, họ thường tạo một tập lệnh bash trong đó họ sử dụng awk, cut, sort, uniq để chia dữ liệu thành các nhóm và ghi ra nhiều tệp trung gian vào đĩa. Sau đó, họ sử dụng Python để xử lý các tệp này. Tôi đã tự hỏi liệu có giải pháp Python / Pandas / Numpy đồng nhất cho vấn đề này hay không.

Trả lời

genodeftest Dec 21 2020 at 03:11

Còn cái này thì sao:

  1. Mở tập tin
  2. lặp qua các dòng đọc: Đối với mỗi dòng đọc:
  • phân tích cú pháp
  • nếu chưa hoàn thành:
    • tạo + mở tệp cho mã đó (" tệp mã ")
    • thêm vào một số dict trong đó key = ticker và value = tệp xử lý
  • ghi dòng vào tệp biểu ngữ
  1. đóng tệp biểu ngữ và tệp gốc
  2. xử lý từng tệp mã đơn lẻ
Martin Dec 21 2020 at 03:44

Tôi sẽ xem xét hai lựa chọn

Vaex và Dask.

Vaex dường như đang tập trung chính xác vào những gì bạn cần. Xử lý lười biếng và bộ dữ liệu rất lớn. Kiểm tra github của họ. Tuy nhiên, có vẻ như bạn cần chuyển đổi tệp sang hdf5, điều này có thể hơi tốn thời gian.

Theo như Dask có liên quan, tôi sẽ không tin tưởng vào thành công. Dask chủ yếu tập trung vào tính toán phân tán và tôi không thực sự chắc chắn liệu nó có thể xử lý các tệp lớn một cách lười biếng hay không. Nhưng bạn có thể thử và xem.

tgrandje Dec 23 2020 at 03:05

Cách tiếp cận này là những con gấu trúc thuần chủng. Nó sẽ sử dụng hai chức năng: một để tính toán các chỉ mục, một để đọc một đoạn. Tôi muốn nói rằng nó sẽ hoàn toàn thất bại nếu bất kỳ nhóm nào của bạn không phù hợp với bộ nhớ (nhưng với tiêu chí của bạn rằng nhóm đó phải được đọc từng người một, tôi chắc chắn rằng nó sẽ phù hợp).

Bạn sẽ cần lặp qua nhị nguyên của các chỉ mục (như được tính từ hàm đầu tiên) để đọc toàn bộ khung dữ liệu.

Hy vọng điều đó sẽ giúp ... (Đừng ngần ngại điều chỉnh giá trị mặc định của chunksize theo nhu cầu của bạn).

import pandas as pd

def compute_indexes(url, cols_indexes=[], chunksize=100000, **kwargs):
    """
    Returns a dictionnary
    Keys are the pseudo indexes of the dataframe 
    Values are lists of indexes corresponding to this index
    """
    iterator = pd.read_csv(
            url, 
            usecols=cols_indexes, 
            chunksize=chunksize,
            **kwargs)
    
    dict_groups = dict()
    for df in iterator:
        groups_present = df.drop_duplicates(keep="first").values.tolist()
        df.reset_index(drop=False, inplace=True)
        df.set_index(cols_indexes, inplace=True)
        for group in groups_present:
            group = tuple(group)
            if group not in dict_groups:
                dict_groups[group] = []
            try:
                dict_groups[group] += df.loc[group]['index'].tolist()
            except TypeError:
                #only one row
                dict_groups[group] += [df.loc[group]['index']]
                
    return dict_groups

def read_csv_group(url, dict_groups, which_group, **kwargs):
    if isinstance(which_group, list):
        which_group = tuple(which_group)
    rows = dict_groups[which_group]
    def skip_rows(x):
        if x == 0:
            return False
        elif x in {x+1 for x in rows}:
            return False
        else:
            return True
    df = pd.read_csv(url, skiprows=skip_rows, **kwargs)
    return df
    
URL = "./dummy.csv"
indexes = ['Ticker', 'Date']
kwargs = {'dtype':{'Ticker':str, 'Date':int})
dict_groups = compute_indexes(URL, indexes, chunksize=100000, **kwargs)
df_one_group = read_csv_group(URL, dict_groups, ('AAPL', 20201201), **kwargs)