DynamoDB - Таблица действий

Потоки DynamoDB позволяют отслеживать изменения элементов таблицы и реагировать на них. Используйте эту функцию для создания приложения, которое реагирует на изменения, обновляя информацию из разных источников. Синхронизируйте данные тысяч пользователей большой многопользовательской системы. Используйте его для отправки пользователям уведомлений об обновлениях. Его приложения оказались разнообразными и содержательными. Потоки DynamoDB служат основным инструментом, используемым для достижения этой функциональности.

Потоки фиксируют упорядоченные по времени последовательности, содержащие модификации элементов в таблице. Они хранят эти данные максимум 24 часа. Приложения используют их для просмотра исходных и измененных элементов почти в реальном времени.

Включенные в таблице потоки фиксируют все изменения. При любой операции CRUD DynamoDB создает запись потока с атрибутами первичного ключа измененных элементов. Вы можете настроить потоки для получения дополнительной информации, например изображений до и после.

Streams несут две гарантии -

  • Каждая запись появляется в потоке один раз и

  • Каждое изменение элемента приводит к появлению записей потока того же порядка, что и у изменений.

Все потоки обрабатываются в режиме реального времени, чтобы вы могли использовать их для связанных функций в приложениях.

Управление потоками

При создании таблицы вы можете включить поток. Существующие таблицы позволяют отключать поток или изменять настройки. Потоки предлагают функцию асинхронной работы, что означает отсутствие влияния на производительность таблицы.

Используйте консоль управления AWS для простого управления потоками. Сначала перейдите к консоли и выберитеTables. На вкладке "Обзор" выберитеManage Stream. Внутри окна выберите информацию, добавляемую в поток при изменении данных таблицы. После ввода всех настроек выберитеEnable.

Если вы хотите отключить любые существующие потоки, выберите Manage Stream, а потом Disable.

Вы также можете использовать API CreateTable и UpdateTable для включения или изменения потока. Используйте параметр StreamSpecification для настройки потока. StreamEnabled указывает статус, означающий «истина» для включения и «ложь» для отключения.

StreamViewType указывает информацию, добавляемую в поток: KEYS_ONLY, NEW_IMAGE, OLD_IMAGE и NEW_AND_OLD_IMAGES.

Потоковое чтение

Чтение и обработка потоков путем подключения к конечной точке и выполнения запросов API. Каждый поток состоит из записей потока, и каждая запись существует как отдельная модификация, которой принадлежит поток. Записи потока включают порядковый номер, раскрывающий порядок публикации. Записи принадлежат к группам, также известным как осколки. Шарды функционируют как контейнеры для нескольких записей, а также содержат информацию, необходимую для доступа к записям и их перемещения. Через 24 часа записи автоматически удаляются.

Эти осколки создаются и удаляются по мере необходимости, и их срок хранения недолгий. Они также автоматически делятся на несколько новых сегментов, обычно в ответ на всплески активности записи. При отключении потока открытые шарды закрываются. Иерархическая связь между шардами означает, что приложения должны отдавать приоритет родительским шардам для правильного порядка обработки. Вы можете использовать адаптер Kinesis, чтобы сделать это автоматически.

Note - Операции, не приводящие к изменению, не записывают записи потока.

Доступ и обработка записей требует выполнения следующих задач:

  • Определите ARN целевого потока.
  • Определите осколки потока, содержащие целевые записи.
  • Получите доступ к осколкам, чтобы получить нужные записи.

Note- Одновременно должно быть не более 2 процессов, читающих осколок. Если он превышает 2 процесса, то источник может быть заблокирован.

Доступные действия API потока включают

  • ListStreams
  • DescribeStream
  • GetShardIterator
  • GetRecords

Вы можете просмотреть следующий пример чтения потока -

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClient;

import com.amazonaws.services.dynamodbv2.model.AttributeAction;
import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.AttributeValueUpdate;

import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
import com.amazonaws.services.dynamodbv2.model.DescribeStreamRequest;
import com.amazonaws.services.dynamodbv2.model.DescribeStreamResult;
import com.amazonaws.services.dynamodbv2.model.DescribeTableResult;

import com.amazonaws.services.dynamodbv2.model.GetRecordsRequest;
import com.amazonaws.services.dynamodbv2.model.GetRecordsResult;
import com.amazonaws.services.dynamodbv2.model.GetShardIteratorRequest;
import com.amazonaws.services.dynamodbv2.model.GetShardIteratorResult;

import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
import com.amazonaws.services.dynamodbv2.model.KeyType;
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
import com.amazonaws.services.dynamodbv2.model.Record;

import com.amazonaws.services.dynamodbv2.model.Shard;
import com.amazonaws.services.dynamodbv2.model.ShardIteratorType;
import com.amazonaws.services.dynamodbv2.model.StreamSpecification;
import com.amazonaws.services.dynamodbv2.model.StreamViewType;
import com.amazonaws.services.dynamodbv2.util.Tables;

public class StreamsExample {
   private static AmazonDynamoDBClient dynamoDBClient =  
      new AmazonDynamoDBClient(new ProfileCredentialsProvider());  
   private static AmazonDynamoDBStreamsClient streamsClient =  
      new AmazonDynamoDBStreamsClient(new ProfileCredentialsProvider());  

