Использование лямбда-функции с Amazon Kinesis
AWS KinesisСлужба используется для сбора / хранения данных отслеживания в реальном времени, поступающих от кликов на веб-сайтах, журналов, каналов социальных сетей. Мы можем запустить AWS Lambda для выполнения дополнительной обработки этих журналов.
Реквизиты
Основные требования для начала работы с Kinesis и AWS Lambda следующие:
- Создать роль с необходимыми разрешениями
- Создать поток данных в Kinesis
- Создайте функцию AWS Lambda.
- Добавить код в AWS Lambda
- Добавить данные в поток данных Kinesis
пример
Давайте поработаем над примером, в котором мы запускаем AWS Lambda для обработки потока данных от Kinesis и отправки почты с полученными данными.
Простая блок-схема для объяснения процесса показана ниже -
Создать роль с необходимыми разрешениями
Перейдите в консоль AWS и создайте роль.
Создать поток данных в Kinesis
Зайдите в консоль AWS и создайте поток данных в kinesis.
Как показано, есть 4 варианта. В этом примере мы будем работать над созданием потока данных.
Нажмите Create data stream. Введите имя в поле имени потока Kinesis, указанное ниже.
Введите количество сегментов для потока данных.
Детали осколков показаны ниже -
Введите имя и щелкните значок Create Kinesis stream кнопку внизу.
Обратите внимание, что для активации потока требуется определенное время.
Создать функцию AWS Lambda
Перейдите в консоль AWS и щелкните Lambda. Создайте функцию AWS Lambda, как показано -
Нажмите Create functionкнопку в конце экрана. Добавьте Kinesis в качестве триггера в AWS Lambda.
Добавьте детали конфигурации в триггер Kinesis -
Добавьте триггер, а теперь добавьте код в AWS Lambda.
Добавление кода в AWS Lambda
Для этого мы будем использовать nodejs в качестве среды выполнения. Мы отправим письмо, как только AWS Lambda будет запущена с потоком данных kinesis.
const aws = require("aws-sdk");
var ses = new aws.SES({
region: 'us-east-1'
});
exports.handler = function(event, context, callback) {
let payload = "";
event.Records.forEach(function(record) {
// Kinesis data is base64 encoded so decode here
payload = new Buffer(record.kinesis.data, 'base64').toString('ascii');
console.log('Decoded payload:', payload);
});
var eParams = {
Destination: {
ToAddresses: ["[email protected]"]
},
Message: {
Body: {
Text: {
Data:payload
}
},
Subject: {
Data: "Kinesis data stream"
}
},
Source: "[email protected]"
};
var email = ses.sendEmail(eParams, function(err, data) {
if (err) console.log(err);
else {
console.log("===EMAIL SENT===");
console.log("EMAIL CODE END");
console.log('EMAIL: ', email);
context.succeed(event);
callback(null, "email is send");
}
});
};
Параметр события содержит данные, введенные в поток данных кинезиса. Приведенный выше лямбда-код aws будет активирован после ввода данных в поток данных kinesis.
Добавить данные в поток данных Kinesis
Здесь мы будем использовать AWS CLI для добавления потока данных кинезиса данных, как показано ниже. Для этого мы можем использовать следующую команду -
aws kinesis put-record --stream-name kinesisdemo --data "hello world" --
partition-key "789675"
Затем активируется AWS Lambda и отправляется письмо.