डायनेमोडीबी - टेबल गतिविधि

DynamoDB स्ट्रीम आपको टेबल आइटम परिवर्तनों पर नज़र रखने और प्रतिक्रिया करने में सक्षम बनाती हैं। एक एप्लिकेशन बनाने के लिए इस कार्यक्षमता पर काम करें जो स्रोतों में जानकारी को अपडेट करके परिवर्तनों का जवाब देती है। एक बड़े, बहु-उपयोगकर्ता प्रणाली के हजारों उपयोगकर्ताओं के लिए डेटा सिंक्रनाइज़ करें। अपडेट पर उपयोगकर्ताओं को सूचनाएं भेजने के लिए इसका उपयोग करें। इसके अनुप्रयोग विविध और पर्याप्त साबित होते हैं। डायनामोबीडी धारा इस कार्यक्षमता को प्राप्त करने के लिए मुख्य उपकरण के रूप में काम करती है।

धाराएँ तालिका के भीतर आइटम संशोधनों वाले समय-क्रम वाले अनुक्रमों को पकड़ती हैं। वे अधिकतम 24 घंटे के लिए इस डेटा को रखते हैं। एप्लिकेशन उन्हें वास्तविक समय में लगभग मूल और संशोधित आइटम देखने के लिए उपयोग करते हैं।

तालिका में सक्षम स्ट्रीम सभी संशोधनों को कैप्चर करती हैं। किसी भी CRUD ऑपरेशन पर, डायनेमोडीबी संशोधित वस्तुओं की प्राथमिक प्रमुख विशेषताओं के साथ एक स्ट्रीम रिकॉर्ड बनाता है। आप पहले और बाद की छवियों जैसे अतिरिक्त जानकारी के लिए स्ट्रीम कॉन्फ़िगर कर सकते हैं।

धाराएं दो गारंटी लेती हैं -

  • प्रत्येक रिकॉर्ड स्ट्रीम में एक बार दिखाई देता है और

  • प्रत्येक आइटम संशोधन संशोधनों के समान क्रम के स्ट्रीम रिकॉर्ड में परिणाम करता है।

सभी स्ट्रीम वास्तविक समय में प्रक्रिया करते हैं ताकि आप उन्हें अनुप्रयोगों में संबंधित कार्यक्षमता के लिए नियोजित कर सकें।

धाराओं का प्रबंधन

टेबल निर्माण पर, आप एक स्ट्रीम को सक्षम कर सकते हैं। मौजूदा तालिकाएँ स्ट्रीम को अक्षम करने या सेटिंग बदलने की अनुमति देती हैं। धाराएँ अतुल्यकालिक ऑपरेशन की सुविधा प्रदान करती हैं, जिसका अर्थ है तालिका प्रदर्शन प्रभाव नहीं।

सरल धारा प्रबंधन के लिए AWS प्रबंधन कंसोल का उपयोग करें। सबसे पहले, कंसोल पर नेविगेट करें, और चुनेंTables। अवलोकन टैब में, चुनेंManage Stream। विंडो के अंदर, टेबल डेटा संशोधनों पर एक स्ट्रीम में जोड़ी गई जानकारी का चयन करें। सभी सेटिंग्स दर्ज करने के बाद, चयन करेंEnable

यदि आप किसी भी मौजूदा स्ट्रीम को अक्षम करना चाहते हैं, तो चुनें Manage Stream, और फिर Disable

किसी स्ट्रीम को सक्षम या परिवर्तित करने के लिए आप APIs CreateTable और UpdateTable का भी उपयोग कर सकते हैं। स्ट्रीम को कॉन्फ़िगर करने के लिए पैरामीटर StreamSpecification का उपयोग करें। StreamEnabled स्थिति निर्दिष्ट करता है, जिसका अर्थ है सक्षम के लिए सत्य और अक्षम के लिए असत्य।

StreamViewType स्ट्रीम में जोड़ी गई जानकारी को निर्दिष्ट करता है: KEYS_ONLY, NEW_IMAGE, OLD_IMAGE और NEW_AND_OLD_IMAGES।

स्ट्रीम पढ़ना

