Wie liest man in Python eine große Datei mit unsortierten Tabellendaten in Blöcken?
Ich habe eine große CSV-Datei (> 100 GB), die ich in den Speicher einlesen und die Daten in Blöcken verarbeiten möchte. Ich habe zwei Einschränkungen:
- Natürlich kann ich nicht die gesamte Datei in den Speicher lesen. Ich habe nur ungefähr 8 GB RAM auf meinem Computer.
- Die Daten sind tabellarisch und ungeordnet. Ich muss die Daten in Gruppen lesen.
Ticker | Datum | Feld1 | Feld2 | Feld3 |
---|---|---|---|---|
AAPL | 20201201 | 0 | 0 | 0 |
AAPL | 20201202 | 0 | 0 | 0 |
AAPL | 20201203 | 0 | 0 | 0 |
AAPL | 20201204 | 0 | 0 | 0 |
NFLX | 20201201 | 0 | 0 | 0 |
NFLX | 20201202 | 0 | 0 | 0 |
NFLX | 20201203 | 0 | 0 | 0 |
NFLX | 20201204 | 0 | 0 | 0 |
Hier besteht die Sorge, dass die Daten in Gruppen gelesen werden müssen. Gruppiert nach Ticker und Datum. Wenn ich sage, ich möchte 10.000 Datensätze in jedem Stapel lesen. Die Grenze dieses Stapels sollte keine Gruppen aufteilen. dh Alle AAPL-Daten für den Dezember 2020 sollten in derselben Charge enden. Diese Daten sollten nicht in zwei Stapeln erscheinen.
Die meisten meiner Mitarbeiter erstellen in einer solchen Situation normalerweise ein Bash-Skript, in dem sie awk, cut, sort, uniq verwenden, um Daten in Gruppen aufzuteilen und mehrere Zwischendateien auf die Festplatte zu schreiben. Dann verwenden sie Python, um diese Dateien zu verarbeiten. Ich habe mich gefragt, ob es dafür eine homogene Python / Pandas / Numpy-Lösung gibt.
Antworten
Wie wäre es damit:
- öffne die Datei
- Schleife über Leseleitungen: Für jede gelesene Zeile:
- Analysieren Sie den Ticker
- falls noch nicht geschehen:
- Erstellen + Öffnen einer Datei für diesen Ticker (" Ticker-Datei ")
- an ein Diktat anhängen, in dem key = ticker und value = file handle
- Schreiben Sie die Zeile in die Ticker-Datei
- Schließen Sie die Ticker-Dateien und die Originaldatei
- Verarbeiten Sie jede einzelne Ticker-Datei
Ich würde zwei Optionen prüfen
Vaex und Dask.
Vaex scheint sich genau auf das zu konzentrieren, was Sie brauchen. Faule Verarbeitung und sehr große Datenmengen. Überprüfen Sie ihren Github. Es scheint jedoch, dass Sie Dateien in HDF5 konvertieren müssen, was etwas zeitaufwändig sein kann.
Für Dask würde ich nicht mit Erfolg rechnen. Dask konzentriert sich hauptsächlich auf verteilte Berechnungen und ich bin mir nicht sicher, ob es große Dateien träge verarbeiten kann. Aber Sie können versuchen, zu sehen.
Dieser Ansatz ist reine Pandas. Es würde zwei Funktionen verwenden: eine zum Berechnen der Indizes, eine zum Lesen eines Blocks. Ich würde sagen, es würde völlig scheitern, wenn eine Ihrer Gruppen nicht in den Speicher passt (aber angesichts Ihrer Kriterien, dass diese Gruppe einzeln gelesen werden muss, würde ich sagen, dass es sicher ist, dass es passt).
Sie müssten das Wörterbuch der Indizes (wie aus der ersten Funktion berechnet) durchlaufen, um den gesamten Datenrahmen zu lesen.
Hoffe, das wird helfen ... (Zögern Sie nicht, den Standardwert der Chunksize an Ihre Bedürfnisse anzupassen).
import pandas as pd
def compute_indexes(url, cols_indexes=[], chunksize=100000, **kwargs):
"""
Returns a dictionnary
Keys are the pseudo indexes of the dataframe
Values are lists of indexes corresponding to this index
"""
iterator = pd.read_csv(
url,
usecols=cols_indexes,
chunksize=chunksize,
**kwargs)
dict_groups = dict()
for df in iterator:
groups_present = df.drop_duplicates(keep="first").values.tolist()
df.reset_index(drop=False, inplace=True)
df.set_index(cols_indexes, inplace=True)
for group in groups_present:
group = tuple(group)
if group not in dict_groups:
dict_groups[group] = []
try:
dict_groups[group] += df.loc[group]['index'].tolist()
except TypeError:
#only one row
dict_groups[group] += [df.loc[group]['index']]
return dict_groups
def read_csv_group(url, dict_groups, which_group, **kwargs):
if isinstance(which_group, list):
which_group = tuple(which_group)
rows = dict_groups[which_group]
def skip_rows(x):
if x == 0:
return False
elif x in {x+1 for x in rows}:
return False
else:
return True
df = pd.read_csv(url, skiprows=skip_rows, **kwargs)
return df
URL = "./dummy.csv"
indexes = ['Ticker', 'Date']
kwargs = {'dtype':{'Ticker':str, 'Date':int})
dict_groups = compute_indexes(URL, indexes, chunksize=100000, **kwargs)
df_one_group = read_csv_group(URL, dict_groups, ('AAPL', 20201201), **kwargs)