Creación de una canalización de transmisión de datos altamente escalable en Python
Python se ha convertido en un lenguaje para trabajos intensivos en datos. Lo estamos viendo en todas partes solo porque es muy rápido crear prototipos en Python y a la gente le encanta debido a su sintaxis fácil, esa ola también aterrizó en la industria de datos. Los ingenieros de datos y los científicos de datos también comenzaron a usarlo en sus trabajos de uso intensivo de datos. En este artículo, vamos a construir una canalización de transmisión de datos muy simple y altamente escalable usando Python.
La transmisión de datos es el proceso de transmisión de un flujo continuo de datos.
Ahora sabemos que en un lado de la canalización tendríamos al menos un productor de datos que produciría datos continuamente y, en el otro lado, tendríamos al menos un consumidor de datos que consumiría esos datos continuamente.
Arquitectura
Lo primero es diseñar una arquitectura escalable y flexible que justifique la pretensión. Usaremos Redis como la canalización de datos y, por el bien de este artículo, usaremos un microservicio de extracción de datos muy simple usando Scrapy de forma independiente como productor de datos y un microservicio separado como consumidor de datos.
Construyendo el productor de datos
Lo primero que necesitamos es construir un proyecto de Python simple con un entorno virtual activado. Para este artículo específico, vamos a utilizar el tutorial oficial de Scrapy . Necesitamos ejecutar el comando dado a continuación para crear un proyecto Scrapy vacío.
scrapy startproject producer
Ahora necesitamos crear una araña que realmente pueda raspar los datos de algún lugar. Vamos a crear un nuevo archivo en el directorio de arañas quotes_spider.py y agregarle el código que se proporciona a continuación.
import scrapy
class QuotesSpider(scrapy.Spider):
name = "quotes"
def start_requests(self):
urls = [
'https://quotes.toscrape.com/page/1/',
'https://quotes.toscrape.com/page/2/',
]
for url in urls:
yield scrapy.Request(url=url, callback=self.parse)
def parse(self, response, **kwargs):
for quote in response.css('.quote .text::text').getall():
yield {
'quote': quote
}
Nuestro lado del productor de datos está listo ahora, pero debemos colocar esos datos en una canalización de datos en lugar de en un archivo. Antes de poner datos en una tubería de datos, necesitamos construir una tubería de datos antes.
Creación de una canalización de datos
En primer lugar, debemos instalar Redis en nuestro sistema, para hacerlo, debemos seguir la guía de instalación oficial de Redis. Después de instalar Redis y ejecutarlo, debería mostrar algo como lo siguiente.
Ahora necesitamos crear contenedores alrededor de las funciones de Redis para que sean más humanas. Comencemos con la creación de un directorio en la raíz con una tubería de nombre y creemos un nuevo archivo reid_client.py en este directorio.
Ahora debemos agregar el código que se proporciona a continuación a nuestro archivo redis-client.py. El código se explica por sí mismo, creamos un captador de datos y un establecedor de datos. Ambos tratan con datos JSON ya que Redis solo puede almacenar datos de cadenas y para almacenar datos de cadenas necesitamos JSONIFY.
import json
import redis
class RedisClient:
"""
Custom Redis client with all the wrapper funtions. This client implements FIFO for pipeline.
"""
connection = redis.Redis(host='localhost', port=6379, db=0)
key = 'DATA-PIPELINE-KEY'
def _convert_data_to_json(self, data):
try:
return json.dumps(data)
except Exception as e:
print(f'Failed to convert data into json with error: {e}')
raise e
def _convert_data_from_json(self, data):
try:
return json.loads(data)
except Exception as e:
print(f'Failed to convert data from json to dict with error: {e}')
raise e
def send_data_to_pipeline(self, data):
data = self._convert_data_to_json(data)
self.connection.lpush(self.key, data)
def get_data_from_pipeline(self):
try:
data = self.connection.rpop(self.key)
return self._convert_data_from_json(data)
except Exception as e:
print(f'Failed to get more data from pipeline with error: {e}')
Ahora que hemos creado una canalización, podemos comenzar a poner nuestros datos en ella desde el lado del productor de datos. Para eso, necesitamos crear una tubería en scrapy que agregue cada elemento raspado a Redis y lo consumimos más tarde. Simplemente agregue el código a continuación a su pipelines.py el archivo del proyecto scrapy.
from pipeline.redis_client import RedisClient
class ProducerPipeline:
redis_client = RedisClient()
def process_item(self, item, spider):
self.redis_client.send_data_to_pipeline(item)
return item
ITEM_PIPELINES = {
'producer.pipelines.ProducerPipeline': 300,
}
Construyendo el consumidor y consumiendo datos
Como hemos construido una canalización y un productor que puede seguir poniendo datos en la canalización independientemente del consumo de datos, estamos a más de la mitad de todo lo que necesitamos para obtener datos de la canalización de datos y consumirlos de acuerdo con nuestras necesidades para llamarlo un proyecto. .
Vamos a crear un nuevo directorio en la raíz, asígnele el nombre consumidor y cree un nuevo archivo con el nombre quotes_consumer.py.
Agregue el código que se proporciona a continuación al archivo quotes_consumer.py. El código verifica si hay nuevos datos si no pudo encontrar ningún dato nuevo en la canalización, luego duerme; de lo contrario, ingiere esos datos en nuestro caso, estamos guardando comillas en un archivo de texto.
import time
from pipeline.redis_client import RedisClient
class QuotesConsumer:
redis_client = RedisClient()
sleep_seconds = 1
def run(self):
while True:
if self.redis_client.get_items_in_pipeline() == 0:
print(f'No new data in pipeline, sleeping for {self.sleep_seconds} seconds...')
time.sleep(self.sleep_seconds)
self.sleep_seconds += 1
continue
self.sleep_seconds = 1
data = self.redis_client.get_data_from_pipeline()
print(f'Obtained data from pipeline, saving to file...')
with open('quotes.txt', 'a+') as file:
file.write(data.get('quote'))
if __name__ == "__main__":
consumer = QuotesConsumer()
consumer.run()
Puedes encontrar el código completo en el siguiente repositorio de GitHub .