O GCP Cloud Function não está captando/reconhecendo corretamente as mensagens do PubSub
Tenho alguns fluxos de trabalho de processamento de dados configurados no Google Cloud Platform. Esses locais processam endereços físicos e retornam algumas métricas sobre eles. Os fluxos de trabalho usam combinações de Cloud Functions e streams PubSub.
Com uma função do Google Cloud no fluxo de trabalho, algumas mensagens não são coletadas do fluxo de acionamento ou são coletadas várias vezes. Eu sei que algum nível disso é esperado. No entanto, isso está acontecendo muito. Chega de causar exageros de 10x para alguns locais e nenhum resultado para vários outros.
Acho que a callback
função não está reconhecendo as mensagens corretamente, mas não tenho certeza do que deveria ser diferente para obter uma coleta e confirmação de mensagens mais confiáveis. Todas as sugestões são apreciadas.
Minha função de nuvem GCP para recuperar métricas é acionada por um fluxo PubSub e executa a retrieve_location
função enviando dados para um fluxo PubSub diferente. A retrieve_location
função fica assim:
def retrieve_location(event, context):
auth_flow()
project_id = <my project id>
subscription_name = <my subscription name>
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project_id, subscription_name)
def callback(message):
message.ack()
message_obj = message.data
message_dcde = message_obj.decode('utf-8')
message_json = json.loads(message_dcde)
get_metrics(message_json)
subscriber.subscribe(subscription_path, callback=callback)
A get_metrics
função pega a carga útil de cada mensagem, recupera alguns dados e os envia para outro fluxo. Esta função parece funcionar como esperado.
def get_metrics(loc):
<... retrieve and process data, my_data is the object that gets sent to the next stream ...>
project_id = <my project id>
topic_name = <my topic name>
topic_id = <my topic id>
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)
try:
publisher.publish(topic_path, data=my_data.encode('utf-8'))
except Exception as exc:
print("topic publish failed: ", exc)
Respostas
Em vez de configurar um segundo assinante do Pub/Sub dentro de sua Função do Cloud, você deve criar uma função em segundo plano inscrita naquele tópico que lida com a carga diretamente, por exemplo:
def get_metrics_background_function(event, context):
message_obj = event.data
message_dcde = message_obj.decode('utf-8')
message_json = json.loads(message_dcde)
get_metrics(message_json)
Parece que você pode estar confundindo o uso do Cloud Pub/Sub para acionar uma Função do Cloud com o uso do Pub/Sub diretamente por meio da biblioteca cliente do Cloud Pub/Sub. Geralmente, você gostaria de fazer um ou outro.
Se a assinatura que você criou foi feita por meio do Cloud Functions, sua retrieve_location
função não está realmente recebendo e processando mensagens. Em vez disso, o que ele está fazendo é iniciar um cliente de assinante e, logo em seguida, desligar, devido ao fato de que subscriber.subscribe
será executado até a conclusão e, portanto, sua função concluirá a execução.
Se esta função estiver inicializando um cliente para a mesma assinatura que aciona a Função do Cloud, ela não fará nada porque as assinaturas baseadas no Cloud Function usam o modelo push , enquanto a biblioteca do cliente deve ser usada com o modelo pull .
Você quer executar a ação callback
diretamente em retrieve_location
, usando o evento como a mensagem (como Dustin descreve), ou você deseja configurar um assinante persistente com a biblioteca do cliente, por exemplo, no GCE, que instancia o assinante e chama subscribe
nele.