微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Json列作为kafka生产者中的key,并根据key推入不同的分区

如何解决Json列作为kafka生产者中的key,并根据key推入不同的分区

正如我们所知,我们可以使用 kafka 生产者发送一个密钥,该密钥在内部进行散列以查找主题数据中的哪个分区。我有一个生产者,我在其中以 JSON 格式发送数据。

[
  {
    "DATE": 20200723,"SOURCETYPE": "WIFI","deviceid": "24:6f:28:99:9e:dc","EVENTTIME": "2020-07-23 07:50:42","TIME": 75042,},{
    "DATE": 20200723,"EVENTTIME": "2020-07-23 08:02:26","TIME": 80226
  },"EVENTTIME": "2020-07-23 08:39:55","TIME": 83955
  },"EVENTTIME": "2020-07-23 08:43:26","TIME": 84326
},"EVENTTIME": "2020-07-23 08:44:22","TIME": 84422
  },"EVENTTIME": "2020-07-23 08:45:09","TIME": 84509
  },"EVENTTIME": "2020-07-23 08:45:58","TIME": 84558
  },"deviceid": "24:6f:28:99","TIME": 84558
          },"TIME": 84558
  }
]

我想根据key(deviceid)推送主题中但不同分区的数据。 我创建了带有两个分区 0 和 1 的主题。但是它将所有数据存储在分区 0 中。我希望所有唯一密钥(deviceid)都存储在不同的分区中。代码

object Producer extends App{
    val props = new Properties()
    props.put("bootstrap.servers","localhost:9092")
    props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer","org.apache.kafka.connect.json.JsonSerializer")
    val producer = new KafkaProducer[String,JsonNode](props)
    println("inside prducer")
    val mapper = (new ObjectMapper() with ScalaObjectMapper).
        registerModule(DefaultScalaModule).
        configure(DeserializationFeature.FAIL_ON_UNKNowN_PROPERTIES,false).
        findAndRegisterModules(). // register joda and java-time modules automatically
        asInstanceOf[ObjectMapper with ScalaObjectMapper] 
     val filename = "/Users/rishunigam/Documents/devicd.json"
     val jsonNode: JsonNode=  mapper.readTree(new File(filename))
     val s = jsonNode.size()
     for(i <- 0 to jsonNode.size()-1) {
     val js = jsonNode.get(i)
       val keys = jsonNode.get(i).findValue("deviceid").toString
       println(keys)
       println(js)
     val record = new ProducerRecord[String,JsonNode]( "tpch.devices_logs",keys,js)
   println(record)
  producer.send(record)
}
    println("producer complete")
    producer.close()
}

解决方法

它将所有数据存储在 partition-0

这并不意味着它不起作用。只是意味着键的散列最终在同一个分区中。

如果要覆盖默认的partitioner,需要定义自己的Partitioner类来解析消息并分配合适的partition,然后在Producer属性中设置partitioner.class

我希望所有唯一的密钥(deviceID)都存储在不同的分区中

然后您必须提前知道您的竞争数据集才能为 N 个设备创建 N 个分区。当您添加全新设备时会发生什么?

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。