微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Testcontainers/LocalStack 是否适用于 DynamoDb Streams KCL 1.x?

如何解决Testcontainers/LocalStack 是否适用于 DynamoDb Streams KCL 1.x?

问题说明

我编写了一个程序,它利用 DynamoDb Streams 在发生更新时获得通知,此代码在使用 AWS 服务时运行良好,但在我的集成测试中使用 Testcontainers/Localstack 似乎不起作用。

做了什么

尝试恢复到旧版本的 testcontainer/localstack

Read this article,它在末尾有一个注释,暗示 KCL 1.x 缺少一些阻止它与 Localstack 一起工作的 API

但是,Kinesis Client Library-1.x 没有提供将 AWS CloudWatch 服务端点 URL 作为配置参数的功能

我相信 DynamoDb Kinesis 适配器在内部使用 KCL 1.x,因此,我认为我不能切换到使用 KCL 2.x。顺便说一句,似乎 DynamoDb Kinesis Adapter 已被存档,但 Amazon Docs 仍然引用它,并且该 git 存储库中没有任何内容表明它被存档的原因或使用什么来代替。

发生了什么?

我的程序运行良好,我没有看到任何错误,但我也没有从任何分片中获取任何更新信息

应用设计

基本上集成测试开始,它在 LocalStack 中创建所需的表(确认这是通过 AWS CLI 完成的),然后在 DynamoDb 中放置 3 个项目。 Spring Boot 应用程序启动并将数据从 DynamoDb 读取到一个列表中。集成测试然后调用删除端点,删除端点简单地调用 CrudRepository.delete 函数(通过 Spring Data DynamoDb 实现)。我已确认实际的 DynamoDb 已从 3 个元素变为 2 个元素,并删除了已删除的元素。但是,当我的应用程序从 KCL 获取更新的记录时,应用程序缓存应该会更新;这种情况在 TreatContainers/localStack 中永远不会发生。

应用程序正在使用 InitialPositionInStream.LATEST 读取分片。

应用输出

您可以看到应用程序在 13:42:17.468 启动并准备就绪,项目在 13:42:24.768 被“删除”。然后测试调用 Thread.sleep(1000) 给 KCL 时间来处理任何更改,然后在 13:42:25.793 测试调用服务器以查看该元素是否仍然存在于缓存中,它是什么。

认为这可能是时间问题,LocalStack 处理缓慢,我在测试中添加一个 @Afterall 函数,该函数等待 2 分钟。我使用 curl 调用 13:44:38.723 处的 get 端点并且元素仍然存在,这应该有足够的时间让 Localstack 处理分片的更新。 >

