Rx.net 在 observable 中实现断开连接/错误时的重试功能

如何解决Rx.net 在 observable 中实现断开连接/错误时的重试功能

代码如下:

    public class FooService
    {
        private ITransportService _transportService;
        public FooService(ITransportService transportService)
        {
            _transportService = transportService;
            _transportService.Connect();
        }

        public Idisposable Subscribe(IObserver<FooData> observer)
        {
            return _transportService.Getobservable()
                .Subscribe(observer);
        }
    }

    public interface ITransportService
    {
        ConnectionState State { get; }
        bool Connect();
        IObservable<FooData> Getobservable();
    }

    public class ClientConsumingProgram
    {
        class FooObserver : IObserver<FooData>
        {
            public void OnNext(FooData value)
            {
                //Client consuming without interruption
            }
            //.. on error.. onCompleted
        }
        public static void Main()
        {
            var fooService = new FooService(transportService);
            var fooObserver = new FooObserver();
            var disposable = fooService.Subscribe(fooObserver);
        }
    }

我想实现以下内容

当传输服务断开时(套接字从服务器关闭),我希望应用程序重试几次,但 foo 服务首先需要在 Connect调用 _transportService,然后调用一次 State已连接,调用 Getobservable。

期望的结果是 OnNext 上的 FooObserver 继续在客户端打勾,如果 _transportService 在最大重试前再次连接,并且一旦超过最大错误 OnError 应该被触发。

>

有人能指出我实现这个的正确方向吗?


更新

public class FooService
{
    private ITransportService _transportService;
    public FooService(ITransportService transportService)
    {
        _transportService = transportService;
        _transportService.Connect();
    }

    public Idisposable Subscribe(IObserver<FooData> observer)
    {
        return _transportService.GetConnectionStateObservable()
        .Select(cs => cs == ConnectionState.Open)
        .distinctUntilChanged()
        .Select(isOpen => isOpen
            ? _transportService.Getobservable()   //if open,return observable
            : Observable.Start(() => _transportService.Connect()) //if not open,call connect and wait for connection to open
                .IgnoreElements()
                .Select(_ => default(FooData))
                .Concat(Observable.Never<FooData>())
        )
        .Switch()
        .Subscribe(observer);
    }
}

public interface ITransportService
{
    IObservable<ConnectionState> GetConnectionStateObservable();
    bool Connect();
    IObservable<FooData> Getobservable();
}

public class FooData
{
    public int Id { get; set; }
    public string Msg { get; set; }
}

public enum ConnectionState
{
    Open,Close
}

public class FooMockTransportService : ITransportService
{
    public ConnectionState State { get; set; }
    private BehaviorSubject<ConnectionState> _connectionSubject = new BehaviorSubject<ConnectionState>(ConnectionState.Close);
    private bool _shoulddisconnect;

    public FooMockTransportService()
    {
        _shoulddisconnect = true;
    }

    public bool Connect()
    {
        State = ConnectionState.Open;
        _connectionSubject.OnNext(ConnectionState.Open);
        return true;
    }

    public IObservable<ConnectionState> GetConnectionStateObservable()
    {
        return _connectionSubject.AsObservable();
    }

    public IObservable<FooData> Getobservable()
    {
        return Observable.Create<FooData>(
            o=>
            {
                TaskPoolScheduler.Default.Schedule(() =>
                {
                    o.OnNext(new FooData { Id = 1,Msg = "First" });
                    o.OnNext(new FooData { Id = 2,Msg = "Sec" });

                    //Simulate disconnection,ony once
                    if(_shoulddisconnect)
                    {
                        _shoulddisconnect = false;
                        State = ConnectionState.Close;
                        o.OnError(new Exception("disconnected"));
                        _connectionSubject.OnNext(ConnectionState.Close);
                    }

                    o.OnNext(new FooData { Id = 3,Msg = "Third" });
                    o.OnNext(new FooData { Id = 4,Msg = "Fourth" });
                });
                return () => { };
            });
    }
}

public class Program
{
    class FooObserver : IObserver<FooData>
    {
        public void OnCompleted()
        {
            throw new NotImplementedException();
        }

        public void OnError(Exception error)
        {
            Console.WriteLine(error);
        }

