微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何从方法内部和外部中断同步订阅?

如何解决如何从方法内部和外部中断同步订阅?

问题:我订阅一个永无止境的消息服务,我的代码需要检查是否有任何消息满足条件,如果满足,则在处理所有消息之前关闭订阅并返回true。如果我已经处理了所有消息并且条件不满足,那么我需要关闭订阅并返回false。

例如条件是foo = 5

message dataset early success :
msg1: foo=1
msg2: foo=2
msg3: foo=5 <= condition satisfied,return true and stop processing
msg4: foo=6

message dataset failure :
msg1: foo=1
msg2: foo=2
msg3: foo=3 
msg4: foo=4 <= no more messages,return false and stop processing

我使用的订阅一个同步方法,我必须传递一个 async EventHandler。 这是适用于这两种情况的运行代码lastMessageReceivedDateTime 跟踪上次收到消息的时间(以识别消息的结尾),_conditionStatisfied 告诉我是否有数据:>

private DateTime lastMessageReceivedDateTime;
private bool _conditionSatisfied;

public Task<bool> CheckSubscription(IThirdParyCode connection)
{
     var subscription = connection.Subscribe(async (obj,args) =>
     {
         lastMessageReceivedDateTime = DateTime.Now;
         if(args.Message.foo == 5)
         {
              _conditionSatisfied = true;
         }
     });

     while (lastMessageReceivedDateTime.AddSeconds(1) > DateTime.Now  && !_conditionSatisfied)
     {
         Thread.Sleep(500);
     }

     subscription?.Unsubscribe();
     return _activityCheckSatisfied;
}

这可行,但我想知道是否有更好的解决方案。

注意:我不能简单地等待异步方法,因为在我取消订阅之前它永远不会返回/完成。

更多信息:connection 的类型是 IStanConnection(来自 NATS),Subscribe 的签名是:

IStanSubscription Subscribe(string subject,StanSubscriptionoptions options,EventHandler<StanMsgHandlerArgs> handler);

我已经简化了签名以专注于我遇到问题的代码

解决方法

根据您的代码示例,我可以假设如果在最后一条消息的一秒钟内没有新消息,则消息流结束。 可以修改您的解决方案以消除活动等待循环并将其替换为单个 await 调用。它将基于两个任务:

  1. 第一个任务将跟踪成功完成情况(在您的示例中为 _conditionSatisfied)并且将由 TaskCompletionSource 设置。SetResult
  2. 第二个任务将尝试使用 CancellationToken 任务包装器 (example implementation of such wrapper) 和 CancellationTokenSource.CancelAfter 的组合来通知流结束,这将尝试在每次迭代后取消任务并延迟。这应该替换 lastMessageReceivedDateTime.AddSeconds(1) > DateTime.Now 条件。

修改后的代码应该是这样的:

private CancellationTokenSource streamEndCancellation = new CancellationTokenSource();
private TaskCompletionSource<bool> satisfiedCompletionSource = new TaskCompletionSource<bool>();

public async Task<bool> CheckSubscription(IThirdParyCode connection)
{
     // CancellationTokenTaskSource is in third-party library and not part of .NET
     var streamEndSource = new CancellationTokenTaskSource<bool>(streamEndCancellation.Token);

     var subscription = connection.Subscribe(async (obj,args) =>
     {
         lastMessageReceivedDateTime = DateTime.Now;
         if(args.Message.foo == 5)
         {
             satisfiedCompletionSource.SetResult(true);
         }
         streamEndCancellation.CancelAfter(1000);
     });

     Task<bool> actualTask = await Task.WhenAny<bool>(satisfiedCompletionSource.Task,streamEndSource.Task);
          
     subscription?.Unsubscribe();
     return !actualTask.IsCanceled;
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。