GCP क्लाउड फ़ंक्शन सही तरीके से PubSub संदेशों को उठा / स्वीकार नहीं कर रहा है
मेरे पास Google क्लाउड प्लेटफ़ॉर्म में स्थापित कुछ डेटा प्रोसेसिंग वर्कफ़्लोज़ हैं। ये स्थान भौतिक पते संसाधित करते हैं और उनके बारे में कुछ मैट्रिक्स लौटाते हैं। वर्कफ़्लोज़ क्लाउड फ़ंक्शंस और PubSub स्ट्रीम के संयोजन का उपयोग करते हैं।
वर्कफ़्लो में एक Google क्लाउड फ़ंक्शन के साथ, कुछ संदेश ट्रिगरिंग स्ट्रीम से नहीं उठाए जाते हैं या कई बार उठाए जाते हैं। मुझे पता है कि इसका कुछ स्तर अपेक्षित है। हालाँकि, यह बहुत हो रहा है। पर्याप्त है कि कुछ स्थानों के लिए 10x overstatements और कई दूसरों के लिए कोई परिणाम नहीं है।
मुझे लगता है कि callback
फ़ंक्शन संदेशों को सही तरीके से स्वीकार नहीं कर रहा है, लेकिन मुझे यकीन नहीं है कि संदेशों की अधिक विश्वसनीय पिक और पावती प्राप्त करने के लिए क्या अलग होना चाहिए। किसी भी सुझाव की सराहना की है।
मैट्रिक्स को पुनः प्राप्त करने के लिए मेरा GCP क्लाउड फ़ंक्शन एक PubSub स्ट्रीम द्वारा ट्रिगर किया गया है और retrieve_location
एक अलग PubSub स्ट्रीम में डेटा भेजने वाले फ़ंक्शन को निष्पादित करता है। retrieve_location
समारोह इस तरह दिखता है:
def retrieve_location(event, context):
auth_flow()
project_id = <my project id>
subscription_name = <my subscription name>
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project_id, subscription_name)
def callback(message):
message.ack()
message_obj = message.data
message_dcde = message_obj.decode('utf-8')
message_json = json.loads(message_dcde)
get_metrics(message_json)
subscriber.subscribe(subscription_path, callback=callback)
get_metrics
समारोह प्रत्येक संदेश से पेलोड ले जाता है, कुछ डेटा प्राप्त करता है और एक अन्य स्ट्रीम को भेज देता है। यह फ़ंक्शन अपेक्षा के अनुरूप काम करता है।
def get_metrics(loc):
<... retrieve and process data, my_data is the object that gets sent to the next stream ...>
project_id = <my project id>
topic_name = <my topic name>
topic_id = <my topic id>
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)
try:
publisher.publish(topic_path, data=my_data.encode('utf-8'))
except Exception as exc:
print("topic publish failed: ", exc)
जवाब
अपने क्लाउड फ़ंक्शन के अंदर एक दूसरा पब / सब सब्सक्राइबर स्थापित करने के बजाय, आपको एक पृष्ठभूमि फ़ंक्शन बनाना चाहिए जो उस विषय को सब्सक्राइब करता है जो सीधे पेलोड को संभालता है, जैसे:
def get_metrics_background_function(event, context):
message_obj = event.data
message_dcde = message_obj.decode('utf-8')
message_json = json.loads(message_dcde)
get_metrics(message_json)
ऐसा लगता है कि आप क्लाउड पब / सब क्लाइंट क्लाइंट लाइब्रेरी के माध्यम से सीधे पब / सब के उपयोग के साथ क्लाउड फ़ंक्शन को ट्रिगर करने के लिए क्लाउड पब / सब के उपयोग को भ्रमित कर रहे हैं। आम तौर पर, आप एक या दूसरे को करना चाहेंगे।
यदि आपके द्वारा बनाई गई सदस्यता क्लाउड फ़ंक्शंस के माध्यम से की गई थी, तो आप retrieve_location
फ़ंक्शन वास्तव में संदेश प्राप्त और प्रसंस्करण नहीं कर रहे हैं। इसके बजाय, यह क्या कर रहा है एक ग्राहक ग्राहक शुरू कर रहा है और कुछ ही समय बाद इस तथ्य को देखते हुए बंद कर रहा है कि subscriber.subscribe
बस पूरा होने के लिए चलेगा और इसलिए आपका कार्य पूरा होगा।
यदि यह फ़ंक्शन क्लाइंट को उसी सदस्यता के लिए शुरू कर रहा है जो क्लाउड फ़ंक्शन को ट्रिगर करता है, तो यह वास्तव में कुछ भी करने वाला नहीं है क्योंकि क्लाउड-फ़ंक्शन-आधारित सदस्यता पुश मॉडल का उपयोग करते हैं जबकि क्लाइंट लाइब्रेरी का उपयोग पुल मॉडल के साथ किया जाना चाहिए ।
आप या तो callback
सीधे retrieve_location
संदेश के रूप में ईवेंट का उपयोग करना चाहते हैं , संदेश के रूप में घटना का उपयोग कर रहे हैं (जैसा कि डस्टिन का वर्णन है), या आप ग्राहक पुस्तकालय के साथ एक स्थायी ग्राहक स्थापित करना चाहते हैं, उदाहरण के लिए, जीसीई पर, जो सब्सक्राइबर को कॉल और कॉल करता है subscribe
इस पर।