微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何在同一个盒子上独立运行多个kafka使用者?

我有两个Kafka消费者ConsumerA和ConsumerB.我想在同一台机器上独立运行这两个kafka消费者.它们之间根本没有关系.这两个kafka消费者将在同一台机器上处理不同的主题.

>每个消费者都应该有一个不同的Properties对象.
>每个消费者应该具有不同的线程池配置,因为它们可以以多线程方式(消费者组)运行,如果需要独立于其他消费者.

以下是我的设计:

消费者类(摘要):

public abstract class Consumer implements Runnable {
    private final Properties consumerProps;
    private final String consumerName;

    public Consumer(String consumerName,Properties consumerProps) {
        this.consumerName = consumerName;
        this.consumerProps = consumerProps;
    }

    protected abstract void shutdown();
    protected abstract void run(String consumerName,Properties consumerProps);

    @Override
    public final void run() {
        run(consumerName,consumerProps);
    }
}

消费者类:

public class ConsumerA extends Consumer {
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private KafkaConsumer<byte[],byte[]> consumer;

    public ConsumerA(String consumerName,Properties consumerProps) {
        super(consumerName,consumerProps);
    }

    @Override
    public void shutdown() {
        closed.set(true);
        consumer.wakeup();
    }

    @Override
    protected void run(String consumerName,Properties consumerProps) {
        consumer = new KafkaConsumer<>(consumerProps);
        consumer.subscribe(getTopicsBasisOnConsumerName());

        Map<String,Object> config = new HashMap<>();
        config.put(Config.URLS,TEST_URL);
        GenericRecordDomainDataDecoder decoder = new GenericRecordDomainDataDecoder(config);

        try {
            while (!closed.get()) {
                ConsumerRecords<byte[],byte[]> records = consumer.poll(Long.MAX_VALUE);
                for (ConsumerRecord<byte[],byte[]> record : records) {
                    GenericRecord payload = decoder.decode(record.value());
                    // extract data from payload
                    System.out.println("topic = %s,partition = %s,offset = %d,customer = %s,country = %s\n",record.topic(),record.partition(),record.offset(),record.key(),record.value());
                }
                consumer.commitAsync();
            }
        } catch (WakeupException ex) {
            // Ignore exception if closing
            System.out.println("error= ",ex);
            if (!closed.get()) throw e;             
        } catch (Exception ex) {
            System.out.println("error= ",ex);      
        } finally {
            try {
                consumer.commitSync();
            } finally {
                consumer.close();
            }
        }
    }
}

ConsumerA B类:

// similar to `ConsumerA` but with specific details of B

ConsumerHandler类:

public final class ConsumerHandler {
  private final ExecutorService executorServiceConsumer;
  private final Consumer consumer;
  private final List<Consumer> consumers = new ArrayList<>();

  public ConsumerHandler(Consumer consumer,int poolSize) {
    this.executorServiceConsumer = Executors.newFixedThreadPool(poolSize);
    this.consumer = consumer;
    for (int i = 0; i < poolSize; i++) {
      this.consumers.add(consumer);
      executorServiceConsumer.submit(consumer);
    }
 }
  public void shutdown() {
    Runtime.getRuntime().addShutdownHook(new Thread() {
      @Override
      public void run() {
        for (Consumer consumer : consumers) {
          consumer.shutdown();
        }
        executorServiceConsumer.shutdown();
        try {
          executorServiceConsumer.awaitTermination(1000,TimeUnit.MILLISECONDS);
        } catch (InterruptedException ex) {
          Thread.currentThread().interrupt();
        }
      }
    });
  }
}

下面是我的一个项目中的主要课程,如果我启动我的服务器,呼叫将首先自动进入,并从这个地方开始我执行我的ConsumerA和ConsumerB的所有kafka消费者.一旦调用shutdown,我就通过在所有Kafka消费者上调用shutdown来释放所有资源.

import javax.annotation.postconstruct;
import javax.annotation.PreDestroy;
import javax.inject.Singleton;

@Singleton
@DependencyInjectionInitializer
public class Initializer {
  private ConsumerHandler consumerHandlerA;
  private ConsumerHandler consumerHandlerB;

  @postconstruct
  public void init() {
    consumerHandlerA = new ConsumerHandler (new ConsumerA("consumerA",getConsumerPropsA()),3);
    consumerHandlerB = new ConsumerHandler (new ConsumerB("consumerB",getConsumerPropsB()),3);
  }

  @PreDestroy
  public void shutdown() {
    consumerHandlerA.shutdown();
    consumerHandlerB.shutdown();
  }
}

对于我想在同一个盒子上运行多个kafka消费者的这类问题,这是正确的设计吗?如果有更好更有效的方法解决这个问题,请告诉我.一般来说,我将在同一个盒子上运行最多三个或四个Kafka消费者,并且每个消费者可以根据需要拥有自己的消费者群体.

这是我在消费者中使用的KafkaConsumer的Javadoc.基于这个article,我已经创建了我的消费者,只是我使用了抽象类来扩展它.在该链接搜索“全部放在一起”.

在文档中提到消费者不是线程安全的,但看起来我的代码正在为池中的每个线程重用相同的消费者实例.

public ConsumerHandler(Consumer consumer,int poolSize) {
    this.executorServiceConsumer = Executors.newFixedThreadPool(poolSize);
    this.consumer = consumer;
    for (int i = 0; i < poolSize; i++) {
      this.consumers.add(consumer);
      executorServiceConsumer.submit(consumer);
    }
 }

解决此线程安全问题并仍然实现相同功能的最佳方法是什么?

解决方法

一个快速的建议,如果你已经知道它,道歉.类级变量永远不是线程安全的.如果需要为每个线程使用不同的Properties对象,最好在方法级别声明它们,并将它们作为参数提供给需要访问Properties对象的其他方法.

原文地址:https://www.jb51.cc/css/217716.html

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。