微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

嵌套记录的融合 Kafka 生产者消息格式

如何解决嵌套记录的融合 Kafka 生产者消息格式

我在 kafka 主题注册一个 AVRO 架构,并且正在尝试向其发送数据。该架构具有嵌套记录,我不确定如何使用 confluent_kafka python 正确地向其发送数据。

示例架构: *忽略架构中的任何拼写错误(真实的非常大,只是一个示例)

 {
 "namespace": "company__name","name": "our_data","type": "record","fields": [
           {
            "name": "datatype1","type": ["null",{
                 "type": "record","name": "datatype1_1","fields": [ 
                     {"name": "site","type": "string"},{"name": "units","type": "string"}
                  ]
             }]
             "default": null
            }
            {
            "name": "datatype2","name": "datatype2_1","type": "string"}
                  ]
             }]
             "default": null
            }
           ]
          }

我正在尝试使用 confluent_kafka python 版本将数据发送到此模式。当我之前完成此操作时,记录没有嵌套,我将使用典型的字典 key: value 对并将其序列化。如何发送嵌套数据以使用架构。

到目前为止我尝试过的...

message = {'datatype1': 
            {'site': 'sitename','units': 'm'
            }
           }

此版本不会导致任何 kafka 错误,但所有列都显示为空

还有……

message = {'datatype1': 
            {'datatype1_1':
              {'site': 'sitename','units': 'm'
              }
            }
           }

此版本产生了架构的 kafka 错误

解决方法

如果使用命名空间,则不必担心命名冲突,并且可以正确构建可选记录: 例如,两者

{
  "meta": {
    "instanceID": "something"
  }
}

{}

是以下的有效实例:

{
  "doc": "Survey","name": "Survey","type": "record","fields": [
    {
      "name": "meta","type": [
        "null",{
          "name": "meta","fields": [
            {
              "name": "instanceID","type": [
                "null","string"
              ],"namespace": "Survey.meta"
            }
          ],"namespace": "Survey"
        }
      ],"namespace": "Survey"
    }
  ]
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。