微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

在单独的线程上重试

如何解决在单独的线程上重试

如果某些任务第一次失败,我会尝试重新处理它们。我不想使用可完成的未来,因为它会打开新的线程和会话,所以当我们遇到很多失败时,它对我来说似乎不可扩展。

我正在使用 dropwizard,在我的服务开始时,我将队列传递给主类和使用者。因为,我认为主类在单独的线程上运行,所以我认为我不需要再次重新创建生产者类 [不确定]。

import random

guess = int(input("Guess the number => "))
rand = random.randrange(1,10)
print("The number was",rand)
def guess_rand(guess,rand):
    if rand == guess:
        print("You have guessed right!")
    elif rand > guess:
        print("You guessed too low!")
    elif guess > rand:
        print("You guessed too high!")

guess_rand(guess,rand)

again = input("Would you like to try again? => ")
while again.lower() == 'yes':
    guess = int(input("Guess the number => "))
    rand = random.randrange(1,10)
    print("The number was",rand)
    guess_rand(guess,rand)
    
    again = input("Would you like to try again? => ")
    if again.lower() == 'exit':
      break
    else:
        continue

Consumer.java

@Override
  public void run(PpsConfiguration configuration,Environment environment) {
    final BlockingQueue<Object[]> queue  = new LinkedBlockingQueue<>();

    new Thread(new Consumer(
         queue)).start(); //consumer at start of service
    
    environment.jersey().register(new MainClass(
            queue));

主类.java 这是之前处理所有内容的主类。我只是展示了 public class Consumer extends MainClass implements Runnable { private final BlockingQueue<Object[]> queue; public Consumer(BlockingQueue<Object[]> queue) { super(queue); this.queue = queue; } @Override public void run() { try { while (true) { Object[] take = queue.take(); someFunction(take); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } 的定义。

让我们假设前端正在触发这个函数并且是入口点。

someFunction

(1) 我的理解是否正确,即我不需要单独创建 private int mainFunction(Object[] take) { if (process(take)) { return 1; } else { // putting into queue queue.add(take); //producer thread adding } } return 0; } public int someFunction(Object[] take) { int val = 0; for (int i=0;i<10; i++){ // retry 10 times until it succeeds LOGGER.info("retry # {}",i+1); val = process(take); if(val == 1){ break; } } return val; } 来将任务添加producer.java。 (2) 如何以及在何处返回 val? (3) 有没有办法像使用时间戳一样在 queue 秒后重试特定类型的任务?

解决方法

不想使用可完成的未来,因为它会打开新的线程和会话,所以当我们遇到很多失败时,它对我来说似乎没有可扩展性。

我不认为您应该修改或必须修改几乎整个组件的结构以应对角落\错误情况。 dropwizard 确实提供了两个额外的入口点,您可以使用它们来处理这种情况:

  1. 健康检查:https://www.dropwizard.io/en/latest/manual/core.html#health-checks
  2. 异常:https://www.dropwizard.io/en/latest/manual/core.html#error-handling

我更喜欢#1。

至于您的问题:

(1) 我的理解是否正确,即我不需要单独创建一个 producer.java 来将任务添加到队列中。

是的,不过见下文。

(2) 如何以及在何处返回 val?

当您延迟\重试任务时,似乎没有办法返回给发件人。您需要在链(注册)中插入类处理重试。但我认为这(很容易)不可能。

(3) 有没有办法像使用时间戳一样在 n 秒后重试特定类型的任务?

我认为您应该通过创建自己的配置来适应这一点 (https://www.dropwizard.io/en/latest/manual/core.html#configuration);也许多个配置类代表多种任务类型。

谢谢, -尼尔。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。