DynamoDB - กิจกรรมบนโต๊ะ

สตรีม DynamoDB ช่วยให้คุณติดตามและตอบสนองต่อการเปลี่ยนแปลงรายการตาราง ใช้ฟังก์ชันนี้เพื่อสร้างแอปพลิเคชันที่ตอบสนองต่อการเปลี่ยนแปลงโดยการอัปเดตข้อมูลจากแหล่งต่างๆ ซิงโครไนซ์ข้อมูลสำหรับผู้ใช้หลายพันคนในระบบผู้ใช้หลายคนขนาดใหญ่ ใช้เพื่อส่งการแจ้งเตือนไปยังผู้ใช้เกี่ยวกับการอัปเดต การใช้งานพิสูจน์ได้ว่ามีความหลากหลายและมีสาระสำคัญ สตรีม DynamoDB ทำหน้าที่เป็นเครื่องมือหลักที่ใช้ในการทำงานนี้

สตรีมรวบรวมลำดับเวลาที่มีการปรับเปลี่ยนรายการภายในตาราง พวกเขาเก็บข้อมูลนี้ไว้สูงสุด 24 ชั่วโมง แอปพลิเคชั่นใช้เพื่อดูรายการต้นฉบับและรายการที่แก้ไขเกือบแบบเรียลไทม์

สตรีมที่เปิดใช้งานบนตารางจะจับการแก้ไขทั้งหมด ในการดำเนินการ CRUD ใด ๆ DynamoDB จะสร้างสตรีมเร็กคอร์ดด้วยแอ็ตทริบิวต์คีย์หลักของไอเท็มที่แก้ไข คุณสามารถกำหนดค่าสตรีมสำหรับข้อมูลเพิ่มเติมเช่นภาพก่อนและหลัง

สตรีมมีการค้ำประกันสองครั้ง -

  • แต่ละระเบียนจะปรากฏหนึ่งครั้งในสตรีมและ

  • การปรับเปลี่ยนแต่ละรายการจะส่งผลให้สตรีมเร็กคอร์ดมีลำดับเดียวกันกับการปรับเปลี่ยน

สตรีมทั้งหมดประมวลผลแบบเรียลไทม์เพื่อให้คุณใช้งานได้สำหรับฟังก์ชันที่เกี่ยวข้องในแอปพลิเคชัน

การจัดการสตรีม

ในการสร้างตารางคุณสามารถเปิดใช้งานสตรีมได้ ตารางที่มีอยู่อนุญาตให้ปิดสตรีมหรือเปลี่ยนแปลงการตั้งค่า สตรีมมีคุณลักษณะของการทำงานแบบอะซิงโครนัสซึ่งหมายความว่าไม่มีผลกระทบต่อประสิทธิภาพของตาราง

ใช้คอนโซล AWS Management สำหรับการจัดการสตรีมอย่างง่าย ขั้นแรกไปที่คอนโซลและเลือกTables. ในแท็บภาพรวมให้เลือกManage Stream. ภายในหน้าต่างเลือกข้อมูลที่เพิ่มลงในสตรีมเกี่ยวกับการแก้ไขข้อมูลตาราง หลังจากเข้าสู่การตั้งค่าทั้งหมดแล้วให้เลือกEnable.

หากคุณต้องการปิดสตรีมที่มีอยู่ให้เลือก Manage Streamและจากนั้น Disable.

คุณยังสามารถใช้ APIs CreateTable และ UpdateTable เพื่อเปิดใช้งานหรือเปลี่ยนสตรีม ใช้พารามิเตอร์ StreamSpecification เพื่อกำหนดค่าสตรีม StreamEnabled ระบุสถานะซึ่งหมายถึงจริงสำหรับเปิดใช้งานและเท็จสำหรับปิดใช้งาน

StreamViewType ระบุข้อมูลที่เพิ่มในสตรีม: KEYS_ONLY, NEW_IMAGE, OLD_IMAGE และ NEW_AND_OLD_IMAGES

การอ่านสตรีม

