微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

何时使用 selector.AddReceive, selector.Select

如何解决何时使用 selector.AddReceive, selector.Select

希望能澄清一下我何时应该使用 selector.AddReceiveselector.Select。这可能不是 Cadence 的问题,但也许我缺少一些关于 Golang 的知识。

对于 selector.Select,我认为基本思想是我们等待来自通道的下一个输出。不完全确定 selector.AddRecieve 的作用。

例如,在节奏示例中,local_activity link 并粘贴如下:

func signalHandlingWorkflow(ctx workflow.Context) error {
    logger := workflow.GetLogger(ctx)
    ch := workflow.GetSignalChannel(ctx,SignalName)
    for {
        var signal string
        if more := ch.Receive(ctx,&signal); !more {
            logger.Info("Signal channel closed")
            return cadence.NewCustomError("signal_channel_closed")
        }

        logger.Info("Signal received.",zap.String("signal",signal))

        if signal == "exit" {
            break
        }

        cwo := workflow.ChildWorkflowOptions{
            ExecutionStartToCloseTimeout: time.Minute,// TaskStartToCloseTimeout must be larger than all local activity execution time,because DecisionTask won't
            // return until all local activities completed.
            TaskStartToCloseTimeout: time.Second * 30,}
        childCtx := workflow.WithChildOptions(ctx,cwo)

        var processResult string
        err := workflow.ExecuteChildWorkflow(childCtx,processingWorkflow,signal).Get(childCtx,&processResult)
        if err != nil {
            return err
        }
        logger.Sugar().Infof("Processed signal: %v,result: %v",signal,processResult)
    }

    return nil
}

我们不使用任何 selector.AddReceive

但是,在此处的示例中,它也使用信号通道:Changing the uber cadence sleeptime based on external input

我也会在这里粘贴代码

func SampleTimerWorkflow(ctx workflow.Context,timerDelay time.Duration) error 
{
    logger := workflow.GetLogger(ctx)
    resetCh := workflow.GetSignalChannel(ctx,"reset")

    timerFired := false
    delay := timerDelay
    for ;!timerFired; {
        selector := workflow.NewSelector(ctx)

        logger.Sugar().Infof("Setting up a timer to fire after: %v",delay)
        timerCancelCtx,cancelTimerHandler := workflow.WithCancel(ctx)
        timerFuture := workflow.NewTimer(timerCancelCtx,delay)
        selector.AddFuture(timerFuture,func(f workflow.Future) {
            logger.Info("Timer Fired.")
            timerFired = true
        })

        selector.AddReceive(resetCh,func(c workflow.Channel,more bool) {
            logger.Info("Reset signal received.")
            logger.Info("Cancel outstanding timer.")
            cancelTimerHandler()

            var t int
            c.Receive(ctx,&t)
            logger.Sugar().Infof("Reset delay: %v seconds",t)
            delay = time.Second * time.Duration(t)
        })

        logger.Info("Waiting for timer to fire.")
        selector.Select(ctx)
    }

    workflow.GetLogger(ctx).Info("Workflow completed.")
    return nil
}

你可以看到有 selector.AddReceive,我不完全确定它的用途是什么,或者我什么时候应该使用它。

我正在尝试向我的工作流程发送一个信号,允许我延长到期时间。意思是,它会延迟 ExpirationActivity

调用

当按照这个例子(结合我当前的代码)时,一旦我发送信号重置,似乎 timerFired 立即被设置为 true。

我当前的代码如下(我删除了一些不相关的 if 语句),之前我只使用了一个 selector.Select 实例,但我的代码在某处运行不正常。

