GCP Cloud Function no detecta o reconoce correctamente los mensajes de PubSub

Aug 18 2020

Tengo algunos flujos de trabajo de procesamiento de datos configurados en Google Cloud Platform. Estas ubicaciones procesan direcciones físicas y devuelven algunas métricas sobre ellas. Los flujos de trabajo usan combinaciones de flujos de Cloud Functions y PubSub.

Con una función de Google Cloud en el flujo de trabajo, algunos mensajes no se recogen del flujo de activación o se recogen varias veces. Sé que se espera cierto nivel de esto. Sin embargo, esto está sucediendo mucho. Suficiente que está causando sobrestimaciones 10x para algunas ubicaciones y ningún resultado para varias otras.

Creo que la callbackfunción no reconoce los mensajes correctamente, pero no estoy seguro de qué debería ser diferente para obtener una recepción y un reconocimiento de mensajes más confiables. Cualquier sugerencia es apreciada.

My GCP Cloud Function para recuperar métricas se activa mediante una transmisión de PubSub y ejecuta la retrieve_locationfunción que envía datos a una transmisión de PubSub diferente. La retrieve_locationfunción se ve así:

def retrieve_location(event, context):
    auth_flow()

    project_id = <my project id>
    subscription_name = <my subscription name>

    subscriber = pubsub_v1.SubscriberClient()

    subscription_path = subscriber.subscription_path(
        project_id, subscription_name)

    def callback(message):
        message.ack()
        message_obj = message.data
        message_dcde = message_obj.decode('utf-8')
        message_json = json.loads(message_dcde)

        get_metrics(message_json)


    subscriber.subscribe(subscription_path, callback=callback)

La get_metricsfunción toma la carga útil de cada mensaje, recupera algunos datos y los envía a otro flujo. Esta función parece funcionar como se esperaba.

def get_metrics(loc):
    <... retrieve and process data, my_data is the object that gets sent to the next stream ...>
          project_id = <my project id>
          topic_name = <my topic name>
          topic_id = <my topic id>

          publisher = pubsub_v1.PublisherClient()
          topic_path = publisher.topic_path(project_id, topic_name)

            try:
                publisher.publish(topic_path, data=my_data.encode('utf-8'))
            except Exception as exc:
                    print("topic publish failed: ", exc)

Respuestas

1 DustinIngram Aug 18 2020 at 01:43

En lugar de configurar un segundo suscriptor de Pub/Sub dentro de su Cloud Function, debe crear una función en segundo plano que esté suscrita a ese tema que maneje la carga útil directamente, por ejemplo:

def get_metrics_background_function(event, context):
    message_obj = event.data
    message_dcde = message_obj.decode('utf-8')
    message_json = json.loads(message_dcde)

    get_metrics(message_json)
1 KamalAboul-Hosn Aug 18 2020 at 02:53

Parece que podría estar combinando el uso de Cloud Pub/Sub para activar una función en la nube con el uso de Pub/Sub directamente a través de la biblioteca cliente de Cloud Pub/Sub. En general, querrás hacer una cosa o la otra.

Si la suscripción que creó se realizó a través de Cloud Functions, entonces su retrieve_locationfunción realmente no está recibiendo ni procesando mensajes. En cambio, lo que está haciendo es iniciar un cliente suscriptor y, poco después, cerrarlo dado el hecho de que subscriber.subscribese ejecutará hasta completarse y, por lo tanto, su función completará la ejecución.

Si esta función está iniciando un cliente en la misma suscripción que activa Cloud Function, entonces en realidad no hará nada porque las suscripciones basadas en Cloud Function usan el modelo de inserción , mientras que la biblioteca del cliente debe usarse con el modelo de extracción . .

O desea realizar la acción callbackdirectamente en retrieve_location, utilizando el evento como mensaje (como describe Dustin), o desea configurar un suscriptor persistente con la biblioteca del cliente, por ejemplo, en GCE, que crea una instancia del suscriptor y llama subscribeen eso.