DynamoDB - aktywność tabeli

Strumienie DynamoDB umożliwiają śledzenie i reagowanie na zmiany elementów tabeli. Użyj tej funkcji, aby stworzyć aplikację, która reaguje na zmiany, aktualizując informacje z różnych źródeł. Synchronizuj dane tysięcy użytkowników dużego systemu z wieloma użytkownikami. Użyj go, aby wysyłać użytkownikom powiadomienia o aktualizacjach. Jego zastosowania są różnorodne i znaczące. Strumienie DynamoDB służą jako główne narzędzie służące do osiągnięcia tej funkcjonalności.

Strumienie przechwytują uporządkowane w czasie sekwencje zawierające modyfikacje elementów w tabeli. Przechowują te dane przez maksymalnie 24 godziny. Aplikacje używają ich do przeglądania oryginalnych i zmodyfikowanych elementów niemal w czasie rzeczywistym.

Strumienie włączone w tabeli przechwytują wszystkie modyfikacje. W każdej operacji CRUD DynamoDB tworzy rekord strumienia z atrybutami klucza podstawowego zmodyfikowanych elementów. Możesz skonfigurować strumienie, aby uzyskać dodatkowe informacje, takie jak obrazy przed i po.

Strumienie mają dwie gwarancje -

  • Każdy rekord pojawia się jeden raz w strumieniu i

  • Każda modyfikacja pozycji skutkuje rekordami strumieni w tej samej kolejności co modyfikacje.

Wszystkie strumienie są przetwarzane w czasie rzeczywistym, aby umożliwić ich wykorzystanie do powiązanych funkcji w aplikacjach.

Zarządzanie strumieniami

Podczas tworzenia tabeli możesz włączyć strumień. Istniejące tabele umożliwiają wyłączenie strumienia lub zmianę ustawień. Strumienie oferują funkcję operacji asynchronicznej, co oznacza brak wpływu na wydajność tabeli.

Wykorzystaj konsolę AWS Management do prostego zarządzania strumieniami. Najpierw przejdź do konsoli i wybierzTables. Na karcie Przegląd wybierzManage Stream. W oknie wybierz informacje dodane do strumienia w przypadku modyfikacji danych tabeli. Po wprowadzeniu wszystkich ustawień wybierzEnable.

Jeśli chcesz wyłączyć istniejące strumienie, wybierz Manage Stream, i wtedy Disable.

Możesz również użyć interfejsów API CreateTable i UpdateTable, aby włączyć lub zmienić strumień. Użyj parametru StreamSpecification, aby skonfigurować strumień. StreamEnabled określa status, co oznacza true dla włączonego i false dla wyłączonego.

StreamViewType określa informacje dodane do strumienia: KEYS_ONLY, NEW_IMAGE, OLD_IMAGE i NEW_AND_OLD_IMAGES.

Czytanie strumieniowe

Odczytuj i przetwarzaj strumienie, łącząc się z punktem końcowym i wysyłając żądania API. Każdy strumień składa się z rekordów strumienia, a każdy rekord istnieje jako pojedyncza modyfikacja, która jest właścicielem strumienia. Rekordy strumieniowe zawierają numer kolejny ujawniający kolejność publikacji. Rekordy należą do grup znanych również jako shards. Fragmenty działają jako pojemniki na kilka rekordów, a także zawierają informacje potrzebne do uzyskiwania dostępu do rekordów i przechodzenia przez nie. Po 24 godzinach rekordy są automatycznie usuwane.

Te fragmenty są generowane i usuwane w razie potrzeby i nie trwają długo. Dzielą się również automatycznie na wiele nowych fragmentów, zwykle w odpowiedzi na nagłe wzrosty aktywności. Po wyłączeniu transmisji otwarte fragmenty zamykają się. Hierarchiczna relacja między fragmentami oznacza, że ​​aplikacje muszą nadawać priorytety fragmentom nadrzędnym w celu poprawnej kolejności przetwarzania. Możesz użyć adaptera Kinesis, aby zrobić to automatycznie.

Note - Operacje powodujące brak zmian nie zapisują rekordów strumieniowych.

Dostęp do rekordów i ich przetwarzanie wymaga wykonania następujących zadań -

  • Określ ARN strumienia docelowego.
  • Określ fragment (y) strumienia zawierającego rekordy celu.
  • Uzyskaj dostęp do fragmentu (ów), aby pobrać żądane rekordy.

Note- Maksymalnie 2 procesy powinny odczytywać fragment na raz. Jeśli przekroczy 2 procesy, może zdławić źródło.

Dostępne akcje interfejsu API strumienia obejmują

  • ListStreams
  • DescribeStream
  • GetShardIterator
  • GetRecords

Możesz przejrzeć następujący przykład odczytu strumienia -

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClient;

import com.amazonaws.services.dynamodbv2.model.AttributeAction;
import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.AttributeValueUpdate;

import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
import com.amazonaws.services.dynamodbv2.model.DescribeStreamRequest;
import com.amazonaws.services.dynamodbv2.model.DescribeStreamResult;
import com.amazonaws.services.dynamodbv2.model.DescribeTableResult;

import com.amazonaws.services.dynamodbv2.model.GetRecordsRequest;
import com.amazonaws.services.dynamodbv2.model.GetRecordsResult;
import com.amazonaws.services.dynamodbv2.model.GetShardIteratorRequest;
import com.amazonaws.services.dynamodbv2.model.GetShardIteratorResult;

import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
import com.amazonaws.services.dynamodbv2.model.KeyType;
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
import com.amazonaws.services.dynamodbv2.model.Record;

