ฟังก์ชัน GCP Cloud ไม่สามารถรับ / รับทราบข้อความ PubSub ได้อย่างถูกต้อง

Aug 18 2020

ฉันตั้งค่าเวิร์กโฟลว์การประมวลผลข้อมูลไว้ใน Google Cloud Platform สถานที่เหล่านี้ประมวลผลที่อยู่ทางกายภาพและส่งคืนเมตริกบางอย่างเกี่ยวกับพวกเขา เวิร์กโฟลว์ใช้การผสมผสานระหว่าง Cloud Functions และ PubSub

ด้วยฟังก์ชัน Google Cloud หนึ่งฟังก์ชันในเวิร์กโฟลว์ข้อความบางข้อความจะไม่ถูกดึงขึ้นมาจากสตรีมที่ทริกเกอร์หรือถูกหยิบขึ้นมาหลายครั้ง ฉันรู้ระดับหนึ่งของสิ่งนี้คาดหวัง อย่างไรก็ตามสิ่งนี้เกิดขึ้นมากมาย เพียงพอที่จะทำให้เกิดการพูดเกินจริง 10 เท่าสำหรับบางสถานที่และไม่มีผลลัพธ์สำหรับบางพื้นที่

ฉันคิดว่าcallbackฟังก์ชั่นนี้ไม่ยอมรับข้อความอย่างถูกต้อง แต่ฉันไม่แน่ใจว่าอะไรควรจะแตกต่างกันเพื่อให้ได้รับข้อมูลที่เชื่อถือได้มากขึ้นและรับทราบข้อความ ข้อเสนอแนะใด ๆ จะได้รับการชื่นชม

ฟังก์ชัน GCP Cloud ของฉันในการดึงข้อมูลเมตริกจะถูกทริกเกอร์โดยสตรีม PubSub และเรียกใช้retrieve_locationฟังก์ชันที่ส่งข้อมูลไปยังสตรีม PubSub อื่น retrieve_locationฟังก์ชั่นมีลักษณะเช่นนี้

def retrieve_location(event, context):
    auth_flow()

    project_id = <my project id>
    subscription_name = <my subscription name>

    subscriber = pubsub_v1.SubscriberClient()

    subscription_path = subscriber.subscription_path(
        project_id, subscription_name)

    def callback(message):
        message.ack()
        message_obj = message.data
        message_dcde = message_obj.decode('utf-8')
        message_json = json.loads(message_dcde)

        get_metrics(message_json)


    subscriber.subscribe(subscription_path, callback=callback)

get_metricsฟังก์ชั่นใช้เวลาส่วนของข้อมูลจากแต่ละข้อความดึงข้อมูลบางส่วนและส่งไปยังกระแสอื่น ดูเหมือนว่าฟังก์ชันนี้จะทำงานได้ตามที่คาดไว้

def get_metrics(loc):
    <... retrieve and process data, my_data is the object that gets sent to the next stream ...>
          project_id = <my project id>
          topic_name = <my topic name>
          topic_id = <my topic id>

          publisher = pubsub_v1.PublisherClient()
          topic_path = publisher.topic_path(project_id, topic_name)

            try:
                publisher.publish(topic_path, data=my_data.encode('utf-8'))
            except Exception as exc:
                    print("topic publish failed: ", exc)

คำตอบ

1 DustinIngram Aug 18 2020 at 01:43

แทนที่จะตั้งค่าสมาชิก Pub / Sub คนที่สองภายในฟังก์ชันคลาวด์ของคุณคุณควรสร้างฟังก์ชันพื้นหลังที่สมัครเป็นสมาชิกของหัวข้อนั้นซึ่งจัดการเพย์โหลดโดยตรงเช่น:

def get_metrics_background_function(event, context):
    message_obj = event.data
    message_dcde = message_obj.decode('utf-8')
    message_json = json.loads(message_dcde)

    get_metrics(message_json)
1 KamalAboul-Hosn Aug 18 2020 at 02:53

ดูเหมือนว่าคุณอาจรวมการใช้ Cloud Pub / Sub เพื่อทริกเกอร์ฟังก์ชันระบบคลาวด์ด้วยการใช้ Pub / Sub โดยตรงผ่านไลบรารีไคลเอ็นต์ Cloud Pub / Sub โดยทั่วไปคุณจะต้องทำอย่างใดอย่างหนึ่ง

หากการสมัครสมาชิกที่คุณสร้างเสร็จสิ้นผ่าน Cloud Functions แสดงว่าretrieve_locationฟังก์ชันของคุณไม่ได้รับและประมวลผลข้อความจริงๆ แต่สิ่งที่กำลังทำคือการเริ่มต้นไคลเอนต์สมาชิกและหลังจากนั้นไม่นานก็ปิดตัวลงเนื่องจากข้อเท็จจริงที่ว่าsubscriber.subscribeจะทำงานจนเสร็จสมบูรณ์ดังนั้นฟังก์ชันของคุณจะเสร็จสิ้นการดำเนินการ

หากฟังก์ชั่นนี้เริ่มต้นไคลเอนต์ไปยังการสมัครสมาชิกเดียวกันกับที่ทริกเกอร์ฟังก์ชันคลาวด์มันจะไม่ทำอะไรเลยเพราะการสมัครใช้งานบนคลาวด์ฟังก์ชั่นใช้โมเดลพุชในขณะที่ไลบรารีไคลเอ็นต์ควรใช้กับโมเดลดึง .

คุณต้องการดำเนินการในcallbackโดยตรงretrieve_locationโดยใช้เหตุการณ์เป็นข้อความ (ตามที่ดัสตินอธิบาย) หรือคุณต้องการตั้งค่าสมาชิกถาวรด้วยไลบรารีไคลเอ็นต์เช่นบน GCE ที่สร้างอินสแตนซ์ผู้สมัครสมาชิกและการโทรsubscribeกับมัน