Creazione di una pipeline di streaming di dati altamente scalabile in Python

Nov 27 2022
Una guida passo passo per la creazione di una pipeline di streaming di dati altamente scalabile in Python.
Python si è modellato come un linguaggio per lavori ad alta intensità di dati. Lo stiamo vedendo ovunque solo perché è molto veloce creare prototipi in Python e le persone lo adorano per la sua sintassi semplice, quell'onda è arrivata anche nel settore dei dati.
Foto di JJ Ying su Unsplash

Python si è modellato come un linguaggio per lavori ad alta intensità di dati. Lo stiamo vedendo ovunque solo perché è molto veloce creare prototipi in Python e le persone lo adorano per la sua sintassi semplice, quell'onda è arrivata anche nel settore dei dati. Anche i data engineer e i data scientist hanno iniziato a usarlo nei loro lavori ad alta intensità di dati. In questo articolo, costruiremo una pipeline di streaming di dati molto semplice e altamente scalabile utilizzando Python.

Lo streaming di dati è il processo di trasmissione di un flusso continuo di dati.

Ora sappiamo che da un lato della pipeline avremmo alcuni o almeno un produttore di dati che produrrebbe continuamente dati e dall'altro avremmo alcuni o almeno un consumatore di dati che consumerebbe quei dati continuamente.

Architettura

La prima cosa è progettare un'architettura scalabile e flessibile che giustifichi l'affermazione. Useremo Redis come pipeline di dati e per il bene di questo articolo, useremo un microservizio di scraping dei dati molto semplice usando Scrapy in modo indipendente come produttore di dati e un microservizio separato come consumatore di dati.

Costruire il produttore di dati

Il primo di cui abbiamo bisogno è creare un semplice progetto Python con un ambiente virtuale attivato. Per questo articolo specifico, utilizzeremo il tutorial ufficiale di Scrapy . Dobbiamo eseguire il comando indicato di seguito per creare un progetto Scrapy vuoto.

scrapy startproject producer

Ora dobbiamo creare uno spider che possa effettivamente raschiare i dati da qualche parte. Creiamo un nuovo file nella directory degli spider quotes_spider.py e aggiungiamo ad esso il codice indicato di seguito.

import scrapy
class QuotesSpider(scrapy.Spider):
    name = "quotes"
    def start_requests(self):
        urls = [
            'https://quotes.toscrape.com/page/1/',
            'https://quotes.toscrape.com/page/2/',
        ]
        for url in urls:
            yield scrapy.Request(url=url, callback=self.parse)
    def parse(self, response, **kwargs):
        for quote in response.css('.quote .text::text').getall():
            yield {
                'quote': quote
            }

Il nostro lato produttore di dati è pronto ora, ma dobbiamo inserire quei dati in una pipeline di dati anziché in un file. Prima di inserire i dati in una pipeline di dati, è necessario creare prima una pipeline di dati.

Creazione di una pipeline di dati

Per prima cosa dobbiamo installare Redis sul nostro sistema, per farlo dobbiamo seguire la guida ufficiale all'installazione di Redis. Dopo aver installato Redis e averlo eseguito, dovrebbe mostrare qualcosa di simile al seguente.

Ora dobbiamo creare wrapper attorno alle funzioni Redis per renderle più umane. Iniziamo con la creazione di una directory in root con un nome pipeline e creiamo un nuovo file reid_client.py in questa directory.

Ora dobbiamo aggiungere il codice fornito di seguito al nostro file redis-client.py. Il codice è autoesplicativo, abbiamo creato un data getter e un data setter. Entrambi gestiscono i dati JSON poiché Redis può archiviare solo i dati delle stringhe e per archiviare i dati delle stringhe è necessario JSONIFY.

import json
import redis

class RedisClient:
    """
    Custom Redis client with all the wrapper funtions. This client implements FIFO for pipeline.
    """
    connection = redis.Redis(host='localhost', port=6379, db=0)
    key = 'DATA-PIPELINE-KEY'
    def _convert_data_to_json(self, data):
        try:
            return json.dumps(data)
        except Exception as e:
            print(f'Failed to convert data into json with error: {e}')
            raise e
    def _convert_data_from_json(self, data):
        try:
            return json.loads(data)
        except Exception as e:
            print(f'Failed to convert data from json to dict with error: {e}')
            raise e
    def send_data_to_pipeline(self, data):
        data = self._convert_data_to_json(data)
        self.connection.lpush(self.key, data)
    def get_data_from_pipeline(self):
        try:
            data = self.connection.rpop(self.key)
            return self._convert_data_from_json(data)
        except Exception as e:
            print(f'Failed to get more data from pipeline with error: {e}')

Ora che abbiamo creato una pipeline, possiamo iniziare a inserire i nostri dati in quella dal lato del produttore di dati. Per questo, dobbiamo creare una pipeline in scrapy che aggiunga ogni elemento raschiato a Redis e lo consumiamo in seguito. Aggiungi semplicemente il codice qui sotto al tuo pipelines.py il file del progetto scrapy.

from pipeline.redis_client import RedisClient
class ProducerPipeline:
    redis_client = RedisClient()
    def process_item(self, item, spider):
        self.redis_client.send_data_to_pipeline(item)
        return item

ITEM_PIPELINES = {
   'producer.pipelines.ProducerPipeline': 300,
}

Costruire il consumatore e consumare dati

Poiché abbiamo creato una pipeline e un produttore che possono continuare a inserire dati nella pipeline indipendentemente dal consumo di dati, siamo più che a metà di tutto ciò di cui abbiamo bisogno per ottenere dati dalla pipeline di dati e consumarli in base alle nostre esigenze per chiamarlo progetto .

Creiamo una nuova directory nella root, chiamiamola consumer e creiamo un nuovo file con il nome quotes_consumer.py al suo interno.

Aggiungi il codice indicato di seguito al file quotes_consumer.py. Il codice verifica la presenza di nuovi dati se non riesce a trovare nuovi dati nella pipeline, quindi dorme altrimenti ingerisce quei dati nel nostro caso stiamo salvando le virgolette in un file di testo.

import time
from pipeline.redis_client import RedisClient

class QuotesConsumer:
    redis_client = RedisClient()
    sleep_seconds = 1
    def run(self):
        while True:
            if self.redis_client.get_items_in_pipeline() == 0:
                print(f'No new data in pipeline, sleeping for {self.sleep_seconds} seconds...')
                time.sleep(self.sleep_seconds)
                self.sleep_seconds += 1
                continue
            self.sleep_seconds = 1
            data = self.redis_client.get_data_from_pipeline()
            print(f'Obtained data from pipeline, saving to file...')
            with open('quotes.txt', 'a+') as file:
                file.write(data.get('quote'))

if __name__ == "__main__":
    consumer = QuotesConsumer()
    consumer.run()

Puoi trovare il codice completo nel seguente repository GitHub .