GCP Cloud Function non raccoglie/riconosce correttamente i messaggi PubSub

Aug 18 2020

Ho alcuni flussi di lavoro per l'elaborazione dei dati configurati in Google Cloud Platform. Queste posizioni elaborano gli indirizzi fisici e restituiscono alcune metriche su di essi. I flussi di lavoro utilizzano combinazioni di Cloud Functions e flussi PubSub.

Con una funzione Google Cloud nel flusso di lavoro, alcuni messaggi non vengono prelevati dal flusso di attivazione o vengono prelevati più volte. So che un certo livello di questo è previsto. Tuttavia, questo sta accadendo molto. Abbastanza che sta causando sopravvalutazioni 10 volte per alcune località e nessun risultato per molte altre.

Penso che la callbackfunzione non riconosca correttamente i messaggi, ma non sono sicuro di cosa dovrebbe essere diverso per ottenere una raccolta e un riconoscimento dei messaggi più affidabili. Tutti i suggerimenti sono apprezzati.

La mia funzione GCP Cloud per recuperare le metriche viene attivata da un flusso PubSub ed esegue la retrieve_locationfunzione inviando i dati a un flusso PubSub diverso. La retrieve_locationfunzione è simile a questa:

def retrieve_location(event, context):
    auth_flow()

    project_id = <my project id>
    subscription_name = <my subscription name>

    subscriber = pubsub_v1.SubscriberClient()

    subscription_path = subscriber.subscription_path(
        project_id, subscription_name)

    def callback(message):
        message.ack()
        message_obj = message.data
        message_dcde = message_obj.decode('utf-8')
        message_json = json.loads(message_dcde)

        get_metrics(message_json)


    subscriber.subscribe(subscription_path, callback=callback)

La get_metricsfunzione prende il payload da ciascun messaggio, recupera alcuni dati e li invia a un altro flusso. Questa funzione sembra funzionare come previsto.

def get_metrics(loc):
    <... retrieve and process data, my_data is the object that gets sent to the next stream ...>
          project_id = <my project id>
          topic_name = <my topic name>
          topic_id = <my topic id>

          publisher = pubsub_v1.PublisherClient()
          topic_path = publisher.topic_path(project_id, topic_name)

            try:
                publisher.publish(topic_path, data=my_data.encode('utf-8'))
            except Exception as exc:
                    print("topic publish failed: ", exc)

Risposte

1 DustinIngram Aug 18 2020 at 01:43

Invece di configurare un secondo abbonato Pub/Sub all'interno della tua Funzione Cloud, dovresti creare una funzione in background che sia iscritta a quell'argomento che gestisca direttamente il payload, ad esempio:

def get_metrics_background_function(event, context):
    message_obj = event.data
    message_dcde = message_obj.decode('utf-8')
    message_json = json.loads(message_dcde)

    get_metrics(message_json)
1 KamalAboul-Hosn Aug 18 2020 at 02:53

Sembra che tu stia confondendo l'uso di Cloud Pub/Sub per attivare una funzione Cloud con l'uso di Pub/Sub direttamente tramite la libreria del client Cloud Pub/Sub. In genere, vorresti fare l'uno o l'altro.

Se l'abbonamento che hai creato è stato effettuato tramite Cloud Functions, allora la tua retrieve_locationfunzione non sta realmente ricevendo ed elaborando messaggi. Invece, ciò che sta facendo è avviare un client abbonato e poco dopo chiuderlo dato che subscriber.subscribeverrà eseguito fino al completamento e quindi la tua funzione completerà l'esecuzione.

Se questa funzione sta avviando un client alla stessa sottoscrizione che attiva la funzione cloud, in realtà non farà nulla perché le sottoscrizioni basate sulla funzione cloud utilizzano il modello push mentre la libreria client deve essere utilizzata con il modello pull .

O vuoi eseguire l'azione callbackdirettamente in retrieve_location, usando l'evento come messaggio (come descrive Dustin), o vorrai configurare un abbonato persistente con la libreria client, ad esempio, su GCE, che istanzia l'abbonato e chiama subscribesu di esso.