อ่านและประมวลผลสตรีมโดยเชื่อมต่อกับปลายทางและสร้างคำขอ API แต่ละสตรีมประกอบด้วยสตรีมเร็กคอร์ดและทุกเร็กคอร์ดจะมีการปรับเปลี่ยนเพียงครั้งเดียวซึ่งเป็นเจ้าของสตรีม ระเบียนสตรีมประกอบด้วยหมายเลขลำดับที่เปิดเผยลำดับการเผยแพร่ ระเบียนอยู่ในกลุ่มหรือที่เรียกว่าเศษ Shards ทำหน้าที่เป็นภาชนะบรรจุสำหรับบันทึกหลายรายการและยังเก็บข้อมูลที่จำเป็นสำหรับการเข้าถึงและการสำรวจบันทึก หลังจาก 24 ชั่วโมงบันทึกจะถูกลบโดยอัตโนมัติ

Shards เหล่านี้สร้างและลบได้ตามต้องการและใช้เวลาไม่นาน นอกจากนี้ยังแบ่งออกเป็นเศษใหม่หลาย ๆ ชิ้นโดยอัตโนมัติโดยทั่วไปจะตอบสนองต่อการเขียนกิจกรรมที่เพิ่มขึ้นอย่างรวดเร็ว เมื่อปิดการสตรีมให้เปิดชาร์ดปิด ความสัมพันธ์ตามลำดับชั้นระหว่างชาร์ดหมายถึงแอปพลิเคชันต้องจัดลำดับความสำคัญของชาร์ดหลักเพื่อให้ลำดับการประมวลผลถูกต้อง คุณสามารถใช้ Kinesis Adapter เพื่อดำเนินการนี้โดยอัตโนมัติ

Note - การดำเนินการที่ไม่มีการเปลี่ยนแปลงอย่าเขียนบันทึกสตรีม

การเข้าถึงและการประมวลผลบันทึกจำเป็นต้องปฏิบัติงานต่อไปนี้ -

  • กำหนด ARN ของสตรีมเป้าหมาย
  • กำหนดชาร์ดของสตรีมที่เก็บบันทึกเป้าหมาย
  • เข้าถึงชาร์ดเพื่อดึงข้อมูลที่ต้องการ

Note- ควรมีสูงสุด 2 กระบวนการอ่านชาร์ดพร้อมกัน หากเกิน 2 กระบวนการก็สามารถเค้นแหล่งที่มาได้

การดำเนินการสตรีม API ที่มีอยู่ ได้แก่

  • ListStreams
  • DescribeStream
  • GetShardIterator
  • GetRecords

คุณสามารถดูตัวอย่างการอ่านสตรีมต่อไปนี้ -

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClient;

import com.amazonaws.services.dynamodbv2.model.AttributeAction;
import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.AttributeValueUpdate;

import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
import com.amazonaws.services.dynamodbv2.model.DescribeStreamRequest;
import com.amazonaws.services.dynamodbv2.model.DescribeStreamResult;
import com.amazonaws.services.dynamodbv2.model.DescribeTableResult;

import com.amazonaws.services.dynamodbv2.model.GetRecordsRequest;
import com.amazonaws.services.dynamodbv2.model.GetRecordsResult;
import com.amazonaws.services.dynamodbv2.model.GetShardIteratorRequest;
import com.amazonaws.services.dynamodbv2.model.GetShardIteratorResult;

import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
import com.amazonaws.services.dynamodbv2.model.KeyType;
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
import com.amazonaws.services.dynamodbv2.model.Record;

import com.amazonaws.services.dynamodbv2.model.Shard;
import com.amazonaws.services.dynamodbv2.model.ShardIteratorType;
import com.amazonaws.services.dynamodbv2.model.StreamSpecification;
import com.amazonaws.services.dynamodbv2.model.StreamViewType;
import com.amazonaws.services.dynamodbv2.util.Tables;

public class StreamsExample {
   private static AmazonDynamoDBClient dynamoDBClient =  
      new AmazonDynamoDBClient(new ProfileCredentialsProvider());  
   private static AmazonDynamoDBStreamsClient streamsClient =  
      new AmazonDynamoDBStreamsClient(new ProfileCredentialsProvider());  

