微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

SNS FIFO 主题不会将消息扇出到 SQS FIFO 队列

如何解决SNS FIFO 主题不会将消息扇出到 SQS FIFO 队列

我正在尝试使用 SQS FIFO 队列探索 SNS FIFO 主题,这是我刚刚尝试过的。我创建了 SNS FIFO 主题和 SQS FIFO 队列,并将 FIFO 队列订阅到了 FIFO 主题。根据文档,对于上述设置,每当我们将消息发布到 SNS FIFO 队列时,它应该将该消息扇出到 SQS 队列,但这并没有发生。我能够获得 PublishResult#getMessageId() 意味着发布部分正在成功发生,但队列中没有任何消息。由于 SNS FIFO 主题不支持电子邮件协议订阅,因此我可以断言此发布-订阅架构的唯一方法是从队列中轮询消息。由于没有发生扇出,队列似乎总是空的。

完整的代码块:

import com.amazonaws.auth.AWsstaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.sns.AmazonSNS;
import com.amazonaws.services.sns.AmazonSNSClientBuilder;
import com.amazonaws.services.sns.model.CreatetopicRequest;
import com.amazonaws.services.sns.model.CreatetopicResult;
import com.amazonaws.services.sns.model.PublishRequest;
import com.amazonaws.services.sns.model.PublishResult;
import com.amazonaws.services.sns.model.SubscribeRequest;
import com.amazonaws.services.sns.model.SubscribeResult;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.CreateQueueRequest;
import com.amazonaws.services.sqs.model.CreateQueueResult;
import com.amazonaws.services.sqs.model.GetQueueAttributesRequest;
import com.amazonaws.services.sqs.model.GetQueueAttributesResult;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.UUID;

class FifoTopicsITest {

    @Test
    void test() {
        final String topicName = UUID.randomUUID().toString().substring(15);
        //creating sns client
        AmazonSNS amazonSNS = AmazonSNSClientBuilder.standard()
                .withCredentials(new AWsstaticCredentialsProvider(new BasicAWSCredentials(
                        "<accessKey>","<secretKey>")))
                .withEndpointConfiguration(new AwsClientBuilder
                        .EndpointConfiguration("https://sns.us-west-1.amazonaws.com","us-west-1")).build();
        //creating sqs client
        AmazonSQS amazonSQS = AmazonSQSClientBuilder.standard()
                .withCredentials(new AWsstaticCredentialsProvider(new BasicAWSCredentials(
                "<accessKey>","<secretKey>")))
                .withEndpointConfiguration(new AwsClientBuilder
                        .EndpointConfiguration("https://sqs.us-west-1.amazonaws.com","us-west-1")).build();

        //creating SNS topic
        CreatetopicRequest createtopicRequest = new CreatetopicRequest().withName(topicName + ".fifo");
        createtopicRequest
                .addAttributesEntry("FifoTopic","true")
                .addAttributesEntry("ContentBasedDeduplication","false");
        CreatetopicResult topicResult = amazonSNS.createtopic(createtopicRequest);
        String topicArn = topicResult.getTopicArn();

        //creating dead-letter sqs queue
        CreateQueueRequest createDLQQueueRequest = new CreateQueueRequest();
        createDLQQueueRequest.addAttributesEntry("FifoQueue","true");
        createDLQQueueRequest.addAttributesEntry("ContentBasedDeduplication","false");
        createDLQQueueRequest.withQueueName(topicName + "_DLQ_" + ".fifo");
        CreateQueueResult createDeadLetterQueueResult = amazonSQS.createQueue(createDLQQueueRequest);

        //getting ARN value of dead-letter queue
        GetQueueAttributesResult getQueueAttributesResult = amazonSQS.getQueueAttributes(
                new GetQueueAttributesRequest(createDeadLetterQueueResult.getQueueUrl())
                        .withAttributeNames("QueueArn"));
        String deleteQueueArn = getQueueAttributesResult.getAttributes().get("QueueArn");

        //creating sqs queue
        CreateQueueRequest createQueueRequest = new CreateQueueRequest();
        createQueueRequest.addAttributesEntry("FifoQueue","true");
        createQueueRequest.addAttributesEntry("ContentBasedDeduplication","false");
        createQueueRequest.withQueueName(topicName + ".fifo");
        String reDrivePolicy = "{\"maxReceiveCount\":\"5\",\"deadLetterTargetArn\":\""
                + deleteQueueArn + "\"}";
        createQueueRequest.addAttributesEntry("RedrivePolicy",reDrivePolicy);
        CreateQueueResult createQueueResult = amazonSQS.createQueue(createQueueRequest);
        String queueUrl = createQueueResult.getQueueUrl();

        //getting ARN value of queue
        getQueueAttributesResult = amazonSQS.getQueueAttributes(
                new GetQueueAttributesRequest(queueUrl)
                        .withAttributeNames("QueueArn"));
        String queueArn = getQueueAttributesResult.getAttributes().get("QueueArn");

        //Subscribe FIFO queue to FIFO Topic
        SubscribeRequest subscribeRequest = new SubscribeRequest();
        subscribeRequest.withProtocol("sqs")
                .withTopicArn(topicArn)
                .withEndpoint(queueArn);
        SubscribeResult subscribeResult = amazonSNS.subscribe(subscribeRequest);
        Assertions.assertNotNull(subscribeResult.getSubscriptionArn());

        //Publishing 4 sample message to FIFO SNS Topic
        for (int i = 0; i < 5; i++) {
            PublishRequest publishRequest = new PublishRequest()
                    .withTopicArn(topicArn)
                    .withMessage("Test Message" + i)
                    .withMessageGroupId(topicName)
                    .withMessageDeduplicationId(UUID.randomUUID().toString());
            PublishResult publishResult = amazonSNS.publish(publishRequest);
            Assertions.assertNotNull(publishResult.getMessageId());
        }

        //Getting ApproximateNumberOfMessages no of messages from the FIFO Queue
        getQueueAttributesResult = amazonSQS.getQueueAttributes(
                new GetQueueAttributesRequest(queueUrl)
                        .withAttributeNames("All"));
        String approximateNumberOfMessages = getQueueAttributesResult.getAttributes()
                                                     .get("ApproximateNumberOfMessages");

        //My expectation here is SNS FIFO topic should have fanout the 4 published message to SQS FIFO Queue
        Assertions.assertEquals(4,Integer.valueOf(approximateNumberOfMessages));
    }
}

