📅  最后修改于: 2020-11-28 14:09:10             🧑  作者: Mango
通过DynamoDB流,您可以跟踪并响应表项的更改。使用此功能来创建一个应用程序,该应用程序通过更新跨源的信息来响应更改。为大型多用户系统的数千个用户同步数据。使用它可以将更新通知发送给用户。它的应用证明是多种多样的。 DynamoDB流用作实现此功能的主要工具。
流捕获表中包含项修改的按时间顺序排列的序列。他们最多保留此数据24小时。应用程序使用它们几乎实时地查看原始和修改的项目。
在表上启用的流将捕获所有修改。在任何CRUD操作上,DynamoDB都会使用修改后的项目的主键属性创建一个流记录。您可以配置流以获取其他信息,例如图像之前和之后。
流有两个保证-
每条记录在流中出现一次,并且
每个项目修改都会导致流记录的顺序与修改顺序相同。
所有流均实时处理,以允许您将其用于应用程序中的相关功能。
创建表时,可以启用流。现有表允许禁用流或更改设置。流提供异步操作的功能,这意味着不影响表性能。
利用AWS管理控制台进行简单的流管理。首先,导航到控制台,然后选择Tables 。在总览标签中,选择管理流。在窗口内,选择添加到有关表数据修改的流中的信息。输入所有设置后,选择启用。
如果要禁用任何现有流,请选择“管理流” ,然后选择“禁用” 。
您还可以利用API CreateTable和UpdateTable启用或更改流。使用参数StreamSpecification来配置流。 StreamEnabled指定状态,含义为true表示启用,值为false表示禁用。
StreamViewType指定添加到流的信息:KEYS_ONLY,NEW_IMAGE,OLD_IMAGE和NEW_AND_OLD_IMAGES。
通过连接到端点并发出API请求来读取和处理流。每个流都包含流记录,每个记录都作为拥有该流的单个修改而存在。流记录包括揭示发布顺序的序列号。记录属于也称为分片的组。碎片函数几个记录的容器,并且还举办需要访问和遍历记录的信息。 24小时后,记录将自动删除。
这些碎片会根据需要生成和删除,并且不会持续很长时间。它们通常会根据写入活动峰值自动自动划分为多个新分片。禁用流时,打开的碎片关闭。分片之间的层次关系意味着应用程序必须为父分片确定优先级,以确保正确的处理顺序。您可以使用Kinesis Adapter自动执行此操作。
注–导致不变的操作不会写入流记录。
访问和处理记录需要执行以下任务-
注–一次读取一个碎片最多应有2个进程。如果超过2个进程,则可以限制源。
可用的流API操作包括
您可以查看以下流读取示例-
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreamsClient;
import com.amazonaws.services.dynamodbv2.model.AttributeAction;
import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import com.amazonaws.services.dynamodbv2.model.AttributeValueUpdate;
import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
import com.amazonaws.services.dynamodbv2.model.DescribeStreamRequest;
import com.amazonaws.services.dynamodbv2.model.DescribeStreamResult;
import com.amazonaws.services.dynamodbv2.model.DescribeTableResult;
import com.amazonaws.services.dynamodbv2.model.GetRecordsRequest;
import com.amazonaws.services.dynamodbv2.model.GetRecordsResult;
import com.amazonaws.services.dynamodbv2.model.GetShardIteratorRequest;
import com.amazonaws.services.dynamodbv2.model.GetShardIteratorResult;
import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
import com.amazonaws.services.dynamodbv2.model.KeyType;
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
import com.amazonaws.services.dynamodbv2.model.Record;
import com.amazonaws.services.dynamodbv2.model.Shard;
import com.amazonaws.services.dynamodbv2.model.ShardIteratorType;
import com.amazonaws.services.dynamodbv2.model.StreamSpecification;
import com.amazonaws.services.dynamodbv2.model.StreamViewType;
import com.amazonaws.services.dynamodbv2.util.Tables;
public class StreamsExample {
private static AmazonDynamoDBClient dynamoDBClient =
new AmazonDynamoDBClient(new ProfileCredentialsProvider());
private static AmazonDynamoDBStreamsClient streamsClient =
new AmazonDynamoDBStreamsClient(new ProfileCredentialsProvider());
public static void main(String args[]) {
dynamoDBClient.setEndpoint("InsertDbEndpointHere");
streamsClient.setEndpoint("InsertStreamEndpointHere");
// table creation
String tableName = "MyTestingTable";
ArrayList attributeDefinitions =
new ArrayList();
attributeDefinitions.add(new AttributeDefinition()
.withAttributeName("ID")
.withAttributeType("N"));
ArrayList keySchema = new
ArrayList();
keySchema.add(new KeySchemaElement()
.withAttributeName("ID")
.withKeyType(KeyType.HASH)); //Partition key
StreamSpecification streamSpecification = new StreamSpecification();
streamSpecification.setStreamEnabled(true);
streamSpecification.setStreamViewType(StreamViewType.NEW_AND_OLD_IMAGES);
CreateTableRequest createTableRequest = new CreateTableRequest()
.withTableName(tableName)
.withKeySchema(keySchema)
.withAttributeDefinitions(attributeDefinitions)
.withProvisionedThroughput(new ProvisionedThroughput()
.withReadCapacityUnits(1L)
.withWriteCapacityUnits(1L))
.withStreamSpecification(streamSpecification);
System.out.println("Executing CreateTable for " + tableName);
dynamoDBClient.createTable(createTableRequest);
System.out.println("Creating " + tableName);
try {
Tables.awaitTableToBecomeActive(dynamoDBClient, tableName);
} catch (InterruptedException e) {
e.printStackTrace();
}
// Get the table's stream settings
DescribeTableResult describeTableResult =
dynamoDBClient.describeTable(tableName);
String myStreamArn = describeTableResult.getTable().getLatestStreamArn();
StreamSpecification myStreamSpec =
describeTableResult.getTable().getStreamSpecification();
System.out.println("Current stream ARN for " + tableName + ": "+ myStreamArn);
System.out.println("Stream enabled: "+ myStreamSpec.getStreamEnabled());
System.out.println("Update view type: "+ myStreamSpec.getStreamViewType());
// Add an item
int numChanges = 0;
System.out.println("Making some changes to table data");
Map item = new HashMap();
item.put("ID", new AttributeValue().withN("222"));
item.put("Alert", new AttributeValue().withS("item!"));
dynamoDBClient.putItem(tableName, item);
numChanges++;
// Update the item
Map key = new HashMap();
key.put("ID", new AttributeValue().withN("222"));
Map attributeUpdates =
new HashMap();
attributeUpdates.put("Alert", new AttributeValueUpdate()
.withAction(AttributeAction.PUT)
.withValue(new AttributeValue().withS("modified item")));
dynamoDBClient.updateItem(tableName, key, attributeUpdates);
numChanges++;
// Delete the item
dynamoDBClient.deleteItem(tableName, key);
numChanges++;
// Get stream shards
DescribeStreamResult describeStreamResult =
streamsClient.describeStream(new DescribeStreamRequest()
.withStreamArn(myStreamArn));
String streamArn =
describeStreamResult.getStreamDescription().getStreamArn();
List shards =
describeStreamResult.getStreamDescription().getShards();
// Process shards
for (Shard shard : shards) {
String shardId = shard.getShardId();
System.out.println("Processing " + shardId + " in "+ streamArn);
// Get shard iterator
GetShardIteratorRequest getShardIteratorRequest = new
GetShardIteratorRequest()
.withStreamArn(myStreamArn)
.withShardId(shardId)
.withShardIteratorType(ShardIteratorType.TRIM_HORIZON);
GetShardIteratorResult getShardIteratorResult =
streamsClient.getShardIterator(getShardIteratorRequest);
String nextItr = getShardIteratorResult.getShardIterator();
while (nextItr != null && numChanges > 0) {
// Read data records with iterator
GetRecordsResult getRecordsResult =
streamsClient.getRecords(new GetRecordsRequest().
withShardIterator(nextItr));
List records = getRecordsResult.getRecords();
System.out.println("Pulling records...");
for (Record record : records) {
System.out.println(record);
numChanges--;
}
nextItr = getRecordsResult.getNextShardIterator();
}
}
}
}