GCP Cloud Function non raccoglie/riconosce correttamente i messaggi PubSub
Ho alcuni flussi di lavoro per l'elaborazione dei dati configurati in Google Cloud Platform. Queste posizioni elaborano gli indirizzi fisici e restituiscono alcune metriche su di essi. I flussi di lavoro utilizzano combinazioni di Cloud Functions e flussi PubSub.
Con una funzione Google Cloud nel flusso di lavoro, alcuni messaggi non vengono prelevati dal flusso di attivazione o vengono prelevati più volte. So che un certo livello di questo è previsto. Tuttavia, questo sta accadendo molto. Abbastanza che sta causando sopravvalutazioni 10 volte per alcune località e nessun risultato per molte altre.
Penso che la callback
funzione non riconosca correttamente i messaggi, ma non sono sicuro di cosa dovrebbe essere diverso per ottenere una raccolta e un riconoscimento dei messaggi più affidabili. Tutti i suggerimenti sono apprezzati.
La mia funzione GCP Cloud per recuperare le metriche viene attivata da un flusso PubSub ed esegue la retrieve_location
funzione inviando i dati a un flusso PubSub diverso. La retrieve_location
funzione è simile a questa:
def retrieve_location(event, context):
auth_flow()
project_id = <my project id>
subscription_name = <my subscription name>
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project_id, subscription_name)
def callback(message):
message.ack()
message_obj = message.data
message_dcde = message_obj.decode('utf-8')
message_json = json.loads(message_dcde)
get_metrics(message_json)
subscriber.subscribe(subscription_path, callback=callback)
La get_metrics
funzione prende il payload da ciascun messaggio, recupera alcuni dati e li invia a un altro flusso. Questa funzione sembra funzionare come previsto.
def get_metrics(loc):
<... retrieve and process data, my_data is the object that gets sent to the next stream ...>
project_id = <my project id>
topic_name = <my topic name>
topic_id = <my topic id>
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)
try:
publisher.publish(topic_path, data=my_data.encode('utf-8'))
except Exception as exc:
print("topic publish failed: ", exc)
Risposte
Invece di configurare un secondo abbonato Pub/Sub all'interno della tua Funzione Cloud, dovresti creare una funzione in background che sia iscritta a quell'argomento che gestisca direttamente il payload, ad esempio:
def get_metrics_background_function(event, context):
message_obj = event.data
message_dcde = message_obj.decode('utf-8')
message_json = json.loads(message_dcde)
get_metrics(message_json)
Sembra che tu stia confondendo l'uso di Cloud Pub/Sub per attivare una funzione Cloud con l'uso di Pub/Sub direttamente tramite la libreria del client Cloud Pub/Sub. In genere, vorresti fare l'uno o l'altro.
Se l'abbonamento che hai creato è stato effettuato tramite Cloud Functions, allora la tua retrieve_location
funzione non sta realmente ricevendo ed elaborando messaggi. Invece, ciò che sta facendo è avviare un client abbonato e poco dopo chiuderlo dato che subscriber.subscribe
verrà eseguito fino al completamento e quindi la tua funzione completerà l'esecuzione.
Se questa funzione sta avviando un client alla stessa sottoscrizione che attiva la funzione cloud, in realtà non farà nulla perché le sottoscrizioni basate sulla funzione cloud utilizzano il modello push mentre la libreria client deve essere utilizzata con il modello pull .
O vuoi eseguire l'azione callback
direttamente in retrieve_location
, usando l'evento come messaggio (come descrive Dustin), o vorrai configurare un abbonato persistente con la libreria client, ad esempio, su GCE, che istanzia l'abbonato e chiama subscribe
su di esso.