पायथन में एक अत्यधिक स्केलेबल डेटा स्ट्रीमिंग पाइपलाइन का निर्माण
पायथन ने डेटा-गहन नौकरियों के लिए खुद को एक भाषा के रूप में आकार दिया है। हम इसे हर जगह सिर्फ इसलिए देख रहे हैं क्योंकि यह वास्तव में पायथन में प्रोटोटाइप के लिए तेज़ है और लोग इसे इसके आसान सिंटैक्स के कारण पसंद कर रहे हैं, यह लहर डेटा उद्योग में भी उतरी। डेटा इंजीनियरों और डेटा वैज्ञानिकों ने भी अपने डेटा-गहन कार्यों में इसका उपयोग करना शुरू कर दिया है। इस लेख में, हम पायथन का उपयोग करके एक बहुत ही सरल और अत्यधिक स्केलेबल डेटा स्ट्रीमिंग पाइपलाइन बनाने जा रहे हैं।
डेटा स्ट्रीमिंग डेटा के निरंतर प्रवाह को प्रसारित करने की प्रक्रिया है।
अब हम जानते हैं कि पाइपलाइन के एक तरफ हमारे पास कुछ या कम से कम एक डेटा निर्माता होगा जो लगातार डेटा का उत्पादन करेगा और दूसरी तरफ, हमारे पास कुछ या कम से कम एक डेटा उपभोक्ता होगा जो उस डेटा का लगातार उपभोग करेगा।
आर्किटेक्चर
पहली बात यह है कि एक स्केलेबल और फ्लेक्सिबल आर्किटेक्चर डिजाइन करना है जो दावे को सही ठहराता है। हम Redis को डेटा पाइपलाइन के रूप में उपयोग करेंगे और इस लेख के लिए, हम डेटा निर्माता के रूप में स्वतंत्र रूप से Scrapy का उपयोग करके और डेटा उपभोक्ता के रूप में एक अलग microservice का उपयोग करके एक बहुत ही सरल डेटा स्क्रैपिंग microservice का उपयोग करेंगे।
डेटा निर्माता का निर्माण
सबसे पहले हमें एक सक्रिय आभासी वातावरण के साथ एक साधारण पायथन परियोजना का निर्माण करना है। इस विशिष्ट लेख के लिए, हम स्क्रेपी के आधिकारिक ट्यूटोरियल का उपयोग करने जा रहे हैं । खाली स्क्रेपी प्रोजेक्ट बनाने के लिए हमें नीचे दी गई कमांड को चलाने की जरूरत है।
scrapy startproject producer
अब हमें एक मकड़ी बनाने की जरूरत है जो वास्तव में डेटा को कहीं से परिमार्जन कर सके। स्पाइडर्स डायरेक्टरी Quotes_spider.py में एक नई फाइल बनाते हैं और उसमें नीचे दिए गए कोड को जोड़ते हैं।
import scrapy
class QuotesSpider(scrapy.Spider):
name = "quotes"
def start_requests(self):
urls = [
'https://quotes.toscrape.com/page/1/',
'https://quotes.toscrape.com/page/2/',
]
for url in urls:
yield scrapy.Request(url=url, callback=self.parse)
def parse(self, response, **kwargs):
for quote in response.css('.quote .text::text').getall():
yield {
'quote': quote
}
हमारा डेटा निर्माता पक्ष अब तैयार है लेकिन हमें उस डेटा को फ़ाइल के बजाय डेटा पाइपलाइन में डालने की आवश्यकता है। डेटा पाइपलाइन में डेटा डालने से पहले हमें पहले डेटा पाइपलाइन बनाने की आवश्यकता होती है।
डेटा पाइपलाइन का निर्माण
सबसे पहले हमें Redis को अपने सिस्टम पर इंस्टॉल करना होगा, ऐसा करने के लिए हमें Redis के आधिकारिक इंस्टॉलेशन गाइड का पालन करना होगा। रेडिस को स्थापित करने और चलाने के बाद इसे नीचे जैसा कुछ दिखाना चाहिए।
अब हमें उन्हें और अधिक मानवीय बनाने के लिए रेडिस कार्यों के चारों ओर रैपर बनाने की जरूरत है। आइए नाम पाइपलाइन के साथ रूट में एक निर्देशिका बनाने के साथ शुरू करें और इस निर्देशिका में एक नई फ़ाइल reid_client.py बनाएं।
अब हमें नीचे दिए गए कोड को हमारी redis-client.py फ़ाइल में जोड़ना होगा। कोड स्व-व्याख्यात्मक है हमने एक डेटा गेट्टर और डेटा सेटर बनाया है। दोनों JSON डेटा से निपटते हैं क्योंकि रेडिस केवल स्ट्रिंग डेटा स्टोर कर सकता है और स्ट्रिंग डेटा स्टोर करने के लिए हमें इसे JSONIFY करने की आवश्यकता है।
import json
import redis
class RedisClient:
"""
Custom Redis client with all the wrapper funtions. This client implements FIFO for pipeline.
"""
connection = redis.Redis(host='localhost', port=6379, db=0)
key = 'DATA-PIPELINE-KEY'
def _convert_data_to_json(self, data):
try:
return json.dumps(data)
except Exception as e:
print(f'Failed to convert data into json with error: {e}')
raise e
def _convert_data_from_json(self, data):
try:
return json.loads(data)
except Exception as e:
print(f'Failed to convert data from json to dict with error: {e}')
raise e
def send_data_to_pipeline(self, data):
data = self._convert_data_to_json(data)
self.connection.lpush(self.key, data)
def get_data_from_pipeline(self):
try:
data = self.connection.rpop(self.key)
return self._convert_data_from_json(data)
except Exception as e:
print(f'Failed to get more data from pipeline with error: {e}')
अब जब हमने एक पाइपलाइन बना ली है तो हम डेटा प्रोड्यूसर की तरफ से उसमें अपना डेटा डालना शुरू कर सकते हैं। उसके लिए, हमें स्क्रैपी में एक पाइपलाइन बनाने की आवश्यकता है जो प्रत्येक स्क्रैप की गई वस्तु को रेडिस में जोड़ती है और हम बाद में इसका उपभोग करते हैं। बस नीचे दिए गए कोड को अपनी स्केपर प्रोजेक्ट की पाइपलाइन्स.py फ़ाइल में जोड़ें।
from pipeline.redis_client import RedisClient
class ProducerPipeline:
redis_client = RedisClient()
def process_item(self, item, spider):
self.redis_client.send_data_to_pipeline(item)
return item
ITEM_PIPELINES = {
'producer.pipelines.ProducerPipeline': 300,
}
उपभोक्ता और उपभोक्ता डेटा का निर्माण
जैसा कि हमने एक पाइपलाइन और एक निर्माता का निर्माण किया है, जो डेटा खपत से स्वतंत्र पाइपलाइन में डेटा डाल सकता है, हम आधे से अधिक रास्ते में हैं, हमें डेटा पाइपलाइन से डेटा प्राप्त करने की आवश्यकता है और इसे एक परियोजना कहने के लिए हमारी आवश्यकताओं के अनुसार इसका उपभोग करें। .
चलिए रूट में एक नई डायरेक्टरी बनाते हैं, इसे कंज्यूमर नाम देते हैं और इसमें एक नई फाइल बनाते हैं नाम के साथ quotes_consumer.py।
नीचे दिए गए कोड कोquots_consumer.py फ़ाइल में जोड़ें। कोड नए डेटा के लिए जांच करता है अगर उसे पाइपलाइन में कोई नया डेटा नहीं मिला तो यह सो जाता है अन्यथा यह उस डेटा को निगला जाता है हमारे मामले में हम टेक्स्ट फ़ाइल में उद्धरण सहेज रहे हैं।
import time
from pipeline.redis_client import RedisClient
class QuotesConsumer:
redis_client = RedisClient()
sleep_seconds = 1
def run(self):
while True:
if self.redis_client.get_items_in_pipeline() == 0:
print(f'No new data in pipeline, sleeping for {self.sleep_seconds} seconds...')
time.sleep(self.sleep_seconds)
self.sleep_seconds += 1
continue
self.sleep_seconds = 1
data = self.redis_client.get_data_from_pipeline()
print(f'Obtained data from pipeline, saving to file...')
with open('quotes.txt', 'a+') as file:
file.write(data.get('quote'))
if __name__ == "__main__":
consumer = QuotesConsumer()
consumer.run()
आप निम्नलिखित GitHub रिपॉजिटरी पर पूरा कोड पा सकते हैं ।