import com.amazonaws.services.dynamodbv2.model.Shard;
import com.amazonaws.services.dynamodbv2.model.ShardIteratorType;
import com.amazonaws.services.dynamodbv2.model.StreamSpecification;
import com.amazonaws.services.dynamodbv2.model.StreamViewType;
import com.amazonaws.services.dynamodbv2.util.Tables;

public class StreamsExample {
   private static AmazonDynamoDBClient dynamoDBClient =  
      new AmazonDynamoDBClient(new ProfileCredentialsProvider());  
   private static AmazonDynamoDBStreamsClient streamsClient =  
      new AmazonDynamoDBStreamsClient(new ProfileCredentialsProvider());  

   public static void main(String args[]) {  
      dynamoDBClient.setEndpoint("InsertDbEndpointHere");   
      streamsClient.setEndpoint("InsertStreamEndpointHere");    
      
      // table creation 
      String tableName = "MyTestingTable";  
      ArrayList<AttributeDefinition> attributeDefinitions =  
         new ArrayList<AttributeDefinition>();  
      
      attributeDefinitions.add(new AttributeDefinition()
         .withAttributeName("ID") 
         .withAttributeType("N"));
         
      ArrayList<KeySchemaElement> keySchema = new 
         ArrayList<KeySchemaElement>(); 
      
      keySchema.add(new KeySchemaElement() 
         .withAttributeName("ID") 
         .withKeyType(KeyType.HASH));                       //Partition key

      StreamSpecification streamSpecification = new StreamSpecification(); 
      streamSpecification.setStreamEnabled(true); 
      streamSpecification.setStreamViewType(StreamViewType.NEW_AND_OLD_IMAGES);  
      CreateTableRequest createTableRequest = new CreateTableRequest() 
         .withTableName(tableName) 
         .withKeySchema(keySchema) 
         .withAttributeDefinitions(attributeDefinitions) 
         .withProvisionedThroughput(new ProvisionedThroughput() 
         .withReadCapacityUnits(1L) 
         .withWriteCapacityUnits(1L))
         .withStreamSpecification(streamSpecification);  
      
      System.out.println("Executing CreateTable for " + tableName); 
      dynamoDBClient.createTable(createTableRequest);  
      System.out.println("Creating " + tableName); 
      
      try { 
         Tables.awaitTableToBecomeActive(dynamoDBClient, tableName); 
      } catch (InterruptedException e) { 
         e.printStackTrace(); 
      } 
         
      // Get the table's stream settings 
      DescribeTableResult describeTableResult =
         dynamoDBClient.describeTable(tableName);  
      
      String myStreamArn = describeTableResult.getTable().getLatestStreamArn(); 
      StreamSpecification myStreamSpec =  
         describeTableResult.getTable().getStreamSpecification();  
      
      System.out.println("Current stream ARN for " + tableName + ": "+ myStreamArn);
      System.out.println("Stream enabled: "+ myStreamSpec.getStreamEnabled()); 
      System.out.println("Update view type: "+ myStreamSpec.getStreamViewType());  
      
      // Add an item 
      int numChanges = 0; 
      System.out.println("Making some changes to table data"); 
      Map<String, AttributeValue> item = new HashMap<String, AttributeValue>(); 
      item.put("ID", new AttributeValue().withN("222")); 
      item.put("Alert", new AttributeValue().withS("item!")); 
      dynamoDBClient.putItem(tableName, item); 
      numChanges++;  
      
      // Update the item         
      Map<String, AttributeValue> key = new HashMap<String, AttributeValue>(); 
      key.put("ID", new AttributeValue().withN("222")); 
      Map<String, AttributeValueUpdate> attributeUpdates =  
      new HashMap<String, AttributeValueUpdate>(); 
      
      attributeUpdates.put("Alert", new AttributeValueUpdate() 
         .withAction(AttributeAction.PUT) 
         .withValue(new AttributeValue().withS("modified item"))); 
      
      dynamoDBClient.updateItem(tableName, key, attributeUpdates); 
      numChanges++;   
      
      // Delete the item         
      dynamoDBClient.deleteItem(tableName, key);  
      numChanges++;
      
      // Get stream shards         
      DescribeStreamResult describeStreamResult =  
      streamsClient.describeStream(new DescribeStreamRequest() 
         .withStreamArn(myStreamArn)); 
      String streamArn =  
         describeStreamResult.getStreamDescription().getStreamArn(); 
      List<Shard> shards =  
         describeStreamResult.getStreamDescription().getShards();  
      
      // Process shards 
      for (Shard shard : shards) { 
         String shardId = shard.getShardId(); 
         System.out.println("Processing " + shardId + " in "+ streamArn);  
         
         // Get shard iterator 
         GetShardIteratorRequest getShardIteratorRequest = new 
            GetShardIteratorRequest() 
            .withStreamArn(myStreamArn) 
            .withShardId(shardId) 
            .withShardIteratorType(ShardIteratorType.TRIM_HORIZON); 
         
         GetShardIteratorResult getShardIteratorResult =  
            streamsClient.getShardIterator(getShardIteratorRequest); 
         String nextItr = getShardIteratorResult.getShardIterator();  
         
         while (nextItr != null && numChanges > 0) { 
            // Read data records with iterator                 
            GetRecordsResult getRecordsResult =  
               streamsClient.getRecords(new GetRecordsRequest(). 
               withShardIterator(nextItr));
               
            List<Record> records = getRecordsResult.getRecords(); 
            System.out.println("Pulling records...");  
               
            for (Record record : records) { 
               System.out.println(record); 
               numChanges--;
            } 
            nextItr = getRecordsResult.getNextShardIterator(); 
         } 
      } 
   } 
}