GCP CloudFunctionがPubSubメッセージを正しく取得/確認しない

Aug 18 2020

Google CloudPlatformでいくつかのデータ処理ワークフローを設定しています。これらの場所は物理アドレスを処理し、それらに関するいくつかのメトリックを返します。ワークフローは、CloudFunctionsとPubSubストリームの組み合わせを使用します。

ワークフローに1つのGoogleCloud関数がある場合、一部のメッセージはトリガーストリームから取得されないか、複数回取得されます。私はこれのある程度が期待されることを知っています。しかし、これはたくさん起こっています。それだけで、一部の場所では10倍の誇張表現が発生し、他のいくつかの場所では結果が得られません。

このcallback関数はメッセージを正しく認識していないと思いますが、メッセージのより信頼性の高いピックアップと確認を取得するために何を変えるべきかわかりません。任意の提案をいただければ幸いです。

メトリックを取得するためのGCPCloud Functionは、PubSubストリームによってトリガーさretrieve_locationれ、別のPubSubストリームにデータを送信する関数を実行します。retrieve_locationこの関数は次のようになります。

def retrieve_location(event, context):
    auth_flow()

    project_id = <my project id>
    subscription_name = <my subscription name>

    subscriber = pubsub_v1.SubscriberClient()

    subscription_path = subscriber.subscription_path(
        project_id, subscription_name)

    def callback(message):
        message.ack()
        message_obj = message.data
        message_dcde = message_obj.decode('utf-8')
        message_json = json.loads(message_dcde)

        get_metrics(message_json)


    subscriber.subscribe(subscription_path, callback=callback)

このget_metrics関数は、各メッセージからペイロードを取得し、データを取得して別のストリームに送信します。この機能は期待通りに機能しているようです。

def get_metrics(loc):
    <... retrieve and process data, my_data is the object that gets sent to the next stream ...>
          project_id = <my project id>
          topic_name = <my topic name>
          topic_id = <my topic id>

          publisher = pubsub_v1.PublisherClient()
          topic_path = publisher.topic_path(project_id, topic_name)

            try:
                publisher.publish(topic_path, data=my_data.encode('utf-8'))
            except Exception as exc:
                    print("topic publish failed: ", exc)

回答

1 DustinIngram Aug 18 2020 at 01:43

CloudFunction内に2番目のPub / Subサブスクライバーを設定する代わりに、ペイロードを直接処理するトピックにサブスクライブするバックグラウンド関数を作成する必要があります。例:

def get_metrics_background_function(event, context):
    message_obj = event.data
    message_dcde = message_obj.decode('utf-8')
    message_json = json.loads(message_dcde)

    get_metrics(message_json)
1 KamalAboul-Hosn Aug 18 2020 at 02:53

Cloud Pub / Subクライアントライブラリを介して直接Pub / Subを使用することで、Cloud Pub / Subを使用してCloudFunctionをトリガーすることを混同しているようです。一般的には、どちらか一方を実行する必要があります。

作成したサブスクリプションがCloudFunctionsを介して行われた場合、retrieve_location関数は実際にはメッセージを受信および処理していません。代わりに、サブスクライバークライアントを起動し、その後すぐにシャットダウンしsubscriber.subscribeます。これは、実行が完了するだけで、関数の実行が完了するためです。

この機能は、クラウド機能をトリガーと同じサブスクリプションへのクライアントを起動している場合、実際にクラウド・ファンクション・ベースのサブスクリプションが使用しているので何もするつもりはないプッシュクライアントライブラリをして使用する必要がありながらモデルをプルモデル。

(Dustinが説明しているように)イベントをメッセージとして使用callbackしてretrieve_location、で直接アクションを実行するか、GCEなどのクライアントライブラリを使用して永続サブスクライバーをセットアップし、サブスクライバーと呼び出しをインスタンス化します。subscribeその上に。