GCP क्लाउड फ़ंक्शन सही तरीके से PubSub संदेशों को उठा / स्वीकार नहीं कर रहा है

Aug 18 2020

मेरे पास Google क्लाउड प्लेटफ़ॉर्म में स्थापित कुछ डेटा प्रोसेसिंग वर्कफ़्लोज़ हैं। ये स्थान भौतिक पते संसाधित करते हैं और उनके बारे में कुछ मैट्रिक्स लौटाते हैं। वर्कफ़्लोज़ क्लाउड फ़ंक्शंस और PubSub स्ट्रीम के संयोजन का उपयोग करते हैं।

वर्कफ़्लो में एक Google क्लाउड फ़ंक्शन के साथ, कुछ संदेश ट्रिगरिंग स्ट्रीम से नहीं उठाए जाते हैं या कई बार उठाए जाते हैं। मुझे पता है कि इसका कुछ स्तर अपेक्षित है। हालाँकि, यह बहुत हो रहा है। पर्याप्त है कि कुछ स्थानों के लिए 10x overstatements और कई दूसरों के लिए कोई परिणाम नहीं है।

मुझे लगता है कि callbackफ़ंक्शन संदेशों को सही तरीके से स्वीकार नहीं कर रहा है, लेकिन मुझे यकीन नहीं है कि संदेशों की अधिक विश्वसनीय पिक और पावती प्राप्त करने के लिए क्या अलग होना चाहिए। किसी भी सुझाव की सराहना की है।

मैट्रिक्स को पुनः प्राप्त करने के लिए मेरा GCP क्लाउड फ़ंक्शन एक PubSub स्ट्रीम द्वारा ट्रिगर किया गया है और retrieve_locationएक अलग PubSub स्ट्रीम में डेटा भेजने वाले फ़ंक्शन को निष्पादित करता है। retrieve_locationसमारोह इस तरह दिखता है:

def retrieve_location(event, context):
    auth_flow()

    project_id = <my project id>
    subscription_name = <my subscription name>

    subscriber = pubsub_v1.SubscriberClient()

    subscription_path = subscriber.subscription_path(
        project_id, subscription_name)

    def callback(message):
        message.ack()
        message_obj = message.data
        message_dcde = message_obj.decode('utf-8')
        message_json = json.loads(message_dcde)

        get_metrics(message_json)


    subscriber.subscribe(subscription_path, callback=callback)

get_metricsसमारोह प्रत्येक संदेश से पेलोड ले जाता है, कुछ डेटा प्राप्त करता है और एक अन्य स्ट्रीम को भेज देता है। यह फ़ंक्शन अपेक्षा के अनुरूप काम करता है।

def get_metrics(loc):
    <... retrieve and process data, my_data is the object that gets sent to the next stream ...>
          project_id = <my project id>
          topic_name = <my topic name>
          topic_id = <my topic id>

          publisher = pubsub_v1.PublisherClient()
          topic_path = publisher.topic_path(project_id, topic_name)

            try:
                publisher.publish(topic_path, data=my_data.encode('utf-8'))
            except Exception as exc:
                    print("topic publish failed: ", exc)

जवाब

1 DustinIngram Aug 18 2020 at 01:43

अपने क्लाउड फ़ंक्शन के अंदर एक दूसरा पब / सब सब्सक्राइबर स्थापित करने के बजाय, आपको एक पृष्ठभूमि फ़ंक्शन बनाना चाहिए जो उस विषय को सब्सक्राइब करता है जो सीधे पेलोड को संभालता है, जैसे:

def get_metrics_background_function(event, context):
    message_obj = event.data
    message_dcde = message_obj.decode('utf-8')
    message_json = json.loads(message_dcde)

    get_metrics(message_json)
1 KamalAboul-Hosn Aug 18 2020 at 02:53

ऐसा लगता है कि आप क्लाउड पब / सब क्लाइंट क्लाइंट लाइब्रेरी के माध्यम से सीधे पब / सब के उपयोग के साथ क्लाउड फ़ंक्शन को ट्रिगर करने के लिए क्लाउड पब / सब के उपयोग को भ्रमित कर रहे हैं। आम तौर पर, आप एक या दूसरे को करना चाहेंगे।

यदि आपके द्वारा बनाई गई सदस्यता क्लाउड फ़ंक्शंस के माध्यम से की गई थी, तो आप retrieve_locationफ़ंक्शन वास्तव में संदेश प्राप्त और प्रसंस्करण नहीं कर रहे हैं। इसके बजाय, यह क्या कर रहा है एक ग्राहक ग्राहक शुरू कर रहा है और कुछ ही समय बाद इस तथ्य को देखते हुए बंद कर रहा है कि subscriber.subscribeबस पूरा होने के लिए चलेगा और इसलिए आपका कार्य पूरा होगा।

यदि यह फ़ंक्शन क्लाइंट को उसी सदस्यता के लिए शुरू कर रहा है जो क्लाउड फ़ंक्शन को ट्रिगर करता है, तो यह वास्तव में कुछ भी करने वाला नहीं है क्योंकि क्लाउड-फ़ंक्शन-आधारित सदस्यता पुश मॉडल का उपयोग करते हैं जबकि क्लाइंट लाइब्रेरी का उपयोग पुल मॉडल के साथ किया जाना चाहिए ।

आप या तो callbackसीधे retrieve_locationसंदेश के रूप में ईवेंट का उपयोग करना चाहते हैं , संदेश के रूप में घटना का उपयोग कर रहे हैं (जैसा कि डस्टिन का वर्णन है), या आप ग्राहक पुस्तकालय के साथ एक स्थायी ग्राहक स्थापित करना चाहते हैं, उदाहरण के लिए, जीसीई पर, जो सब्सक्राइबर को कॉल और कॉल करता है subscribeइस पर।