Облачная функция GCP некорректно принимает / подтверждает сообщения PubSub
У меня есть несколько рабочих процессов обработки данных, настроенных в Google Cloud Platform. Эти местоположения обрабатывают физические адреса и возвращают некоторые показатели о них. В рабочих процессах используются комбинации облачных функций и потоков PubSub.
С одной функцией Google Cloud в рабочем процессе некоторые сообщения не принимаются из потока запуска или принимаются несколько раз. Я знаю, что некоторый уровень этого ожидается. Однако это происходит очень часто. Достаточно того, что для одних мест завышение в 10 раз, а для других - нет.
Я думаю, что callback
функция не распознает сообщения правильно, но я не уверен, что должно отличаться, чтобы получить более надежный прием и подтверждение сообщений. Любые предложения приветствуются.
Моя облачная функция GCP для получения показателей запускается потоком PubSub и выполняет retrieve_location
функцию отправки данных в другой поток PubSub. retrieve_location
Функция выглядит следующим образом :
def retrieve_location(event, context):
auth_flow()
project_id = <my project id>
subscription_name = <my subscription name>
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project_id, subscription_name)
def callback(message):
message.ack()
message_obj = message.data
message_dcde = message_obj.decode('utf-8')
message_json = json.loads(message_dcde)
get_metrics(message_json)
subscriber.subscribe(subscription_path, callback=callback)
get_metrics
Функция принимает полезную нагрузку от каждого сообщения, извлекают некоторые данные и отправляет его в другой поток. Кажется, эта функция работает должным образом.
def get_metrics(loc):
<... retrieve and process data, my_data is the object that gets sent to the next stream ...>
project_id = <my project id>
topic_name = <my topic name>
topic_id = <my topic id>
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)
try:
publisher.publish(topic_path, data=my_data.encode('utf-8'))
except Exception as exc:
print("topic publish failed: ", exc)
Ответы
Вместо того, чтобы настраивать второго подписчика Pub / Subscriber внутри вашей облачной функции, вы должны создать фоновую функцию, которая подписана на эту тему, которая напрямую обрабатывает полезную нагрузку, например:
def get_metrics_background_function(event, context):
message_obj = event.data
message_dcde = message_obj.decode('utf-8')
message_json = json.loads(message_dcde)
get_metrics(message_json)
Похоже, вы можете объединить использование Cloud Pub / Sub для запуска облачной функции с использованием Pub / Sub напрямую через клиентскую библиотеку Cloud Pub / Sub. Как правило, вам нужно сделать одно или другое.
Если созданная вами подписка была создана с помощью облачных функций, значит, вы на retrieve_location
самом деле не получаете и не обрабатываете сообщения. Вместо этого он subscriber.subscribe
запускает клиента-подписчика и вскоре после этого завершает работу, учитывая тот факт, что он просто завершится, и, следовательно, ваша функция завершит выполнение.
Если эта функция запускает клиента для той же подписки, которая запускает облачную функцию, то на самом деле она ничего не будет делать, потому что подписки на основе облачной функции используют модель push, а клиентскую библиотеку следует использовать с моделью pull. .
Вы либо хотите выполнить действие callback
непосредственно внутри retrieve_location
, используя событие в качестве сообщения (как описывает Дастин), либо вы хотите настроить постоянного подписчика с клиентской библиотекой, например, на GCE, который создает экземпляр подписчика и вызывает subscribe
в теме.