GCP Cloud 함수가 PubSub 메시지를 올바르게 선택 / 승인하지 않음
Google Cloud Platform에 몇 가지 데이터 처리 워크 플로가 설정되어 있습니다. 이러한 위치는 물리적 주소를 처리하고 이에 대한 일부 메트릭을 반환합니다. 워크 플로는 Cloud Functions와 PubSub 스트림의 조합을 사용합니다.
워크 플로에 Google Cloud 함수가 하나 있으면 일부 메시지가 트리거되는 스트림에서 선택되지 않거나 여러 번 선택됩니다. 나는 이것이 어느 정도 예상된다는 것을 알고 있습니다. 그러나 이것은 많이 일어나고 있습니다. 이로 인해 일부 위치에서는 10 배의 과장이 발생하고 다른 위치에서는 결과가 없습니다.
나는 생각 callback
기능이 올바르게 메시지를 인정하지 않고, 나는 확실히 메시지보다 안정적인 픽업 및 승인을 얻을 다를 수해야하는지 모르겠어요. 어떤 제안이라도 감사합니다.
측정 항목을 검색하는 내 GCP Cloud 함수는 PubSub 스트림에 의해 트리거되고 retrieve_location
다른 PubSub 스트림에 데이터를 전송 하는 함수를 실행합니다 . retrieve_location
기능은 다음과 같습니다 :
def retrieve_location(event, context):
auth_flow()
project_id = <my project id>
subscription_name = <my subscription name>
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project_id, subscription_name)
def callback(message):
message.ack()
message_obj = message.data
message_dcde = message_obj.decode('utf-8')
message_json = json.loads(message_dcde)
get_metrics(message_json)
subscriber.subscribe(subscription_path, callback=callback)
이 get_metrics
함수는 각 메시지에서 페이로드를 가져 와서 일부 데이터를 검색하여 다른 스트림으로 보냅니다. 이 기능은 예상대로 작동하는 것 같습니다.
def get_metrics(loc):
<... retrieve and process data, my_data is the object that gets sent to the next stream ...>
project_id = <my project id>
topic_name = <my topic name>
topic_id = <my topic id>
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)
try:
publisher.publish(topic_path, data=my_data.encode('utf-8'))
except Exception as exc:
print("topic publish failed: ", exc)
답변
Cloud 함수 내에 두 번째 Pub / Sub 구독자를 설정하는 대신 페이로드를 직접 처리하는 해당 주제를 구독 하는 백그라운드 함수 를 만들어야합니다 . 예 :
def get_metrics_background_function(event, context):
message_obj = event.data
message_dcde = message_obj.decode('utf-8')
message_json = json.loads(message_dcde)
get_metrics(message_json)
Cloud Pub / Sub 클라이언트 라이브러리를 통해 직접 Pub / Sub를 사용하여 Cloud 함수를 트리거하기 위해 Cloud Pub / Sub 사용을 병합하는 것 같습니다. 일반적으로 둘 중 하나를 수행 할 수 있습니다.
생성 한 구독이 Cloud Functions를 통해 완료된 경우 retrieve_location
함수가 실제로 메시지를 수신하고 처리하지 않습니다. 대신, 구독자 클라이언트를 시작하고 subscriber.subscribe
완료 될 때까지 실행되고 따라서 함수가 실행을 완료 한다는 사실을 감안할 때 곧 종료됩니다 .
이 함수가 Cloud 함수를 트리거하는 동일한 구독에 대해 클라이언트를 시작하는 경우 Cloud-Function 기반 구독은 푸시 모델을 사용하고 클라이언트 라이브러리는 가져 오기 모델 과 함께 사용해야 하기 때문에 실제로 아무 작업도 수행하지 않습니다. .
이벤트를 메시지로 사용하여 에서 callback
직접 작업을 수행하거나 retrieve_location
(Dustin이 설명하는대로) 클라이언트 라이브러리 (예 : GCE에서 구독자를 인스턴스화하고 호출)를 사용하여 영구 구독자를 설정하려고합니다. subscribe
그 위에.