Xây dựng đường ống truyền dữ liệu có khả năng mở rộng cao bằng Python
Python đã tự định hình mình là ngôn ngữ dành cho các công việc cần nhiều dữ liệu. Chúng tôi đang thấy nó ở khắp mọi nơi chỉ vì nó thực sự nhanh chóng để tạo nguyên mẫu bằng Python và mọi người yêu thích nó do cú pháp dễ dàng của nó, làn sóng đó cũng đã đổ bộ vào ngành dữ liệu. Các kỹ sư dữ liệu và nhà khoa học dữ liệu cũng bắt đầu sử dụng nó trong các công việc cần nhiều dữ liệu của họ. Trong bài viết này, chúng ta sẽ xây dựng một đường dẫn truyền dữ liệu rất đơn giản và có khả năng mở rộng cao bằng Python.
Truyền dữ liệu là quá trình truyền một luồng dữ liệu liên tục.
Bây giờ chúng tôi biết ở một bên của đường ống, chúng tôi sẽ có một số hoặc ít nhất một nhà sản xuất dữ liệu sẽ liên tục tạo dữ liệu và ở phía bên kia, chúng tôi sẽ có một số hoặc ít nhất một người tiêu dùng dữ liệu sẽ liên tục sử dụng dữ liệu đó.
Ngành kiến trúc
Điều đầu tiên là thiết kế một kiến trúc có thể mở rộng và linh hoạt để chứng minh yêu cầu. Chúng tôi sẽ sử dụng Redis làm đường dẫn dữ liệu và vì mục đích của bài viết này, chúng tôi sẽ sử dụng một vi dịch vụ thu thập dữ liệu rất đơn giản bằng cách sử dụng Scrapy một cách độc lập với tư cách là nhà sản xuất dữ liệu và một vi dịch vụ riêng biệt với tư cách là người tiêu dùng dữ liệu.
Xây dựng nhà sản xuất dữ liệu
Điều đầu tiên chúng ta cần là xây dựng một dự án Python đơn giản với môi trường ảo được kích hoạt. Đối với bài viết cụ thể này, chúng tôi sẽ sử dụng hướng dẫn chính thức của Scrapy . Chúng ta cần chạy lệnh dưới đây để tạo một dự án Scrapy trống.
scrapy startproject producer
Bây giờ chúng ta cần tạo một con nhện thực sự có thể lấy dữ liệu từ một nơi nào đó. Hãy tạo một tệp mới trong thư mục spiders quote_spider.py và thêm mã được cung cấp bên dưới vào đó.
import scrapy
class QuotesSpider(scrapy.Spider):
name = "quotes"
def start_requests(self):
urls = [
'https://quotes.toscrape.com/page/1/',
'https://quotes.toscrape.com/page/2/',
]
for url in urls:
yield scrapy.Request(url=url, callback=self.parse)
def parse(self, response, **kwargs):
for quote in response.css('.quote .text::text').getall():
yield {
'quote': quote
}
Phía nhà sản xuất dữ liệu của chúng tôi hiện đã sẵn sàng nhưng chúng tôi cần đưa dữ liệu đó vào một đường dẫn dữ liệu thay vì trong một tệp. Trước khi đưa dữ liệu vào đường ống dữ liệu, chúng ta cần xây dựng đường ống dữ liệu trước.
Xây dựng một đường ống dữ liệu
Trước tiên, chúng tôi cần cài đặt Redis trên hệ thống của mình, để làm được điều đó, chúng tôi cần làm theo hướng dẫn cài đặt chính thức của Redis. Sau khi cài đặt Redis và chạy nó sẽ hiển thị như bên dưới.
Bây giờ chúng ta cần tạo các trình bao bọc xung quanh các chức năng của Redis để làm cho chúng trở nên nhân văn hơn. Hãy bắt đầu với việc tạo một thư mục trong thư mục gốc có đường dẫn tên và tạo một tệp mới reid_client.py trong thư mục này.
Bây giờ chúng tôi cần thêm mã được cung cấp bên dưới vào tệp redis-client.py của mình. Mã này tự giải thích, chúng tôi đã tạo một trình thu thập dữ liệu và trình thiết lập dữ liệu. Cả hai đều xử lý dữ liệu JSON vì Redis chỉ có thể lưu trữ dữ liệu chuỗi và để lưu trữ dữ liệu chuỗi, chúng ta cần JSONIFY nó.
import json
import redis
class RedisClient:
"""
Custom Redis client with all the wrapper funtions. This client implements FIFO for pipeline.
"""
connection = redis.Redis(host='localhost', port=6379, db=0)
key = 'DATA-PIPELINE-KEY'
def _convert_data_to_json(self, data):
try:
return json.dumps(data)
except Exception as e:
print(f'Failed to convert data into json with error: {e}')
raise e
def _convert_data_from_json(self, data):
try:
return json.loads(data)
except Exception as e:
print(f'Failed to convert data from json to dict with error: {e}')
raise e
def send_data_to_pipeline(self, data):
data = self._convert_data_to_json(data)
self.connection.lpush(self.key, data)
def get_data_from_pipeline(self):
try:
data = self.connection.rpop(self.key)
return self._convert_data_from_json(data)
except Exception as e:
print(f'Failed to get more data from pipeline with error: {e}')
Bây giờ khi chúng tôi đã tạo một đường dẫn, chúng tôi có thể bắt đầu đưa dữ liệu của mình vào đó từ phía nhà sản xuất dữ liệu. Để làm được điều đó, chúng tôi cần tạo một quy trình trong phế liệu để thêm mọi mục đã loại bỏ vào Redis và chúng tôi sử dụng nó sau. Chỉ cần thêm đoạn mã dưới đây vào tệp tin của dự án phế liệu pipet.py của bạn.
from pipeline.redis_client import RedisClient
class ProducerPipeline:
redis_client = RedisClient()
def process_item(self, item, spider):
self.redis_client.send_data_to_pipeline(item)
return item
ITEM_PIPELINES = {
'producer.pipelines.ProducerPipeline': 300,
}
Xây dựng dữ liệu người tiêu dùng và tiêu dùng
Vì chúng tôi đã xây dựng một đường ống và một nhà sản xuất có thể tiếp tục đưa dữ liệu vào đường ống độc lập với mức tiêu thụ dữ liệu, chúng tôi đã hoàn thành hơn nửa chặng đường tất cả những gì chúng tôi cần để lấy dữ liệu từ đường ống dữ liệu và sử dụng nó theo nhu cầu của chúng tôi để gọi đó là một dự án .
Hãy tạo một thư mục mới trong thư mục gốc, đặt tên là người tiêu dùng và tạo một tệp mới có tên quote_consumer.py trong đó.
Thêm mã được cung cấp bên dưới vào tệp quote_consumer.py. Mã kiểm tra dữ liệu mới nếu nó không thể tìm thấy bất kỳ dữ liệu mới nào trong đường dẫn thì nó sẽ ngủ nếu không nó sẽ nhập dữ liệu đó trong trường hợp của chúng tôi, chúng tôi đang lưu dấu ngoặc kép vào tệp văn bản.
import time
from pipeline.redis_client import RedisClient
class QuotesConsumer:
redis_client = RedisClient()
sleep_seconds = 1
def run(self):
while True:
if self.redis_client.get_items_in_pipeline() == 0:
print(f'No new data in pipeline, sleeping for {self.sleep_seconds} seconds...')
time.sleep(self.sleep_seconds)
self.sleep_seconds += 1
continue
self.sleep_seconds = 1
data = self.redis_client.get_data_from_pipeline()
print(f'Obtained data from pipeline, saving to file...')
with open('quotes.txt', 'a+') as file:
file.write(data.get('quote'))
if __name__ == "__main__":
consumer = QuotesConsumer()
consumer.run()
Bạn có thể tìm thấy mã hoàn chỉnh trên kho lưu trữ GitHub sau .