如何解决Beam Kafka 流输入,无输出打印或文本
我正在尝试使用直接运行器来计算 kafka 消息密钥。
如果我将 max_num_records =20 放在 ReadFromKafka 中,我可以看到打印或输出到文本的结果。 喜欢:
('2102',5)
('2706',5)
('2103',5)
('2707',5)
但是没有max_num_records,或者如果max_num_records大于kafka topic中的消息计数,程序会继续运行但没有任何输出。 如果我尝试使用 beam.io.WritetoText 输出,则会创建一个空的临时文件夹,例如: beam-temp-StatOut-d16768eadec511eb8bd897b012f36e97
终端显示:
2.30.0: Pulling from apache/beam_java8_sdk
Digest: sha256:720144b98d9cb2bcb21c2c0741d693b2ed54f85181dbd9963ba0aa9653072c19
Status: Image is up to date for apache/beam_java8_sdk:2.30.0
docker.io/apache/beam_java8_sdk:2.30.0
如果我将 'enable.auto.commit': 'true' 放在 kafka 消费者配置中,消息被提交,来自同一组的其他客户端无法读取它们,所以我认为它读取成功,只是没有处理或输出。
我尝试了固定时间、滑动时间窗口,有或没有不同的触发器,没有任何变化。
尝试了 flink runner,结果与直接 runner 相同。
不知道我做错了什么,有什么帮助吗?
环境: centos 7
蟒蛇
蟒蛇 3.8.8
Java 1.8.0_292
光束 2.30
代码如下:
direct_options = PipelineOptions([
"--runner=DirectRunner","--environment_type=LOOPBACK","--streaming",])
direct_options.view_as(SetupOptions).save_main_session = True
direct_options.view_as(StandardOptions).streaming = True
conf = {'bootstrap.servers': '192.168.75.158:9092','group.id': "g17",'enable.auto.commit': 'false','auto.offset.reset': 'earliest'}
if __name__ == '__main__':
with beam.Pipeline(options = direct_options) as p:
msg_kv_bytes = ( p
| 'ReadKafka' >> ReadFromKafka(consumer_config=conf,topics=['LaneIn']))
messages = msg_kv_bytes | 'Decode' >> beam.MapTuple(lambda k,v: (k.decode('utf-8'),v.decode('utf-8')))
counts = (
messages
| beam.WindowInto(
window.FixedWindows(10),trigger = AfterCount(1),#AfterCount(4),#AfterProcessingTime
# allowed_lateness=3,accumulation_mode = AccumulationMode.ACCUMULATING) #ACCUMULATING #disCARDING
# | 'Windowsing' >> beam.WindowInto(window.FixedWindows(10,5))
| 'TakeKeyPairWithOne' >> beam.MapTuple(lambda k,v: (k,1))
| 'Grouping' >> beam.GroupByKey()
| 'Sum' >> beam.MapTuple(lambda k,sum(v)))
)
output = (
counts
| 'Print' >> beam.ParDo(print)
# | 'WriteText' >> beam.io.WritetoText('/home/StatOut',file_name_suffix='.txt')
)
解决方法
您可能会遇到几个已知问题。
Beam 的便携式 DirectRunner
目前不完全支持流式传输。要遵循的相关 Jira 是 https://issues.apache.org/jira/browse/BEAM-7514
Beam 的便携式运行器(包括 DirectRunner
)有一个已知问题,即流媒体源无法正确发出消息。因此,必须提供 max_num_records
或 max_read_time
参数以将此类源转换为有界源。要关注的相关 Jira 是 https://issues.apache.org/jira/browse/BEAM-11998。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。