微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何访问 SCDF 流中的源原生标头?

如何解决如何访问 SCDF 流中的源原生标头?

我正在使用 SCDF 开发一个流,允许将所有 MQTT 消息持久化到 sql 数据库

这是用于创建流的代码

stream create --name mqtt-to-jdbc  --deFinition "mqtt --qos=2 --topics='#' --username=admin --password=******** --url='tcp://192.168.1.153:60065' | jdbc --username=sa --password=******** --driver-class-name=com.microsoft.sqlserver.jdbc.sqlServerDriver --url='jdbc:sqlserver://192.168.1.18;databaseName=test_db;schema=dbo' --table-name=mqtt_message --columns=\"headers:headers.toString(),payload:payload.toString(),created_at:new java.sql.Timestamp(T(System).currentTimeMillis()).toString()\"" --deploy

mqtt_message 表包含几列,其中包括 headers、payload、receive_topic。 流已成功部署并且数据被持久化,但是: headers 列是使用 SpEL headers.toString() 提取的:

{b3=d4840635cb8c968c-381e88a613735a05-1,nativeHeaders={},errorChannel=,id=e34b08d5-eafe-decd-4aa1-634cb187889a,timestamp=1622532820049}

payload 列使用 SpEL payload.toString() 很好地提取

Test payload

标题中的值不包括假定的标题包括消息的主题 (mqtt_receivedTopic)。

如果我为生产者和接收器提供实现,我可以访问以下消息头:

Headers:{
    mqtt_id=0,deliveryAttempt=1,kafka_timestampType=CREATE_TIME,kafka_receivedTopic=reaper.reaper-source,mqtt_receivedRetained=false,kafka_offset=31,mqtt_duplicate=false,scst_nativeHeadersPresent=true,kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@19da7b97,id=cc423e5f-af1e-173a-a9ac-c229ab544738,kafka_receivedPartitionId=0,mqtt_receivedTopic=test/topic,contentType=application/json,kafka_receivedTimestamp=1622453609998,mqtt_receivedQos=0,kafka_groupId=reaper,timestamp=1622453610004
}

我还测试了以下属性,但它们都没有改变结果:

制作人

  • spring.cloud.stream.bindings.output.producer.headerMode=embeddedHeaders
  • spring.cloud.stream.bindings.output.producer.useNativeEncoding=true

消费者

  • spring.cloud.stream.default.consumer.headerMode=embeddedHeaders

有没有办法在生产者和接收器之间传递本机标头并将它们写入目的地列(从接收到的主题提取值)。

谢谢。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。