Budowa wysoce skalowalnego potoku strumieniowego przesyłania danych w języku Python
Python ukształtował się jako język do zadań wymagających dużej ilości danych. Widzimy to wszędzie tylko dlatego, że prototypowanie w Pythonie jest naprawdę szybkie, a ludzie je uwielbiają ze względu na jego łatwą składnię, ta fala wylądowała również w branży danych. Inżynierowie danych i analitycy danych również zaczęli używać go w swoich zadaniach wymagających dużej ilości danych. W tym artykule zbudujemy bardzo prosty i wysoce skalowalny potok strumieniowania danych przy użyciu Pythona.
Strumieniowe przesyłanie danych to proces przesyłania ciągłego strumienia danych.
Teraz wiemy, że po jednej stronie potoku mielibyśmy kilku lub przynajmniej jednego producenta danych, który stale wytwarzałby dane, a po drugiej stronie mielibyśmy kilku lub przynajmniej jednego konsumenta danych, który konsumowałby te dane w sposób ciągły.
Architektura
Pierwszą rzeczą jest zaprojektowanie skalowalnej i elastycznej architektury, która uzasadnia twierdzenie. Będziemy używać Redis jako potoku danych i na potrzeby tego artykułu będziemy używać bardzo prostej mikrousługi zbierania danych, używając Scrapy niezależnie jako producenta danych i oddzielnej mikrousługi jako konsumenta danych.
Budowa producenta danych
Najpierw musimy zbudować prosty projekt w Pythonie z aktywowanym środowiskiem wirtualnym. W tym konkretnym artykule użyjemy oficjalnego samouczka Scrapy . Musimy uruchomić podane poniżej polecenie, aby utworzyć pusty projekt Scrapy.
scrapy startproject producer
Teraz musimy stworzyć pająka, który faktycznie może skądś pobierać dane. Utwórzmy nowy plik w katalogu spiders quotes_spider.py i dodajmy do niego poniższy kod.
import scrapy
class QuotesSpider(scrapy.Spider):
name = "quotes"
def start_requests(self):
urls = [
'https://quotes.toscrape.com/page/1/',
'https://quotes.toscrape.com/page/2/',
]
for url in urls:
yield scrapy.Request(url=url, callback=self.parse)
def parse(self, response, **kwargs):
for quote in response.css('.quote .text::text').getall():
yield {
'quote': quote
}
Nasza strona producenta danych jest już gotowa, ale musimy umieścić te dane w potoku danych zamiast w pliku. Przed umieszczeniem danych w potoku danych musimy wcześniej zbudować potok danych.
Budowa potoku danych
Najpierw musimy zainstalować Redis w naszym systemie, aby to zrobić, musimy postępować zgodnie z oficjalnym przewodnikiem instalacji Redis. Po zainstalowaniu i uruchomieniu Redis powinien pokazać coś takiego jak poniżej.
Teraz musimy utworzyć opakowania wokół funkcji Redis, aby uczynić je bardziej ludzkimi. Zacznijmy od utworzenia katalogu w katalogu głównym z potokiem nazw i utworzeniem w tym katalogu nowego pliku reid_client.py.
Teraz musimy dodać poniższy kod do naszego pliku redis-client.py. Kod jest oczywisty, stworzyliśmy moduł pobierający i ustawiający dane. Oba dotyczą danych JSON, ponieważ Redis może przechowywać tylko dane łańcuchowe, a do przechowywania danych łańcuchowych potrzebujemy JSONIFY.
import json
import redis
class RedisClient:
"""
Custom Redis client with all the wrapper funtions. This client implements FIFO for pipeline.
"""
connection = redis.Redis(host='localhost', port=6379, db=0)
key = 'DATA-PIPELINE-KEY'
def _convert_data_to_json(self, data):
try:
return json.dumps(data)
except Exception as e:
print(f'Failed to convert data into json with error: {e}')
raise e
def _convert_data_from_json(self, data):
try:
return json.loads(data)
except Exception as e:
print(f'Failed to convert data from json to dict with error: {e}')
raise e
def send_data_to_pipeline(self, data):
data = self._convert_data_to_json(data)
self.connection.lpush(self.key, data)
def get_data_from_pipeline(self):
try:
data = self.connection.rpop(self.key)
return self._convert_data_from_json(data)
except Exception as e:
print(f'Failed to get more data from pipeline with error: {e}')
Teraz, gdy stworzyliśmy potok, możemy zacząć umieszczać w nim nasze dane od strony producenta danych. W tym celu musimy utworzyć potok w scrapy, który dodaje każdy zeskrobany element do Redis i zużywamy go później. Po prostu dodaj poniższy kod do pliku pipelines.py projektu scrapy.
from pipeline.redis_client import RedisClient
class ProducerPipeline:
redis_client = RedisClient()
def process_item(self, item, spider):
self.redis_client.send_data_to_pipeline(item)
return item
ITEM_PIPELINES = {
'producer.pipelines.ProducerPipeline': 300,
}
Budowanie konsumenta i konsumowanie danych
Ponieważ zbudowaliśmy potok i producenta, który może przekazywać dane do potoku niezależnie od zużycia danych, jesteśmy ponad w połowie wszystkiego, czego potrzebujemy, aby pobrać dane z potoku danych i wykorzystać je zgodnie z naszymi potrzebami, aby nazwać to projektem .
Utwórzmy nowy katalog w katalogu głównym, nazwijmy go konsument i utwórzmy w nim nowy plik o nazwie quotes_consumer.py.
Dodaj poniższy kod do pliku quotes_consumer.py. Kod sprawdza nowe dane, jeśli nie może znaleźć żadnych nowych danych w potoku, a następnie śpi, w przeciwnym razie pobiera te dane, w naszym przypadku zapisujemy cytaty w pliku tekstowym.
import time
from pipeline.redis_client import RedisClient
class QuotesConsumer:
redis_client = RedisClient()
sleep_seconds = 1
def run(self):
while True:
if self.redis_client.get_items_in_pipeline() == 0:
print(f'No new data in pipeline, sleeping for {self.sleep_seconds} seconds...')
time.sleep(self.sleep_seconds)
self.sleep_seconds += 1
continue
self.sleep_seconds = 1
data = self.redis_client.get_data_from_pipeline()
print(f'Obtained data from pipeline, saving to file...')
with open('quotes.txt', 'a+') as file:
file.write(data.get('quote'))
if __name__ == "__main__":
consumer = QuotesConsumer()
consumer.run()
Cały kod można znaleźć w następującym repozytorium GitHub .