2021-04-05 13:42:07.712  INFO 11414 --- [    Test worker] c.c.u.d.s.processor.MyKclProcessor    : KCL shard lease table,test-table,exists and is ACTIVE
2021-04-05 13:42:07.770  INFO 11414 --- [    Test worker] c.c.u.d.s.processor.MyKclProcessor    : KCL shard lease table,kcl-shard-lease-lock,exists and is ACTIVE
2021-04-05 13:42:07.772  INFO 11414 --- [    Test worker] c.c.u.d.s.processor.MyKclProcessor    : Describing table=test-table
2021-04-05 13:42:07.821  INFO 11414 --- [    Test worker] c.c.u.d.s.processor.MyKclProcessor    : Got description for table=test-table
2021-04-05 13:42:07.822  INFO 11414 --- [    Test worker] c.c.u.d.s.processor.MyKclProcessor    : Got stream arn (arn:aws:dynamodb:us-east-1:000000000000:table/test-table/stream/2021-04-05T17:41:21.860) for table=test-table with tableArn=arn:aws:dynamodb:us-east-1:000000000000:table/test-table
2021-04-05 13:42:07.904  INFO 11414 --- [    Test worker] c.c.u.d.s.processor.MyKclProcessor    : Creating KCL worker
2021-04-05 13:42:07.938  INFO 11414 --- [    Test worker] c.a.s.k.leases.impl.LeaseCoordinator     : With failover time 10000 ms and epsilon 25 ms,LeaseCoordinator will renew leases every 3308 ms,takeleases every 20050 ms,process maximum of 2147483647 leases and steal 1 lease(s) at a time.
2021-04-05 13:42:07.941  INFO 11414 --- [    Test worker] c.a.s.k.clientlibrary.lib.worker.Worker  : Shard sync strategy determined as SHARD_END.
2021-04-05 13:42:07.941  INFO 11414 --- [    Test worker] c.c.u.d.s.processor.MyKclProcessor    : KCL Worker created!
2021-04-05 13:42:07.944  INFO 11414 --- [cessingThread-0] c.a.s.k.clientlibrary.lib.worker.Worker  : Initialization attempt 1
2021-04-05 13:42:07.945  INFO 11414 --- [cessingThread-0] c.a.s.k.clientlibrary.lib.worker.Worker  : Initializing LeaseCoordinator
2021-04-05 13:42:14.525  INFO 11414 --- [    Test worker] c.c.u.d.s.controller.TestItemController  : *** TestItemController Started ***
2021-04-05 13:42:14.650  INFO 11414 --- [cessingThread-0] c.a.s.k.clientlibrary.lib.worker.Worker  : Syncing Kinesis shard info
2021-04-05 13:42:14.656  INFO 11414 --- [cessingThread-0] c.a.s.d.s.DynamoDBStreamsShardSyncer     : syncShardLeases: begin
2021-04-05 13:42:14.656  INFO 11414 --- [cessingThread-0] c.a.s.d.s.DynamoDBStreamsShardSyncer     : getShardList: begin
2021-04-05 13:42:14.757  INFO 11414 --- [cessingThread-0] c.a.s.d.s.DynamoDBStreamsShardSyncer     : getShardList: done
2021-04-05 13:42:14.779  INFO 11414 --- [cessingThread-0] c.a.s.d.s.DynamoDBStreamsShardSyncer     : determineNewLeasesToCreate: begin
2021-04-05 13:42:14.781  INFO 11414 --- [cessingThread-0] c.a.s.d.s.DynamoDBStreamsShardSyncer     : determineNewLeasesToCreate: done
2021-04-05 13:42:14.863  INFO 11414 --- [cessingThread-0] c.a.s.d.s.DynamoDBStreamsShardSyncer     : cleanupGarbageLeases: begin
2021-04-05 13:42:14.863  INFO 11414 --- [cessingThread-0] c.a.s.d.s.DynamoDBStreamsShardSyncer     : cleanupGarbageLeases: done
2021-04-05 13:42:14.863  INFO 11414 --- [cessingThread-0] c.a.s.d.s.DynamoDBStreamsShardSyncer     : cleanupLeasesOfFinishedShards: begin
2021-04-05 13:42:14.863  INFO 11414 --- [cessingThread-0] c.a.s.d.s.DynamoDBStreamsShardSyncer     : cleanupLeasesOfFinishedShards: done
2021-04-05 13:42:14.863  INFO 11414 --- [cessingThread-0] c.a.s.d.s.DynamoDBStreamsShardSyncer     : syncShardLeases: done
2021-04-05 13:42:14.866  INFO 11414 --- [cessingThread-0] c.a.s.k.clientlibrary.lib.worker.Worker  : Starting LeaseCoordinator
2021-04-05 13:42:14.900  INFO 11414 --- [cessingThread-0] c.a.s.kinesis.leases.impl.LeaseRenewer   :  Worker Test-application-lMtQuWkzefmq+ found lease {
  "leaseKey" : "shardId-00000001617600000000-000000000000","leaSEOwner" : "Test-application-lMtQuWkzefmq+","leaseCounter" : 0,"concurrencyToken" : null,"lastCounterIncrementNanos" : null,"checkpoint" : {
    "sequenceNumber" : "LATEST","subSequenceNumber" : 0
  },"pendingCheckpoint" : null,"ownerSwitchesSinceCheckpoint" : 0,"parentShardIds" : [ ]
}
2021-04-05 13:42:14.949  WARN 11414 --- [cessingThread-0] c.a.s.k.metrics.impl.MetricsHelper       : No metrics scope set in thread KCLProcessingThread-0,getMetricsScope returning NullMetricsScope.
2021-04-05 13:42:15.011  INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker       : Worker Test-application-lMtQuWkzefmq+ saw 1 total leases,0 expired leases,1 workers.Unfinished lease target: 1 leases,I have 1 unfinished leases. Finished leases target is 1 and I have 0 finished leases. I will take 0 leases in total.
2021-04-05 13:42:15.011  INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker       : TakeLeases took 0 seconds.
2021-04-05 13:42:16.709  INFO 11414 --- [    Test worker] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 'applicationTaskExecutor'
2021-04-05 13:42:17.309  INFO 11414 --- [    Test worker] o.s.b.a.e.web.EndpointLinksResolver      : Exposing 2 endpoint(s) beneath base path '/actuator'
2021-04-05 13:42:17.451  INFO 11414 --- [    Test worker] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 50376 (http) with context path ''
2021-04-05 13:42:17.464  INFO 11414 --- [    Test worker] d.d.r.u.Entity2DynamoDBTableSynchronizer : Checking repository classes with DynamoDB tables test-table for ContextRefreshedEvent
2021-04-05 13:42:17.468  INFO 11414 --- [    Test worker] c.c.u.dynamodb.DynamoDbStreamsTestApp    : Started DynamoDbStreamsTestApp in 54.763 seconds (JVM running for 77.176)
2021-04-05 13:42:17.872  INFO 11414 --- [    Test worker] c.c.u.dynamodb.DynamoDbStreamsTestApp    : *** ASK SERVER TO DELETE ITEM 23456 ***
2021-04-05 13:42:17.996  INFO 11414 --- [o-auto-1-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring dispatcherServlet 'dispatcherServlet'
2021-04-05 13:42:17.996  INFO 11414 --- [o-auto-1-exec-1] o.s.web.servlet.dispatcherServlet        : Initializing Servlet 'dispatcherServlet'
2021-04-05 13:42:18.019  INFO 11414 --- [o-auto-1-exec-1] o.s.web.servlet.dispatcherServlet        : Completed initialization in 23 ms
2021-04-05 13:42:18.068  INFO 11414 --- [o-auto-1-exec-1] c.c.u.d.s.controller.TestItemController  : Deleting itemNumber=23456
2021-04-05 13:42:21.670  INFO 11414 --- [o-auto-1-exec-1] c.c.u.d.s.controller.TestItemController  : Found inventory item to delete
2021-04-05 13:42:24.768  INFO 11414 --- [o-auto-1-exec-1] c.c.u.d.s.controller.TestItemController  : Item deleted
2021-04-05 13:42:24.789  INFO 11414 --- [    Test worker] c.c.u.dynamodb.DynamoDbStreamsTestApp    : *** SERVER SAYS ITEM 23456 DELETED ***
2021-04-05 13:42:24.954  INFO 11414 --- [cessingThread-0] c.a.s.k.clientlibrary.lib.worker.Worker  : Initialization complete. Starting worker loop.
2021-04-05 13:42:24.970  INFO 11414 --- [cessingThread-0] c.a.s.k.clientlibrary.lib.worker.Worker  : Created new shardConsumer for : ShardInfo [shardId=shardId-00000001617600000000-000000000000,concurrencyToken=b773dd9e-d385-44cd-8189-3cf330b94351,parentShardIds=[],checkpoint={SequenceNumber: LATEST,SubsequenceNumber: 0}]
2021-04-05 13:42:24.972  INFO 11414 --- [dProcessor-0000] c.a.s.k.c.l.w.BlockOnParentShardTask     : No need to block on parents [] of shard shardId-00000001617600000000-000000000000
2021-04-05 13:42:25.793  INFO 11414 --- [    Test worker] c.c.u.dynamodb.DynamoDbStreamsTestApp    : *** VERIFY ITEM 23456 WAS DELETED OR NOT ***
2021-04-05 13:42:31.304  INFO 11414 --- [o-auto-1-exec-2] c.c.u.d.s.controller.TestItemController  : Getting itemNumber=23456
2021-04-05 13:42:34.586  INFO 11414 --- [o-auto-1-exec-2] c.c.u.d.s.controller.TestItemController  : Item=TestItem{itemNumber='23456',description='A doo',price=10.99}
2021-04-05 13:42:34.589  INFO 11414 --- [dProcessor-0000] c.a.s.k.c.lib.worker.KinesisDataFetcher  : Initializing shard shardId-00000001617600000000-000000000000 with LATEST
2021-04-05 13:42:34.611 ERROR 11414 --- [    Test worker] c.c.u.dynamodb.DynamoDbStreamsTestApp    : *** SERVER STILL HAS ITEM 23456 ***
2021-04-05 13:42:34.663  WARN 11414 --- [    Test worker] c.c.u.dynamodb.DynamoDbStreamsTestApp    : Giving system 2 MINUTES before shut down
2021-04-05 13:42:35.088  INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker       : Worker Test-application-lMtQuWkzefmq+ saw 1 total leases,I have 1 unfinished leases. Finished leases target is 1 and I have 0 finished leases. I will take 0 leases in total.
2021-04-05 13:42:35.088  INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker       : TakeLeases took 0 seconds.
2021-04-05 13:42:55.171  INFO 11414 --- [oordinator-0001] c.a.s.d.s.leases.StreamsLeaseTaker       : Worker Test-application-lMtQuWkzefmq+ saw 1 total leases,I have 1 unfinished leases. Finished leases target is 1 and I have 0 finished leases. I will take 0 leases in total.
2021-04-05 13:42:55.171  INFO 11414 --- [oordinator-0001] c.a.s.d.s.leases.StreamsLeaseTaker       : TakeLeases took 0 seconds.
2021-04-05 13:43:08.731  INFO 11414 --- [cessingThread-0] c.a.s.k.clientlibrary.lib.worker.Worker  : Current stream shard assignments: shardId-00000001617600000000-000000000000
2021-04-05 13:43:08.732  INFO 11414 --- [cessingThread-0] c.a.s.k.clientlibrary.lib.worker.Worker  : Sleeping ...
2021-04-05 13:43:15.248  INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker       : Worker Test-application-lMtQuWkzefmq+ saw 1 total leases,I have 1 unfinished leases. Finished leases target is 1 and I have 0 finished leases. I will take 0 leases in total.
2021-04-05 13:43:15.248  INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker       : TakeLeases took 0 seconds.
2021-04-05 13:43:35.322  INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker       : Worker Test-application-lMtQuWkzefmq+ saw 1 total leases,I have 1 unfinished leases. Finished leases target is 1 and I have 0 finished leases. I will take 0 leases in total.
2021-04-05 13:43:35.323  INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker       : TakeLeases took 0 seconds.
2021-04-05 13:43:55.399  INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker       : Worker Test-application-lMtQuWkzefmq+ saw 1 total leases,I have 1 unfinished leases. Finished leases target is 1 and I have 0 finished leases. I will take 0 leases in total.
2021-04-05 13:43:55.399  INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker       : TakeLeases took 0 seconds.
2021-04-05 13:44:09.937  INFO 11414 --- [cessingThread-0] c.a.s.k.clientlibrary.lib.worker.Worker  : Current stream shard assignments: shardId-00000001617600000000-000000000000
2021-04-05 13:44:09.937  INFO 11414 --- [cessingThread-0] c.a.s.k.clientlibrary.lib.worker.Worker  : Sleeping ...
2021-04-05 13:44:15.470  INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker       : Worker Test-application-lMtQuWkzefmq+ saw 1 total leases,I have 1 unfinished leases. Finished leases target is 1 and I have 0 finished leases. I will take 0 leases in total.
2021-04-05 13:44:15.470  INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker       : TakeLeases took 0 seconds.
2021-04-05 13:44:35.550  INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker       : Worker Test-application-lMtQuWkzefmq+ saw 1 total leases,I have 1 unfinished leases. Finished leases target is 1 and I have 0 finished leases. I will take 0 leases in total.
2021-04-05 13:44:35.551  INFO 11414 --- [oordinator-0000] c.a.s.d.s.leases.StreamsLeaseTaker       : TakeLeases took 0 seconds.
2021-04-05 13:44:38.723  INFO 11414 --- [o-auto-1-exec-5] c.c.u.d.s.controller.TestItemController  : Getting itemNumber=23456
2021-04-05 13:44:41.374  INFO 11414 --- [o-auto-1-exec-5] c.c.u.d.s.controller.TestItemController  : Item=TestItem{itemNumber='23456',price=10.99}

配置信息

  • dynamodb-streams-kinesis-适配器:1.5.2
  • amazon-kinesis-client:1.13.3
  • junit-jupiter-api:5.6.0
  • junit-jupiter:1.15.2
  • 本地堆栈:1.15.2
  • 本地堆栈:0.12.9
  • localstack-utils:0.2.10
  • aws-java-sdk-dynamodb:1.11.858
  • spring-boot-starter-web: 2.3.3.RELEASE
  • spring 数据 dynamodb(来自 boostchicken fork):5.2.5
  • MacOS Catalina:10.15.7
  • Java:15.0.2
  • 码头工人:
Client: Docker Engine - Community
 Cloud integration: 1.0.9
 Version:           20.10.5
 API version:       1.41
 Go version:        go1.13.15
 Git commit:        55c4c88
 Built:             Tue Mar  2 20:13:00 2021
 OS/Arch:           darwin/amd64
 Context:           default
 Experimental:      true

Server: Docker Engine - Community
 Engine:
  Version:          20.10.5
  API version:      1.41 (minimum version 1.12)
  Go version:       go1.13.15
  Git commit:       363e9a8
  Built:            Tue Mar  2 20:15:47 2021
  OS/Arch:          linux/amd64
  Experimental:     true
 containerd:
  Version:          1.4.3
  GitCommit:        269548fa27e0089a8b8278fc4fc781d7f65a939b
 runc:
  Version:          1.0.0-rc92
  GitCommit:        ff819c7e9184c13b7c2607fe6c30ae19403a7aff
 docker-init:
  Version:          0.19.0
  GitCommit:        de40ad0
 Kubernetes:
  Version:          UnkNown
  StackAPI:         UnkNown

解决方法

解决方案

玩了好几个小时后,我终于注意到这个消息

2021-04-05 13:42:24.954 INFO 11414 --- [cessingThread-0] c.a.s.k.clientlibrary.lib.worker.Worker:初始化完成。启动工作循环。

经过一段时间的观察,很明显,当使用 Testcontainer/Localstack 时,worker 需要 10 秒才能初始化并“准备好”。这很容易解决,因为 com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker 有一个状态更改监听器,所以我可以设置一个监听器来确定 Localstack 何时准备就绪,然后允许代码继续。

最终问题

遗憾的是,这并没有完全解决问题,事实证明,Testcontainer/Localstack 中的 DynamoDb 流实现非常慢。我的意思是创建/删除或修改一条记录,分片处理器需要 7 秒以上的时间才能获取更新。所以在这里,我最终在我删除项目和检查以确认项目实际删除之前的时间之间休眠了 10 秒。

结论

结果证明这是一个时间问题,Testconatiners/Localstack 将数据推送到分片的速度非常慢(至少与针对 DynamoDb 和 DynamoDb Streams 运行的相同代码相比)。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。