微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何解析 Kafka 中的 Avro 数据

如何解决如何解析 Kafka 中的 Avro 数据

我为 Kafka 记录写了一个 Avro 模式

ProducerRecord<String,TestSchema> record = new ProducerRecord<String,TestSchema>("TestTopic0409",testSchema)

我想把它加载到 Druid。我本地启动Druid,连接Kafka数据时,结果显示乱码:

[在此处输入图片说明][1]

然后我使用如下规范:

{
"type": "kafka","spec": {
"dataSchema": {
"dataSource": "new-data-source","timestampSpec": null,"dimensionsspec": null,"metricsspec": [],"granularitySpec": {
"type": "uniform","segmentGranularity": "DAY","queryGranularity": {
"type": "none"
},"rollup": true,"intervals": null
},"transformSpec": {
"filter": null,"transforms": []
},"parser": {
"type": "avro_stream","avroBytesDecoder": {
"type": "schema_inline","schema": {
"namespace": "com.airebroker.data","name": "Test","type": "record","fields": [
{
"name": "id","type": "int"
},{
"name": "name","type": "string"
},{
"name": "timestamp","type": "long"
}
]
}
},"parseSpec": {
"format": "avro","timestampSpec": {},"dimensionsspec": {}
}
}
},"ioConfig": {
"topic": "TestTopic0409","inputFormat": {
"type": "avro_ocf","flattenSpec": {
"useFielddiscovery": true,"fields": []
},"binaryAsstring": false
},"replicas": 1,"taskCount": 1,"taskDuration": "PT3600S","consumerProperties": {
"bootstrap.servers": "localhost:9092"
},"pollTimeout": 100,"startDelay": "PT5S","period": "PT30S","useEarliestOffset": false,"completionTimeout": "PT1800S","lateMessageRejectionPeriod": null,"earlyMessageRejectionPeriod": null,"lateMessageRejectionStartDateTime": null,"stream": "TestTopic0409","useEarliestSequenceNumber": false,"type": "kafka"
},"tuningConfig": {
"type": "kafka","maxRowsInMemory": 1000000,"maxBytesInMemory": 0,"maxRowsPerSegment": 5000000,"maxTotalRows": null,"intermediatePersistPeriod": "PT10M","basePersistDirectory": "/home/zhangjh/apache-druid-0.20.2/var/tmp/druid-realtime-persist7289903804951562243","maxPendingPersists": 0,"indexSpec": {
"bitmap": {
"type": "roaring","compressRunOnSerialization": true
},"dimensionCompression": "lz4","metricCompression": "lz4","longEncoding": "longs","segmentLoader": null
},"indexSpecForIntermediatePersists": {
"bitmap": {
"type": "roaring","buildV9Directly": true,"reportParseExceptions": false,"handoffConditionTimeout": 0,"resetoffsetAutomatically": false,"segmentWriteOutMediumFactory": null,"workerThreads": null,"chatThreads": null,"chatRetries": 8,"httpTimeout": "PT10S","shutdownTimeout": "PT80S","offsetFetchPeriod": "PT30S","intermediateHandoffPeriod": "P2147483647D","logParseExceptions": false,"maxParseExceptions": 2147483647,"maxSavedParseExceptions": 0,"skipSequenceNumberAvailabilityCheck": false,"repartitionTransitionDuration": "PT120S"
}
}
}

然后它给了我一个结果:错误:未定义

我还以为是我的spec文件格式不对,结果在官网上试了各种方法解析Kafka中的Avro数据。所有返回我:错误:未定义。

然后我继续尝试手动解析Avro数据并通过扩展将其拼接成JSON数据。我定义了一个类:public class ExampleByteBufferInputRowParser implements ByteBufferInputRowParser。在parseBatch函数中我写了一个txt到tmp路径,但是当我解析数据时,方法没有通过这里。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。