สร้างไปป์ไลน์การสตรีมข้อมูลที่ปรับขนาดได้สูงใน Python
Python ได้สร้างตัวเองให้เป็นภาษาสำหรับงานที่ต้องใช้ข้อมูลมาก เราเห็นมันทุกที่เพียงเพราะมันเร็วมากในการสร้างต้นแบบใน Python และผู้คนก็ชื่นชอบมันเนื่องจากไวยากรณ์ที่ง่าย คลื่นนั้นเข้าสู่อุตสาหกรรมข้อมูลด้วย วิศวกรข้อมูลและนักวิทยาศาสตร์ข้อมูลก็เริ่มใช้มันในงานที่ต้องใช้ข้อมูลมาก ในบทความนี้ เราจะสร้างไปป์ไลน์การสตรีมข้อมูลที่ง่ายและปรับขนาดได้สูงโดยใช้ Python
การสตรีมข้อมูลเป็นกระบวนการของการส่งข้อมูลอย่างต่อเนื่อง
ตอนนี้เรารู้แล้วว่าในด้านหนึ่งของไปป์ไลน์เราจะมีผู้ผลิตข้อมูลบางส่วนหรืออย่างน้อยหนึ่งรายที่จะผลิตข้อมูลอย่างต่อเนื่อง และอีกด้านหนึ่ง เราจะมีผู้ใช้ข้อมูลบางรายหรืออย่างน้อยหนึ่งรายที่จะใช้ข้อมูลนั้นอย่างต่อเนื่อง
สถาปัตยกรรม
สิ่งแรกคือการออกแบบสถาปัตยกรรมที่ปรับขนาดได้และยืดหยุ่นซึ่งพิสูจน์การอ้างสิทธิ์ เราจะใช้Redisเป็นไปป์ไลน์ข้อมูล และเพื่อประโยชน์ของบทความนี้ เราจะใช้ไมโครเซอร์วิสการขูดข้อมูลอย่างง่าย โดยใช้Scrapyแยกกันในฐานะผู้ผลิตข้อมูล และไมโครเซอร์วิสที่แยกต่างหากในฐานะผู้บริโภคข้อมูล
การสร้างตัวสร้างข้อมูล
สิ่งแรกที่เราต้องการคือสร้างโปรเจ็กต์ Python อย่างง่ายด้วยสภาพแวดล้อมเสมือนที่เปิดใช้งาน สำหรับบทความนี้ เราจะใช้ บทช่วย สอนอย่าง เป็นทางการของ Scrapy เราจำเป็นต้องเรียกใช้คำสั่งที่ระบุด้านล่างเพื่อสร้างโครงการ Scrapy ที่ว่างเปล่า
scrapy startproject producer
ตอนนี้เราต้องสร้างสไปเดอร์ที่สามารถขูดข้อมูลจากที่ใดที่หนึ่งได้ มาสร้างไฟล์ใหม่ในไดเร็กทอรี spiders quotes_spider.py และเพิ่มโค้ดที่ระบุด้านล่างลงไป
import scrapy
class QuotesSpider(scrapy.Spider):
name = "quotes"
def start_requests(self):
urls = [
'https://quotes.toscrape.com/page/1/',
'https://quotes.toscrape.com/page/2/',
]
for url in urls:
yield scrapy.Request(url=url, callback=self.parse)
def parse(self, response, **kwargs):
for quote in response.css('.quote .text::text').getall():
yield {
'quote': quote
}
ด้านผู้ผลิตข้อมูลของเราพร้อมแล้ว แต่เราต้องใส่ข้อมูลนั้นลงในท่อส่งข้อมูลแทนในไฟล์ ก่อนที่จะใส่ข้อมูลลงในท่อส่งข้อมูล เราต้องสร้างท่อส่งข้อมูลก่อน
สร้างไปป์ไลน์ข้อมูล
ก่อนอื่นเราต้องติดตั้ง Redis ในระบบของเรา โดยทำตามคู่มือ การติดตั้งอย่างเป็นทางการ ของ Redis หลังจากติดตั้ง Redis และเรียกใช้ควรแสดงข้อมูลด้านล่าง
ตอนนี้เราต้องสร้างสิ่งห่อหุ้มรอบฟังก์ชัน Redis เพื่อให้มีมนุษยธรรมมากขึ้น เริ่มจากการสร้างไดเร็กทอรีใน root ด้วยชื่อไปป์ไลน์ และสร้างไฟล์ใหม่ reid_client.py ในไดเร็กทอรีนี้
ตอนนี้เราต้องเพิ่มรหัสที่ระบุด้านล่างในไฟล์ redis-client.py ของเรา รหัสนี้เป็นตัวอธิบายเราได้สร้าง data getter และ data setter ทั้งคู่จัดการกับข้อมูล JSON เนื่องจาก Redis สามารถเก็บข้อมูลสตริงได้เท่านั้น และเพื่อเก็บข้อมูลสตริงเราจำเป็นต้อง JSONIFY
import json
import redis
class RedisClient:
"""
Custom Redis client with all the wrapper funtions. This client implements FIFO for pipeline.
"""
connection = redis.Redis(host='localhost', port=6379, db=0)
key = 'DATA-PIPELINE-KEY'
def _convert_data_to_json(self, data):
try:
return json.dumps(data)
except Exception as e:
print(f'Failed to convert data into json with error: {e}')
raise e
def _convert_data_from_json(self, data):
try:
return json.loads(data)
except Exception as e:
print(f'Failed to convert data from json to dict with error: {e}')
raise e
def send_data_to_pipeline(self, data):
data = self._convert_data_to_json(data)
self.connection.lpush(self.key, data)
def get_data_from_pipeline(self):
try:
data = self.connection.rpop(self.key)
return self._convert_data_from_json(data)
except Exception as e:
print(f'Failed to get more data from pipeline with error: {e}')
เมื่อเราสร้างไปป์ไลน์แล้ว เราสามารถเริ่มใส่ข้อมูลของเราจากฝั่งผู้ผลิตข้อมูล เพื่อสิ่งนั้น เราจำเป็นต้องสร้างไปป์ไลน์ใน scrapy ซึ่งจะเพิ่มทุกรายการที่คัดลอกไปยัง Redis และเราจะใช้มันในภายหลัง เพียงเพิ่มโค้ดด้านล่างลงในไฟล์pipelines.py ของไฟล์โครงการ scrapy
from pipeline.redis_client import RedisClient
class ProducerPipeline:
redis_client = RedisClient()
def process_item(self, item, spider):
self.redis_client.send_data_to_pipeline(item)
return item
ITEM_PIPELINES = {
'producer.pipelines.ProducerPipeline': 300,
}
สร้างข้อมูลผู้บริโภคและการบริโภค
เนื่องจากเราได้สร้างไปป์ไลน์และโปรดิวเซอร์ที่สามารถใส่ข้อมูลลงในไปป์ไลน์โดยไม่ขึ้นกับการใช้ข้อมูล เราจึงมีมากกว่าครึ่งทางของสิ่งที่เราต้องการในการรับข้อมูลจากไปป์ไลน์ข้อมูลและใช้งานตามความต้องการของเราเพื่อเรียกมันว่าโปรเจ็กต์ .
มาสร้างไดเร็กทอรีใหม่ในรูท ตั้งชื่อเป็น consumer และสร้างไฟล์ใหม่ด้วยชื่อ quotes_consumer.py
เพิ่มรหัสที่ระบุด้านล่างในไฟล์ quotes_consumer.py โค้ดจะตรวจสอบข้อมูลใหม่ หากไม่พบข้อมูลใหม่ใดๆ ในไปป์ไลน์ ก็จะเข้าสู่โหมดสลีป มิฉะนั้นจะนำเข้าข้อมูลนั้น ในกรณีของเรา เรากำลังบันทึกเครื่องหมายคำพูดลงในไฟล์ข้อความ
import time
from pipeline.redis_client import RedisClient
class QuotesConsumer:
redis_client = RedisClient()
sleep_seconds = 1
def run(self):
while True:
if self.redis_client.get_items_in_pipeline() == 0:
print(f'No new data in pipeline, sleeping for {self.sleep_seconds} seconds...')
time.sleep(self.sleep_seconds)
self.sleep_seconds += 1
continue
self.sleep_seconds = 1
data = self.redis_client.get_data_from_pipeline()
print(f'Obtained data from pipeline, saving to file...')
with open('quotes.txt', 'a+') as file:
file.write(data.get('quote'))
if __name__ == "__main__":
consumer = QuotesConsumer()
consumer.run()
คุณสามารถค้นหารหัสที่สมบูรณ์ได้จากที่ เก็บ GitHub ต่อไปนี้