สร้างไปป์ไลน์การสตรีมข้อมูลที่ปรับขนาดได้สูงใน Python

Nov 27 2022
คำแนะนำทีละขั้นตอนสำหรับการสร้างไปป์ไลน์การสตรีมข้อมูลที่ปรับขนาดได้สูงใน Python
Python ได้สร้างตัวเองให้เป็นภาษาสำหรับงานที่ต้องใช้ข้อมูลมาก เราเห็นมันทุกที่เพียงเพราะมันเร็วมากในการสร้างต้นแบบใน Python และผู้คนก็ชื่นชอบมันเนื่องจากไวยากรณ์ที่ง่าย คลื่นนั้นเข้าสู่อุตสาหกรรมข้อมูลด้วย
ภาพถ่ายโดย JJ Ying บน Unsplash

Python ได้สร้างตัวเองให้เป็นภาษาสำหรับงานที่ต้องใช้ข้อมูลมาก เราเห็นมันทุกที่เพียงเพราะมันเร็วมากในการสร้างต้นแบบใน Python และผู้คนก็ชื่นชอบมันเนื่องจากไวยากรณ์ที่ง่าย คลื่นนั้นเข้าสู่อุตสาหกรรมข้อมูลด้วย วิศวกรข้อมูลและนักวิทยาศาสตร์ข้อมูลก็เริ่มใช้มันในงานที่ต้องใช้ข้อมูลมาก ในบทความนี้ เราจะสร้างไปป์ไลน์การสตรีมข้อมูลที่ง่ายและปรับขนาดได้สูงโดยใช้ Python

การสตรีมข้อมูลเป็นกระบวนการของการส่งข้อมูลอย่างต่อเนื่อง

ตอนนี้เรารู้แล้วว่าในด้านหนึ่งของไปป์ไลน์เราจะมีผู้ผลิตข้อมูลบางส่วนหรืออย่างน้อยหนึ่งรายที่จะผลิตข้อมูลอย่างต่อเนื่อง และอีกด้านหนึ่ง เราจะมีผู้ใช้ข้อมูลบางรายหรืออย่างน้อยหนึ่งรายที่จะใช้ข้อมูลนั้นอย่างต่อเนื่อง

สถาปัตยกรรม

สิ่งแรกคือการออกแบบสถาปัตยกรรมที่ปรับขนาดได้และยืดหยุ่นซึ่งพิสูจน์การอ้างสิทธิ์ เราจะใช้Redisเป็นไปป์ไลน์ข้อมูล และเพื่อประโยชน์ของบทความนี้ เราจะใช้ไมโครเซอร์วิสการขูดข้อมูลอย่างง่าย โดยใช้Scrapyแยกกันในฐานะผู้ผลิตข้อมูล และไมโครเซอร์วิสที่แยกต่างหากในฐานะผู้บริโภคข้อมูล

การสร้างตัวสร้างข้อมูล

สิ่งแรกที่เราต้องการคือสร้างโปรเจ็กต์ Python อย่างง่ายด้วยสภาพแวดล้อมเสมือนที่เปิดใช้งาน สำหรับบทความนี้ เราจะใช้ บทช่วย สอนอย่าง เป็นทางการของ Scrapy เราจำเป็นต้องเรียกใช้คำสั่งที่ระบุด้านล่างเพื่อสร้างโครงการ Scrapy ที่ว่างเปล่า

scrapy startproject producer

ตอนนี้เราต้องสร้างสไปเดอร์ที่สามารถขูดข้อมูลจากที่ใดที่หนึ่งได้ มาสร้างไฟล์ใหม่ในไดเร็กทอรี spiders quotes_spider.py และเพิ่มโค้ดที่ระบุด้านล่างลงไป

import scrapy
class QuotesSpider(scrapy.Spider):
    name = "quotes"
    def start_requests(self):
        urls = [
            'https://quotes.toscrape.com/page/1/',
            'https://quotes.toscrape.com/page/2/',
        ]
        for url in urls:
            yield scrapy.Request(url=url, callback=self.parse)
    def parse(self, response, **kwargs):
        for quote in response.css('.quote .text::text').getall():
            yield {
                'quote': quote
            }

ด้านผู้ผลิตข้อมูลของเราพร้อมแล้ว แต่เราต้องใส่ข้อมูลนั้นลงในท่อส่งข้อมูลแทนในไฟล์ ก่อนที่จะใส่ข้อมูลลงในท่อส่งข้อมูล เราต้องสร้างท่อส่งข้อมูลก่อน

สร้างไปป์ไลน์ข้อมูล

ก่อนอื่นเราต้องติดตั้ง Redis ในระบบของเรา โดยทำตามคู่มือ การติดตั้งอย่างเป็นทางการ ของ Redis หลังจากติดตั้ง Redis และเรียกใช้ควรแสดงข้อมูลด้านล่าง

