Python에서 확장성이 뛰어난 데이터 스트리밍 파이프라인 구축
Python은 데이터 집약적인 작업을 위한 언어로 자리 잡았습니다. 우리는 Python에서 프로토타입을 만드는 것이 정말 빠르고 사람들이 쉬운 구문으로 인해 그것을 좋아하기 때문에 모든 곳에서 그것을 보고 있습니다. 그 물결은 데이터 산업에도 상륙했습니다. 데이터 엔지니어와 데이터 과학자도 데이터 집약적인 작업에서 이를 사용하기 시작했습니다. 이 기사에서는 Python을 사용하여 매우 간단하고 확장성이 뛰어난 데이터 스트리밍 파이프라인을 구축할 것입니다.
데이터 스트리밍은 연속적인 데이터 흐름을 전송하는 프로세스입니다.
이제 우리는 파이프라인의 한쪽에는 지속적으로 데이터를 생성하는 일부 또는 적어도 한 명의 데이터 생산자가 있고 다른 쪽에는 해당 데이터를 지속적으로 소비하는 일부 또는 적어도 한 명의 데이터 소비자가 있다는 것을 알고 있습니다.
건축물
첫 번째는 주장을 정당화하는 확장 가능하고 유연한 아키텍처를 설계하는 것입니다. 우리는 Redis 를 데이터 파이프라인으로 사용하고 이 기사를 위해 Scrapy 를 데이터 생산자로 독립적으로 사용하고 별도의 마이크로 서비스를 데이터 소비자로 사용하는 매우 간단한 데이터 스크래핑 마이크로 서비스를 사용할 것입니다.
데이터 생산자 구축
첫 번째로 필요한 것은 활성화된 가상 환경으로 간단한 Python 프로젝트를 빌드하는 것입니다. 이 특정 기사에서는 Scrapy의 공식 튜토리얼 을 사용할 것 입니다. 빈 Scrapy 프로젝트를 생성하려면 아래 명령을 실행해야 합니다.
scrapy startproject producer
이제 실제로 어딘가에서 데이터를 스크랩할 수 있는 스파이더를 만들어야 합니다. 스파이더 디렉토리 quotes_spider.py에 새 파일을 만들고 아래 코드를 추가해 봅시다.
import scrapy
class QuotesSpider(scrapy.Spider):
name = "quotes"
def start_requests(self):
urls = [
'https://quotes.toscrape.com/page/1/',
'https://quotes.toscrape.com/page/2/',
]
for url in urls:
yield scrapy.Request(url=url, callback=self.parse)
def parse(self, response, **kwargs):
for quote in response.css('.quote .text::text').getall():
yield {
'quote': quote
}
이제 데이터 생산자 쪽이 준비되었지만 해당 데이터를 파일이 아닌 데이터 파이프라인에 넣어야 합니다. 데이터 파이프라인에 데이터를 넣기 전에 먼저 데이터 파이프라인을 구축해야 합니다.
데이터 파이프라인 구축
먼저 Redis의 공식 설치 가이드 를 따라 시스템에 Redis를 설치해야 합니다 . Redis를 설치하고 실행하면 아래와 같은 내용이 표시됩니다.
이제 우리는 Redis 함수를 보다 인간적으로 만들기 위해 래퍼를 만들어야 합니다. 루트에 이름 파이프라인이 있는 디렉터리를 만들고 이 디렉터리에 새 파일 reid_client.py를 만드는 것으로 시작하겠습니다.
이제 아래 코드를 redis-client.py 파일에 추가해야 합니다. 데이터 게터와 데이터 세터를 만든 코드는 자명합니다. Redis는 문자열 데이터만 저장할 수 있고 JSONIFY에 필요한 문자열 데이터를 저장하기 위해 둘 다 JSON 데이터를 처리합니다.
import json
import redis
class RedisClient:
"""
Custom Redis client with all the wrapper funtions. This client implements FIFO for pipeline.
"""
connection = redis.Redis(host='localhost', port=6379, db=0)
key = 'DATA-PIPELINE-KEY'
def _convert_data_to_json(self, data):
try:
return json.dumps(data)
except Exception as e:
print(f'Failed to convert data into json with error: {e}')
raise e
def _convert_data_from_json(self, data):
try:
return json.loads(data)
except Exception as e:
print(f'Failed to convert data from json to dict with error: {e}')
raise e
def send_data_to_pipeline(self, data):
data = self._convert_data_to_json(data)
self.connection.lpush(self.key, data)
def get_data_from_pipeline(self):
try:
data = self.connection.rpop(self.key)
return self._convert_data_from_json(data)
except Exception as e:
print(f'Failed to get more data from pipeline with error: {e}')
이제 파이프라인을 만들었으므로 데이터 생산자 측에서 파이프라인에 데이터를 넣을 수 있습니다. 이를 위해 스크랩한 모든 항목을 Redis에 추가하고 나중에 사용하는 파이프라인을 scrapy에 생성해야 합니다. 아래 코드를 pipelines.py에 스크래피 프로젝트 파일에 추가하기만 하면 됩니다.
from pipeline.redis_client import RedisClient
class ProducerPipeline:
redis_client = RedisClient()
def process_item(self, item, spider):
self.redis_client.send_data_to_pipeline(item)
return item
ITEM_PIPELINES = {
'producer.pipelines.ProducerPipeline': 300,
}
소비자 구축 및 데이터 소비
데이터 소비와 독립적으로 데이터를 파이프라인에 계속 넣을 수 있는 파이프라인과 생산자를 구축했으므로 데이터 파이프라인에서 데이터를 가져와 프로젝트라고 부르는 필요에 따라 소비하는 데 필요한 모든 과정이 절반 이상 진행되었습니다. .
루트에 새 디렉터리를 만들고 이름을 consumer로 지정하고 quotes_consumer.py라는 이름으로 새 파일을 만듭니다.
quotes_consumer.py 파일에 아래 코드를 추가합니다. 코드는 파이프라인에서 새 데이터를 찾을 수 없는 경우 새 데이터를 확인하고 그렇지 않으면 잠자기 상태로 유지하며 우리의 경우 텍스트 파일에 따옴표를 저장합니다.
import time
from pipeline.redis_client import RedisClient
class QuotesConsumer:
redis_client = RedisClient()
sleep_seconds = 1
def run(self):
while True:
if self.redis_client.get_items_in_pipeline() == 0:
print(f'No new data in pipeline, sleeping for {self.sleep_seconds} seconds...')
time.sleep(self.sleep_seconds)
self.sleep_seconds += 1
continue
self.sleep_seconds = 1
data = self.redis_client.get_data_from_pipeline()
print(f'Obtained data from pipeline, saving to file...')
with open('quotes.txt', 'a+') as file:
file.write(data.get('quote'))
if __name__ == "__main__":
consumer = QuotesConsumer()
consumer.run()
다음 GitHub 리포지토리 에서 전체 코드를 찾을 수 있습니다 .