微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

使用 librdkafka 在 kafka 生产者中将结构作为输入而不是字符串

如何解决使用 librdkafka 在 kafka 生产者中将结构作为输入而不是字符串

我需要在生产者中将输入作为结构而不是字符串,并使用 librdkafka 库在消费者中打印它。我在生产者代码中进行了更改,甚至能够获得结构,但在消费者中它没有显示请帮助如果还需要一些改动

这是代码 生产者代码

struct date
{
int d,m,y;
};


int main(int argc,char** argv) {
rd_kafka_t* rk;         /* Producer instance handle */
rd_kafka_conf_t* conf;  /* Temporary configuration object */
char errstr[512];       /* librdkafka API error reporting buffer */
//char buf[512];          /* Message value temporary buffer */
const char* brokers;    /* Argument: broker list */
const char* topic;      /* Argument: topic to produce to */
struct date  d1;
struct date* ptr ;
ptr = &d1;
int i;



brokers ="localhost:9092";
topic = "test";




/*
 * Create Kafka client configuration place-holder
 */
conf = rd_kafka_conf_new();

/* Set bootstrap broker(s) as a comma-separated list of
 * host or host:port (default port 9092).
 * librdkafka will use the bootstrap brokers to acquire the full
 * set of brokers from the cluster. */
if (rd_kafka_conf_set(conf,"bootstrap.servers",brokers,errstr,sizeof(errstr)) != RD_KAFKA_CONF_OK) {
    fprintf(stderr,"%s\n",errstr);
    return 1;
}

/* Set the delivery report callback.
 * This callback will be called once per message to inform
 * the application if delivery succeeded or Failed.
 * See dr_msg_cb() above.
 * The callback is only triggered from rd_kafka_poll() and
 * rd_kafka_flush(). */
rd_kafka_conf_set_dr_msg_cb(conf,dr_msg_cb);

/*
 * Create producer instance.
 *
 * NOTE: rd_kafka_new() takes ownership of the conf object
 *       and the application must not reference it again after
 *       this call.
 */
rk = rd_kafka_new(RD_KAFKA_PRODUCER,conf,sizeof(errstr));
if (!rk) {
    fprintf(stderr,"%% Failed to create new producer: %s\n",errstr);
    return 1;
}

/* Signal handler for clean shutdown */
signal(SIGINT,stop);

fprintf(stderr,"%% enter \n");
/*fprintf(stderr,"%% Type some text and hit enter to produce message\n"
    "%% Or just hit enter to only serve delivery reports\n"
    "%% Press Ctrl-C or Ctrl-D to exit\n");*/



while (run && fscanf_s(stdin,"%d/%d/%d",&d1.d,&d1.m,&d1.y))
{ 
    
    
    size_t len = sizeof(d1);
    rd_kafka_resp_err_t err;
    fprintf(stdout,"%d/%d/%d \n",d1.d,d1.m,d1.y);
    

    //if ([len - 1] == '\n') /* Remove newline */
       // d1[--len] = '\0';
    
    
    if (len == 0) {
        /* Empty line: only serve delivery reports */
        rd_kafka_poll(rk,0/*non-blocking */);
        continue;
    }

retry:
    err = rd_kafka_producev(
        /* Producer handle */
        rk,/* Topic name */
        RD_KAFKA_V_TOPIC(topic),/* Make a copy of the payload. */
        RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_copY),/* Message value and length */
        RD_KAFKA_V_VALUE(ptr,len),/* Per-Message opaque,provided in
         * delivery report callback as
         * msg_opaque. */
        RD_KAFKA_V_OPAQUE(NULL),/* End sentinel */
        RD_KAFKA_V_END);

    if (err) {
        /*
         * Failed to *enqueue* message for producing.
         */
        fprintf(stderr,"%% Failed to produce to topic %s: %s\n",topic,rd_kafka_err2str(err));

        if (err == RD_KAFKA_RESP_ERR__QUEUE_FULL) {
            /* If the internal queue is full,wait for
             * messages to be delivered and then retry.
             * The internal queue represents both
             * messages to be sent and messages that have
             * been sent or Failed,awaiting their
             * delivery report callback to be called.
             *
             * The internal queue is limited by the
             * configuration property
             * queue.buffering.max.messages */
            rd_kafka_poll(rk,1000/*block for max 1000ms*/);
            goto retry;
        }
    }
    else {
        fprintf(stderr,"%% Enqueued message (%zd bytes) "
            "for topic %s\n",len,topic);
    }


    /* A producer application should continually serve
     * the delivery report queue by calling rd_kafka_poll()
     * at frequent intervals.
     * Either put the poll call in your main loop,or in a
     * dedicated thread,or call it after every
     * rd_kafka_produce() call.
     * Just make sure that rd_kafka_poll() is still called
     * during periods where you are not producing any messages
     * to make sure prevIoUsly produced messages have their
     * delivery report callback served (and any other callbacks
     * you register). */
    rd_kafka_poll(rk,0/*non-blocking*/);
}




