Construindo um Pipeline de Streaming de Dados Altamente Escalável em Python
O Python se moldou como uma linguagem para trabalhos com uso intensivo de dados. Estamos vendo isso em todos os lugares apenas porque é muito rápido prototipar em Python e as pessoas estão adorando devido à sua sintaxe fácil, essa onda também chegou ao setor de dados. Engenheiros de dados e cientistas de dados também começaram a usá-lo em seus trabalhos intensivos em dados. Neste artigo, vamos construir um pipeline de streaming de dados muito simples e altamente escalável usando Python.
O streaming de dados é o processo de transmissão de um fluxo contínuo de dados.
Agora sabemos que de um lado do pipeline teríamos algum ou pelo menos um produtor de dados que produziria dados continuamente e, do outro lado, teríamos algum ou pelo menos um consumidor de dados que consumiria esses dados continuamente.
Arquitetura
A primeira coisa é projetar uma arquitetura escalável e flexível que justifique a reivindicação. Usaremos o Redis como pipeline de dados e, para este artigo, usaremos um microsserviço de raspagem de dados muito simples usando Scrapy independentemente como produtor de dados e um microsserviço separado como consumidor de dados.
Construindo o produtor de dados
O primeiro que precisamos é construir um projeto Python simples com um ambiente virtual ativado. Para este artigo específico, vamos usar o tutorial oficial do Scrapy . Precisamos executar o comando abaixo para criar um projeto Scrapy vazio.
scrapy startproject producer
Agora precisamos criar um spider que possa realmente extrair os dados de algum lugar. Vamos criar um novo arquivo no diretório do spiders quotes_spider.py e adicionar o código abaixo a ele.
import scrapy
class QuotesSpider(scrapy.Spider):
name = "quotes"
def start_requests(self):
urls = [
'https://quotes.toscrape.com/page/1/',
'https://quotes.toscrape.com/page/2/',
]
for url in urls:
yield scrapy.Request(url=url, callback=self.parse)
def parse(self, response, **kwargs):
for quote in response.css('.quote .text::text').getall():
yield {
'quote': quote
}
Nosso lado do produtor de dados está pronto agora, mas precisamos colocar esses dados em um pipeline de dados em vez de em um arquivo. Antes de colocar dados em um pipeline de dados, precisamos criar um pipeline de dados antes.
Como criar um pipeline de dados
Primeiramente precisamos instalar o Redis em nosso sistema, para isso precisamos seguir o guia oficial de instalação do Redis. Depois de instalar o Redis e executá-lo, ele deve mostrar algo como o abaixo.
Agora precisamos criar wrappers em torno das funções do Redis para torná-las mais humanas. Vamos começar criando um diretório na raiz com um pipeline de nome e criar um novo arquivo reid_client.py neste diretório.
Agora precisamos adicionar o código fornecido abaixo ao nosso arquivo redis-client.py. O código é autoexplicativo, criamos um coletor e um configurador de dados. Ambos lidam com dados JSON, pois o Redis só pode armazenar dados de string e, para armazenar dados de string, precisamos JSONIFY.
import json
import redis
class RedisClient:
"""
Custom Redis client with all the wrapper funtions. This client implements FIFO for pipeline.
"""
connection = redis.Redis(host='localhost', port=6379, db=0)
key = 'DATA-PIPELINE-KEY'
def _convert_data_to_json(self, data):
try:
return json.dumps(data)
except Exception as e:
print(f'Failed to convert data into json with error: {e}')
raise e
def _convert_data_from_json(self, data):
try:
return json.loads(data)
except Exception as e:
print(f'Failed to convert data from json to dict with error: {e}')
raise e
def send_data_to_pipeline(self, data):
data = self._convert_data_to_json(data)
self.connection.lpush(self.key, data)
def get_data_from_pipeline(self):
try:
data = self.connection.rpop(self.key)
return self._convert_data_from_json(data)
except Exception as e:
print(f'Failed to get more data from pipeline with error: {e}')
Agora, como criamos um pipeline, podemos começar a colocar nossos dados nele do lado do produtor de dados. Para isso, precisamos criar um pipeline no scrapy que adiciona todos os itens raspados ao Redis e os consumimos posteriormente. Basta adicionar o código abaixo ao seu pipelines.py o arquivo do projeto scrapy.
from pipeline.redis_client import RedisClient
class ProducerPipeline:
redis_client = RedisClient()
def process_item(self, item, spider):
self.redis_client.send_data_to_pipeline(item)
return item
ITEM_PIPELINES = {
'producer.pipelines.ProducerPipeline': 300,
}
Construindo o consumidor e consumindo dados
Como construímos um pipeline e um produtor que pode continuar colocando dados no pipeline independentemente do consumo de dados, já passamos da metade de tudo o que precisamos para obter dados do pipeline de dados e consumi-los de acordo com nossas necessidades para chamá-lo de projeto .
Vamos criar um novo diretório na raiz, nomeá-lo como consumidor e criar um novo arquivo com o nome quotes_consumer.py nele.
Adicione o código abaixo ao arquivo quotes_consumer.py. O código verifica se há novos dados, se não conseguir encontrar novos dados no pipeline, ele fica suspenso, caso contrário, ingere esses dados. Em nosso caso, estamos salvando citações em um arquivo de texto.
import time
from pipeline.redis_client import RedisClient
class QuotesConsumer:
redis_client = RedisClient()
sleep_seconds = 1
def run(self):
while True:
if self.redis_client.get_items_in_pipeline() == 0:
print(f'No new data in pipeline, sleeping for {self.sleep_seconds} seconds...')
time.sleep(self.sleep_seconds)
self.sleep_seconds += 1
continue
self.sleep_seconds = 1
data = self.redis_client.get_data_from_pipeline()
print(f'Obtained data from pipeline, saving to file...')
with open('quotes.txt', 'a+') as file:
file.write(data.get('quote'))
if __name__ == "__main__":
consumer = QuotesConsumer()
consumer.run()
Você pode encontrar o código completo no seguinte repositório GitHub .