   public static void main(String args[]) {  
      dynamoDBClient.setEndpoint("InsertDbEndpointHere");   
      streamsClient.setEndpoint("InsertStreamEndpointHere");    
      
      // table creation 
      String tableName = "MyTestingTable";  
      ArrayList<AttributeDefinition> attributeDefinitions =  
         new ArrayList<AttributeDefinition>();  
      
      attributeDefinitions.add(new AttributeDefinition()
         .withAttributeName("ID") 
         .withAttributeType("N"));
         
      ArrayList<KeySchemaElement> keySchema = new 
         ArrayList<KeySchemaElement>(); 
      
      keySchema.add(new KeySchemaElement() 
         .withAttributeName("ID") 
         .withKeyType(KeyType.HASH));                       //Partition key

      StreamSpecification streamSpecification = new StreamSpecification(); 
      streamSpecification.setStreamEnabled(true); 
      streamSpecification.setStreamViewType(StreamViewType.NEW_AND_OLD_IMAGES);  
      CreateTableRequest createTableRequest = new CreateTableRequest() 
         .withTableName(tableName) 
         .withKeySchema(keySchema) 
         .withAttributeDefinitions(attributeDefinitions) 
         .withProvisionedThroughput(new ProvisionedThroughput() 
         .withReadCapacityUnits(1L) 
         .withWriteCapacityUnits(1L))
         .withStreamSpecification(streamSpecification);  
      
      System.out.println("Executing CreateTable for " + tableName); 
      dynamoDBClient.createTable(createTableRequest);  
      System.out.println("Creating " + tableName); 
      
      try { 
         Tables.awaitTableToBecomeActive(dynamoDBClient, tableName); 
      } catch (InterruptedException e) { 
         e.printStackTrace(); 
      } 
         
      // Get the table's stream settings 
      DescribeTableResult describeTableResult =
         dynamoDBClient.describeTable(tableName);  
      
      String myStreamArn = describeTableResult.getTable().getLatestStreamArn(); 
      StreamSpecification myStreamSpec =  
         describeTableResult.getTable().getStreamSpecification();  
      
      System.out.println("Current stream ARN for " + tableName + ": "+ myStreamArn);
      System.out.println("Stream enabled: "+ myStreamSpec.getStreamEnabled()); 
      System.out.println("Update view type: "+ myStreamSpec.getStreamViewType());  
      
      // Add an item 
      int numChanges = 0; 
      System.out.println("Making some changes to table data"); 
      Map<String, AttributeValue> item = new HashMap<String, AttributeValue>(); 
      item.put("ID", new AttributeValue().withN("222")); 
      item.put("Alert", new AttributeValue().withS("item!")); 
      dynamoDBClient.putItem(tableName, item); 
      numChanges++;  
      
      // Update the item         
      Map<String, AttributeValue> key = new HashMap<String, AttributeValue>(); 
      key.put("ID", new AttributeValue().withN("222")); 
      Map<String, AttributeValueUpdate> attributeUpdates =  
      new HashMap<String, AttributeValueUpdate>(); 
      
      attributeUpdates.put("Alert", new AttributeValueUpdate() 
         .withAction(AttributeAction.PUT) 
         .withValue(new AttributeValue().withS("modified item"))); 
      
      dynamoDBClient.updateItem(tableName, key, attributeUpdates); 
      numChanges++;   
      
      // Delete the item         
      dynamoDBClient.deleteItem(tableName, key);  
      numChanges++;
      
      // Get stream shards         
      DescribeStreamResult describeStreamResult =  
      streamsClient.describeStream(new DescribeStreamRequest() 
         .withStreamArn(myStreamArn)); 
      String streamArn =  
         describeStreamResult.getStreamDescription().getStreamArn(); 
      List<Shard> shards =  
         describeStreamResult.getStreamDescription().getShards();  
      
      // Process shards 
      for (Shard shard : shards) { 
         String shardId = shard.getShardId(); 
         System.out.println("Processing " + shardId + " in "+ streamArn);  
         
         // Get shard iterator 
         GetShardIteratorRequest getShardIteratorRequest = new 
            GetShardIteratorRequest() 
            .withStreamArn(myStreamArn) 
            .withShardId(shardId) 
            .withShardIteratorType(ShardIteratorType.TRIM_HORIZON); 
         
         GetShardIteratorResult getShardIteratorResult =  
            streamsClient.getShardIterator(getShardIteratorRequest); 
         String nextItr = getShardIteratorResult.getShardIterator();  
         
         while (nextItr != null && numChanges > 0) { 
            // Read data records with iterator                 
            GetRecordsResult getRecordsResult =  
               streamsClient.getRecords(new GetRecordsRequest(). 
               withShardIterator(nextItr));
               
            List<Record> records = getRecordsResult.getRecords(); 
            System.out.println("Pulling records...");  
               
            for (Record record : records) { 
               System.out.println(record); 
               numChanges--;
            } 
            nextItr = getRecordsResult.getNextShardIterator(); 
         } 
      } 
   } 
}