Sử dụng Hàm Lambda với Amazon Kinesis

AWS Kinesisdịch vụ được sử dụng để nắm bắt / lưu trữ dữ liệu theo dõi thời gian thực đến từ các lần nhấp vào trang web, nhật ký, nguồn cấp dữ liệu truyền thông xã hội. Chúng tôi có thể kích hoạt AWS Lambda để thực hiện xử lý bổ sung trên các nhật ký này.

Yêu cầu

Các yêu cầu cơ bản để bắt đầu với Kinesis và AWS Lambda như sau:

  • Tạo vai trò với các quyền cần thiết
  • Tạo luồng dữ liệu trong Kinesis
  • Tạo hàm AWS Lambda.
  • Thêm mã vào AWS Lambda
  • Thêm dữ liệu vào luồng dữ liệu Kinesis

Thí dụ

Hãy để chúng tôi làm việc trên một ví dụ, trong đó chúng tôi sẽ kích hoạt AWS Lambda để xử lý luồng dữ liệu từ Kinesis và gửi thư với dữ liệu nhận được.

Dưới đây là một sơ đồ khối đơn giản để giải thích quy trình:

Tạo vai trò với các quyền bắt buộc

Đi tới bảng điều khiển AWS và tạo một vai trò.

Tạo luồng dữ liệu trong Kinesis

Đi tới bảng điều khiển AWS và tạo luồng dữ liệu trong kinesis.

Có 4 lựa chọn như hình. Chúng tôi sẽ làm việc về Tạo luồng dữ liệu trong ví dụ này.

Nhấp chuột Create data stream. Nhập tên trong tên luồng Kinesis được cung cấp bên dưới.

Nhập số phân đoạn cho luồng dữ liệu.

Các chi tiết của Shards như được hiển thị bên dưới -

Nhập tên và nhấp vào Create Kinesis stream ở dưới cùng.

Lưu ý rằng phải mất một thời gian nhất định để luồng hoạt động.

Tạo hàm AWS Lambda

Đi tới bảng điều khiển AWS và nhấp vào Lambda. Tạo hàm AWS Lambda như hình -

Nhấp chuột Create functionở cuối màn hình. Thêm Kinesis làm trình kích hoạt cho AWS Lambda.

Thêm chi tiết cấu hình vào trình kích hoạt Kinesis -

Thêm trình kích hoạt và bây giờ thêm mã vào AWS Lambda.

Thêm mã vào AWS Lambda

Với mục đích này, chúng tôi sẽ sử dụng nodejs làm thời gian chạy. Chúng tôi sẽ gửi thư sau khi AWS Lambda được kích hoạt với luồng dữ liệu kinesis.

const aws =  require("aws-sdk");
var ses = new aws.SES({
   region: 'us-east-1'
});
exports.handler = function(event, context, callback) {
   let payload = "";
   event.Records.forEach(function(record) {
      // Kinesis data is base64 encoded so decode here
      payload = new Buffer(record.kinesis.data, 'base64').toString('ascii');
      console.log('Decoded payload:', payload);
   });
   var eParams = {
      Destination: {
         ToAddresses: ["[email protected]"]
      },
      Message: {
         Body: {
            Text: {
               Data:payload
            }
         },
         Subject: {
            Data: "Kinesis data stream"
         }
      },
      Source: "[email protected]"
   };    
   var email = ses.sendEmail(eParams, function(err, data) {
      if (err) console.log(err);
      else {
         console.log("===EMAIL SENT===");
         console.log("EMAIL CODE END");
         console.log('EMAIL: ', email);
         context.succeed(event);
         callback(null, "email is send");
      }
   });
};

Tham số sự kiện có dữ liệu được nhập vào luồng dữ liệu kinesis. Mã aws lambda ở trên sẽ được kích hoạt khi dữ liệu được nhập vào luồng dữ liệu kinesis.

Thêm dữ liệu vào luồng dữ liệu Kinesis

Ở đây chúng ta sẽ sử dụng AWS CLI để thêm luồng dữ liệu data kinesis như hình dưới đây. Với mục đích này, chúng ta có thể sử dụng lệnh sau:

aws kinesis put-record --stream-name kinesisdemo  --data "hello world" --
partition-key "789675"

Sau đó, AWS Lambda được kích hoạt và thư được gửi đi.