如何解决使用 librdkafka 在 kafka 生产者中将结构作为输入而不是字符串
我需要在生产者中将输入作为结构而不是字符串,并使用 librdkafka 库在消费者中打印它。我在生产者代码中进行了更改,甚至能够获得结构,但在消费者中它没有显示请帮助如果还需要一些改动
struct date
{
int d,m,y;
};
int main(int argc,char** argv) {
rd_kafka_t* rk; /* Producer instance handle */
rd_kafka_conf_t* conf; /* Temporary configuration object */
char errstr[512]; /* librdkafka API error reporting buffer */
//char buf[512]; /* Message value temporary buffer */
const char* brokers; /* Argument: broker list */
const char* topic; /* Argument: topic to produce to */
struct date d1;
struct date* ptr ;
ptr = &d1;
int i;
brokers ="localhost:9092";
topic = "test";
/*
* Create Kafka client configuration place-holder
*/
conf = rd_kafka_conf_new();
/* Set bootstrap broker(s) as a comma-separated list of
* host or host:port (default port 9092).
* librdkafka will use the bootstrap brokers to acquire the full
* set of brokers from the cluster. */
if (rd_kafka_conf_set(conf,"bootstrap.servers",brokers,errstr,sizeof(errstr)) != RD_KAFKA_CONF_OK) {
fprintf(stderr,"%s\n",errstr);
return 1;
}
/* Set the delivery report callback.
* This callback will be called once per message to inform
* the application if delivery succeeded or Failed.
* See dr_msg_cb() above.
* The callback is only triggered from rd_kafka_poll() and
* rd_kafka_flush(). */
rd_kafka_conf_set_dr_msg_cb(conf,dr_msg_cb);
/*
* Create producer instance.
*
* NOTE: rd_kafka_new() takes ownership of the conf object
* and the application must not reference it again after
* this call.
*/
rk = rd_kafka_new(RD_KAFKA_PRODUCER,conf,sizeof(errstr));
if (!rk) {
fprintf(stderr,"%% Failed to create new producer: %s\n",errstr);
return 1;
}
/* Signal handler for clean shutdown */
signal(SIGINT,stop);
fprintf(stderr,"%% enter \n");
/*fprintf(stderr,"%% Type some text and hit enter to produce message\n"
"%% Or just hit enter to only serve delivery reports\n"
"%% Press Ctrl-C or Ctrl-D to exit\n");*/
while (run && fscanf_s(stdin,"%d/%d/%d",&d1.d,&d1.m,&d1.y))
{
size_t len = sizeof(d1);
rd_kafka_resp_err_t err;
fprintf(stdout,"%d/%d/%d \n",d1.d,d1.m,d1.y);
//if ([len - 1] == '\n') /* Remove newline */
// d1[--len] = '\0';
if (len == 0) {
/* Empty line: only serve delivery reports */
rd_kafka_poll(rk,0/*non-blocking */);
continue;
}
retry:
err = rd_kafka_producev(
/* Producer handle */
rk,/* Topic name */
RD_KAFKA_V_TOPIC(topic),/* Make a copy of the payload. */
RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_copY),/* Message value and length */
RD_KAFKA_V_VALUE(ptr,len),/* Per-Message opaque,provided in
* delivery report callback as
* msg_opaque. */
RD_KAFKA_V_OPAQUE(NULL),/* End sentinel */
RD_KAFKA_V_END);
if (err) {
/*
* Failed to *enqueue* message for producing.
*/
fprintf(stderr,"%% Failed to produce to topic %s: %s\n",topic,rd_kafka_err2str(err));
if (err == RD_KAFKA_RESP_ERR__QUEUE_FULL) {
/* If the internal queue is full,wait for
* messages to be delivered and then retry.
* The internal queue represents both
* messages to be sent and messages that have
* been sent or Failed,awaiting their
* delivery report callback to be called.
*
* The internal queue is limited by the
* configuration property
* queue.buffering.max.messages */
rd_kafka_poll(rk,1000/*block for max 1000ms*/);
goto retry;
}
}
else {
fprintf(stderr,"%% Enqueued message (%zd bytes) "
"for topic %s\n",len,topic);
}
/* A producer application should continually serve
* the delivery report queue by calling rd_kafka_poll()
* at frequent intervals.
* Either put the poll call in your main loop,or in a
* dedicated thread,or call it after every
* rd_kafka_produce() call.
* Just make sure that rd_kafka_poll() is still called
* during periods where you are not producing any messages
* to make sure prevIoUsly produced messages have their
* delivery report callback served (and any other callbacks
* you register). */
rd_kafka_poll(rk,0/*non-blocking*/);
}
/* Wait for final messages to be delivered or fail.
* rd_kafka_flush() is an abstraction over rd_kafka_poll() which
* waits for all messages to be delivered. */
fprintf(stderr,"%% Flushing final messages..\n");
rd_kafka_flush(rk,10 * 1000 /* wait for max 10 seconds */);
/* If the output queue is still not empty there is an issue
* with producing messages to the clusters. */
if (rd_kafka_outq_len(rk) > 0)
fprintf(stderr,"%% %d message(s) were not delivered\n",rd_kafka_outq_len(rk));
/* Destroy the producer instance */
rd_kafka_destroy(rk);
return 0;
}
消费者代码:
#include <stdio.h>
#include <signal.h>
#include <string.h>
#include <ctype.h>
/* Typical include path would be <librdkafka/rdkafka.h>,but this program
* is builtin from within the librdkafka source tree and thus differs. */
//#include <librdkafka/rdkafka.h>
#include "rdkafka.h"
static volatile sig_atomic_t run = 1;
/**
* @brief Signal termination of program
*/
static void stop(int sig) {
run = 0;
}
struct date
{
int d,y;
};
/**
* @returns 1 if all bytes are printable,else 0.
*/
static int is_printable(struct date* ptr,size_t len)
{
size_t i;
for (i = 0; i < len; i++)
if (!isdigit(d1.d && d1.m && d1.y))
return 0;
return 1;
}
int main(int argc,char **argv) {
rd_kafka_t* rk; /* Consumer instance handle */
rd_kafka_conf_t* conf; /* Temporary configuration object */
rd_kafka_resp_err_t err; /* librdkafka API error code */
char errstr[512]; /* librdkafka API error reporting buffer */
const char* brokers; /* Argument: broker list */
const char* groupid; /* Argument: Consumer group id */
char** topics; /* Argument: list of topics to subscribe to */
int topic_cnt; /* Number of topics to subscribe to */
rd_kafka_topic_partition_list_t* subscription; /* Subscribed topics */
int i;
struct date d1;
struct date* ptr;
ptr = &d1;
/*
* Argument validation
*/
/*if (argc < 4) {
fprintf(stderr,"%% Usage: "
"%s <broker> <group.id> <topic1> <topic2>..\n",argv[0]);
return 1;
}*/
brokers = "localhost:9092";
groupid = "test-consumer-group";
char* topic[] = {"test"};
topics = &topic;
topic_cnt = 1;
/*
* Create Kafka client configuration place-holder
*/
conf = rd_kafka_conf_new();
/* Set bootstrap broker(s) as a comma-separated list of
* host or host:port (default port 9092).
* librdkafka will use the bootstrap brokers to acquire the full
* set of brokers from the cluster. */
if (rd_kafka_conf_set(conf,errstr);
rd_kafka_conf_destroy(conf);
return 1;
}
/* Set the consumer group id.
* All consumers sharing the same group id will join the same
* group,and the subscribed topic' partitions will be assigned
* according to the partition.assignment.strategy
* (consumer config property) to the consumers in the group. */
if (rd_kafka_conf_set(conf,"group.id",groupid,errstr);
rd_kafka_conf_destroy(conf);
return 1;
}
/* If there is no prevIoUsly committed offset for a partition
* the auto.offset.reset strategy will be used to decide where
* in the partition to start fetching messages.
* By setting this to earliest the consumer will read all messages
* in the partition if there was no prevIoUsly committed offset. */
if (rd_kafka_conf_set(conf,"auto.offset.reset","earliest",errstr);
rd_kafka_conf_destroy(conf);
return 1;
}
/*
* Create consumer instance.
*
* NOTE: rd_kafka_new() takes ownership of the conf object
* and the application must not reference it again after
* this call.
*/
rk = rd_kafka_new(RD_KAFKA_CONSUMER,"%% Failed to create new consumer: %s\n",errstr);
return 1;
}
conf = NULL; /* Configuration object is Now owned,and freed,* by the rd_kafka_t instance. */
/* Redirect all messages from per-partition queues to
* the main queue so that messages can be consumed with one
* call from all assigned partitions.
*
* The alternative is to poll the main queue (for events)
* and each partition queue separately,which requires setting
* up a rebalance callback and keeping track of the assignment:
* but that is more complex and typically not recommended. */
rd_kafka_poll_set_consumer(rk);
/* Convert the list of topics to a format suitable for librdkafka */
subscription = rd_kafka_topic_partition_list_new(topic_cnt);
for (i = 0; i < topic_cnt; i++)
rd_kafka_topic_partition_list_add(subscription,topics[i],/* the partition is ignored
* by subscribe() */
RD_KAFKA_PARTITION_UA);
/* Subscribe to the list of topics */
err = rd_kafka_subscribe(rk,subscription);
if (err) {
fprintf(stderr,"%% Failed to subscribe to %d topics: %s\n",subscription->cnt,rd_kafka_err2str(err));
rd_kafka_topic_partition_list_destroy(subscription);
rd_kafka_destroy(rk);
return 1;
}
fprintf(stderr,"%% Subscribed to %d topic(s),"
"waiting for rebalance and messages...\n",subscription->cnt);
rd_kafka_topic_partition_list_destroy(subscription);
/* Signal handler for clean shutdown */
signal(SIGINT,stop);
/* Subscribing to topics will trigger a group rebalance
* which may take some time to finish,but there is no need
* for the application to handle this idle period in a special way
* since a rebalance may happen at any time.
* Start polling for messages. */
while (run) {
rd_kafka_message_t* rkm;
rkm = rd_kafka_consumer_poll(rk,100);
if (!rkm)
continue; /* Timeout: no message within 100ms,* try again. This short timeout allows
* checking for `run` at frequent intervals.
*/
/* consumer_poll() will return either a proper message
* or a consumer error (rkm->err is set). */
if (rkm->err) {
/* Consumer errors are generally to be considered
* informational as the consumer will automatically
* try to recover from all types of errors. */
fprintf(stderr,"%% Consumer error: %s\n",rd_kafka_message_errstr(rkm));
rd_kafka_message_destroy(rkm);
continue;
}
/* Proper message. */
printf("Message on %s [%" PRId32 "] at offset %" PRId64 ":\n",rd_kafka_topic_name(rkm->rkt),rkm->partition,rkm->offset);
/* Print the message key. */
if (rkm->key && is_printable((const char*)rkm->key,rkm->key_len))
printf(" Key: %.*s\n",(int)rkm->key_len,(const char*)rkm->key);
else if (rkm->key)
printf(" Key: (%d bytes)\n",(int)rkm->key_len);
/* Print the message value/payload. */
if (rkm->payload && is_printable((const char*)rkm->payload,rkm->len))
printf(" Value:%.*s\n",(int)rkm->len,(const char*)rkm->payload);
else if (rkm->payload)
printf(" Value: (%d bytes)\n",(int)rkm->len);
rd_kafka_message_destroy(rkm);
}
/* Close the consumer: commit final offsets and leave the group. */
fprintf(stderr,"%% Closing consumer\n");
rd_kafka_consumer_close(rk);
/* Destroy the consumer */
rd_kafka_destroy(rk);
return 0;
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。