La fonction GCP Cloud ne capte pas/n'accuse pas correctement les messages PubSub
Certains workflows de traitement de données sont configurés dans Google Cloud Platform. Ces emplacements traitent les adresses physiques et renvoient des métriques à leur sujet. Les workflows utilisent des combinaisons de flux Cloud Functions et PubSub.
Avec une fonction Google Cloud dans le workflow, certains messages ne sont pas récupérés à partir du flux de déclenchement ou sont récupérés plusieurs fois. Je sais qu'un certain niveau de cela est attendu. Cependant, cela se produit beaucoup. Assez qui cause des surestimations 10x pour certains endroits et aucun résultat pour plusieurs autres.
Je pense que la callback
fonction ne reconnaît pas correctement les messages, mais je ne sais pas ce qui devrait être différent pour obtenir une détection et une reconnaissance des messages plus fiables. Toutes les suggestions sont appréciées.
Ma fonction GCP Cloud pour récupérer des métriques est déclenchée par un flux PubSub et exécute la retrieve_location
fonction en envoyant des données à un autre flux PubSub. La retrieve_location
fonction ressemble à ceci :
def retrieve_location(event, context):
auth_flow()
project_id = <my project id>
subscription_name = <my subscription name>
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project_id, subscription_name)
def callback(message):
message.ack()
message_obj = message.data
message_dcde = message_obj.decode('utf-8')
message_json = json.loads(message_dcde)
get_metrics(message_json)
subscriber.subscribe(subscription_path, callback=callback)
La get_metrics
fonction prend la charge utile de chaque message, récupère certaines données et les envoie à un autre flux. Cette fonction semble fonctionner comme prévu.
def get_metrics(loc):
<... retrieve and process data, my_data is the object that gets sent to the next stream ...>
project_id = <my project id>
topic_name = <my topic name>
topic_id = <my topic id>
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)
try:
publisher.publish(topic_path, data=my_data.encode('utf-8'))
except Exception as exc:
print("topic publish failed: ", exc)
Réponses
Au lieu de configurer un deuxième abonné Pub/Sub dans votre fonction Cloud, vous devez créer une fonction d'arrière -plan abonnée à ce sujet qui gère directement la charge utile, par exemple :
def get_metrics_background_function(event, context):
message_obj = event.data
message_dcde = message_obj.decode('utf-8')
message_json = json.loads(message_dcde)
get_metrics(message_json)
Il semble que vous confondiez l'utilisation de Cloud Pub/Sub pour déclencher une fonction Cloud avec l'utilisation de Pub/Sub directement via la bibliothèque cliente Cloud Pub/Sub. Généralement, vous voudriez faire l'un ou l'autre.
Si l'abonnement que vous avez créé a été effectué via Cloud Functions, votre retrieve_location
fonction ne reçoit et ne traite pas vraiment les messages. Au lieu de cela, ce qu'il fait est de démarrer un client abonné et de s'arrêter peu de temps après étant donné qu'il subscriber.subscribe
s'exécutera jusqu'à la fin et que votre fonction terminera donc son exécution.
Si cette fonction démarre un client sur le même abonnement qui déclenche la fonction Cloud, elle ne fera rien, car les abonnements basés sur Cloud-Function utilisent le modèle push alors que la bibliothèque cliente doit être utilisée avec le modèle pull . .
Soit vous souhaitez effectuer l'action callback
directement dans retrieve_location
, en utilisant l'événement comme message (comme le décrit Dustin), soit vous souhaitez configurer un abonné persistant avec la bibliothèque cliente, par exemple sur GCE, qui instancie l'abonné et appelle subscribe
dessus.