微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何使用 librdkafka 发送 json 数据?

如何解决如何使用 librdkafka 发送 json 数据?

我正在尝试使用 librdkafka c api 发送 json 有效负载。我现在要做的是

#include <jansson.h>
#include <librdkafka/rdkafka.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>

typedef struct my_data {
    char *id;
    char *value;
    unsigned long timestamp;
} my_data;

char * my_data_to_json(const my_data *ev);
void rk_dr_callback(rd_kafka_t *rk,const rd_kafka_message_t *msg,void *opaque);

int main(int argc,char * argv[])
{
    // configure producer
    rd_kafka_conf_t *conf = rd_kafka_conf_new();
    char errstr[512];

    rd_kafka_conf_set(conf,"bootstrap.servers","k1.example.com:9093",errstr,sizeof(errstr));    
    
    rd_kafka_conf_set_dr_msg_cb(conf,rk_dr_callback);

    rd_kafka_t *rk = rd_kafka_new(RD_KAFKA_PRODUCER,conf,sizeof(errstr));    

    if (!rk) {
        fprintf(stderr,"Failed to create kafka producer: %s\n",errstr);
        return -1;
    }

    // create json
    my_data ev = {
        .id = "test-id",.value = "test-value",.timestamp = (unsigned long) time(NULL)
    };

    char *json = my_data_to_json(&ev);
    printf("json dump: %s\n",json);

    // publish data
    rd_kafka_resp_err_t err = rd_kafka_producev(rk,RD_KAFKA_V_TOPIC("topic"),RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_copY),RD_KAFKA_V_VALUE(json,strlen(json)),RD_KAFKA_V_KEY("key-1",strlen("key-1")),RD_KAFKA_V_OPAQUE(NULL),RD_KAFKA_V_END);

    // cleanup
    rd_kafka_flush(rk,5 * 1000);
    
    if (rd_kafka_outq_len(rk) > 0) {
        fprintf(stderr,"%d message(s) were not delivered\n",rd_kafka_outq_len(rk));
    }

    rd_kafka_destroy(rk);
    free(json);

    return 0;
}

void rk_dr_callback(rd_kafka_t *rk,void *opaque)
{
    if (msg->err) {
        printf("Failed to send message: %s\n",rd_kafka_err2str(msg->err));
    }
    else {
        printf("delivered %zd bytes to partition %d\n",msg->len,msg->partition);
    }
}

char * my_data_to_json(const my_data *ev)
{
    /* build the JSON object {"id": "id","value": "value","timestamp": 12345678} */
    json_t *json = json_pack("{sssssi}","id",ev->id,"value",ev->value,"timestamp",ev->timestamp);

    if (!json) {
        fprintf(stderr,"Failed to construct json from data\n");
    }

    char *str = json_dumps(json,JSON_COMPACT);

    if (!str) {
        fprintf(stderr,"Failed to encode json object\n");
    }

    return str;
}

使用上面的代码,我设法将字节获取到代理。但是 json 负载似乎格式不正确。消费者(消费者是使用 Newtonsoft json lib 反序列化的 C# 客户端)抛出以下错误

Newtonsoft.Json.JsonReaderException: Unexpected character encountered while parsing value: . Path '',line 0,position 0.\n at Newtonsoft.Json.JsonTextReader.ParseValue()\n at ...

我不太清楚我的错误是因为我构造 json 对象的方式、将其编码为字符串的方式还是我使用 librdkafka 发布 json 字符串的方式。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?