func Workflow(ctx workflow.Context) (string,error) {
    // local state per bonus workflow
    bonusAcceptanceState := pending
    logger := workflow.GetLogger(ctx).Sugar()
    logger.Info("Bonus workflow started")
    timerCreated := false
    timerFired := false
    delay := timerDelay

    // To query state in Cadence GUI
    err := workflow.SetQueryHandler(ctx,"bonusAcceptanceState",func(input []byte) (string,error) {
        return bonusAcceptanceState,nil
    })
    if err != nil {
        logger.Info("SetQueryHandler Failed: " + err.Error())
        return "",err
    }
    info := workflow.GetInfo(ctx)
    executionTimeout := time.Duration(info.ExecutionStartToCloseTimeoutSeconds) * time.Second
    // decisionTimeout := time.Duration(info.TaskStartToCloseTimeoutSeconds) * time.Second
    decisionTimeout := time.Duration(info.ExecutionStartToCloseTimeoutSeconds) * time.Second
    maxRetryTime := executionTimeout // retry for the entire time

    retryPolicy := &cadence.RetryPolicy{
        InitialInterval:          time.Second,BackoffCoefficient:       2,MaximumInterval:          executionTimeout,ExpirationInterval:       maxRetryTime,MaximumAttempts:          0,// unlimited,bound by maxRetryTime
        NonRetriableErrorReasons: []string{},}
    ao := workflow.ActivityOptions{
        TaskList:               taskList,ScheduletoStartTimeout: executionTimeout,// time until a task has to be picked up by a worker
        ScheduletoCloseTimeout: executionTimeout,// total execution timeout
        StartToCloseTimeout:    decisionTimeout,// time that a worker can take to process a task
        RetryPolicy:            retryPolicy,}
    ctx = workflow.WithActivityOptions(ctx,ao)
    selector := workflow.NewSelector(ctx)
    timerCancelCtx,cancelTimerHandler := workflow.WithCancel(ctx)

    var signal *singalType

    for {
        signalChan := workflow.GetSignalChannel(ctx,signalName)
        // resetCh := workflow.GetSignalChannel(ctx,"reset")

        selector.AddReceive(signalChan,more bool) {
            c.Receive(ctx,&signal)
        })

        selector.Select(ctx)

        if signal.Type == "exit" {
            return "",nil
        }

        // We can check the age and return an appropriate response
        if signal.Type == "ACCEPT" {
            if bonusAcceptanceState == pending {
                logger.Info("Bonus Accepted")
                bonusAcceptanceState = accepted

                var status string
                future := workflow.ExecuteActivity(ctx,AcceptActivity)
                if err := future.Get(ctx,&status); err != nil {
                    logger.Errorw("Activity Failed","error",err)
                }
                // Start expiration timer
                if !timerCreated {
                    timerCreated = true
                    timerFuture := workflow.NewTimer(timerCancelCtx,delay)
                    selector.AddFuture(timerFuture,func(f workflow.Future) {
                        logger.Info("Timer Fired.")
                        timerFired = true
                    })
                }

            }
        }
        
        
        if signal.Type == "ROLlovER_1X" && bonusAcceptanceState == accepted {
            var status string
            future := workflow.ExecuteActivity(ctx,Rollover1x)
            if err := future.Get(ctx,&status); err != nil {
                logger.Errorw("Activity Failed",err)
            }
            selector.Select(ctx)
        }
        if signal.Type == "ROLlovER_COMPLETE" && bonusAcceptanceState == accepted {
            var status string
            future := workflow.ExecuteActivity(ctx,RolloverComplete)
            if err := future.Get(ctx,err)
                return "",err
            }
            // Workflow is terminated on return result
            return status,nil
        }
        for; !timerFired && bonusAcceptanceState == accepted && signal.Type == "RESET" {
            cancelTimerHandler()

            i,err := strconv.Atoi(signal.Value)
            if err != nil {
                logger.Infow("error in converting")
            }

            logger.Infof("Reset delay: %v seconds",i)
            delay = time.Minute * time.Duration(i)
            timerFuture := workflow.NewTimer(timerCancelCtx,delay)
            selector.AddFuture(timerFuture,func(f workflow.Future) {
                logger.Info("Timer Fired.")
                timerFired = true
            })
            selector.Select(ctx)
        }
        if timerFired {
            var status string
            future := workflow.ExecuteActivity(ctx,ExpirationActivity)
            if err := future.Get(ctx,err)
            }
            return status,nil
        }
    }

}

解决方法

查看未来的返回结果

selector.AddFuture(timerFuture,func(f workflow.Future) {
    err := f.Get(ctx,nil)
    if err == nil {
        logger.Info("Timer Fired.")
        timerFired = true
    }
})

参考:https://github.com/uber-go/cadence-client/blob/0256258b905b677f2f38fcacfbda43398d236309/workflow/deterministic_wrappers.go#L128-L129

,

TL;DR:

  • 只有在需要让选择器侦听频道时才使用 selector.AddReceive,例如在您的第二个代码段中。如果您只需要直接处理来自通道的信号而无需选择器,则不需要使用它。
  • selector.Select 是让代码等待一些事件发生。因为你不想用忙循环来等待。

有关何时使用它们的更多详细信息

本质上,这与 Golang select statement 的概念完全相同。 Golang select 允许您等待计时器和频道。除了 Golang 没有 selector.Select() 仅仅是因为它已经融入语言本身,但 Cadence 是一个库。

与在 golang 中一样,您不必使用 select 语句来使用计时器或通道。只有在必须编写一些代码来侦听多个事件源时才需要它。

例如,如果您有两个通道,您想编写一些通用逻辑来处理这两个通道,例如增加一个计数器。此计数器不属于任何通道。这是一个普通的计数器。然后使用 selector 看起来不错。

chA := workflow.GetSignalChannel(ctx,SignalNameA)
chB := workflow.GetSignalChannel(ctx,SignalNameB)
counter := 0

selector.AddReceive(chA)
selector.AddReceive(chB)
For {
  selector.Select()
  counter += 1
}

带有选择器的工作流代码与 Golang 中的非常相似:

    counter := 0
    for {
        select {
        case _ := <- chA:
            counter += 1
        case _ := <- chB:
            counter += 1
        }
    }

否则你可能不得不使用两个 goroutines 来监听每个通道,并进行计数。 golang 代码如下所示:

counter := 0
go func(){
   for{
      _ := <- chA
      counter += 1
   }
}()

go func(){
   for{
      _ := <- chB
      counter += 1
   }
}()

这可能是竞争条件的问题。除非计数器被很好地实现为线程安全。

在 Cadence 工作流代码中,它是这样的:

chA := workflow.GetSignalChannel(ctx,SignalNameB)
counter := 0

Workflow.Go(ctx){
   for{
     chA.Receive(ctx,nil)
     counter +=1
   }
}

Workflow.Go(ctx){
   for{
     chB.Receive(ctx,nil)
     counter +=1
   }
}

但是,Cadence 中没有这样的竞争条件,因为 Cadence 的协程(由Workflow.Go() 启动)并不是真正的并发。上面的两个工作流代码都应该可以完美运行。

但 Cadence 仍然提供与 Golang 相同的 selector,主要是因为第一个更自然地编写代码。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。