如何解决可观察的计时器处理 第一种情况第二种情况编辑第一种情况第二种情况结论
我正在使用 Reactive .NET 扩展,我想知道它的处理方式。我知道在某些情况下最好这样处理它:.takeuntil(Observable.Timer(TimeSpan.FromMinutes(x)))
。我
第一种情况
在这种情况下,我有一个计时器,它在 x 秒后触发,然后它完成并应该被处理。
public void ScheduleOrderCancellationIfNotFilled(string pair,long orderId,int waitSecondsBeforeCancel)
{
Observable.Timer(TimeSpan.FromSeconds(waitSecondsBeforeCancel))
.Do(e =>
{
var result = _client.Spot.Order.Getorder(pair,orderId);
if (result.Success)
{
if (result.Data?.Status != OrderStatus.Filled)
{
_client.Spot.Order.CancelOrder(pair,orderId);
}
}
})
.Subscribe();
}
第二种情况
在这种情况下,计时器在第一秒运行,然后每 29 分钟重复一次。这应该一直存在,直到它的定义类被处理。我相信这个应该与 Idisposable
实现一起处理。怎么样?
var keepAliveListenKey = Observable.Timer(TimeSpan.FromSeconds(1),TimeSpan.FromMinutes(29))
.Do(async e =>
{
await KeepAliveListenKeyAsync().ConfigureAwait(false);
})
.Subscribe();
编辑
我还希望它使用 Subject<T>
,这样可以更轻松地处理和重置订阅。
例如。 Reset and Dispose observable subscriber,Reactive Extensions(@Enigmativity)
public class UploadDicomSet : ImportBaseSet
{
Idisposable subscription;
Subject<IObservable<long>> subject = new Subject<IObservable<long>>();
public UploadDicomSet()
{
subscription = subject.Switch().Subscribe(s => CheckUploadSetList(s));
subject.OnNext(Observable.Interval(TimeSpan.FromMinutes(2)));
}
void CheckUploadSetList(long interval)
{
subject.OnNext(Observable.Never<long>());
// Do other things
}
public void AddDicomFile(SharedLib.DicomFile dicomFile)
{
subject.OnNext(Observable.Interval(TimeSpan.FromMinutes(2)));
// Reset the subscription to go off in 2 minutes from Now
// Do other things
}
}
解决方法
在第一种情况下,它会被自动处理。实际上,这是实现自动订阅管理的常用方法,而且绝对是处理 rx
的好方法。
在第二种情况下,您过度设计了。 Observable.Timer(TimeSpan.FromSeconds(1),TimeSpan.FromSeconds(1))
本身足以随时间生成一系列递增的 long
。由于此流本质上是无穷无尽的,您是对的 - 需要显式订阅管理。所以它就足够了:
var sub = Observable.Timer(TimeSpan.FromSeconds(1),TimeSpan.FromSeconds(1)).Subscribe()
...然后sub.Dispose()
它。
附言请注意,在您的代码中,您 .Do
async/await
。很可能那不是你想要的。您希望 SelectMany
确保正确等待 async
操作并处理异常。
在评论部分回答您的问题:
如果使用 Subject 来处理呢?
嗯,没什么特别的。 IObserver<>
、IObservable<>
都是由此类实现的,因此它类似于经典的 .NET 事件(要对某个事件调用的回调列表)。就您的问题和用例而言,它在任何意义上都没有区别。
你能举一个关于 .Do 与异常处理的例子吗?
当然。这个想法是你想将你的 async/await
封装成一些 Task<T>
到 IObservable<T>
这样既保留取消信号又保留错误信号。为此,必须使用 .SelectMany
方法(就像 LINQ 中的 SelectMany
,同样的想法)。所以只需将您的 .Do
更改为 .SelectMany
。
Observable
.Timer(TimeSpan.FromSeconds(1),TimeSpan.FromSeconds(1))
.SelectMany(_ => Observable.FromAsync(() => /* that's the point where your Task<> becomes Observable */ myTask))
我又糊涂了。我需要 IObservable
很可能,您不需要开关。为什么?因为它的创建主要是为了避免 IO 竞争条件,这样每当发出新事件时,当前事件(可能由于自然并行或异步工作流而正在进行)保证被取消(即取消订阅)。否则竞争条件会(并且将会)损害您的状态。
相反,SelectMany 将确保所有这些都按顺序发生,它们确实以某种总顺序发生。什么都不会被取消。您将完成(等待,如果您愿意)当前回调,然后触发下一个。当然,这种行为可以通过适当的 IScheduler
来改变,但那是另外一回事了。
Reactive Observable Subscription Disposal (@Enigmativity)
Subscribe 扩展方法返回的 Disposable 仅返回以允许您在 observable 自然结束之前手动取消订阅 observable。
如果 observable 完成 - 带有 OnCompleted 或 OnError - 那么订阅已经为你处理了。
需要注意的一件重要事情:垃圾收集器永远不会对 observable 订阅调用 .Dispose(),因此,如果您的订阅在您的订阅超出范围之前没有(或可能没有)自然结束,您必须处理它们。
第一种情况
在第一种情况下,看起来我不需要手动 .Dispose() 订阅,因为它自然结束。
Dispose 在最后被触发。
var xs = Observable.Create<long>(o =>
{
var d = Observable.Timer(TimeSpan.FromSeconds(5))
.Do(e =>
{
Console.WriteLine("5 seconds elapsed.");
})
.Subscribe(o);
return Disposable.Create(() =>
{
Console.WriteLine("Disposed!");
d.Dispose();
});
});
var subscription = xs.Subscribe(x => Console.WriteLine(x));
第二种情况
但在第二种情况下,如果它没有“自然地”结束,我应该处理它。
除非手动处理,否则不会触发处理。
var xs = Observable.Create<long>(o =>
{
var d = Observable.Timer(TimeSpan.FromSeconds(1),TimeSpan.FromSeconds(1))
.Do(e =>
{
Console.WriteLine("Test.");
})
.Subscribe(o);
return Disposable.Create(() =>
{
Console.WriteLine("Disposed!");
d.Dispose();
});
});
var subscription = xs.Subscribe(x => Console.WriteLine(x));
结论
他举了这么好的例子,如果你问自己同样的问题,值得一看。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。