        public void OnNext(FooData value)
        {
            Console.WriteLine(value.Id);
        }
    }
    public static void Main()
    {
        var transportService = new FooMockTransportService();
        var fooService = new FooService(transportService);
        var fooObserver = new FooObserver();
        var disposable = fooService.Subscribe(fooObserver);
        Console.Read();
    }
}

代码是可编译的,并且还包含对 Shlomo 的建议。 当前输出

1
2
System.Exception: disconnected

所需的输出,在断开连接时,它应该每 1 秒捕获并重试一次,以查看它是否已连接:

1
2
1
2
3
4

解决方法

如果您控制 ITransportService,我建议您添加一个属性:

public interface ITransportService
{
    ConnectionState State { get; }
    bool Connect();
    IObservable<FooData> GetObservable();
    IObservable<ConnectionState> GetConnectionStateObservable();
}

一旦你能以一种可观察的方式获得状态,产生可观察的就变得更容易了:

public class FooService
{
    private ITransportService _transportService;
    public FooService(ITransportService transportService)
    {
        _transportService = transportService;
        _transportService.Connect();
    }

    public IDisposable Subscribe(IObserver<FooData> observer)
    {
        return _transportService.GetConnectionStateObservable()
            .Select(cs => cs == ConnectionState.Open)
            .DistinctUntilChanged()
            .Select(isOpen => isOpen 
                ? _transportService.GetObservable()   //if open,return observable
                : Observable.Start(() => _transportService.Connect()) //if not open,call connect and wait for connection to open
                    .IgnoreElements()
                    .Select(_ => default(FooData))
                    .Concat(Observable.Never<FooData>())
            )
            .Switch()
            .Subscribe(observer);
    }
}

如果您不控制 ITransportService,我建议您创建一个继承自它的接口,您可以在其中添加类似的属性。

顺便说一句,我建议您放弃 FooObserver,您几乎不需要塑造自己的观察者。公开 observable,并在 Observable 上调用 Subscribe 重载通常可以解决问题。

但我无法测试任何这些:重试逻辑应该是什么样的,Connect 的返回值是什么意思,或者 ConnectionState 类是什么,以及代码无法编译。您应该尝试将您的问题设计为 mcve


更新

以下按预期处理测试代码:

public IDisposable Subscribe(IObserver<FooData> observer)
{
    return _transportService.GetConnectionStateObservable()
        .Select(cs => cs == ConnectionState.Open)
        .DistinctUntilChanged()
        .Select(isOpen => isOpen
            ? _transportService.GetObservable()   //if open,return observable
                .Catch(Observable.Never<FooData>())
            : Observable.Start(() => _transportService.Connect()) //if not open,call connect and wait for connection to open
                .IgnoreElements()
                .Select(_ => default(FooData))
                .Concat(Observable.Never<FooData>())
        )
        .Switch()
        .Subscribe(observer);
}

仅对原始发布代码的更改是额外的 .Catch(Observable.Never<FooData>())。正如所写,这段代码将永远运行。我希望你有办法终止所发布内容的外部可观察性。

,

详细说明我的评论:

正如 Shlomo 已经在他的回答中展示了如何利用可观察的连接状态,我猜你想要的是在断开连接时再次订阅它。

为此使用 Observable.Defer

return Observable.Defer(() => your final observable)

现在断开连接,如果你想再次订阅使用重试

return Observable.Defer(() => your final observable).Retry(3)

但是您可能需要延迟重试,无论是线性还是指数回退 策略,为此使用 DelaySubscription

return Observable.Defer(() => your_final_observable.DelaySubscription(strategy)).Retry(3)

这是最终代码,每秒重试一次:

        public IDisposable Subscribe(IObserver<FooData> observer)
        {
            return Observable.Defer(() => {
                return _transportService.GetConnectionStateObservable()
            .Select(cs => cs == ConnectionState.Open)
            .DistinctUntilChanged()
            .Select(isOpen => isOpen
                ? _transportService.GetObservable()   //if open,call connect and wait for connection to open
                    .IgnoreElements()
                    .Select(_ => default(FooData))
                    .Concat(Observable.Never<FooData>())
            )
            .Switch().DelaySubscription(TimeSpan.FromSeconds(1));
            })
            .Retry(2)
            .Subscribe(observer);
        }

要记住的一些警告

这个 DelaySubscription 也会延迟第一次调用,所以如果有问题,创建一个计数变量,只有当计数 > 0 时才使用 DelaySubscription,否则使用正常的 observable。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?