Membangun Pipeline Streaming Data yang Sangat Skalabel dengan Python

Nov 27 2022
Panduan langkah demi langkah untuk membangun pipeline streaming data yang sangat skalabel dengan Python.
Python telah membentuk dirinya sebagai bahasa untuk pekerjaan intensif data. Kami melihatnya di mana-mana hanya karena sangat cepat untuk membuat prototipe dengan Python dan orang-orang menyukainya karena sintaksnya yang mudah, gelombang itu juga mendarat di industri data.
Foto oleh JJ Ying di Unsplash

Python telah membentuk dirinya sebagai bahasa untuk pekerjaan intensif data. Kami melihatnya di mana-mana hanya karena sangat cepat untuk membuat prototipe dengan Python dan orang-orang menyukainya karena sintaksnya yang mudah, gelombang itu juga mendarat di industri data. Insinyur data dan ilmuwan data juga mulai menggunakannya dalam pekerjaan intensif data mereka. Pada artikel ini, kita akan membuat pipa streaming data yang sangat sederhana dan sangat skalabel menggunakan Python.

Streaming data adalah proses mentransmisikan aliran data yang berkelanjutan.

Sekarang kita tahu di satu sisi pipa kita akan memiliki beberapa atau setidaknya satu produsen data yang akan terus menghasilkan data dan di sisi lain, kita akan memiliki beberapa atau setidaknya satu konsumen data yang akan terus mengonsumsi data tersebut.

Arsitektur

Hal pertama adalah merancang arsitektur yang dapat diskalakan dan fleksibel yang membenarkan klaim tersebut. Kami akan menggunakan Redis sebagai pipa data dan demi artikel ini, kami akan menggunakan layanan mikro pengikisan data yang sangat sederhana menggunakan Scrapy secara mandiri sebagai produsen data dan layanan mikro terpisah sebagai konsumen data.

Membangun Produser data

Yang pertama kita butuhkan adalah membangun proyek Python sederhana dengan lingkungan virtual yang diaktifkan. Untuk artikel khusus ini, kami akan menggunakan tutorial resmi Scrapy . Kita perlu menjalankan perintah yang diberikan di bawah ini untuk membuat proyek Scrapy kosong.

scrapy startproject producer

Sekarang kita perlu membuat laba-laba yang benar-benar dapat mengikis data dari suatu tempat. Mari buat file baru di direktori spiders quotes_spider.py dan tambahkan kode yang diberikan di bawah ini.

import scrapy
class QuotesSpider(scrapy.Spider):
    name = "quotes"
    def start_requests(self):
        urls = [
            'https://quotes.toscrape.com/page/1/',
            'https://quotes.toscrape.com/page/2/',
        ]
        for url in urls:
            yield scrapy.Request(url=url, callback=self.parse)
    def parse(self, response, **kwargs):
        for quote in response.css('.quote .text::text').getall():
            yield {
                'quote': quote
            }

Sisi penghasil data kita sudah siap sekarang, tetapi kita perlu memasukkan data itu ke dalam pipa data, bukan di file. Sebelum memasukkan data ke dalam data pipeline kita perlu membangun data pipeline terlebih dahulu.

Membangun saluran data

Pertama kita perlu menginstal Redis di sistem kita, untuk melakukannya kita perlu mengikuti panduan instalasi resmi Redis. Setelah menginstal Redis dan menjalankannya, seharusnya muncul seperti di bawah ini.

Sekarang kita perlu membuat pembungkus di sekitar fungsi Redis untuk membuatnya lebih manusiawi. Mari kita mulai dengan membuat direktori di root dengan nama pipeline dan membuat file baru reid_client.py di direktori ini.

Sekarang kita perlu menambahkan kode yang diberikan di bawah ini ke file redis-client.py kita. Kode ini cukup jelas, kami membuat pengambil data dan penyetel data. Keduanya berurusan dengan data JSON karena Redis hanya dapat menyimpan data string dan untuk menyimpan data string kita perlu JSONIFY.

import json
import redis

class RedisClient:
    """
    Custom Redis client with all the wrapper funtions. This client implements FIFO for pipeline.
    """
    connection = redis.Redis(host='localhost', port=6379, db=0)
    key = 'DATA-PIPELINE-KEY'
    def _convert_data_to_json(self, data):
        try:
            return json.dumps(data)
        except Exception as e:
            print(f'Failed to convert data into json with error: {e}')
            raise e
    def _convert_data_from_json(self, data):
        try:
            return json.loads(data)
        except Exception as e:
            print(f'Failed to convert data from json to dict with error: {e}')
            raise e
    def send_data_to_pipeline(self, data):
        data = self._convert_data_to_json(data)
        self.connection.lpush(self.key, data)
    def get_data_from_pipeline(self):
        try:
            data = self.connection.rpop(self.key)
            return self._convert_data_from_json(data)
        except Exception as e:
            print(f'Failed to get more data from pipeline with error: {e}')

Sekarang karena kami telah membuat saluran pipa, kami dapat mulai memasukkan data kami ke dalamnya dari sisi penghasil data. Untuk itu, kita perlu membuat pipa di scrapy yang menambahkan setiap item yang tergores ke Redis dan kita akan menggunakannya nanti. Cukup tambahkan kode di bawah ini ke pipelines.py file proyek scrapy Anda.

from pipeline.redis_client import RedisClient
class ProducerPipeline:
    redis_client = RedisClient()
    def process_item(self, item, spider):
        self.redis_client.send_data_to_pipeline(item)
        return item

ITEM_PIPELINES = {
   'producer.pipelines.ProducerPipeline': 300,
}

Membangun konsumen dan mengkonsumsi data

Karena kami telah membangun jalur pipa dan produser yang dapat terus memasukkan data ke jalur pipa secara independen dari konsumsi data, kami sudah lebih dari setengah jalan untuk mendapatkan data dari jalur data dan menggunakannya sesuai dengan kebutuhan kami untuk menyebutnya sebagai proyek .

Mari buat direktori baru di root, beri nama konsumen dan buat file baru dengan nama quotes_consumer.py di dalamnya.

Tambahkan kode yang diberikan di bawah ini ke file quotes_consumer.py. Kode memeriksa data baru jika tidak dapat menemukan data baru di dalam pipeline, lalu akan tidur, jika tidak, data tersebut akan diserap dalam kasus kami, kami menyimpan tanda kutip ke file teks.

import time
from pipeline.redis_client import RedisClient

class QuotesConsumer:
    redis_client = RedisClient()
    sleep_seconds = 1
    def run(self):
        while True:
            if self.redis_client.get_items_in_pipeline() == 0:
                print(f'No new data in pipeline, sleeping for {self.sleep_seconds} seconds...')
                time.sleep(self.sleep_seconds)
                self.sleep_seconds += 1
                continue
            self.sleep_seconds = 1
            data = self.redis_client.get_data_from_pipeline()
            print(f'Obtained data from pipeline, saving to file...')
            with open('quotes.txt', 'a+') as file:
                file.write(data.get('quote'))

if __name__ == "__main__":
    consumer = QuotesConsumer()
    consumer.run()

Anda dapat menemukan kode lengkapnya di repositori GitHub berikut .