Python'da Yüksek Derecede Ölçeklenebilir Veri Akış Hattı Oluşturma

Nov 27 2022
Python'da yüksek düzeyde ölçeklenebilir bir veri akışı ardışık düzeni oluşturmak için adım adım kılavuz.
Python kendisini veri yoğun işler için bir dil olarak şekillendirdi. Python'da prototip oluşturmanın gerçekten hızlı olması ve kolay sözdizimi nedeniyle insanlar onu sevmesi nedeniyle onu her yerde görüyoruz, bu dalga veri endüstrisine de indi.
Unsplash'ta JJ Ying'in fotoğrafı

Python kendisini veri yoğun işler için bir dil olarak şekillendirdi. Python'da prototip oluşturmanın gerçekten hızlı olması ve kolay sözdizimi nedeniyle insanlar onu sevmesi nedeniyle onu her yerde görüyoruz, bu dalga veri endüstrisine de indi. Veri mühendisleri ve veri bilimcileri de veri yoğun işlerinde kullanmaya başladılar. Bu yazıda, Python kullanarak çok basit ve yüksek düzeyde ölçeklenebilir bir veri akışı boru hattı oluşturacağız.

Veri akışı, sürekli bir veri akışını iletme işlemidir.

Artık boru hattının bir tarafında, sürekli olarak veri üreten bir miktar veya en az bir veri üreticimiz olacağını ve diğer tarafta, bu verileri sürekli olarak tüketecek bir miktar veya en az bir veri tüketicimiz olacağını biliyoruz.

Mimari

Birincisi, iddiayı haklı çıkaran ölçeklenebilir ve esnek bir mimari tasarlamaktır. Redis'i veri boru hattı olarak kullanacağız ve bu makalenin hatırına, veri üreticisi olarak Scrapy'yi bağımsız olarak ve veri tüketicisi olarak ayrı bir mikro hizmeti kullanarak çok basit bir veri kazıma mikro hizmeti kullanacağız.

Veri üreticisini oluşturma

İhtiyacımız olan ilk şey, etkinleştirilmiş bir sanal ortamla basit bir Python projesi oluşturmak. Bu özel makale için, Scrapy'nin resmi eğitimini kullanacağız . Boş bir Scrapy projesi oluşturmak için aşağıda verilen komutu çalıştırmamız gerekiyor.

scrapy startproject producer

Şimdi verileri bir yerden gerçekten kazıyabilecek bir örümcek yaratmamız gerekiyor. Örümcekler dizini alıntı_spider.py içinde yeni bir dosya oluşturalım ve aşağıdaki kodu ona ekleyelim.

import scrapy
class QuotesSpider(scrapy.Spider):
    name = "quotes"
    def start_requests(self):
        urls = [
            'https://quotes.toscrape.com/page/1/',
            'https://quotes.toscrape.com/page/2/',
        ]
        for url in urls:
            yield scrapy.Request(url=url, callback=self.parse)
    def parse(self, response, **kwargs):
        for quote in response.css('.quote .text::text').getall():
            yield {
                'quote': quote
            }

Veri üretici tarafımız artık hazır ancak bu verileri bir dosya yerine bir veri hattına koymamız gerekiyor. Verileri bir veri boru hattına koymadan önce, daha önce bir veri boru hattı oluşturmamız gerekir.

Bir veri ardışık düzeni oluşturma

Öncelikle Redis'i sistemimize kurmamız gerekiyor, bunun için Redis'in resmi kurulum kılavuzunu takip etmemiz gerekiyor . Redis'i kurduktan ve çalıştırdıktan sonra aşağıdaki gibi bir şey göstermelidir.

Şimdi onları daha insancıl hale getirmek için Redis işlevlerinin etrafında sarmalayıcılar oluşturmamız gerekiyor. Kökte bir isim ardışık düzenine sahip bir dizin oluşturmaya başlayalım ve bu dizinde yeni bir reid_client.py dosyası oluşturalım.