SNS 访问策略(权限)

{
  "Version": "2008-10-17","Id": "__default_policy_ID","Statement": [
    {
      "Sid": "__default_statement_ID","Effect": "Allow","Principal": {
        "AWS": "*"
      },"Action": [
        "SNS:GetTopicAttributes","SNS:SetTopicAttributes","SNS:AddPermission","SNS:RemovePermission","SNS:Deletetopic","SNS:Subscribe","SNS:ListSubscriptionsByTopic","SNS:Publish","SNS:Receive"
      ],"Resource": "arn:aws:sns:us-west-1:<account>:<topicName>.fifo","Condition": {
        "StringEquals": {
          "AWS:SourceOwner": "<account>"
        }
      }
    }
  ]
}

SQS 访问政策(权限)


{
  "Version": "2012-10-17","Id": "arn:aws:sqs:us-west-1:<account>:<topicName>.fifo/SQSDefaultPolicy"
}

我错过了什么?为什么消息不存在于 SQS 队列中。我应该对下面的 SQS 队列权限做些什么?

{
  "Id": "Policy1611770719125","Version": "2012-10-17","Statement": [
    {
      "Sid": "Stmt1611770707743","Action": [
        "sqs:GetQueueAttributes","sqs:GetQueueUrl","sqs:ListQueueTags","sqs:ListQueues","sqs:ReceiveMessage","sqs:SendMessage","sqs:SendMessageBatch","sqs:SetQueueAttributes"
      ],"Resource": "arn:aws:sqs:us-west-1:<account>:<queueName>.fifo","Principal": {
        "AWS": "*"
      }
    }
  ]
}

解决方法

为后人分享我的答案,因为怀疑实际问题与socket.on("after2Sec",(cashOut) => { console.log(cashOut); }) 有关,当我们使用 AWS SDK V1 创建 FIFO SNS 队列并将其订阅到 SQS FIFO 队列时,默认访问策略如下

Access Policy

即使我尝试使用 AWS SDK v2 link 创建 SQS FIFO 队列,上述访问策略也是相同的。因此,当我如下手动更改访问策略时,问题已得到解决,FIFO SNS 主题扇出按指定方式发生:

{
  "Version": "2012-10-17","Id": "arn:aws:sqs:us-west-1:<account>:<topicName>.fifo/SQSDefaultPolicy"
}

为每个 FIFO 队列添加上述 { "Statement": [ { "Action": [ "sqs:*" ],"Effect": "Allow","Resource": "arn:aws:sqs:us-west-1:<account>:<queueName>.fifo","Principal": { "AWS": "*" } } ] } 的代码块:

Access policy

在创建 SQS FIFO 队列后添加上述代码块最终解决了问题。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。