微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

php \RdKafka\Consumer 批量消息

如何解决php \RdKafka\Consumer 批量消息

你好,我在 PHP 中使用 Kafka。特别是这个库:\RdKafka。这是我的脚本,它运行良好,但我正在一条一条地接收消息,但我希望收到 10 条消息。需要一些帮助来实现这一目标,因为我是使用 Kafka 的新手。

消费者连接到 Kafka 并正确获取消息。

<?PHP
include_once("config.PHP");
const REQUEST_SLEEP_TIME = 12*1000; //12 seg
$conf = new RdKafka\Conf();
$conf->set("bootstrap.servers",$KAFKA_SOCKET);
$conf->set("group.id","test-consumer-group");
$rk = new RdKafka\Consumer($conf);

$topicConf = new RdKafka\TopicConf();
$topicConf->set("request.required.acks",1);
$topicConf->set("auto.commit.enable",0);
$topicConf->set("auto.commit.interval.ms",100);
 $topicConf->set("offset.store.method","broker");
$topic = $rk->newTopic(KAFKA_TOPIC,$topicConf);
$topic->consumeStart(0,RD_KAFKA_OFFSET_END);

$i = 0;
while (true) {
    echo "start $i \n";
    $message = $topic->consume(0,REQUEST_SLEEP_TIME);

    if (is_null($message)) {
        sleep(1);
        echo "No more messages: ".date("H:i:s")."\n";
        continue;
    }

    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            echo "RD_KAFKA_RESP_ERR_NO_ERROR\n";
            print_r($message->payload."\n");
        break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "No more messages; will wait for more\n";
        break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo "Timed out\n";
        break;
        default:
            throw new \Exception($message->errstr(),$message->err);
        break;
    }
    $i++;
    echo "end $i\n";
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。