GCP Cloud Function tidak mengambil / mengakui pesan PubSub dengan benar
Saya memiliki beberapa alur kerja pemrosesan data yang disiapkan di Google Cloud Platform. Lokasi ini memproses alamat fisik dan mengembalikan beberapa metrik tentangnya. Alur kerja menggunakan kombinasi aliran Cloud Functions dan PubSub.
Dengan satu Google Cloud Function dalam alur kerja, beberapa pesan tidak diambil dari aliran pemicu atau diambil beberapa kali. Saya tahu beberapa tingkat dari ini diharapkan. Namun, ini sering terjadi. Cukup yang menyebabkan 10x penyajian berlebih untuk beberapa lokasi dan tidak ada hasil untuk beberapa lokasi lainnya.
Saya pikir callback
fungsinya tidak mengenali pesan dengan benar tetapi saya tidak yakin apa yang harus berbeda untuk mendapatkan pengambilan dan pengakuan pesan yang lebih andal. Setiap saran dihargai.
Fungsi GCP Cloud saya untuk mengambil metrik dipicu oleh aliran PubSub dan menjalankan retrieve_location
fungsi pengiriman data ke aliran PubSub yang berbeda. The retrieve_location
Fungsi terlihat seperti ini:
def retrieve_location(event, context):
auth_flow()
project_id = <my project id>
subscription_name = <my subscription name>
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project_id, subscription_name)
def callback(message):
message.ack()
message_obj = message.data
message_dcde = message_obj.decode('utf-8')
message_json = json.loads(message_dcde)
get_metrics(message_json)
subscriber.subscribe(subscription_path, callback=callback)
The get_metrics
Fungsi mengambil muatan dari setiap pesan, mengambil beberapa data dan mengirimkannya ke aliran lain. Fungsi ini tampaknya berfungsi seperti yang diharapkan.
def get_metrics(loc):
<... retrieve and process data, my_data is the object that gets sent to the next stream ...>
project_id = <my project id>
topic_name = <my topic name>
topic_id = <my topic id>
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)
try:
publisher.publish(topic_path, data=my_data.encode('utf-8'))
except Exception as exc:
print("topic publish failed: ", exc)
Jawaban
Daripada menyiapkan pelanggan Pub / Sub kedua di dalam Cloud Function, Anda harus membuat fungsi latar belakang yang berlangganan topik tersebut yang menangani payload secara langsung, misalnya:
def get_metrics_background_function(event, context):
message_obj = event.data
message_dcde = message_obj.decode('utf-8')
message_json = json.loads(message_dcde)
get_metrics(message_json)
Sepertinya Anda mungkin menggabungkan penggunaan Cloud Pub / Sub untuk memicu Cloud Function dengan penggunaan Pub / Sub langsung melalui library klien Cloud Pub / Sub. Umumnya, Anda ingin melakukan salah satunya.
Jika langganan yang Anda buat dilakukan melalui Cloud Functions, maka retrieve_location
fungsi Anda tidak benar-benar menerima dan memproses pesan. Alih-alih, yang dilakukannya adalah memulai klien pelanggan dan tidak lama kemudian menutup karena faktanya subscriber.subscribe
hanya akan berjalan hingga selesai dan oleh karena itu fungsi Anda akan menyelesaikan eksekusi.
Jika fungsi ini memulai klien ke langganan yang sama yang memicu Cloud Function, fungsi ini sebenarnya tidak akan melakukan apa pun karena langganan berbasis Cloud-Function menggunakan model push, sedangkan library klien harus digunakan dengan model tarik .
Anda juga ingin melakukan tindakan callback
secara langsung di retrieve_location
, menggunakan acara sebagai pesan (seperti yang dijelaskan oleh Dustin), atau Anda ingin menyiapkan pelanggan tetap dengan perpustakaan klien, misalnya, di GCE, yang memberi contoh pelanggan dan panggilan subscribe
di atasnya.