एक समापन बिंदु से कनेक्ट करके और एपीआई अनुरोध बनाकर स्ट्रीम पढ़ें और प्रक्रिया करें। प्रत्येक स्ट्रीम में स्ट्रीम रिकॉर्ड होते हैं, और प्रत्येक रिकॉर्ड एकल संशोधन के रूप में मौजूद होता है जो स्ट्रीम का मालिक होता है। स्ट्रीम रिकॉर्ड में प्रकाशन क्रम का एक क्रम संख्या शामिल है। अभिलेख समूहों से संबंधित हैं जिन्हें शार्क के रूप में भी जाना जाता है। कई रिकॉर्ड के लिए कंटेनर के रूप में कार्य करता है, और रिकॉर्ड तक पहुंचने और ट्रैवर्स करने के लिए आवश्यक जानकारी भी रखता है। 24 घंटों के बाद, रिकॉर्ड स्वचालित रूप से हटा देते हैं।

ये Shards आवश्यकतानुसार उत्पन्न और हटाए जाते हैं, और लंबे समय तक नहीं रहते हैं। वे स्वचालित रूप से कई नए शार्क में विभाजित होते हैं, आमतौर पर गतिविधि स्पाइक लिखने के लिए। स्ट्रीम डिसएबल करने पर, ओपन शार्ड्स करीब। शार्क के बीच पदानुक्रमित संबंध का मतलब है कि अनुप्रयोगों को सही प्रसंस्करण क्रम के लिए माता-पिता की शार्क को प्राथमिकता देनी चाहिए। स्वचालित रूप से ऐसा करने के लिए आप Kinesis Adapter का उपयोग कर सकते हैं।

Note - बिना किसी बदलाव के किए गए ऑपरेशन स्ट्रीम रिकॉर्ड नहीं लिखते हैं।

एक्सेस और प्रोसेसिंग रिकॉर्ड के लिए निम्नलिखित कार्य करने की आवश्यकता होती है -

  • लक्ष्य स्ट्रीम के ARN को निर्धारित करें।
  • लक्ष्य रिकॉर्ड रखने वाली धारा के शार्ड (एस) को निर्धारित करें।
  • वांछित रिकॉर्ड्स को पुनः प्राप्त करने के लिए शार्क (एस) पर पहुँचें।

Note- एक बार में एक शार्क पढ़ने की अधिकतम 2 प्रक्रियाएं होनी चाहिए। यदि यह 2 प्रक्रियाओं से अधिक है, तो यह स्रोत को कुचलना कर सकता है।

उपलब्ध स्ट्रीम कार्रवाई में शामिल हैं

  • ListStreams
  • DescribeStream
  • GetShardIterator
  • GetRecords

आप स्ट्रीम पढ़ने के निम्नलिखित उदाहरण की समीक्षा कर सकते हैं -

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClient;

import com.amazonaws.services.dynamodbv2.model.AttributeAction;
import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.AttributeValueUpdate;

import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
import com.amazonaws.services.dynamodbv2.model.DescribeStreamRequest;
import com.amazonaws.services.dynamodbv2.model.DescribeStreamResult;
import com.amazonaws.services.dynamodbv2.model.DescribeTableResult;

import com.amazonaws.services.dynamodbv2.model.GetRecordsRequest;
import com.amazonaws.services.dynamodbv2.model.GetRecordsResult;
import com.amazonaws.services.dynamodbv2.model.GetShardIteratorRequest;
import com.amazonaws.services.dynamodbv2.model.GetShardIteratorResult;

import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
import com.amazonaws.services.dynamodbv2.model.KeyType;
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
import com.amazonaws.services.dynamodbv2.model.Record;

import com.amazonaws.services.dynamodbv2.model.Shard;
import com.amazonaws.services.dynamodbv2.model.ShardIteratorType;
import com.amazonaws.services.dynamodbv2.model.StreamSpecification;
import com.amazonaws.services.dynamodbv2.model.StreamViewType;
import com.amazonaws.services.dynamodbv2.util.Tables;

public class StreamsExample {
   private static AmazonDynamoDBClient dynamoDBClient =  
      new AmazonDynamoDBClient(new ProfileCredentialsProvider());  
   private static AmazonDynamoDBStreamsClient streamsClient =  
      new AmazonDynamoDBStreamsClient(new ProfileCredentialsProvider());  

