Construire un pipeline de streaming de données hautement évolutif en Python

Nov 27 2022
Un guide étape par étape pour créer un pipeline de streaming de données hautement évolutif en Python.
Python s'est imposé comme un langage pour les travaux gourmands en données. Nous le voyons partout simplement parce qu'il est très rapide à prototyper en Python et que les gens l'adorent en raison de sa syntaxe simple, cette vague a également atterri dans l'industrie des données.
Photo de JJ Ying sur Unsplash

Python s'est imposé comme un langage pour les travaux gourmands en données. Nous le voyons partout simplement parce qu'il est très rapide à prototyper en Python et que les gens l'adorent en raison de sa syntaxe simple, cette vague a également atterri dans l'industrie des données. Les ingénieurs de données et les scientifiques de données ont également commencé à l'utiliser dans leurs travaux à forte intensité de données. Dans cet article, nous allons créer un pipeline de streaming de données très simple et hautement évolutif à l'aide de Python.

Le streaming de données est le processus de transmission d'un flux continu de données.

Maintenant, nous savons que d'un côté du pipeline, nous aurions un ou au moins un producteur de données qui produirait des données en continu et de l'autre côté, nous aurions un ou au moins un consommateur de données qui consommerait ces données en continu.

Architecture

La première chose est de concevoir une architecture évolutive et flexible qui justifie la demande. Nous utiliserons Redis comme pipeline de données et pour les besoins de cet article, nous utiliserons un microservice de grattage de données très simple utilisant Scrapy indépendamment en tant que producteur de données et un microservice distinct en tant que consommateur de données.

Construire le producteur de données

Le premier dont nous avons besoin est de construire un projet Python simple avec un environnement virtuel activé. Pour cet article spécifique, nous allons utiliser le tutoriel officiel de Scrapy . Nous devons exécuter la commande ci-dessous pour créer un projet Scrapy vide.

scrapy startproject producer

Nous devons maintenant créer une araignée qui peut réellement récupérer les données quelque part. Créons un nouveau fichier dans le répertoire spiders quotes_spider.py et ajoutons-y le code ci-dessous.

import scrapy
class QuotesSpider(scrapy.Spider):
    name = "quotes"
    def start_requests(self):
        urls = [
            'https://quotes.toscrape.com/page/1/',
            'https://quotes.toscrape.com/page/2/',
        ]
        for url in urls:
            yield scrapy.Request(url=url, callback=self.parse)
    def parse(self, response, **kwargs):
        for quote in response.css('.quote .text::text').getall():
            yield {
                'quote': quote
            }

Notre côté producteur de données est maintenant prêt, mais nous devons placer ces données dans un pipeline de données plutôt que dans un fichier. Avant de mettre des données dans un pipeline de données, nous devons d'abord créer un pipeline de données.

Construire un pipeline de données

Tout d'abord, nous devons installer Redis sur notre système, pour ce faire, nous devons suivre le guide d'installation officiel de Redis. Après avoir installé et exécuté Redis, il devrait afficher quelque chose comme ci-dessous.

Nous devons maintenant créer des wrappers autour des fonctions Redis pour les rendre plus humaines. Commençons par créer un répertoire à la racine avec un nom pipeline et créons un nouveau fichier reid_client.py dans ce répertoire.

Nous devons maintenant ajouter le code ci-dessous à notre fichier redis-client.py. Le code est explicite, nous avons créé un getter de données et un setter de données. Les deux traitent des données JSON car Redis ne peut stocker que des données de chaîne et pour stocker des données de chaîne, nous devons les JSONIFIER.

import json
import redis

class RedisClient:
    """
    Custom Redis client with all the wrapper funtions. This client implements FIFO for pipeline.
    """
    connection = redis.Redis(host='localhost', port=6379, db=0)
    key = 'DATA-PIPELINE-KEY'
    def _convert_data_to_json(self, data):
        try:
            return json.dumps(data)
        except Exception as e:
            print(f'Failed to convert data into json with error: {e}')
            raise e
    def _convert_data_from_json(self, data):
        try:
            return json.loads(data)
        except Exception as e:
            print(f'Failed to convert data from json to dict with error: {e}')
            raise e
    def send_data_to_pipeline(self, data):
        data = self._convert_data_to_json(data)
        self.connection.lpush(self.key, data)
    def get_data_from_pipeline(self):
        try:
            data = self.connection.rpop(self.key)
            return self._convert_data_from_json(data)
        except Exception as e:
            print(f'Failed to get more data from pipeline with error: {e}')

Maintenant que nous avons créé un pipeline, nous pouvons commencer à y mettre nos données du côté du producteur de données. Pour cela, nous devons créer un pipeline dans scrapy qui ajoute chaque élément scrapé à Redis et nous le consommons plus tard. Ajoutez simplement le code ci-dessous à votre pipelines.py le fichier du projet scrapy.

from pipeline.redis_client import RedisClient
class ProducerPipeline:
    redis_client = RedisClient()
    def process_item(self, item, spider):
        self.redis_client.send_data_to_pipeline(item)
        return item

ITEM_PIPELINES = {
   'producer.pipelines.ProducerPipeline': 300,
}

Construire le consommateur et consommer des données

Comme nous avons construit un pipeline et un producteur qui peut continuer à mettre des données dans le pipeline indépendamment de la consommation de données, nous sommes à plus de la moitié de tout ce dont nous avons besoin pour obtenir des données du pipeline de données et les consommer en fonction de nos besoins pour appeler cela un projet .

Créons un nouveau répertoire à la racine, nommons-le consommateur et créons un nouveau fichier avec le nom quotes_consumer.py.

Ajoutez le code ci-dessous au fichier quotes_consumer.py. Le code vérifie les nouvelles données s'il ne trouve aucune nouvelle donnée dans le pipeline, puis il dort, sinon il ingère ces données dans notre cas, nous enregistrons les citations dans un fichier texte.

import time
from pipeline.redis_client import RedisClient

class QuotesConsumer:
    redis_client = RedisClient()
    sleep_seconds = 1
    def run(self):
        while True:
            if self.redis_client.get_items_in_pipeline() == 0:
                print(f'No new data in pipeline, sleeping for {self.sleep_seconds} seconds...')
                time.sleep(self.sleep_seconds)
                self.sleep_seconds += 1
                continue
            self.sleep_seconds = 1
            data = self.redis_client.get_data_from_pipeline()
            print(f'Obtained data from pipeline, saving to file...')
            with open('quotes.txt', 'a+') as file:
                file.write(data.get('quote'))

if __name__ == "__main__":
    consumer = QuotesConsumer()
    consumer.run()

Vous pouvez trouver le code complet sur le dépôt GitHub suivant .