Создание масштабируемого конвейера потоковой передачи данных в Python

Nov 27 2022
Пошаговое руководство по созданию масштабируемого конвейера потоковой передачи данных в Python.
Python зарекомендовал себя как язык для работы с большими объемами данных. Мы видим это повсюду только потому, что в Python очень быстро создавать прототипы, и люди любят его из-за его простого синтаксиса, эта волна коснулась и индустрии данных.
Фото Джей Джей Ин на Unsplash

Python зарекомендовал себя как язык для работы с большими объемами данных. Мы видим это повсюду только потому, что в Python очень быстро создавать прототипы, и люди любят его из-за его простого синтаксиса, эта волна коснулась и индустрии данных. Инженеры данных и специалисты по данным также начали использовать его в своей работе с интенсивным использованием данных. В этой статье мы собираемся создать очень простой и хорошо масштабируемый конвейер потоковой передачи данных с использованием Python.

Потоковая передача данных — это процесс передачи непрерывного потока данных.

Теперь мы знаем, что на одной стороне конвейера у нас будет какой-то или по крайней мере один производитель данных, который будет непрерывно производить данные, а на другой стороне у нас будет какой-то или по крайней мере один потребитель данных, который будет постоянно потреблять эти данные.

Архитектура

Прежде всего необходимо разработать масштабируемую и гибкую архитектуру, которая оправдывает требования. Мы будем использовать Redis в качестве конвейера данных, и ради этой статьи мы будем использовать очень простой микросервис очистки данных, используя Scrapy независимо в качестве производителя данных и отдельный микросервис в качестве потребителя данных.

Создание производителя данных

Первое, что нам нужно, это создать простой проект Python с активированной виртуальной средой. Для этой конкретной статьи мы будем использовать официальный учебник Scrapy . Нам нужно запустить приведенную ниже команду, чтобы создать пустой проект Scrapy.

scrapy startproject producer

Теперь нам нужно создать паука, который действительно может собирать данные откуда-то. Давайте создадим новый файл в каталоге пауков quotes_spider.py и добавим в него приведенный ниже код.

import scrapy
class QuotesSpider(scrapy.Spider):
    name = "quotes"
    def start_requests(self):
        urls = [
            'https://quotes.toscrape.com/page/1/',
            'https://quotes.toscrape.com/page/2/',
        ]
        for url in urls:
            yield scrapy.Request(url=url, callback=self.parse)
    def parse(self, response, **kwargs):
        for quote in response.css('.quote .text::text').getall():
            yield {
                'quote': quote
            }

Наша сторона производителя данных готова, но нам нужно поместить эти данные в конвейер данных, а не в файл. Прежде чем помещать данные в конвейер данных, нам нужно создать конвейер данных.

Построение конвейера данных

Во-первых, нам нужно установить Redis в нашей системе, для этого нам нужно следовать официальному руководству по установке Redis. После установки Redis и запуска он должен показать что-то вроде приведенного ниже.

Теперь нам нужно создать обертки вокруг функций Redis, чтобы сделать их более человечными. Давайте начнем с создания каталога в корне с именем конвейера и создадим новый файл reid_client.py в этом каталоге.

Теперь нам нужно добавить приведенный ниже код в наш файл redis-client.py. Код говорит сам за себя, мы создали средство получения и установки данных. Оба имеют дело с данными JSON, поскольку Redis может хранить только строковые данные, а для хранения строковых данных нам нужно их JSONIFY.

import json
import redis

class RedisClient:
    """
    Custom Redis client with all the wrapper funtions. This client implements FIFO for pipeline.
    """
    connection = redis.Redis(host='localhost', port=6379, db=0)
    key = 'DATA-PIPELINE-KEY'
    def _convert_data_to_json(self, data):
        try:
            return json.dumps(data)
        except Exception as e:
            print(f'Failed to convert data into json with error: {e}')
            raise e
    def _convert_data_from_json(self, data):
        try:
            return json.loads(data)
        except Exception as e:
            print(f'Failed to convert data from json to dict with error: {e}')
            raise e
    def send_data_to_pipeline(self, data):
        data = self._convert_data_to_json(data)
        self.connection.lpush(self.key, data)
    def get_data_from_pipeline(self):
        try:
            data = self.connection.rpop(self.key)
            return self._convert_data_from_json(data)
        except Exception as e:
            print(f'Failed to get more data from pipeline with error: {e}')

Теперь, когда мы создали конвейер, мы можем начать помещать в него наши данные со стороны производителя данных. Для этого нам нужно создать конвейер в scrapy, который добавляет каждый очищенный элемент в Redis, а мы используем его позже. Просто добавьте приведенный ниже код в файл pipeps.py проекта scrapy.

from pipeline.redis_client import RedisClient
class ProducerPipeline:
    redis_client = RedisClient()
    def process_item(self, item, spider):
        self.redis_client.send_data_to_pipeline(item)
        return item

ITEM_PIPELINES = {
   'producer.pipelines.ProducerPipeline': 300,
}

Создание потребителя и потребление данных

Поскольку мы построили конвейер и производителя, который может продолжать помещать данные в конвейер независимо от потребления данных, мы прошли более половины всего, что нам нужно, чтобы получить данные из конвейера данных и использовать их в соответствии с нашими потребностями, чтобы назвать это проектом. .

Создадим в корне новую директорию, назовем ее Consumer и создадим в ней новый файл с именем quotes_consumer.py.

Добавьте приведенный ниже код в файл quotes_consumer.py. Код проверяет наличие новых данных, если он не может найти новые данные в конвейере, тогда он спит, в противном случае он принимает эти данные, в нашем случае мы сохраняем кавычки в текстовый файл.

import time
from pipeline.redis_client import RedisClient

class QuotesConsumer:
    redis_client = RedisClient()
    sleep_seconds = 1
    def run(self):
        while True:
            if self.redis_client.get_items_in_pipeline() == 0:
                print(f'No new data in pipeline, sleeping for {self.sleep_seconds} seconds...')
                time.sleep(self.sleep_seconds)
                self.sleep_seconds += 1
                continue
            self.sleep_seconds = 1
            data = self.redis_client.get_data_from_pipeline()
            print(f'Obtained data from pipeline, saving to file...')
            with open('quotes.txt', 'a+') as file:
                file.write(data.get('quote'))

if __name__ == "__main__":
    consumer = QuotesConsumer()
    consumer.run()

Вы можете найти полный код в следующем репозитории GitHub .