微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何将librdkafka有效载荷转换为avro以获取参数值

如何解决如何将librdkafka有效载荷转换为avro以获取参数值

我正在尝试使用librdkafka库将消息发布/订阅到kafka。我无法反序列化到Avro的消息。有人可以帮助我了解如何从有效负载(rkm-> payload)构造avro_datum_t对象。

生产者代码

const char  PERSON_SCHEMA[] =
"{\"type\":\"record\",\
  \"name\":\"Person\",\
  \"fields\":[\
     {\"name\": \"ID\",\"type\": \"long\"},\
     {\"name\": \"First\",\"type\": \"string\"},\
     {\"name\": \"Last\",\
     {\"name\": \"Phone\",\
     {\"name\": \"Age\",\"type\": \"int\"}]}";

void init_schema(void)
{
        if (avro_schema_from_json_literal(PERSON_SCHEMA,&person_schema)) {
                fprintf(stderr,"Unable to parse person schema\n");
                exit(EXIT_FAILURE);
        }
}

avro_datum_t add_person(const char *first,const char *last,const char *phone,int32_t age)
{
        avro_datum_t person = avro_record(person_schema);

        avro_datum_t id_datum = avro_int64(++id);
        avro_datum_t first_datum = avro_string(first);
        avro_datum_t last_datum = avro_string(last);
        avro_datum_t age_datum = avro_int32(age);
        avro_datum_t phone_datum = avro_string(phone);

        if (avro_record_set(person,"ID",id_datum)
            || avro_record_set(person,"First",first_datum)
            || avro_record_set(person,"Last",last_datum)
            || avro_record_set(person,"Age",age_datum)
            || avro_record_set(person,"Phone",phone_datum)) {
                fprintf(stderr,"Unable to create Person datum structure\n");
                exit(EXIT_FAILURE);
        }
        return person;
}

/* Asynchronous produce */
err = rd_kafka_producev(
      rk,RD_KAFKA_V_TOPIC(topic),RD_KAFKA_V_KEY(user,strlen(key)),RD_KAFKA_V_VALUE(person,sizeof(person)),RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_copY),RD_KAFKA_V_OPAQUE(&delivery_counter),RD_KAFKA_V_END);


在消费者方面,如何将rkm-> payload反序列化为avro_datum_t对象并提取参数。

感谢对此的任何帮助或指示。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。