新しいデータがs3バケットにロードされるたびにPythonスクリプトをトリガーするにはどうすればよいですか?

Aug 19 2020

私は秒単位で新しいレコードを取得するs3バケットからデータをプルダウンしようとしています。データは1時間あたり250 + Gで受信されます。新しいデータの読み込みを秒単位でリアルタイムに収集するために継続的に実行されるPythonスクリプトを作成しています。

s3バケットキーの構造は次のとおりです。

o_key=7111/year=2020/month=8/day=11/hour=16/minute=46/second=9/ee9.jsonl.gz
o_key=7111/year=2020/month=8/day=11/hour=16/minute=40/second=1/ee99999.jsonl.gz

私はBoto3を使用してこれを試していますが、これまでのところ次のようになっています。

s3_resource = boto3.resource('s3', aws_access_key_id=ACCESS_KEY, aws_secret_access_key=SECRET_KEY, verify=False)
s3_bucket = s3_resource.Bucket(BUCKET_NAME)
files = s3_bucket.objects.filter()
files = [obj.key for obj in sorted(files, key=lambda x: x.last_modified, reverse=True)]
for x in files:
    print(x)

これにより、そのバケットにあるすべてのキーが出力され、last_modifiedデータで並べ替えられます。ただし、新しいデータが読み込まれるまでスクリプトを一時停止してから、そのデータなどを2番目までに処理する方法はありますか?新しいデータをロードするときに20秒の遅延が発生する可能性があるため、ロジックを形成するときに問題が発生します。どんなアイデアや提案も役に立ちます。

 s3_resource = boto3.resource('s3', aws_access_key_id=ACCESS_KEY, aws_secret_access_key=SECRET_KEY, verify=False)
 s3_bucket = s3_resource.Bucket(BUCKET_NAME)

 files = s3_bucket.objects.filter()
 while list(files): #check if the key exists
         if len(objs) > 0 and objs[0].key == key:
                   print("Exists!")
         else:
               time.sleep(.1) #sleep until the next key is there
               continue 

これは私が試した別のアプローチですが、うまく機能していません。次のデータがないときはいつでもスリープ状態にし、ロードされたら新しいデータを処理しようとしています。

回答

r0ck Aug 19 2020 at 20:52

Amazon S3通知機能を使用すると、バケットで特定のイベントが発生したときに通知を受け取ることができます。通知を有効にするには、最初に、AmazonS3で発行するイベントとAmazonS3で通知を送信する宛先を識別する通知設定を追加する必要があります。この構成は、バケットに関連付けられている通知サブリソースに保存します。-通常、ラムダでは...

https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html

これが
r0ckに役立つことを願っています