如何解决如何从方法内部和外部中断同步订阅?
问题:我订阅了一个永无止境的消息服务,我的代码需要检查是否有任何消息满足条件,如果满足,则在处理所有消息之前关闭订阅并返回true。如果我已经处理了所有消息并且条件不满足,那么我需要关闭订阅并返回false。
例如条件是foo = 5
:
message dataset early success :
msg1: foo=1
msg2: foo=2
msg3: foo=5 <= condition satisfied,return true and stop processing
msg4: foo=6
message dataset failure :
msg1: foo=1
msg2: foo=2
msg3: foo=3
msg4: foo=4 <= no more messages,return false and stop processing
我使用的订阅有一个同步方法,我必须传递一个 async EventHandler
。
这是适用于这两种情况的运行代码,lastMessageReceivedDateTime
跟踪上次收到消息的时间(以识别消息的结尾),_conditionStatisfied
告诉我是否有数据:>
private DateTime lastMessageReceivedDateTime;
private bool _conditionSatisfied;
public Task<bool> CheckSubscription(IThirdParyCode connection)
{
var subscription = connection.Subscribe(async (obj,args) =>
{
lastMessageReceivedDateTime = DateTime.Now;
if(args.Message.foo == 5)
{
_conditionSatisfied = true;
}
});
while (lastMessageReceivedDateTime.AddSeconds(1) > DateTime.Now && !_conditionSatisfied)
{
Thread.Sleep(500);
}
subscription?.Unsubscribe();
return _activityCheckSatisfied;
}
这可行,但我想知道是否有更好的解决方案。
注意:我不能简单地等待异步方法,因为在我取消订阅之前它永远不会返回/完成。
更多信息:connection
的类型是 IStanConnection
(来自 NATS),Subscribe
的签名是:
IStanSubscription Subscribe(string subject,StanSubscriptionoptions options,EventHandler<StanMsgHandlerArgs> handler);
我已经简化了签名以专注于我遇到问题的代码。
解决方法
根据您的代码示例,我可以假设如果在最后一条消息的一秒钟内没有新消息,则消息流结束。
可以修改您的解决方案以消除活动等待循环并将其替换为单个 await
调用。它将基于两个任务:
- 第一个任务将跟踪成功完成情况(在您的示例中为
_conditionSatisfied
)并且将由 TaskCompletionSource 设置。SetResult - 第二个任务将尝试使用 CancellationToken 任务包装器 (example implementation of such wrapper) 和 CancellationTokenSource.CancelAfter 的组合来通知流结束,这将尝试在每次迭代后取消任务并延迟。这应该替换
lastMessageReceivedDateTime.AddSeconds(1) > DateTime.Now
条件。
修改后的代码应该是这样的:
private CancellationTokenSource streamEndCancellation = new CancellationTokenSource();
private TaskCompletionSource<bool> satisfiedCompletionSource = new TaskCompletionSource<bool>();
public async Task<bool> CheckSubscription(IThirdParyCode connection)
{
// CancellationTokenTaskSource is in third-party library and not part of .NET
var streamEndSource = new CancellationTokenTaskSource<bool>(streamEndCancellation.Token);
var subscription = connection.Subscribe(async (obj,args) =>
{
lastMessageReceivedDateTime = DateTime.Now;
if(args.Message.foo == 5)
{
satisfiedCompletionSource.SetResult(true);
}
streamEndCancellation.CancelAfter(1000);
});
Task<bool> actualTask = await Task.WhenAny<bool>(satisfiedCompletionSource.Task,streamEndSource.Task);
subscription?.Unsubscribe();
return !actualTask.IsCanceled;
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。