Làm cách nào để bạn đọc một tệp lớn với dữ liệu dạng bảng không được sắp xếp thành nhiều phần trong Python?
Tôi có một tệp CSV lớn (> 100 GB) mà tôi muốn đọc vào bộ nhớ và xử lý dữ liệu theo từng phần. Tôi có hai ràng buộc:
- Rõ ràng là tôi không thể đọc toàn bộ tệp vào bộ nhớ. Tôi chỉ có khoảng 8GB ram trên máy của tôi.
- Dữ liệu là dạng bảng và không có thứ tự. Tôi cần đọc dữ liệu theo nhóm.
| Ticker | Ngày | Field1 | Field2 | Field3 |
|---|---|---|---|---|
| AAPL | 20201201 | 0 | 0 | 0 |
| AAPL | 20201202 | 0 | 0 | 0 |
| AAPL | 20201203 | 0 | 0 | 0 |
| AAPL | 20201204 | 0 | 0 | 0 |
| NFLX | 20201201 | 0 | 0 | 0 |
| NFLX | 20201202 | 0 | 0 | 0 |
| NFLX | 20201203 | 0 | 0 | 0 |
| NFLX | 20201204 | 0 | 0 | 0 |
Mối quan tâm ở đây là dữ liệu phải được đọc theo nhóm. Được nhóm theo Mã và ngày. Nếu tôi nói tôi muốn đọc 10.000 bản ghi trong mỗi đợt. Ranh giới của lô đó không được chia nhóm. tức là tất cả dữ liệu AAPL cho tháng 12 năm 2020 sẽ kết thúc trong cùng một đợt. Dữ liệu đó không được xuất hiện trong hai đợt.
Hầu hết các đồng nghiệp của tôi khi gặp trường hợp như thế này, họ thường tạo một tập lệnh bash trong đó họ sử dụng awk, cut, sort, uniq để chia dữ liệu thành các nhóm và ghi ra nhiều tệp trung gian vào đĩa. Sau đó, họ sử dụng Python để xử lý các tệp này. Tôi đã tự hỏi liệu có giải pháp Python / Pandas / Numpy đồng nhất cho vấn đề này hay không.
Trả lời
Còn cái này thì sao:
- Mở tập tin
- lặp qua các dòng đọc: Đối với mỗi dòng đọc:
- phân tích cú pháp
- nếu chưa hoàn thành:
- tạo + mở tệp cho mã đó (" tệp mã ")
- thêm vào một số dict trong đó key = ticker và value = tệp xử lý
- ghi dòng vào tệp biểu ngữ
- đóng tệp biểu ngữ và tệp gốc
- xử lý từng tệp mã đơn lẻ
Tôi sẽ xem xét hai lựa chọn
Vaex và Dask.
Vaex dường như đang tập trung chính xác vào những gì bạn cần. Xử lý lười biếng và bộ dữ liệu rất lớn. Kiểm tra github của họ. Tuy nhiên, có vẻ như bạn cần chuyển đổi tệp sang hdf5, điều này có thể hơi tốn thời gian.
Theo như Dask có liên quan, tôi sẽ không tin tưởng vào thành công. Dask chủ yếu tập trung vào tính toán phân tán và tôi không thực sự chắc chắn liệu nó có thể xử lý các tệp lớn một cách lười biếng hay không. Nhưng bạn có thể thử và xem.
Cách tiếp cận này là những con gấu trúc thuần chủng. Nó sẽ sử dụng hai chức năng: một để tính toán các chỉ mục, một để đọc một đoạn. Tôi muốn nói rằng nó sẽ hoàn toàn thất bại nếu bất kỳ nhóm nào của bạn không phù hợp với bộ nhớ (nhưng với tiêu chí của bạn rằng nhóm đó phải được đọc từng người một, tôi chắc chắn rằng nó sẽ phù hợp).
Bạn sẽ cần lặp qua nhị nguyên của các chỉ mục (như được tính từ hàm đầu tiên) để đọc toàn bộ khung dữ liệu.
Hy vọng điều đó sẽ giúp ... (Đừng ngần ngại điều chỉnh giá trị mặc định của chunksize theo nhu cầu của bạn).
import pandas as pd
def compute_indexes(url, cols_indexes=[], chunksize=100000, **kwargs):
"""
Returns a dictionnary
Keys are the pseudo indexes of the dataframe
Values are lists of indexes corresponding to this index
"""
iterator = pd.read_csv(
url,
usecols=cols_indexes,
chunksize=chunksize,
**kwargs)
dict_groups = dict()
for df in iterator:
groups_present = df.drop_duplicates(keep="first").values.tolist()
df.reset_index(drop=False, inplace=True)
df.set_index(cols_indexes, inplace=True)
for group in groups_present:
group = tuple(group)
if group not in dict_groups:
dict_groups[group] = []
try:
dict_groups[group] += df.loc[group]['index'].tolist()
except TypeError:
#only one row
dict_groups[group] += [df.loc[group]['index']]
return dict_groups
def read_csv_group(url, dict_groups, which_group, **kwargs):
if isinstance(which_group, list):
which_group = tuple(which_group)
rows = dict_groups[which_group]
def skip_rows(x):
if x == 0:
return False
elif x in {x+1 for x in rows}:
return False
else:
return True
df = pd.read_csv(url, skiprows=skip_rows, **kwargs)
return df
URL = "./dummy.csv"
indexes = ['Ticker', 'Date']
kwargs = {'dtype':{'Ticker':str, 'Date':int})
dict_groups = compute_indexes(URL, indexes, chunksize=100000, **kwargs)
df_one_group = read_csv_group(URL, dict_groups, ('AAPL', 20201201), **kwargs)