   public static void main(String args[]) {  
      dynamoDBClient.setEndpoint("InsertDbEndpointHere");   
      streamsClient.setEndpoint("InsertStreamEndpointHere");    
      
      // table creation 
      String tableName = "MyTestingTable";  
      ArrayList<AttributeDefinition> attributeDefinitions =  
         new ArrayList<AttributeDefinition>();  
      
      attributeDefinitions.add(new AttributeDefinition()
         .withAttributeName("ID") 
         .withAttributeType("N"));
         
      ArrayList<KeySchemaElement> keySchema = new 
         ArrayList<KeySchemaElement>(); 
      
      keySchema.add(new KeySchemaElement() 
         .withAttributeName("ID") 
         .withKeyType(KeyType.HASH));                       //Partition key

      StreamSpecification streamSpecification = new StreamSpecification(); 
      streamSpecification.setStreamEnabled(true); 
      streamSpecification.setStreamViewType(StreamViewType.NEW_AND_OLD_IMAGES);  
      CreateTableRequest createTableRequest = new CreateTableRequest() 
         .withTableName(tableName) 
         .withKeySchema(keySchema) 
         .withAttributeDefinitions(attributeDefinitions) 
         .withProvisionedThroughput(new ProvisionedThroughput() 
         .withReadCapacityUnits(1L) 
         .withWriteCapacityUnits(1L))
         .withStreamSpecification(streamSpecification);  
      
      System.out.println("Executing CreateTable for " + tableName); 
      dynamoDBClient.createTable(createTableRequest);  
      System.out.println("Creating " + tableName); 
      
      try { 
         Tables.awaitTableToBecomeActive(dynamoDBClient, tableName); 
      } catch (InterruptedException e) { 
         e.printStackTrace(); 
      } 
         
      // Get the table's stream settings 
      DescribeTableResult describeTableResult =
         dynamoDBClient.describeTable(tableName);  
      
      String myStreamArn = describeTableResult.getTable().getLatestStreamArn(); 
      StreamSpecification myStreamSpec =  
         describeTableResult.getTable().getStreamSpecification();  
      
      System.out.println("Current stream ARN for " + tableName + ": "+ myStreamArn);
      System.out.println("Stream enabled: "+ myStreamSpec.getStreamEnabled()); 
      System.out.println("Update view type: "+ myStreamSpec.getStreamViewType());  
      
      // Add an item 
      int numChanges = 0; 
      System.out.println("Making some changes to table data"); 
      Map<String, AttributeValue> item = new HashMap<String, AttributeValue>(); 
      item.put("ID", new AttributeValue().withN("222")); 
      item.put("Alert", new AttributeValue().withS("item!")); 
      dynamoDBClient.putItem(tableName, item); 
      numChanges++;  
      
      // Update the item         
      Map<String, AttributeValue> key = new HashMap<String, AttributeValue>(); 
      key.put("ID", new AttributeValue().withN("222")); 
      Map<String, AttributeValueUpdate> attributeUpdates =  
      new HashMap<String, AttributeValueUpdate>(); 
      
      attributeUpdates.put("Alert", new AttributeValueUpdate() 
         .withAction(AttributeAction.PUT) 
         .withValue(new AttributeValue().withS("modified item"))); 
      
      dynamoDBClient.updateItem(tableName, key, attributeUpdates); 
      numChanges++;   
      
      // Delete the item         
      dynamoDBClient.deleteItem(tableName, key);  
      numChanges++;
      
      // Get stream shards         
      DescribeStreamResult describeStreamResult =  
      streamsClient.describeStream(new DescribeStreamRequest() 
         .withStreamArn(myStreamArn)); 
      String streamArn =  
         describeStreamResult.getStreamDescription().getStreamArn(); 
      List<Shard> shards =  
         describeStreamResult.getStreamDescription().getShards();  
      
      // Process shards 
      for (Shard shard : shards) { 
         String shardId = shard.getShardId(); 
         System.out.println("Processing " + shardId + " in "+ streamArn);  
         
         // Get shard iterator 
         GetShardIteratorRequest getShardIteratorRequest = new 
            GetShardIteratorRequest() 
            .withStreamArn(myStreamArn) 
            .withShardId(shardId) 
            .withShardIteratorType(ShardIteratorType.TRIM_HORIZON); 
         
         GetShardIteratorResult getShardIteratorResult =  
            streamsClient.getShardIterator(getShardIteratorRequest); 
         String nextItr = getShardIteratorResult.getShardIterator();  
         
         while (nextItr != null && numChanges > 0) { 
            // Read data records with iterator                 
            GetRecordsResult getRecordsResult =  
               streamsClient.getRecords(new GetRecordsRequest(). 
               withShardIterator(nextItr));
               
            List<Record> records = getRecordsResult.getRecords(); 
            System.out.println("Pulling records...");  
               
            for (Record record : records) { 
               System.out.println(record); 
               numChanges--;
            } 
            nextItr = getRecordsResult.getNextShardIterator(); 
         } 
      } 
   } 
}