   public static void main(String args[]) {  
      dynamoDBClient.setEndpoint("InsertDbEndpointHere");   
      streamsClient.setEndpoint("InsertStreamEndpointHere");    
      
      // table creation 
      String tableName = "MyTestingTable";  
      ArrayList<AttributeDefinition> attributeDefinitions =  
         new ArrayList<AttributeDefinition>();  
      
      attributeDefinitions.add(new AttributeDefinition()
         .withAttributeName("ID") 
         .withAttributeType("N"));
         
      ArrayList<KeySchemaElement> keySchema = new 
         ArrayList<KeySchemaElement>(); 
      
      keySchema.add(new KeySchemaElement() 
         .withAttributeName("ID") 
         .withKeyType(KeyType.HASH));                       //Partition key

      StreamSpecification streamSpecification = new StreamSpecification(); 
      streamSpecification.setStreamEnabled(true); 
      streamSpecification.setStreamViewType(StreamViewType.NEW_AND_OLD_IMAGES);  
      CreateTableRequest createTableRequest = new CreateTableRequest() 
         .withTableName(tableName) 
         .withKeySchema(keySchema) 
         .withAttributeDefinitions(attributeDefinitions) 
         .withProvisionedThroughput(new ProvisionedThroughput() 
         .withReadCapacityUnits(1L) 
         .withWriteCapacityUnits(1L))
         .withStreamSpecification(streamSpecification);  
      
      System.out.println("Executing CreateTable for " + tableName); 
      dynamoDBClient.createTable(createTableRequest);  
      System.out.println("Creating " + tableName); 
      
      try { 
         Tables.awaitTableToBecomeActive(dynamoDBClient, tableName); 
      } catch (InterruptedException e) { 
         e.printStackTrace(); 
      } 
         
      // Get the table's stream settings 
      DescribeTableResult describeTableResult =
         dynamoDBClient.describeTable(tableName);  
      
      String myStreamArn = describeTableResult.getTable().getLatestStreamArn(); 
      StreamSpecification myStreamSpec =  
         describeTableResult.getTable().getStreamSpecification();  
      
      System.out.println("Current stream ARN for " + tableName + ": "+ myStreamArn);
      System.out.println("Stream enabled: "+ myStreamSpec.getStreamEnabled()); 
      System.out.println("Update view type: "+ myStreamSpec.getStreamViewType());  
      
      // Add an item 
      int numChanges = 0; 
      System.out.println("Making some changes to table data"); 
      Map<String, AttributeValue> item = new HashMap<String, AttributeValue>(); 
      item.put("ID", new AttributeValue().withN("222")); 
      item.put("Alert", new AttributeValue().withS("item!")); 
      dynamoDBClient.putItem(tableName, item); 
      numChanges++;  
      
      // Update the item         
      Map<String, AttributeValue> key = new HashMap<String, AttributeValue>(); 
      key.put("ID", new AttributeValue().withN("222")); 
      Map<String, AttributeValueUpdate> attributeUpdates =  
      new HashMap<String, AttributeValueUpdate>(); 
      
      attributeUpdates.put("Alert", new AttributeValueUpdate() 
         .withAction(AttributeAction.PUT) 
         .withValue(new AttributeValue().withS("modified item"))); 
      
      dynamoDBClient.updateItem(tableName, key, attributeUpdates); 
      numChanges++;   
      
      // Delete the item         
      dynamoDBClient.deleteItem(tableName, key);  
      numChanges++;
      
      // Get stream shards         
      DescribeStreamResult describeStreamResult =  
      streamsClient.describeStream(new DescribeStreamRequest() 
         .withStreamArn(myStreamArn)); 
      String streamArn =  
         describeStreamResult.getStreamDescription().getStreamArn(); 
      List<Shard> shards =  
         describeStreamResult.getStreamDescription().getShards();  
      
      // Process shards 
      for (Shard shard : shards) { 
         String shardId = shard.getShardId(); 
         System.out.println("Processing " + shardId + " in "+ streamArn);  
         
         // Get shard iterator 
         GetShardIteratorRequest getShardIteratorRequest = new 
            GetShardIteratorRequest() 
            .withStreamArn(myStreamArn) 
            .withShardId(shardId) 
            .withShardIteratorType(ShardIteratorType.TRIM_HORIZON); 
         
         GetShardIteratorResult getShardIteratorResult =  
            streamsClient.getShardIterator(getShardIteratorRequest); 
         String nextItr = getShardIteratorResult.getShardIterator();  
         
         while (nextItr != null && numChanges > 0) { 
            // Read data records with iterator                 
            GetRecordsResult getRecordsResult =  
               streamsClient.getRecords(new GetRecordsRequest(). 
               withShardIterator(nextItr));
               
            List<Record> records = getRecordsResult.getRecords(); 
            System.out.println("Pulling records...");  
               
            for (Record record : records) { 
               System.out.println(record); 
               numChanges--;
            } 
            nextItr = getRecordsResult.getNextShardIterator(); 
         } 
      } 
   } 
}