Jak odczytujesz duży plik z nieposortowanymi danymi tabelarycznymi w fragmentach w Pythonie?
Mam duży plik CSV (> 100 GB), który chcę wczytać do pamięci i przetwarzać dane w fragmentach. Mam dwa ograniczenia:
- Oczywiście nie mogę wczytać całego pliku do pamięci. Mam tylko około 8 GB pamięci RAM na moim komputerze.
- Dane są tabelaryczne i nieuporządkowane. Muszę czytać dane w grupach.
Serce | Data | Pole 1 | Pole 2 | Pole3 |
---|---|---|---|---|
AAPL | 20201201 | 0 | 0 | 0 |
AAPL | 20201202 | 0 | 0 | 0 |
AAPL | 20201203 | 0 | 0 | 0 |
AAPL | 20201204 | 0 | 0 | 0 |
NFLX | 20201201 | 0 | 0 | 0 |
NFLX | 20201202 | 0 | 0 | 0 |
NFLX | 20201203 | 0 | 0 | 0 |
NFLX | 20201204 | 0 | 0 | 0 |
Problem polega na tym, że dane muszą być odczytywane w grupach. Pogrupowane według Ticker i daty. Jeśli powiem, że chcę przeczytać 10 000 rekordów w każdej partii. Granica tej partii nie powinna dzielić grup. tzn. wszystkie dane AAPL za grudzień 2020 r. powinny znaleźć się w tej samej partii. Te dane nie powinny pojawiać się w dwóch partiach.
Większość moich współpracowników, gdy napotykają taką sytuację, zwykle tworzy skrypt bash, w którym używają awk, cut, sort, uniq do dzielenia danych na grupy i zapisywania wielu plików pośrednich na dysk. Następnie używają Pythona do przetwarzania tych plików. Zastanawiałem się, czy istnieje jednorodne rozwiązanie tego problemu w Pythonie / Pandas / Numpy.
Odpowiedzi
Co powiesz na to:
- otwórz plik
- pętla nad wierszami czytania: Dla każdego wiersza czytaj:
- przeanalizuj ticker
- jeśli jeszcze tego nie zrobiono:
- utwórz + otwórz plik dla tego tickera („ plik tickera ”)
- dołącz do jakiegoś dyktu, gdzie klucz = pasek i wartość = uchwyt pliku
- zapisz linię do pliku giełdowego
- zamknij pliki giełdowe i oryginalny plik
- przetwarzać każdy pojedynczy plik tickera
Spojrzałbym na dwie opcje
Vaex i Dask.
Wydaje się, że Vaex koncentruje się dokładnie na tym, czego potrzebujesz. Leniwe przetwarzanie i bardzo duże zbiory danych. Sprawdź ich github. Wygląda jednak na to, że musisz przekonwertować pliki do hdf5, co może być trochę czasochłonne.
Jeśli chodzi o Dask, nie liczyłbym na sukces. Dask koncentruje się głównie na obliczeniach rozproszonych i nie jestem pewien, czy może leniwie przetwarzać duże pliki. Ale możesz spróbować i zobaczyć.
Takie podejście to czyste pandy. Używałby dwóch funkcji: jednej do obliczania indeksów, drugiej do odczytu jednej porcji. Powiedziałbym, że kompletnie się nie uda, jeśli którakolwiek z twoich grup nie zmieści się w pamięci (ale biorąc pod uwagę twoje kryteria, że te grupy muszą być czytane pojedynczo, powiedziałbym, że z pewnością pasuje).
Aby odczytać całą ramkę danych, należałoby przejrzeć słownik indeksów (zgodnie z obliczeniami z pierwszej funkcji).
Mam nadzieję, że to pomoże ... (Nie wahaj się i dostosuj domyślną wartość rozmiaru części do swoich potrzeb).
import pandas as pd
def compute_indexes(url, cols_indexes=[], chunksize=100000, **kwargs):
"""
Returns a dictionnary
Keys are the pseudo indexes of the dataframe
Values are lists of indexes corresponding to this index
"""
iterator = pd.read_csv(
url,
usecols=cols_indexes,
chunksize=chunksize,
**kwargs)
dict_groups = dict()
for df in iterator:
groups_present = df.drop_duplicates(keep="first").values.tolist()
df.reset_index(drop=False, inplace=True)
df.set_index(cols_indexes, inplace=True)
for group in groups_present:
group = tuple(group)
if group not in dict_groups:
dict_groups[group] = []
try:
dict_groups[group] += df.loc[group]['index'].tolist()
except TypeError:
#only one row
dict_groups[group] += [df.loc[group]['index']]
return dict_groups
def read_csv_group(url, dict_groups, which_group, **kwargs):
if isinstance(which_group, list):
which_group = tuple(which_group)
rows = dict_groups[which_group]
def skip_rows(x):
if x == 0:
return False
elif x in {x+1 for x in rows}:
return False
else:
return True
df = pd.read_csv(url, skiprows=skip_rows, **kwargs)
return df
URL = "./dummy.csv"
indexes = ['Ticker', 'Date']
kwargs = {'dtype':{'Ticker':str, 'Date':int})
dict_groups = compute_indexes(URL, indexes, chunksize=100000, **kwargs)
df_one_group = read_csv_group(URL, dict_groups, ('AAPL', 20201201), **kwargs)