如何解决Rx.net 在 observable 中实现断开连接/错误时的重试功能
代码如下:
public class FooService
{
private ITransportService _transportService;
public FooService(ITransportService transportService)
{
_transportService = transportService;
_transportService.Connect();
}
public Idisposable Subscribe(IObserver<FooData> observer)
{
return _transportService.Getobservable()
.Subscribe(observer);
}
}
public interface ITransportService
{
ConnectionState State { get; }
bool Connect();
IObservable<FooData> Getobservable();
}
public class ClientConsumingProgram
{
class FooObserver : IObserver<FooData>
{
public void OnNext(FooData value)
{
//Client consuming without interruption
}
//.. on error.. onCompleted
}
public static void Main()
{
var fooService = new FooService(transportService);
var fooObserver = new FooObserver();
var disposable = fooService.Subscribe(fooObserver);
}
}
我想实现以下内容:
当传输服务断开时(套接字从服务器关闭),我希望应用程序重试几次,但 foo 服务首先需要在 Connect
上调用 _transportService
,然后调用一次 State
已连接,调用 Getobservable。
期望的结果是 OnNext
上的 FooObserver
继续在客户端打勾,如果 _transportService
在最大重试前再次连接,并且一旦超过最大错误 OnError 应该被触发。
有人能指出我实现这个的正确方向吗?
更新
public class FooService
{
private ITransportService _transportService;
public FooService(ITransportService transportService)
{
_transportService = transportService;
_transportService.Connect();
}
public Idisposable Subscribe(IObserver<FooData> observer)
{
return _transportService.GetConnectionStateObservable()
.Select(cs => cs == ConnectionState.Open)
.distinctUntilChanged()
.Select(isOpen => isOpen
? _transportService.Getobservable() //if open,return observable
: Observable.Start(() => _transportService.Connect()) //if not open,call connect and wait for connection to open
.IgnoreElements()
.Select(_ => default(FooData))
.Concat(Observable.Never<FooData>())
)
.Switch()
.Subscribe(observer);
}
}
public interface ITransportService
{
IObservable<ConnectionState> GetConnectionStateObservable();
bool Connect();
IObservable<FooData> Getobservable();
}
public class FooData
{
public int Id { get; set; }
public string Msg { get; set; }
}
public enum ConnectionState
{
Open,Close
}
public class FooMockTransportService : ITransportService
{
public ConnectionState State { get; set; }
private BehaviorSubject<ConnectionState> _connectionSubject = new BehaviorSubject<ConnectionState>(ConnectionState.Close);
private bool _shoulddisconnect;
public FooMockTransportService()
{
_shoulddisconnect = true;
}
public bool Connect()
{
State = ConnectionState.Open;
_connectionSubject.OnNext(ConnectionState.Open);
return true;
}
public IObservable<ConnectionState> GetConnectionStateObservable()
{
return _connectionSubject.AsObservable();
}
public IObservable<FooData> Getobservable()
{
return Observable.Create<FooData>(
o=>
{
TaskPoolScheduler.Default.Schedule(() =>
{
o.OnNext(new FooData { Id = 1,Msg = "First" });
o.OnNext(new FooData { Id = 2,Msg = "Sec" });
//Simulate disconnection,ony once
if(_shoulddisconnect)
{
_shoulddisconnect = false;
State = ConnectionState.Close;
o.OnError(new Exception("disconnected"));
_connectionSubject.OnNext(ConnectionState.Close);
}
o.OnNext(new FooData { Id = 3,Msg = "Third" });
o.OnNext(new FooData { Id = 4,Msg = "Fourth" });
});
return () => { };
});
}
}
public class Program
{
class FooObserver : IObserver<FooData>
{
public void OnCompleted()
{
throw new NotImplementedException();
}
public void OnError(Exception error)
{
Console.WriteLine(error);
}
public void OnNext(FooData value)
{
Console.WriteLine(value.Id);
}
}
public static void Main()
{
var transportService = new FooMockTransportService();
var fooService = new FooService(transportService);
var fooObserver = new FooObserver();
var disposable = fooService.Subscribe(fooObserver);
Console.Read();
}
}
代码是可编译的,并且还包含对 Shlomo 的建议。 当前输出:
1
2
System.Exception: disconnected
所需的输出,在断开连接时,它应该每 1 秒捕获并重试一次,以查看它是否已连接:
1
2
1
2
3
4
解决方法
如果您控制 ITransportService
,我建议您添加一个属性:
public interface ITransportService
{
ConnectionState State { get; }
bool Connect();
IObservable<FooData> GetObservable();
IObservable<ConnectionState> GetConnectionStateObservable();
}
一旦你能以一种可观察的方式获得状态,产生可观察的就变得更容易了:
public class FooService
{
private ITransportService _transportService;
public FooService(ITransportService transportService)
{
_transportService = transportService;
_transportService.Connect();
}
public IDisposable Subscribe(IObserver<FooData> observer)
{
return _transportService.GetConnectionStateObservable()
.Select(cs => cs == ConnectionState.Open)
.DistinctUntilChanged()
.Select(isOpen => isOpen
? _transportService.GetObservable() //if open,return observable
: Observable.Start(() => _transportService.Connect()) //if not open,call connect and wait for connection to open
.IgnoreElements()
.Select(_ => default(FooData))
.Concat(Observable.Never<FooData>())
)
.Switch()
.Subscribe(observer);
}
}
如果您不控制 ITransportService
,我建议您创建一个继承自它的接口,您可以在其中添加类似的属性。
顺便说一句,我建议您放弃 FooObserver
,您几乎不需要塑造自己的观察者。公开 observable,并在 Observable 上调用 Subscribe
重载通常可以解决问题。
但我无法测试任何这些:重试逻辑应该是什么样的,Connect
的返回值是什么意思,或者 ConnectionState
类是什么,以及代码无法编译。您应该尝试将您的问题设计为 mcve。
更新:
以下按预期处理测试代码:
public IDisposable Subscribe(IObserver<FooData> observer)
{
return _transportService.GetConnectionStateObservable()
.Select(cs => cs == ConnectionState.Open)
.DistinctUntilChanged()
.Select(isOpen => isOpen
? _transportService.GetObservable() //if open,return observable
.Catch(Observable.Never<FooData>())
: Observable.Start(() => _transportService.Connect()) //if not open,call connect and wait for connection to open
.IgnoreElements()
.Select(_ => default(FooData))
.Concat(Observable.Never<FooData>())
)
.Switch()
.Subscribe(observer);
}
仅对原始发布代码的更改是额外的 .Catch(Observable.Never<FooData>())
。正如所写,这段代码将永远运行。我希望你有办法终止所发布内容的外部可观察性。
详细说明我的评论:
正如 Shlomo 已经在他的回答中展示了如何利用可观察的连接状态,我猜你想要的是在断开连接时再次订阅它。
为此使用 Observable.Defer
return Observable.Defer(() => your final observable)
现在断开连接,如果你想再次订阅使用重试
return Observable.Defer(() => your final observable).Retry(3)
但是您可能需要延迟重试,无论是线性还是指数回退 策略,为此使用 DelaySubscription
return Observable.Defer(() => your_final_observable.DelaySubscription(strategy)).Retry(3)
这是最终代码,每秒重试一次:
public IDisposable Subscribe(IObserver<FooData> observer)
{
return Observable.Defer(() => {
return _transportService.GetConnectionStateObservable()
.Select(cs => cs == ConnectionState.Open)
.DistinctUntilChanged()
.Select(isOpen => isOpen
? _transportService.GetObservable() //if open,call connect and wait for connection to open
.IgnoreElements()
.Select(_ => default(FooData))
.Concat(Observable.Never<FooData>())
)
.Switch().DelaySubscription(TimeSpan.FromSeconds(1));
})
.Retry(2)
.Subscribe(observer);
}
要记住的一些警告:
这个 DelaySubscription 也会延迟第一次调用,所以如果有问题,创建一个计数变量,只有当计数 > 0 时才使用 DelaySubscription,否则使用正常的 observable。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。