Jak odczytujesz duży plik z nieposortowanymi danymi tabelarycznymi w fragmentach w Pythonie?

Dec 21 2020

Mam duży plik CSV (> 100 GB), który chcę wczytać do pamięci i przetwarzać dane w fragmentach. Mam dwa ograniczenia:

  1. Oczywiście nie mogę wczytać całego pliku do pamięci. Mam tylko około 8 GB pamięci RAM na moim komputerze.
  2. Dane są tabelaryczne i nieuporządkowane. Muszę czytać dane w grupach.
Serce Data Pole 1 Pole 2 Pole3
AAPL 20201201 0 0 0
AAPL 20201202 0 0 0
AAPL 20201203 0 0 0
AAPL 20201204 0 0 0
NFLX 20201201 0 0 0
NFLX 20201202 0 0 0
NFLX 20201203 0 0 0
NFLX 20201204 0 0 0

Problem polega na tym, że dane muszą być odczytywane w grupach. Pogrupowane według Ticker i daty. Jeśli powiem, że chcę przeczytać 10 000 rekordów w każdej partii. Granica tej partii nie powinna dzielić grup. tzn. wszystkie dane AAPL za grudzień 2020 r. powinny znaleźć się w tej samej partii. Te dane nie powinny pojawiać się w dwóch partiach.

Większość moich współpracowników, gdy napotykają taką sytuację, zwykle tworzy skrypt bash, w którym używają awk, cut, sort, uniq do dzielenia danych na grupy i zapisywania wielu plików pośrednich na dysk. Następnie używają Pythona do przetwarzania tych plików. Zastanawiałem się, czy istnieje jednorodne rozwiązanie tego problemu w Pythonie / Pandas / Numpy.

Odpowiedzi

genodeftest Dec 21 2020 at 03:11

Co powiesz na to:

  1. otwórz plik
  2. pętla nad wierszami czytania: Dla każdego wiersza czytaj:
  • przeanalizuj ticker
  • jeśli jeszcze tego nie zrobiono:
    • utwórz + otwórz plik dla tego tickera („ plik tickera ”)
    • dołącz do jakiegoś dyktu, gdzie klucz = pasek i wartość = uchwyt pliku
  • zapisz linię do pliku giełdowego
  1. zamknij pliki giełdowe i oryginalny plik
  2. przetwarzać każdy pojedynczy plik tickera
Martin Dec 21 2020 at 03:44

Spojrzałbym na dwie opcje

Vaex i Dask.

Wydaje się, że Vaex koncentruje się dokładnie na tym, czego potrzebujesz. Leniwe przetwarzanie i bardzo duże zbiory danych. Sprawdź ich github. Wygląda jednak na to, że musisz przekonwertować pliki do hdf5, co może być trochę czasochłonne.

Jeśli chodzi o Dask, nie liczyłbym na sukces. Dask koncentruje się głównie na obliczeniach rozproszonych i nie jestem pewien, czy może leniwie przetwarzać duże pliki. Ale możesz spróbować i zobaczyć.

tgrandje Dec 23 2020 at 03:05

Takie podejście to czyste pandy. Używałby dwóch funkcji: jednej do obliczania indeksów, drugiej do odczytu jednej porcji. Powiedziałbym, że kompletnie się nie uda, jeśli którakolwiek z twoich grup nie zmieści się w pamięci (ale biorąc pod uwagę twoje kryteria, że ​​te grupy muszą być czytane pojedynczo, powiedziałbym, że z pewnością pasuje).

Aby odczytać całą ramkę danych, należałoby przejrzeć słownik indeksów (zgodnie z obliczeniami z pierwszej funkcji).

Mam nadzieję, że to pomoże ... (Nie wahaj się i dostosuj domyślną wartość rozmiaru części do swoich potrzeb).

import pandas as pd

def compute_indexes(url, cols_indexes=[], chunksize=100000, **kwargs):
    """
    Returns a dictionnary
    Keys are the pseudo indexes of the dataframe 
    Values are lists of indexes corresponding to this index
    """
    iterator = pd.read_csv(
            url, 
            usecols=cols_indexes, 
            chunksize=chunksize,
            **kwargs)
    
    dict_groups = dict()
    for df in iterator:
        groups_present = df.drop_duplicates(keep="first").values.tolist()
        df.reset_index(drop=False, inplace=True)
        df.set_index(cols_indexes, inplace=True)
        for group in groups_present:
            group = tuple(group)
            if group not in dict_groups:
                dict_groups[group] = []
            try:
                dict_groups[group] += df.loc[group]['index'].tolist()
            except TypeError:
                #only one row
                dict_groups[group] += [df.loc[group]['index']]
                
    return dict_groups

def read_csv_group(url, dict_groups, which_group, **kwargs):
    if isinstance(which_group, list):
        which_group = tuple(which_group)
    rows = dict_groups[which_group]
    def skip_rows(x):
        if x == 0:
            return False
        elif x in {x+1 for x in rows}:
            return False
        else:
            return True
    df = pd.read_csv(url, skiprows=skip_rows, **kwargs)
    return df
    
URL = "./dummy.csv"
indexes = ['Ticker', 'Date']
kwargs = {'dtype':{'Ticker':str, 'Date':int})
dict_groups = compute_indexes(URL, indexes, chunksize=100000, **kwargs)
df_one_group = read_csv_group(URL, dict_groups, ('AAPL', 20201201), **kwargs)