如何解决Kafka 消息包含控制字符MongoDB 源连接器
我有一个 Kafka Connect MongoDB 源连接器(都通过 Confluent 平台)工作,但它创建的消息在开始时包含一个控制字符,这使得下游解析(到 JSON)这条消息比我想象的更难
正在运行的源连接器:
{
"name": "mongo-source-connector","config": {
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector","connection.uri": "mongodb://myUsername:myPassword@my-mongodb-host-address:27017","database": "myDatabase","collection": "myCollection","change.stream.full.document": "updateLookup","errors.log.enable": true
}
}
这个Source连接器在Kafka主题中创建的消息如下(注意前导控制字符):
�{"_id": {"_data": "82609E8726000000012B022C0100296E5A1004BE208B099BCF4106822DE274B0B9D39A46645F69640064609E87267125D17D12D180620004"},"operationType": "insert","clusterTime": {"$timestamp": {"t": 1621002022,"i": 1}},"fullDocument": {"_id": {"$oid": "609e87267125d17d12d18062"},"uuid": "23534a5c-ad82-431c-a821-6b4aed4f59a1","endingNumber": 10},"ns": {"db": "myDatabase","coll": "myCollection"},"documentKey": {"_id": {"$oid": "609e87267125d17d12d18062"}}}
控制字符使下游解析为 JSON 变得困难,因为它使原本有效的 JSON 无效。我不知道它为什么会在那里或如何摆脱它。
我想,在像 JSON 一样处理它之前,我可以解析出像这个控制字符这样的垃圾,但这似乎是我想避免的创可贴。
我现在处理消息的方式(我认为这是无关紧要的,因为我已经测试过它可以在没有控制字符的情况下使用有效的 JSON),以防万一:
data class MyChangesetMessageId (
@JsonProperty("_data")
val data: String
)
data class MyChangesetMessageTimestamp (
val t: Long,val i: Int
)
data class MyChangesetMessageClusterTime (
@JsonProperty("\$timestamp")
val timestamp: MyChangesetMessageTimestamp
)
data class MyChangesetoid (
@JsonProperty("\$oid")
val oid: String
)
data class MyChangesetMessageFullDocument (
@JsonProperty("_id")
val id: MyChangesetoid,val uuid: String,val endingNumber: Int
)
data class MyChangesetMessageNS (
val db: String,val coll: String
)
data class MyChangesetDocumentKey (
@JsonProperty("_id")
val id: MyChangesetoid
)
data class MyChangesetMessage (
@JsonProperty("_id")
val id: MyChangesetMessageId,val operationType: String,val clusterTime: MyChangesetMessageClusterTime,val fullDocument: MyChangesetMessageFullDocument,val ns: MyChangesetMessageNS,val documentKey: MyChangesetDocumentKey
)
...
val objectMapper = jacksonObjectMapper()
val changesetMessage = objectMapper.readValue(message,MyChangesetMessage::class.java)
感谢任何想法。
解决方法
您所指的字符通常与已解码为字符串的 Avro 序列化数据常见。
检查 Connect 工作器中的键/值转换器设置,因为您尚未在连接器中定义它。
如果您想解析为 JSON,请使用 JSONConverter,否则如果您想跳过数据类定义并从 Avro 模式生成它,Avro 也可以正常工作
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。