/* Wait for final messages to be delivered or fail.
 * rd_kafka_flush() is an abstraction over rd_kafka_poll() which
 * waits for all messages to be delivered. */
fprintf(stderr,"%% Flushing final messages..\n");
rd_kafka_flush(rk,10 * 1000 /* wait for max 10 seconds */);

/* If the output queue is still not empty there is an issue
 * with producing messages to the clusters. */
if (rd_kafka_outq_len(rk) > 0)
    fprintf(stderr,"%% %d message(s) were not delivered\n",rd_kafka_outq_len(rk));

/* Destroy the producer instance */
rd_kafka_destroy(rk);

return 0;
}

消费者代码

#include <stdio.h>
#include <signal.h>
#include <string.h>
#include <ctype.h>


/* Typical include path would be <librdkafka/rdkafka.h>,but this program
* is builtin from within the librdkafka source tree and thus differs. */
//#include <librdkafka/rdkafka.h>
#include "rdkafka.h"


static volatile sig_atomic_t run = 1;

/**
* @brief Signal termination of program
*/
static void stop(int sig) {
run = 0;
}

struct date
{
int d,y;
};



/**
* @returns 1 if all bytes are printable,else 0.
*/
static int is_printable(struct date* ptr,size_t len)
{
size_t i;

for (i = 0; i < len; i++)
    if (!isdigit(d1.d && d1.m && d1.y))
        return 0;

return 1;
}




int main(int argc,char **argv) {
rd_kafka_t* rk;          /* Consumer instance handle */
rd_kafka_conf_t* conf;   /* Temporary configuration object */
rd_kafka_resp_err_t err; /* librdkafka API error code */
char errstr[512];        /* librdkafka API error reporting buffer */
const char* brokers;     /* Argument: broker list */
const char* groupid;     /* Argument: Consumer group id */
char** topics;           /* Argument: list of topics to subscribe to */
int topic_cnt;           /* Number of topics to subscribe to */
rd_kafka_topic_partition_list_t* subscription; /* Subscribed topics */
int i;
struct date  d1;
struct date* ptr;
ptr = &d1;


/*
 * Argument validation
 */
 /*if (argc < 4) {
     fprintf(stderr,"%% Usage: "
         "%s <broker> <group.id> <topic1> <topic2>..\n",argv[0]);
     return 1;
 }*/

brokers = "localhost:9092";
groupid = "test-consumer-group"; 
char* topic[] = {"test"};
topics = &topic;    
topic_cnt = 1;


/*
 * Create Kafka client configuration place-holder
 */
conf = rd_kafka_conf_new();

/* Set bootstrap broker(s) as a comma-separated list of
 * host or host:port (default port 9092).
 * librdkafka will use the bootstrap brokers to acquire the full
 * set of brokers from the cluster. */
if (rd_kafka_conf_set(conf,errstr);
    rd_kafka_conf_destroy(conf);
    return 1;
}

/* Set the consumer group id.
 * All consumers sharing the same group id will join the same
 * group,and the subscribed topic' partitions will be assigned
 * according to the partition.assignment.strategy
 * (consumer config property) to the consumers in the group. */
if (rd_kafka_conf_set(conf,"group.id",groupid,errstr);
    rd_kafka_conf_destroy(conf);
    return 1;
}

/* If there is no prevIoUsly committed offset for a partition
 * the auto.offset.reset strategy will be used to decide where
 * in the partition to start fetching messages.
 * By setting this to earliest the consumer will read all messages
 * in the partition if there was no prevIoUsly committed offset. */
if (rd_kafka_conf_set(conf,"auto.offset.reset","earliest",errstr);
    rd_kafka_conf_destroy(conf);
    return 1;
}

/*
 * Create consumer instance.
 *
 * NOTE: rd_kafka_new() takes ownership of the conf object
 *       and the application must not reference it again after
 *       this call.
 */
rk = rd_kafka_new(RD_KAFKA_CONSUMER,"%% Failed to create new consumer: %s\n",errstr);
    return 1;
}