ตอนนี้เราต้องสร้างสิ่งห่อหุ้มรอบฟังก์ชัน Redis เพื่อให้มีมนุษยธรรมมากขึ้น เริ่มจากการสร้างไดเร็กทอรีใน root ด้วยชื่อไปป์ไลน์ และสร้างไฟล์ใหม่ reid_client.py ในไดเร็กทอรีนี้

ตอนนี้เราต้องเพิ่มรหัสที่ระบุด้านล่างในไฟล์ redis-client.py ของเรา รหัสนี้เป็นตัวอธิบายเราได้สร้าง data getter และ data setter ทั้งคู่จัดการกับข้อมูล JSON เนื่องจาก Redis สามารถเก็บข้อมูลสตริงได้เท่านั้น และเพื่อเก็บข้อมูลสตริงเราจำเป็นต้อง JSONIFY

import json
import redis

class RedisClient:
    """
    Custom Redis client with all the wrapper funtions. This client implements FIFO for pipeline.
    """
    connection = redis.Redis(host='localhost', port=6379, db=0)
    key = 'DATA-PIPELINE-KEY'
    def _convert_data_to_json(self, data):
        try:
            return json.dumps(data)
        except Exception as e:
            print(f'Failed to convert data into json with error: {e}')
            raise e
    def _convert_data_from_json(self, data):
        try:
            return json.loads(data)
        except Exception as e:
            print(f'Failed to convert data from json to dict with error: {e}')
            raise e
    def send_data_to_pipeline(self, data):
        data = self._convert_data_to_json(data)
        self.connection.lpush(self.key, data)
    def get_data_from_pipeline(self):
        try:
            data = self.connection.rpop(self.key)
            return self._convert_data_from_json(data)
        except Exception as e:
            print(f'Failed to get more data from pipeline with error: {e}')

เมื่อเราสร้างไปป์ไลน์แล้ว เราสามารถเริ่มใส่ข้อมูลของเราจากฝั่งผู้ผลิตข้อมูล เพื่อสิ่งนั้น เราจำเป็นต้องสร้างไปป์ไลน์ใน scrapy ซึ่งจะเพิ่มทุกรายการที่คัดลอกไปยัง Redis และเราจะใช้มันในภายหลัง เพียงเพิ่มโค้ดด้านล่างลงในไฟล์pipelines.py ของไฟล์โครงการ scrapy

from pipeline.redis_client import RedisClient
class ProducerPipeline:
    redis_client = RedisClient()
    def process_item(self, item, spider):
        self.redis_client.send_data_to_pipeline(item)
        return item

ITEM_PIPELINES = {
   'producer.pipelines.ProducerPipeline': 300,
}

สร้างข้อมูลผู้บริโภคและการบริโภค

เนื่องจากเราได้สร้างไปป์ไลน์และโปรดิวเซอร์ที่สามารถใส่ข้อมูลลงในไปป์ไลน์โดยไม่ขึ้นกับการใช้ข้อมูล เราจึงมีมากกว่าครึ่งทางของสิ่งที่เราต้องการในการรับข้อมูลจากไปป์ไลน์ข้อมูลและใช้งานตามความต้องการของเราเพื่อเรียกมันว่าโปรเจ็กต์ .

มาสร้างไดเร็กทอรีใหม่ในรูท ตั้งชื่อเป็น consumer และสร้างไฟล์ใหม่ด้วยชื่อ quotes_consumer.py

เพิ่มรหัสที่ระบุด้านล่างในไฟล์ quotes_consumer.py โค้ดจะตรวจสอบข้อมูลใหม่ หากไม่พบข้อมูลใหม่ใดๆ ในไปป์ไลน์ ก็จะเข้าสู่โหมดสลีป มิฉะนั้นจะนำเข้าข้อมูลนั้น ในกรณีของเรา เรากำลังบันทึกเครื่องหมายคำพูดลงในไฟล์ข้อความ

import time
from pipeline.redis_client import RedisClient

class QuotesConsumer:
    redis_client = RedisClient()
    sleep_seconds = 1
    def run(self):
        while True:
            if self.redis_client.get_items_in_pipeline() == 0:
                print(f'No new data in pipeline, sleeping for {self.sleep_seconds} seconds...')
                time.sleep(self.sleep_seconds)
                self.sleep_seconds += 1
                continue
            self.sleep_seconds = 1
            data = self.redis_client.get_data_from_pipeline()
            print(f'Obtained data from pipeline, saving to file...')
            with open('quotes.txt', 'a+') as file:
                file.write(data.get('quote'))

if __name__ == "__main__":
    consumer = QuotesConsumer()
    consumer.run()

คุณสามารถค้นหารหัสที่สมบูรณ์ได้จากที่ เก็บ GitHub ต่อไปนี้