如何解决Kinesis - 使用 Python Kinesis 聚合生成并使用 Spring Cloud Stream
由于 KPL 在 Python 中不可用,我尝试在我的 Python 项目中通过使用 Python Kinesis Aggregation 在发送记录之前聚合记录来模仿 KPL 之类的功能到 Kinesis。我的使用者是一个多实例 Java Spring 应用程序,它使用 Spring Cloud Stream 来监听传入的消息,并启用了 KCL/KPL。
我能够验证 Spring 应用程序可以接收和处理来自另一个使用实际 KPL Java 库 的生产者的消息,并且这些消息在实例中均匀分布。但是,当我使用 Python Aggregation 时,只有一个 Spring 应用程序实例获取了一部分消息,并且它实际上能够处理它们,但其他消息消失了。在 Python 方面,我能够验证我的 Kinesis put 请求是否会发送到不同的分片,因此消息应该分布在 Spring 应用程序实例中。
由于 Spring 应用程序能够处理一部分记录,我认为/希望可能存在我没有在 Spring Cloud Stream 端设置的属性。我知道这是生产者/消费者的奇怪组合,但我希望其他人以前遇到过这个问题,他们可以指出我正确的方向。下面是我的 Python 聚合代码以及 Spring Cloud Stream 属性。
Python 聚合代码
from aws_kinesis_agg.aggregator import RecordAggregator
def write_records_to_stream(stream_name,records):
record_aggregator = RecordAggregator()
partition_key = 1
for record in records:
aggregated_records = record_aggregator.add_user_record(
partition_key=str(partition_key),data=record,explicit_hash_key=None)
if aggregated_records:
_write_records_to_stream(aggregated_records,stream_name)
partition_key += 1
aggregated_records = record_aggregator.clear_and_get()
if aggregated_records:
_write_records_to_stream(aggregated_records,stream_name)
def _write_records_to_stream(aggregated_records,stream_name):
partition_key,hash_key,data = aggregated_records.get_contents()
kinesis_client.put_record(StreamName=stream_name,Data=data,PartitionKey=partition_key)
Spring Cloud Stream 属性
spring:
cloud:
stream:
bindings:
my-stream:
consumer:
autoRebalanceEnabled: true
partitioned: false
content-type: application/*+avro
destination: my-stream-name
group: my-group
kinesis:
binder:
kpl-kcl-enabled: true
bindings:
my-stream:
consumer:
checkpointMode: manual
recordsLimit: 10
idleBetweenPolls: 10000
listenerMode: batch
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。