Python'da yığınlar halinde sıralanmamış tablo verileri içeren büyük bir dosyayı nasıl okursunuz?

Dec 21 2020

Belleğe okumak ve verileri parçalar halinde işlemek istediğim büyük bir CSV dosyam (> 100 GB) var. Sahip olduğum iki kısıtlama var:

  1. Açıkçası tüm dosyayı belleğe okuyamıyorum. Makinemde sadece yaklaşık 8GB ram var.
  2. Veriler tablo halindedir ve sıralanmamıştır. Verileri gruplar halinde okumam gerekiyor.
Ticker Tarih (değiştir | kaynağı değiştir) Alan1 Alan2 Alan3
AAPL 20201201 0 0 0
AAPL 20201202 0 0 0
AAPL 20201203 0 0 0
AAPL 20201204 0 0 0
NFLX 20201201 0 0 0
NFLX 20201202 0 0 0
NFLX 20201203 0 0 0
NFLX 20201204 0 0 0

Buradaki endişe, verilerin gruplar halinde okunması gerektiğidir. Kayan yazı ve tarihe göre gruplandırılmıştır. Her grupta 10.000 kayıt okumak istiyorum dersem. Bu partinin sınırı grupları bölmemelidir. Örneğin, Aralık 2020 için tüm AAPL verileri aynı grupta yer almalıdır. Bu veriler iki grup halinde görünmemelidir.

İş arkadaşlarımın çoğu böyle bir durumla karşılaştıklarında, verileri gruplara ayırmak ve diske birden çok ara dosya yazmak için genellikle awk, cut, sort, uniq kullandıkları bir bash betiği oluştururlar. Daha sonra bu dosyaları işlemek için Python kullanırlar. Bunun homojen bir Python / Pandas / Numpy çözümü olup olmadığını merak ediyordum.

Yanıtlar

genodeftest Dec 21 2020 at 03:11

Buna ne dersin:

  1. dosyayı aç
  2. okuma satırları üzerinden döngü: Her satır için şunu okuyun:
  • şeridi ayrıştır
  • zaten yapılmadıysa:
    • o hisse senedi için bir dosya oluştur + aç (" ticker dosyası ")
    • anahtar = ticker ve değer = dosya tutacağı olduğu bazı dikte ekleyin
  • satırı ticker dosyasına yaz
  1. ticker dosyalarını ve orijinal dosyayı kapatın
  2. her bir kayan yazı dosyasını işle
Martin Dec 21 2020 at 03:44

İki seçeneğe bakardım

Vaex ve Dask.

Vaex tam olarak ihtiyacınız olan şeye odaklanmış görünüyor. Tembel işleme ve çok büyük veri kümeleri. Github'larını kontrol edin. Ancak görünen o ki, dosyaları hdf5'e dönüştürmeniz gerekiyor ki bu biraz zaman alıcı olabilir.

Dask söz konusu olduğunda, başarıya güvenmem. Dask öncelikle dağıtılmış hesaplamaya odaklanmıştır ve büyük dosyaları tembel olarak işleyip işleyemeyeceğinden emin değilim. Ama deneyebilir ve görebilirsin.

tgrandje Dec 23 2020 at 03:05

Bu yaklaşım saf pandalardır. İki işlevi kullanır: biri indeksleri hesaplamak için, diğeri bir öbeği okumak için. Gruplarınızdan herhangi biri hafızaya sığmazsa tamamen başarısız olacağını söyleyebilirim (ancak bu grubun birer birer okunması gerektiğine dair kriterleriniz göz önüne alındığında, bunun kesin bir tahmin olacağını söyleyebilirim).

Veri çerçevesinin tamamını okumak için dizin sözlüğü üzerinden (ilk işlevden hesaplandığı gibi) döngü yapmanız gerekir.

Bunun yardımcı olacağını umuyoruz ... (Büyük boyutun varsayılan değerini ihtiyaçlarınıza göre uyarlamaktan çekinmeyin).

import pandas as pd

def compute_indexes(url, cols_indexes=[], chunksize=100000, **kwargs):
    """
    Returns a dictionnary
    Keys are the pseudo indexes of the dataframe 
    Values are lists of indexes corresponding to this index
    """
    iterator = pd.read_csv(
            url, 
            usecols=cols_indexes, 
            chunksize=chunksize,
            **kwargs)
    
    dict_groups = dict()
    for df in iterator:
        groups_present = df.drop_duplicates(keep="first").values.tolist()
        df.reset_index(drop=False, inplace=True)
        df.set_index(cols_indexes, inplace=True)
        for group in groups_present:
            group = tuple(group)
            if group not in dict_groups:
                dict_groups[group] = []
            try:
                dict_groups[group] += df.loc[group]['index'].tolist()
            except TypeError:
                #only one row
                dict_groups[group] += [df.loc[group]['index']]
                
    return dict_groups

def read_csv_group(url, dict_groups, which_group, **kwargs):
    if isinstance(which_group, list):
        which_group = tuple(which_group)
    rows = dict_groups[which_group]
    def skip_rows(x):
        if x == 0:
            return False
        elif x in {x+1 for x in rows}:
            return False
        else:
            return True
    df = pd.read_csv(url, skiprows=skip_rows, **kwargs)
    return df
    
URL = "./dummy.csv"
indexes = ['Ticker', 'Date']
kwargs = {'dtype':{'Ticker':str, 'Date':int})
dict_groups = compute_indexes(URL, indexes, chunksize=100000, **kwargs)
df_one_group = read_csv_group(URL, dict_groups, ('AAPL', 20201201), **kwargs)