conf = NULL; /* Configuration object is Now owned,and freed,* by the rd_kafka_t instance. */


              /* Redirect all messages from per-partition queues to
               * the main queue so that messages can be consumed with one
               * call from all assigned partitions.
               *
               * The alternative is to poll the main queue (for events)
               * and each partition queue separately,which requires setting
               * up a rebalance callback and keeping track of the assignment:
               * but that is more complex and typically not recommended. */
rd_kafka_poll_set_consumer(rk);


/* Convert the list of topics to a format suitable for librdkafka */
subscription = rd_kafka_topic_partition_list_new(topic_cnt);
for (i = 0; i < topic_cnt; i++)
    rd_kafka_topic_partition_list_add(subscription,topics[i],/* the partition is ignored
         * by subscribe() */
        RD_KAFKA_PARTITION_UA);

/* Subscribe to the list of topics */
err = rd_kafka_subscribe(rk,subscription);
if (err) {
    fprintf(stderr,"%% Failed to subscribe to %d topics: %s\n",subscription->cnt,rd_kafka_err2str(err));
    rd_kafka_topic_partition_list_destroy(subscription);
    rd_kafka_destroy(rk);
    return 1;
}

fprintf(stderr,"%% Subscribed to %d topic(s),"
    "waiting for rebalance and messages...\n",subscription->cnt);


rd_kafka_topic_partition_list_destroy(subscription);


/* Signal handler for clean shutdown */
signal(SIGINT,stop);

/* Subscribing to topics will trigger a group rebalance
 * which may take some time to finish,but there is no need
 * for the application to handle this idle period in a special way
 * since a rebalance may happen at any time.
 * Start polling for messages. */

while (run) {
    rd_kafka_message_t* rkm;
    rkm = rd_kafka_consumer_poll(rk,100);
    

    if (!rkm)
        continue; /* Timeout: no message within 100ms,*  try again. This short timeout allows
                   *  checking for `run` at frequent intervals.
                   */

                   /* consumer_poll() will return either a proper message
                    * or a consumer error (rkm->err is set). */
    if (rkm->err) {
        /* Consumer errors are generally to be considered
         * informational as the consumer will automatically
         * try to recover from all types of errors. */
        fprintf(stderr,"%% Consumer error: %s\n",rd_kafka_message_errstr(rkm));
        rd_kafka_message_destroy(rkm);
        continue;
    }

    /* Proper message. */
    printf("Message on %s [%" PRId32 "] at offset %" PRId64 ":\n",rd_kafka_topic_name(rkm->rkt),rkm->partition,rkm->offset);
    
   

    /* Print the message key. */
    if (rkm->key && is_printable((const char*)rkm->key,rkm->key_len))
        printf(" Key: %.*s\n",(int)rkm->key_len,(const char*)rkm->key);
    else if (rkm->key)
        printf(" Key: (%d bytes)\n",(int)rkm->key_len);

    /* Print the message value/payload. */
    if (rkm->payload && is_printable((const char*)rkm->payload,rkm->len))
        printf(" Value:%.*s\n",(int)rkm->len,(const char*)rkm->payload); 
    else if (rkm->payload)
        printf(" Value: (%d bytes)\n",(int)rkm->len); 

    rd_kafka_message_destroy(rkm);
}


/* Close the consumer: commit final offsets and leave the group. */
fprintf(stderr,"%% Closing consumer\n");
rd_kafka_consumer_close(rk);


/* Destroy the consumer */
rd_kafka_destroy(rk);

return 0;
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。