Использование лямбда-функции с Amazon Kinesis

AWS KinesisСлужба используется для сбора / хранения данных отслеживания в реальном времени, поступающих от кликов на веб-сайтах, журналов, каналов социальных сетей. Мы можем запустить AWS Lambda для выполнения дополнительной обработки этих журналов.

Реквизиты

Основные требования для начала работы с Kinesis и AWS Lambda следующие:

  • Создать роль с необходимыми разрешениями
  • Создать поток данных в Kinesis
  • Создайте функцию AWS Lambda.
  • Добавить код в AWS Lambda
  • Добавить данные в поток данных Kinesis

пример

Давайте поработаем над примером, в котором мы запускаем AWS Lambda для обработки потока данных от Kinesis и отправки почты с полученными данными.

Простая блок-схема для объяснения процесса показана ниже -

Создать роль с необходимыми разрешениями

Перейдите в консоль AWS и создайте роль.

Создать поток данных в Kinesis

Зайдите в консоль AWS и создайте поток данных в kinesis.

Как показано, есть 4 варианта. В этом примере мы будем работать над созданием потока данных.

Нажмите Create data stream. Введите имя в поле имени потока Kinesis, указанное ниже.

Введите количество сегментов для потока данных.

Детали осколков показаны ниже -

Введите имя и щелкните значок Create Kinesis stream кнопку внизу.

Обратите внимание, что для активации потока требуется определенное время.

Создать функцию AWS Lambda

Перейдите в консоль AWS и щелкните Lambda. Создайте функцию AWS Lambda, как показано -

Нажмите Create functionкнопку в конце экрана. Добавьте Kinesis в качестве триггера в AWS Lambda.

Добавьте детали конфигурации в триггер Kinesis -

Добавьте триггер, а теперь добавьте код в AWS Lambda.

Добавление кода в AWS Lambda

Для этого мы будем использовать nodejs в качестве среды выполнения. Мы отправим письмо, как только AWS Lambda будет запущена с потоком данных kinesis.

const aws =  require("aws-sdk");
var ses = new aws.SES({
   region: 'us-east-1'
});
exports.handler = function(event, context, callback) {
   let payload = "";
   event.Records.forEach(function(record) {
      // Kinesis data is base64 encoded so decode here
      payload = new Buffer(record.kinesis.data, 'base64').toString('ascii');
      console.log('Decoded payload:', payload);
   });
   var eParams = {
      Destination: {
         ToAddresses: ["[email protected]"]
      },
      Message: {
         Body: {
            Text: {
               Data:payload
            }
         },
         Subject: {
            Data: "Kinesis data stream"
         }
      },
      Source: "[email protected]"
   };    
   var email = ses.sendEmail(eParams, function(err, data) {
      if (err) console.log(err);
      else {
         console.log("===EMAIL SENT===");
         console.log("EMAIL CODE END");
         console.log('EMAIL: ', email);
         context.succeed(event);
         callback(null, "email is send");
      }
   });
};

Параметр события содержит данные, введенные в поток данных кинезиса. Приведенный выше лямбда-код aws будет активирован после ввода данных в поток данных kinesis.

Добавить данные в поток данных Kinesis

Здесь мы будем использовать AWS CLI для добавления потока данных кинезиса данных, как показано ниже. Для этого мы можем использовать следующую команду -

aws kinesis put-record --stream-name kinesisdemo  --data "hello world" --
partition-key "789675"

Затем активируется AWS Lambda и отправляется письмо.