O GCP Cloud Function não está captando/reconhecendo corretamente as mensagens do PubSub

Aug 18 2020

Tenho alguns fluxos de trabalho de processamento de dados configurados no Google Cloud Platform. Esses locais processam endereços físicos e retornam algumas métricas sobre eles. Os fluxos de trabalho usam combinações de Cloud Functions e streams PubSub.

Com uma função do Google Cloud no fluxo de trabalho, algumas mensagens não são coletadas do fluxo de acionamento ou são coletadas várias vezes. Eu sei que algum nível disso é esperado. No entanto, isso está acontecendo muito. Chega de causar exageros de 10x para alguns locais e nenhum resultado para vários outros.

Acho que a callbackfunção não está reconhecendo as mensagens corretamente, mas não tenho certeza do que deveria ser diferente para obter uma coleta e confirmação de mensagens mais confiáveis. Todas as sugestões são apreciadas.

Minha função de nuvem GCP para recuperar métricas é acionada por um fluxo PubSub e executa a retrieve_locationfunção enviando dados para um fluxo PubSub diferente. A retrieve_locationfunção fica assim:

def retrieve_location(event, context):
    auth_flow()

    project_id = <my project id>
    subscription_name = <my subscription name>

    subscriber = pubsub_v1.SubscriberClient()

    subscription_path = subscriber.subscription_path(
        project_id, subscription_name)

    def callback(message):
        message.ack()
        message_obj = message.data
        message_dcde = message_obj.decode('utf-8')
        message_json = json.loads(message_dcde)

        get_metrics(message_json)


    subscriber.subscribe(subscription_path, callback=callback)

A get_metricsfunção pega a carga útil de cada mensagem, recupera alguns dados e os envia para outro fluxo. Esta função parece funcionar como esperado.

def get_metrics(loc):
    <... retrieve and process data, my_data is the object that gets sent to the next stream ...>
          project_id = <my project id>
          topic_name = <my topic name>
          topic_id = <my topic id>

          publisher = pubsub_v1.PublisherClient()
          topic_path = publisher.topic_path(project_id, topic_name)

            try:
                publisher.publish(topic_path, data=my_data.encode('utf-8'))
            except Exception as exc:
                    print("topic publish failed: ", exc)

Respostas

1 DustinIngram Aug 18 2020 at 01:43

Em vez de configurar um segundo assinante do Pub/Sub dentro de sua Função do Cloud, você deve criar uma função em segundo plano inscrita naquele tópico que lida com a carga diretamente, por exemplo:

def get_metrics_background_function(event, context):
    message_obj = event.data
    message_dcde = message_obj.decode('utf-8')
    message_json = json.loads(message_dcde)

    get_metrics(message_json)
1 KamalAboul-Hosn Aug 18 2020 at 02:53

Parece que você pode estar confundindo o uso do Cloud Pub/Sub para acionar uma Função do Cloud com o uso do Pub/Sub diretamente por meio da biblioteca cliente do Cloud Pub/Sub. Geralmente, você gostaria de fazer um ou outro.

Se a assinatura que você criou foi feita por meio do Cloud Functions, sua retrieve_locationfunção não está realmente recebendo e processando mensagens. Em vez disso, o que ele está fazendo é iniciar um cliente de assinante e, logo em seguida, desligar, devido ao fato de que subscriber.subscribeserá executado até a conclusão e, portanto, sua função concluirá a execução.

Se esta função estiver inicializando um cliente para a mesma assinatura que aciona a Função do Cloud, ela não fará nada porque as assinaturas baseadas no Cloud Function usam o modelo push , enquanto a biblioteca do cliente deve ser usada com o modelo pull .

Você quer executar a ação callbackdiretamente em retrieve_location, usando o evento como a mensagem (como Dustin descreve), ou você deseja configurar um assinante persistente com a biblioteca do cliente, por exemplo, no GCE, que instancia o assinante e chama subscribenele.