微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何检查 gcp pubsub 空/非活动订阅

如何解决如何检查 gcp pubsub 空/非活动订阅

我有一个应用程序订阅 GCP 中的主题,当那里有一些消息时,它会下载它们并将它们发送到 ActiveMQ 上的队列。

为了加快这个过程,我使用了 executorService 并启动了多个线程来向 activeMQ 发送消息。由于订阅应该是一项正在进行的任务,我将代码放入 while(true) 循环中,因此我无法以正常方式关闭 executorService,因为我将在每个循环。

订阅为空(主题中没有数据)2 或 3 分钟或一些不活动窗口时,我正在寻找一种优雅的方法关闭 executorService。然后当有一些新数据时它当然会重新开始。

以下是我不喜欢的想法,它只是一个计数器,当订阅未检索到任何数据时,我会递增该计数器。

我正在寻找一种更优雅的方式来做到这一点。

@Service
@Slf4j
public class PubSubSubscriberService {

private static final int EMPTY_SUBSCRIPTION_COUNTER = 4;
private static final Logger businessLogger = LoggerFactory.getLogger("BusinessLogger");
private Queue<PubsubMessage> messages = new ConcurrentLinkedQueue<>();



public void pullMessagesAndSendTobroker(CompositeConfigurationElement cce) {

    var patchSize = cce.getSubscriber().getPatchSize();
    var nThreads = cce.getSubscriber().getSendingParallelThreads();
    var scheduledTasks = 0;
    var subscribeCounter = 0;
    ThreadPoolExecutor threadPoolExecutor = null;


    while (true) {
       try {
           if (subscribeCounter < EMPTY_SUBSCRIPTION_COUNTER) {
                log.info("Creating Executor Service for uploading to broker with a thread pool of Size: " + nThreads);
            threadPoolExecutor = getThreadPoolExecutor(nThreads);
        }

        var subscriber = this.getSubscriber(cce);
        this.startSubscriber(subscriber,cce);
        this.checkActivity(threadPoolExecutor,subscribeCounter++);


        // send patches of {{ messagesPerIteration }}
        while (this.messages.size() > patchSize) {
            if (poolIsReady(threadPoolExecutor,nThreads)) {
                UploadTask task = new UploadTask(this.messages,cce,cf,patchSize);
                threadPoolExecutor.submit(task);
                scheduledTasks ++;
            }
            subscribeCounter = 0;
        }

        // send the rest
        if (this.messages.size() > 0) {
            UploadTask task = new UploadTask(this.messages,patchSize);
            threadPoolExecutor.submit(task);
            scheduledTasks ++;
            subscribeCounter = 0;
        }

        if (scheduledTasks > 0) {
            businessLogger.info("Scheduled " + scheduledTasks + " upload tasks of size upto: " + patchSize + ",preparing to start subscribing for 30 more sec") ;
            scheduledTasks = 0;
        }
    } catch ( Exception e) {
        e.printstacktrace();
        businessLogger.error(e.getMessage());
    }
}

解决方法

您的池占用很少的空间和内存,并且在不使用时几乎不消耗 CPU。为您的池容量设置最大限制,并尝试缩小它的规模。如果要处理的消息过多,则任务会排队等待空闲的执行程序池来完成任务。

如果您有上下可扩展性,您的设计可能会被审查。您可以在集群中触发一个事件,并在其他 Pod 上并行处理它们,而不是 pod 内部的 executorPool。这些 Pod 将能够根据流量扩大和缩小(查看 Knative

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。