Şimdi redis-client.py dosyamıza aşağıda verilen kodu eklememiz gerekiyor. Kod kendi kendini açıklayıcıdır, bir veri alıcı ve veri ayarlayıcı oluşturduk. Her ikisi de JSON verileriyle ilgilenir, çünkü Redis yalnızca dize verilerini depolayabilir ve dize verilerini depolamak için JSONIFY yapmamız gerekir.

import json
import redis

class RedisClient:
    """
    Custom Redis client with all the wrapper funtions. This client implements FIFO for pipeline.
    """
    connection = redis.Redis(host='localhost', port=6379, db=0)
    key = 'DATA-PIPELINE-KEY'
    def _convert_data_to_json(self, data):
        try:
            return json.dumps(data)
        except Exception as e:
            print(f'Failed to convert data into json with error: {e}')
            raise e
    def _convert_data_from_json(self, data):
        try:
            return json.loads(data)
        except Exception as e:
            print(f'Failed to convert data from json to dict with error: {e}')
            raise e
    def send_data_to_pipeline(self, data):
        data = self._convert_data_to_json(data)
        self.connection.lpush(self.key, data)
    def get_data_from_pipeline(self):
        try:
            data = self.connection.rpop(self.key)
            return self._convert_data_from_json(data)
        except Exception as e:
            print(f'Failed to get more data from pipeline with error: {e}')

Şimdi bir boru hattı oluşturduğumuza göre, veri üreticisi tarafından verilerimizi buna koymaya başlayabiliriz. Bunun için, scrapy'de her scraped öğesini Redis'e ekleyen bir boru hattı oluşturmamız gerekiyor ve onu daha sonra tüketiyoruz. Aşağıdaki kodu, scrapy projenizin pipelines.py dosyasına eklemeniz yeterlidir.

from pipeline.redis_client import RedisClient
class ProducerPipeline:
    redis_client = RedisClient()
    def process_item(self, item, spider):
        self.redis_client.send_data_to_pipeline(item)
        return item

ITEM_PIPELINES = {
   'producer.pipelines.ProducerPipeline': 300,
}

Tüketiciyi oluşturmak ve verileri tüketmek

Bir boru hattı ve veri tüketiminden bağımsız olarak boru hattına veri koymaya devam edebilen bir üretici oluşturduğumuz için, veri boru hattından veri almak ve onu bir proje olarak adlandırmak için ihtiyaçlarımıza göre tüketmek için ihtiyacımız olan her şeyin yarısından fazlasını yaptık. .

Kökte yeni bir dizin oluşturalım, buna tüketici adını verelim ve içinde quote_consumer.py adında yeni bir dosya oluşturalım.

Quotes_consumer.py dosyasına aşağıdaki kodu ekleyin. Kod, boru hattında herhangi bir yeni veri bulamazsa yeni verileri kontrol eder, sonra uyur, aksi takdirde bu verileri alır, bizim durumumuzda alıntıları bir metin dosyasına kaydediyoruz.

import time
from pipeline.redis_client import RedisClient

class QuotesConsumer:
    redis_client = RedisClient()
    sleep_seconds = 1
    def run(self):
        while True:
            if self.redis_client.get_items_in_pipeline() == 0:
                print(f'No new data in pipeline, sleeping for {self.sleep_seconds} seconds...')
                time.sleep(self.sleep_seconds)
                self.sleep_seconds += 1
                continue
            self.sleep_seconds = 1
            data = self.redis_client.get_data_from_pipeline()
            print(f'Obtained data from pipeline, saving to file...')
            with open('quotes.txt', 'a+') as file:
                file.write(data.get('quote'))

if __name__ == "__main__":
    consumer = QuotesConsumer()
    consumer.run()

Kodun tamamını aşağıdaki GitHub deposunda bulabilirsiniz .