Wie liest man in Python eine große Datei mit unsortierten Tabellendaten in Blöcken?

Dec 21 2020

Ich habe eine große CSV-Datei (> 100 GB), die ich in den Speicher einlesen und die Daten in Blöcken verarbeiten möchte. Ich habe zwei Einschränkungen:

  1. Natürlich kann ich nicht die gesamte Datei in den Speicher lesen. Ich habe nur ungefähr 8 GB RAM auf meinem Computer.
  2. Die Daten sind tabellarisch und ungeordnet. Ich muss die Daten in Gruppen lesen.
Ticker Datum Feld1 Feld2 Feld3
AAPL 20201201 0 0 0
AAPL 20201202 0 0 0
AAPL 20201203 0 0 0
AAPL 20201204 0 0 0
NFLX 20201201 0 0 0
NFLX 20201202 0 0 0
NFLX 20201203 0 0 0
NFLX 20201204 0 0 0

Hier besteht die Sorge, dass die Daten in Gruppen gelesen werden müssen. Gruppiert nach Ticker und Datum. Wenn ich sage, ich möchte 10.000 Datensätze in jedem Stapel lesen. Die Grenze dieses Stapels sollte keine Gruppen aufteilen. dh Alle AAPL-Daten für den Dezember 2020 sollten in derselben Charge enden. Diese Daten sollten nicht in zwei Stapeln erscheinen.

Die meisten meiner Mitarbeiter erstellen in einer solchen Situation normalerweise ein Bash-Skript, in dem sie awk, cut, sort, uniq verwenden, um Daten in Gruppen aufzuteilen und mehrere Zwischendateien auf die Festplatte zu schreiben. Dann verwenden sie Python, um diese Dateien zu verarbeiten. Ich habe mich gefragt, ob es dafür eine homogene Python / Pandas / Numpy-Lösung gibt.

Antworten

genodeftest Dec 21 2020 at 03:11

Wie wäre es damit:

  1. öffne die Datei
  2. Schleife über Leseleitungen: Für jede gelesene Zeile:
  • Analysieren Sie den Ticker
  • falls noch nicht geschehen:
    • Erstellen + Öffnen einer Datei für diesen Ticker (" Ticker-Datei ")
    • an ein Diktat anhängen, in dem key = ticker und value = file handle
  • Schreiben Sie die Zeile in die Ticker-Datei
  1. Schließen Sie die Ticker-Dateien und die Originaldatei
  2. Verarbeiten Sie jede einzelne Ticker-Datei
Martin Dec 21 2020 at 03:44

Ich würde zwei Optionen prüfen

Vaex und Dask.

Vaex scheint sich genau auf das zu konzentrieren, was Sie brauchen. Faule Verarbeitung und sehr große Datenmengen. Überprüfen Sie ihren Github. Es scheint jedoch, dass Sie Dateien in HDF5 konvertieren müssen, was etwas zeitaufwändig sein kann.

Für Dask würde ich nicht mit Erfolg rechnen. Dask konzentriert sich hauptsächlich auf verteilte Berechnungen und ich bin mir nicht sicher, ob es große Dateien träge verarbeiten kann. Aber Sie können versuchen, zu sehen.

tgrandje Dec 23 2020 at 03:05

Dieser Ansatz ist reine Pandas. Es würde zwei Funktionen verwenden: eine zum Berechnen der Indizes, eine zum Lesen eines Blocks. Ich würde sagen, es würde völlig scheitern, wenn eine Ihrer Gruppen nicht in den Speicher passt (aber angesichts Ihrer Kriterien, dass diese Gruppe einzeln gelesen werden muss, würde ich sagen, dass es sicher ist, dass es passt).

Sie müssten das Wörterbuch der Indizes (wie aus der ersten Funktion berechnet) durchlaufen, um den gesamten Datenrahmen zu lesen.

Hoffe, das wird helfen ... (Zögern Sie nicht, den Standardwert der Chunksize an Ihre Bedürfnisse anzupassen).

import pandas as pd

def compute_indexes(url, cols_indexes=[], chunksize=100000, **kwargs):
    """
    Returns a dictionnary
    Keys are the pseudo indexes of the dataframe 
    Values are lists of indexes corresponding to this index
    """
    iterator = pd.read_csv(
            url, 
            usecols=cols_indexes, 
            chunksize=chunksize,
            **kwargs)
    
    dict_groups = dict()
    for df in iterator:
        groups_present = df.drop_duplicates(keep="first").values.tolist()
        df.reset_index(drop=False, inplace=True)
        df.set_index(cols_indexes, inplace=True)
        for group in groups_present:
            group = tuple(group)
            if group not in dict_groups:
                dict_groups[group] = []
            try:
                dict_groups[group] += df.loc[group]['index'].tolist()
            except TypeError:
                #only one row
                dict_groups[group] += [df.loc[group]['index']]
                
    return dict_groups

def read_csv_group(url, dict_groups, which_group, **kwargs):
    if isinstance(which_group, list):
        which_group = tuple(which_group)
    rows = dict_groups[which_group]
    def skip_rows(x):
        if x == 0:
            return False
        elif x in {x+1 for x in rows}:
            return False
        else:
            return True
    df = pd.read_csv(url, skiprows=skip_rows, **kwargs)
    return df
    
URL = "./dummy.csv"
indexes = ['Ticker', 'Date']
kwargs = {'dtype':{'Ticker':str, 'Date':int})
dict_groups = compute_indexes(URL, indexes, chunksize=100000, **kwargs)
df_one_group = read_csv_group(URL, dict_groups, ('AAPL', 20201201), **kwargs)