如何解决SNS FIFO 主题不会将消息扇出到 SQS FIFO 队列
我正在尝试使用 SQS FIFO 队列探索 SNS FIFO 主题,这是我刚刚尝试过的。我创建了 SNS FIFO 主题和 SQS FIFO 队列,并将 FIFO 队列订阅到了 FIFO 主题。根据文档,对于上述设置,每当我们将消息发布到 SNS FIFO 队列时,它应该将该消息扇出到 SQS 队列,但这并没有发生。我能够获得 PublishResult#getMessageId() 意味着发布部分正在成功发生,但队列中没有任何消息。由于 SNS FIFO 主题不支持电子邮件协议订阅,因此我可以断言此发布-订阅架构的唯一方法是从队列中轮询消息。由于没有发生扇出,队列似乎总是空的。
完整的代码块:
import com.amazonaws.auth.AWsstaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.sns.AmazonSNS;
import com.amazonaws.services.sns.AmazonSNSClientBuilder;
import com.amazonaws.services.sns.model.CreatetopicRequest;
import com.amazonaws.services.sns.model.CreatetopicResult;
import com.amazonaws.services.sns.model.PublishRequest;
import com.amazonaws.services.sns.model.PublishResult;
import com.amazonaws.services.sns.model.SubscribeRequest;
import com.amazonaws.services.sns.model.SubscribeResult;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.CreateQueueRequest;
import com.amazonaws.services.sqs.model.CreateQueueResult;
import com.amazonaws.services.sqs.model.GetQueueAttributesRequest;
import com.amazonaws.services.sqs.model.GetQueueAttributesResult;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.UUID;
class FifoTopicsITest {
@Test
void test() {
final String topicName = UUID.randomUUID().toString().substring(15);
//creating sns client
AmazonSNS amazonSNS = AmazonSNSClientBuilder.standard()
.withCredentials(new AWsstaticCredentialsProvider(new BasicAWSCredentials(
"<accessKey>","<secretKey>")))
.withEndpointConfiguration(new AwsClientBuilder
.EndpointConfiguration("https://sns.us-west-1.amazonaws.com","us-west-1")).build();
//creating sqs client
AmazonSQS amazonSQS = AmazonSQSClientBuilder.standard()
.withCredentials(new AWsstaticCredentialsProvider(new BasicAWSCredentials(
"<accessKey>","<secretKey>")))
.withEndpointConfiguration(new AwsClientBuilder
.EndpointConfiguration("https://sqs.us-west-1.amazonaws.com","us-west-1")).build();
//creating SNS topic
CreatetopicRequest createtopicRequest = new CreatetopicRequest().withName(topicName + ".fifo");
createtopicRequest
.addAttributesEntry("FifoTopic","true")
.addAttributesEntry("ContentBasedDeduplication","false");
CreatetopicResult topicResult = amazonSNS.createtopic(createtopicRequest);
String topicArn = topicResult.getTopicArn();
//creating dead-letter sqs queue
CreateQueueRequest createDLQQueueRequest = new CreateQueueRequest();
createDLQQueueRequest.addAttributesEntry("FifoQueue","true");
createDLQQueueRequest.addAttributesEntry("ContentBasedDeduplication","false");
createDLQQueueRequest.withQueueName(topicName + "_DLQ_" + ".fifo");
CreateQueueResult createDeadLetterQueueResult = amazonSQS.createQueue(createDLQQueueRequest);
//getting ARN value of dead-letter queue
GetQueueAttributesResult getQueueAttributesResult = amazonSQS.getQueueAttributes(
new GetQueueAttributesRequest(createDeadLetterQueueResult.getQueueUrl())
.withAttributeNames("QueueArn"));
String deleteQueueArn = getQueueAttributesResult.getAttributes().get("QueueArn");
//creating sqs queue
CreateQueueRequest createQueueRequest = new CreateQueueRequest();
createQueueRequest.addAttributesEntry("FifoQueue","true");
createQueueRequest.addAttributesEntry("ContentBasedDeduplication","false");
createQueueRequest.withQueueName(topicName + ".fifo");
String reDrivePolicy = "{\"maxReceiveCount\":\"5\",\"deadLetterTargetArn\":\""
+ deleteQueueArn + "\"}";
createQueueRequest.addAttributesEntry("RedrivePolicy",reDrivePolicy);
CreateQueueResult createQueueResult = amazonSQS.createQueue(createQueueRequest);
String queueUrl = createQueueResult.getQueueUrl();
//getting ARN value of queue
getQueueAttributesResult = amazonSQS.getQueueAttributes(
new GetQueueAttributesRequest(queueUrl)
.withAttributeNames("QueueArn"));
String queueArn = getQueueAttributesResult.getAttributes().get("QueueArn");
//Subscribe FIFO queue to FIFO Topic
SubscribeRequest subscribeRequest = new SubscribeRequest();
subscribeRequest.withProtocol("sqs")
.withTopicArn(topicArn)
.withEndpoint(queueArn);
SubscribeResult subscribeResult = amazonSNS.subscribe(subscribeRequest);
Assertions.assertNotNull(subscribeResult.getSubscriptionArn());
//Publishing 4 sample message to FIFO SNS Topic
for (int i = 0; i < 5; i++) {
PublishRequest publishRequest = new PublishRequest()
.withTopicArn(topicArn)
.withMessage("Test Message" + i)
.withMessageGroupId(topicName)
.withMessageDeduplicationId(UUID.randomUUID().toString());
PublishResult publishResult = amazonSNS.publish(publishRequest);
Assertions.assertNotNull(publishResult.getMessageId());
}
//Getting ApproximateNumberOfMessages no of messages from the FIFO Queue
getQueueAttributesResult = amazonSQS.getQueueAttributes(
new GetQueueAttributesRequest(queueUrl)
.withAttributeNames("All"));
String approximateNumberOfMessages = getQueueAttributesResult.getAttributes()
.get("ApproximateNumberOfMessages");
//My expectation here is SNS FIFO topic should have fanout the 4 published message to SQS FIFO Queue
Assertions.assertEquals(4,Integer.valueOf(approximateNumberOfMessages));
}
}
SNS 访问策略(权限)
{
"Version": "2008-10-17","Id": "__default_policy_ID","Statement": [
{
"Sid": "__default_statement_ID","Effect": "Allow","Principal": {
"AWS": "*"
},"Action": [
"SNS:GetTopicAttributes","SNS:SetTopicAttributes","SNS:AddPermission","SNS:RemovePermission","SNS:Deletetopic","SNS:Subscribe","SNS:ListSubscriptionsByTopic","SNS:Publish","SNS:Receive"
],"Resource": "arn:aws:sns:us-west-1:<account>:<topicName>.fifo","Condition": {
"StringEquals": {
"AWS:SourceOwner": "<account>"
}
}
}
]
}
SQS 访问政策(权限)
{
"Version": "2012-10-17","Id": "arn:aws:sqs:us-west-1:<account>:<topicName>.fifo/SQSDefaultPolicy"
}
我错过了什么?为什么消息不存在于 SQS 队列中。我应该对下面的 SQS 队列权限做些什么?
{
"Id": "Policy1611770719125","Version": "2012-10-17","Statement": [
{
"Sid": "Stmt1611770707743","Action": [
"sqs:GetQueueAttributes","sqs:GetQueueUrl","sqs:ListQueueTags","sqs:ListQueues","sqs:ReceiveMessage","sqs:SendMessage","sqs:SendMessageBatch","sqs:SetQueueAttributes"
],"Resource": "arn:aws:sqs:us-west-1:<account>:<queueName>.fifo","Principal": {
"AWS": "*"
}
}
]
}
解决方法
为后人分享我的答案,因为怀疑实际问题与socket.on("after2Sec",(cashOut) => {
console.log(cashOut);
})
有关,当我们使用 AWS SDK V1 创建 FIFO SNS 队列并将其订阅到 SQS FIFO 队列时,默认访问策略如下
Access Policy
即使我尝试使用 AWS SDK v2 link 创建 SQS FIFO 队列,上述访问策略也是相同的。因此,当我如下手动更改访问策略时,问题已得到解决,FIFO SNS 主题扇出按指定方式发生:
{
"Version": "2012-10-17","Id": "arn:aws:sqs:us-west-1:<account>:<topicName>.fifo/SQSDefaultPolicy"
}
为每个 FIFO 队列添加上述 {
"Statement": [
{
"Action": [
"sqs:*"
],"Effect": "Allow","Resource": "arn:aws:sqs:us-west-1:<account>:<queueName>.fifo","Principal": {
"AWS": "*"
}
}
]
}
的代码块:
Access policy
在创建 SQS FIFO 队列后添加上述代码块最终解决了问题。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。