GCP Bulut İşlevi, PubSub mesajlarını doğru şekilde almıyor / onaylamıyor

Aug 18 2020

Google Cloud Platform'da oluşturulmuş bazı veri işleme iş akışlarım var. Bu konumlar fiziksel adresleri işler ve bunlarla ilgili bazı ölçümler döndürür. İş akışları, Cloud Functions ve PubSub akışlarının kombinasyonlarını kullanır.

İş akışındaki bir Google Cloud İşlevi ile bazı mesajlar tetikleyici akıştan alınmaz veya birden çok kez alınır. Bunun bir miktar beklendiğini biliyorum. Ancak, bu çok oluyor. Yeterince bu, bazı konumlar için 10 kat fazla ifadelere neden olurken, diğerleri için sonuç alınamıyor.

callbackİşlevin mesajları doğru bir şekilde kabul etmediğini düşünüyorum, ancak mesajların daha güvenilir bir şekilde alınması ve onaylanması için neyin farklı olması gerektiğinden emin değilim. Herhangi bir öneri takdir edilmektedir.

Metrikleri almaya yönelik GCP Bulut İşlevim, bir PubSub akışı tarafından tetiklenir ve retrieve_locationfarklı bir PubSub akışına veri gönderen işlevi çalıştırır . retrieve_locationFonksiyonu aşağıdaki gibidir:

def retrieve_location(event, context):
    auth_flow()

    project_id = <my project id>
    subscription_name = <my subscription name>

    subscriber = pubsub_v1.SubscriberClient()

    subscription_path = subscriber.subscription_path(
        project_id, subscription_name)

    def callback(message):
        message.ack()
        message_obj = message.data
        message_dcde = message_obj.decode('utf-8')
        message_json = json.loads(message_dcde)

        get_metrics(message_json)


    subscriber.subscribe(subscription_path, callback=callback)

get_metricsFonksiyon bazı verileri alır, her bir mesajın gelen yük alır ve başka bir akışa gönderir. Bu işlev beklendiği gibi çalışıyor gibi görünüyor.

def get_metrics(loc):
    <... retrieve and process data, my_data is the object that gets sent to the next stream ...>
          project_id = <my project id>
          topic_name = <my topic name>
          topic_id = <my topic id>

          publisher = pubsub_v1.PublisherClient()
          topic_path = publisher.topic_path(project_id, topic_name)

            try:
                publisher.publish(topic_path, data=my_data.encode('utf-8'))
            except Exception as exc:
                    print("topic publish failed: ", exc)

Yanıtlar

1 DustinIngram Aug 18 2020 at 01:43

Bulut İşlevinizin içinde ikinci bir Pub / Sub abonesi kurmak yerine, bu konuya abone olan ve yükü doğrudan işleyen bir arka plan işlevi oluşturmalısınız , örneğin:

def get_metrics_background_function(event, context):
    message_obj = event.data
    message_dcde = message_obj.decode('utf-8')
    message_json = json.loads(message_dcde)

    get_metrics(message_json)
1 KamalAboul-Hosn Aug 18 2020 at 02:53

Bir Cloud Function'ı tetiklemek için Cloud Pub / Sub kullanımını, doğrudan Cloud Pub / Sub istemci kitaplığı aracılığıyla Pub / Sub kullanımıyla karıştırıyor olabilirsiniz. Genellikle birini ya da diğerini yapmak istersiniz.

Oluşturduğunuz abonelik Cloud Functions aracılığıyla yapıldıysa, retrieve_locationişleviniz gerçekten mesaj almıyor ve işlemiyor demektir. Bunun yerine, yaptığı şey, bir abone istemcisi başlatmak ve kısa bir süre sonra subscriber.subscribe, sadece tamamlanana kadar çalışacağı ve bu nedenle işlevinizin yürütmeyi tamamlayacağı gerçeği göz önüne alındığında, kapatılmasıdır .

Bu işlev bir istemciyi Bulut İşlevini tetikleyen aynı aboneliğe başlatıyorsa, aslında hiçbir şey yapmayacaktır çünkü Bulut İşlevine dayalı abonelikler itme modelini kullanırken istemci kitaplığı çekme modeliyle birlikte kullanılmalıdır .

İşlemi callbackdoğrudan içinde retrieve_location, olayı mesaj olarak kullanarak (Dustin'in açıkladığı gibi) içinde gerçekleştirmek veya müşteri kitaplığıyla, örneğin GCE'de aboneyi ve çağrıları başlatan kalıcı bir abone oluşturmak isteyeceksiniz. subscribeüstünde.