Создание масштабируемого конвейера потоковой передачи данных в Python
Python зарекомендовал себя как язык для работы с большими объемами данных. Мы видим это повсюду только потому, что в Python очень быстро создавать прототипы, и люди любят его из-за его простого синтаксиса, эта волна коснулась и индустрии данных. Инженеры данных и специалисты по данным также начали использовать его в своей работе с интенсивным использованием данных. В этой статье мы собираемся создать очень простой и хорошо масштабируемый конвейер потоковой передачи данных с использованием Python.
Потоковая передача данных — это процесс передачи непрерывного потока данных.
Теперь мы знаем, что на одной стороне конвейера у нас будет какой-то или по крайней мере один производитель данных, который будет непрерывно производить данные, а на другой стороне у нас будет какой-то или по крайней мере один потребитель данных, который будет постоянно потреблять эти данные.
Архитектура
Прежде всего необходимо разработать масштабируемую и гибкую архитектуру, которая оправдывает требования. Мы будем использовать Redis в качестве конвейера данных, и ради этой статьи мы будем использовать очень простой микросервис очистки данных, используя Scrapy независимо в качестве производителя данных и отдельный микросервис в качестве потребителя данных.
Создание производителя данных
Первое, что нам нужно, это создать простой проект Python с активированной виртуальной средой. Для этой конкретной статьи мы будем использовать официальный учебник Scrapy . Нам нужно запустить приведенную ниже команду, чтобы создать пустой проект Scrapy.
scrapy startproject producer
Теперь нам нужно создать паука, который действительно может собирать данные откуда-то. Давайте создадим новый файл в каталоге пауков quotes_spider.py и добавим в него приведенный ниже код.
import scrapy
class QuotesSpider(scrapy.Spider):
name = "quotes"
def start_requests(self):
urls = [
'https://quotes.toscrape.com/page/1/',
'https://quotes.toscrape.com/page/2/',
]
for url in urls:
yield scrapy.Request(url=url, callback=self.parse)
def parse(self, response, **kwargs):
for quote in response.css('.quote .text::text').getall():
yield {
'quote': quote
}
Наша сторона производителя данных готова, но нам нужно поместить эти данные в конвейер данных, а не в файл. Прежде чем помещать данные в конвейер данных, нам нужно создать конвейер данных.
Построение конвейера данных
Во-первых, нам нужно установить Redis в нашей системе, для этого нам нужно следовать официальному руководству по установке Redis. После установки Redis и запуска он должен показать что-то вроде приведенного ниже.
Теперь нам нужно создать обертки вокруг функций Redis, чтобы сделать их более человечными. Давайте начнем с создания каталога в корне с именем конвейера и создадим новый файл reid_client.py в этом каталоге.
Теперь нам нужно добавить приведенный ниже код в наш файл redis-client.py. Код говорит сам за себя, мы создали средство получения и установки данных. Оба имеют дело с данными JSON, поскольку Redis может хранить только строковые данные, а для хранения строковых данных нам нужно их JSONIFY.
import json
import redis
class RedisClient:
"""
Custom Redis client with all the wrapper funtions. This client implements FIFO for pipeline.
"""
connection = redis.Redis(host='localhost', port=6379, db=0)
key = 'DATA-PIPELINE-KEY'
def _convert_data_to_json(self, data):
try:
return json.dumps(data)
except Exception as e:
print(f'Failed to convert data into json with error: {e}')
raise e
def _convert_data_from_json(self, data):
try:
return json.loads(data)
except Exception as e:
print(f'Failed to convert data from json to dict with error: {e}')
raise e
def send_data_to_pipeline(self, data):
data = self._convert_data_to_json(data)
self.connection.lpush(self.key, data)
def get_data_from_pipeline(self):
try:
data = self.connection.rpop(self.key)
return self._convert_data_from_json(data)
except Exception as e:
print(f'Failed to get more data from pipeline with error: {e}')
Теперь, когда мы создали конвейер, мы можем начать помещать в него наши данные со стороны производителя данных. Для этого нам нужно создать конвейер в scrapy, который добавляет каждый очищенный элемент в Redis, а мы используем его позже. Просто добавьте приведенный ниже код в файл pipeps.py проекта scrapy.
from pipeline.redis_client import RedisClient
class ProducerPipeline:
redis_client = RedisClient()
def process_item(self, item, spider):
self.redis_client.send_data_to_pipeline(item)
return item
ITEM_PIPELINES = {
'producer.pipelines.ProducerPipeline': 300,
}
Создание потребителя и потребление данных
Поскольку мы построили конвейер и производителя, который может продолжать помещать данные в конвейер независимо от потребления данных, мы прошли более половины всего, что нам нужно, чтобы получить данные из конвейера данных и использовать их в соответствии с нашими потребностями, чтобы назвать это проектом. .
Создадим в корне новую директорию, назовем ее Consumer и создадим в ней новый файл с именем quotes_consumer.py.
Добавьте приведенный ниже код в файл quotes_consumer.py. Код проверяет наличие новых данных, если он не может найти новые данные в конвейере, тогда он спит, в противном случае он принимает эти данные, в нашем случае мы сохраняем кавычки в текстовый файл.
import time
from pipeline.redis_client import RedisClient
class QuotesConsumer:
redis_client = RedisClient()
sleep_seconds = 1
def run(self):
while True:
if self.redis_client.get_items_in_pipeline() == 0:
print(f'No new data in pipeline, sleeping for {self.sleep_seconds} seconds...')
time.sleep(self.sleep_seconds)
self.sleep_seconds += 1
continue
self.sleep_seconds = 1
data = self.redis_client.get_data_from_pipeline()
print(f'Obtained data from pipeline, saving to file...')
with open('quotes.txt', 'a+') as file:
file.write(data.get('quote'))
if __name__ == "__main__":
consumer = QuotesConsumer()
consumer.run()
Вы можете найти полный код в следующем репозитории GitHub .