如何分隔IObservable和IObserver

如何解决如何分隔IObservable和IObserver

更新:查看底部的示例

我需要在班级之间发消息。发布者将无限期地循环,调用某种方法来获取数据,然后将该调用的结果传递到OnNext中。订阅者可以有很多,但是只能有一个IObservable和一项长期运行的任务。这是一个实现。

using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.Diagnostics;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading.Tasks;

namespace UnitTestProject1
{
    [TestClass]
    public class UnitTest1
    {
        private static string GetSomeData() => "Hi";

        [TestMethod]
        public async Task RunMessagingAsync()
        {
            var subject = new Subject<string>();

            //Create a class and inject the subject as IObserver
            new Publisher(subject);

            //Create a class and inject the subject as IObservable
            new Subscriber(subject,1.ToString());
            new Subscriber(subject,2.ToString());
            new Subscriber(subject,3.ToString());

            //Run the loop for 3 seconds
            await Task.Delay(3000);
        }

        class Publisher
        {
            public Publisher(IObserver<string> observer)
            {
                Task.Run(async () =>
                {
                    //Loop forever
                    while (true)
                    {
                        //Get some data,publish it with OnNext and wait 500 milliseconds
                        observer.OnNext(GetSomeData());
                        await Task.Delay(500);
                    }
                });
            }
        }

        class Subscriber
        {
            public string Name;

            //Listen for OnNext and write to the debug window when it happens
            public Subscriber(IObservable<string> observable,string name)
            {
                Name = name;
                var disposable = observable.Subscribe((s) => Debug.WriteLine($"Name: {Name} Message: {s}"));
            }
        }
    }
}

输出:

名称:1条消息:嗨

名称:2条消息:嗨

名称:3则消息:您好

名称:1条消息:嗨

名称:2条消息:嗨

名称:3则消息:您好

这很好。请注意,只有一个IObserver发送消息,但是所有订阅都接收到该消息。 但是,如何分隔IObservableIObserver?它们像Subject一样粘在一起。这是另一种方法。

[TestMethod]
public async Task RunMessagingAsync2()
{
    var observers = new List<IObserver<string>>();

    var observable = Observable.Create(
    (IObserver<string> observer) =>
    {
        observers.Add(observer);

        Task.Run(async () =>
        {
            while (true)
            {
                try
                {
                    observer.OnNext(GetSomeData());
                }
                catch (Exception ex)
                {
                    observer.OnError(ex);
                }

                await Task.Delay(500);
            }
        });

        return Disposable.Create(() => { });
    });

    //Create a class and inject the subject as IObservable
    new Subscriber(observable);
    new Subscriber(observable);

    //Run the loop for 10 seconds
    await Task.Delay(10000);

    Assert.IsTrue(ReferenceEquals(observers[0],observers[1]));
}

这里的问题是,这将创建两个单独的Task和两个单独的IObserver。每个订阅都会创建一个新的IObserver。您可以确认,因为此处的Assert失败。这对我来说真的没有任何意义。根据我对反应式编程的了解,我不希望这里的Subscribe方法每次都会创建一个新的IObserver。签出this gist。这是对Observable.Create example的略微修改。它显示了Subscribe方法如何导致每次调用时创建IObserver。 如何在不使用Subject的情况下实现第一个示例的功能?

这是另一种根本不使用Reactive UI的方法...您可以根据需要从发布者创建Subject,但这不是必需的。

using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.Diagnostics;
using System.Threading.Tasks;

namespace UnitTestProject1
{
    [TestClass]
    public class UnitTest1
    {
        private static string GetSomeData() => "Hi";
   
        class Publisher
        {
            public Publisher(Action<string> onNext)
            {
                Task.Run(async () =>
                {
                    //Loop forever
                    while (true)
                    {
                        //Get some data,publish it with OnNext and wait 500 milliseconds
                        onNext(GetSomeData());
                        await Task.Delay(500);
                    }
                });
            }
        }

        class Subscriber
        {
            //Listen for OnNext and write to the debug window when it happens
            public void ReceiveMessage(string message) => Debug.WriteLine(message);
        }

        [TestMethod]
        public async Task RunMessagingAsync()
        {
            //Create a class and inject the subject as IObservable
            var subscriber = new Subscriber();

            //Create a class and inject the subject as IObserver
            new Publisher(subscriber.ReceiveMessage);

            //Run the loop for 10 seconds
            await Task.Delay(10000);
        }
    }
}

最后,我应该补充说ReactiveUI曾经有一个MessageBus class。我不确定是否将其删除,但不再建议使用。他们建议我们改用什么?

工作示例

此版本正确。我想我现在唯一要问的是如何用Observable.Create 来做到这一点? Observable.Create的问题在于它为每个订阅运行操作。那不是预期的功能。无论有多少个订阅,这里运行时间很长的任务只会运行一次。

using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace UnitTestProject1
{
    class Subscriber
    {
        public string Name;

        //Listen for OnNext and write to the debug window when it happens
        public Subscriber(IObservable<string> observable,string name)
        {
            Name = name;
            var disposable = observable.Subscribe((s) => Debug.WriteLine($"Name: {Name} Message: {s}"));
        }
    }

    internal class BasicObservable<T> : IObservable<T>
    {
        List<IObserver<T>> _observers = new List<IObserver<T>>();

        public BasicObservable(
            Func<T> getData,TimeSpan? interval = null,CancellationToken cancellationToken = default
            ) =>

            Task.Run(async () =>
            {
                while (!cancellationToken.IsCancellationRequested)
                {
                    try
                    {
                        await Task.Delay(interval ?? new TimeSpan(0,1));
                        var data = getData();
                        _observers.ForEach(o => o.OnNext(data));
                    }
                    catch (Exception ex)
                    {
                        _observers.ForEach(o => o.OnError(ex));
                    }
                }

                _observers.ForEach(o => o.OnCompleted());

            },cancellationToken);

        public IDisposable Subscribe(IObserver<T> observer)
        {
            _observers.Add(observer);
            return Disposable.Create(observer,(o) => _observers.Remove(o));
        }
    }

    public static class ObservableExtensions
    {
        public static IObservable<T> CreateObservable<T>(
            this Func<T> getData,CancellationToken cancellationToken = default)
        => new BasicObservable<T>(getData,default,cancellationToken);

        public static IObservable<T> CreateObservable<T>(
            this Func<T> getData,interval,cancellationToken);
    }

    [TestClass]
    public class UnitTest1
    {
        string GetData() => "Hi";

        [TestMethod]
        public async Task Messaging()
        {
            var cancellationSource = new CancellationTokenSource();
            var cancellationToken = cancellationSource.Token;

            Func<string> getData = GetData;

            var publisher = getData.CreateObservable(cancellationToken);

            new Subscriber(publisher,"One");
            new Subscriber(publisher,"Two");

            for (var i = 0; true; i++)
            {
                if (i >= 5)
                {
                    cancellationSource.Cancel();
                }

                await Task.Delay(1000);
            }
        }
    }

}

解决方法

首先,您必须熟悉"cold" and "hot" observables的理论。这是Introduction to RX的定义。

  1. 是被动序列,可根据请求(订阅时)开始生成通知。
  2. 热门是活动的序列,无论订阅如何,都会产生通知。

您想要的是一个热观测值,而问题是Observable.Create方法创建了冷观测值。但是,您可以使用Publish运算符使任何可观察到的热点。该运算符提供了一种方法,可以让多个独立的观察者共享单个基础订阅。示例:

int index = 0;
var coldObservable = Observable.Create<int>(observer =>
{
    _ = Task.Run(async () =>
    {
        while (true)
        {
            observer.OnNext(++index);
            await Task.Delay(1000);
        }
    });
    return Disposable.Empty;
});

IConnectableObservable<int> hotObservable = coldObservable.Publish();
hotObservable.Connect(); // Causes the start of the loop

hotObservable.Subscribe(s => Console.WriteLine($"Observer A received #{s}"));
hotObservable.Subscribe(s => Console.WriteLine($"Observer B received #{s}"));

在调用coldObservable方法时,将订阅由Observable.Create创建的hotObservable.Connect,然后,由该单个订阅生成的所有通知都将传播到{{1}的所有订阅者}。

输出:

hotObservable

重要提示:以上示例的目的是演示Observer A received #1 Observer B received #1 Observer A received #2 Observer B received #2 Observer A received #3 Observer B received #3 Observer A received #4 Observer B received #4 Observer A received #5 Observer B received #5 Observer A received #6 Observer B received #6 ... 运算符,而不是作为高质量RX代码的示例。它的问题之一是,从理论上讲,在连接到源之后订阅观察者成为可能,因此第一通知不会发送给部分或全部观察者,因为它可能是在其订阅之前创建的。换句话说,这是比赛条件。

还有另一种管理Publish(运营商RefCount)的生存期的方法:

返回一个可观察序列,只要该观察序列至少有一个订阅,该序列就一直与源保持连接。

IConnectableObservable

通过这种方式,您无需手动var hotObservable = coldObservable.Publish().RefCount(); 。该连接在第一次订阅时自动发生,而在最后一次取消订阅时自动解除。

,

我将其添加为答案是因为我觉得Christian在他的答案中发布的代码很危险,因为它混合了Tasks和Rx并存在竞争条件。

这是解决大多数这些问题的替代方法:

public class UnitTest1
{
    private string GetData() => "Hi";
    
    private IDisposable Subscriber(IObservable<string> observable,string name) =>
        observable.Subscribe(s => Debug.WriteLine($"Name: {name} Message: {s}"));
    
    public async Task Messaging()
    {
        var coldObservable =
            Observable
                .Timer(TimeSpan.Zero,TimeSpan.FromSeconds(1.0))
                .Select(_ => GetData());
                
        var publisher = coldObservable.Publish();

        var subscriptions =
            new CompositeDisposable(
                Subscriber(publisher,"One"),Subscriber(publisher,"Two"),publisher.Connect());

        await Task.Delay(TimeSpan.FromSeconds(5.0));

        subscriptions.Dispose();
    }
}

不过,更好的是,我将考虑以这种方式进行操作:

public class UnitTest1
{
    private string GetData() => "Hi";
    
    private IObservable<string> Subscriber(IObservable<string> observable,string name) =>
        observable.Select(s => $"Name: {name} Message: {s}");
    
    public async Task Messaging()
    {
        var coldObservable =
            Observable
                .Timer(TimeSpan.Zero,TimeSpan.FromSeconds(1.0))
                .Select(_ => GetData())
                .Do(_ => Debug.WriteLine("Called GetData()"))
                .Publish(published =>
                    Observable
                        .Merge(
                            Subscriber(published,Subscriber(published,"Two")))
                .TakeUntil(Observable.Timer(TimeSpan.FromSeconds(5.0)))
                .Do(x => Debug.WriteLine(x));
    
        await coldObservable;
    }
}

始终最好将内置运算符用于Rx,而不是将混合方法用于任务。

,

由于上面的回答,我最终无需实现IObservable就获得了期望的结果。西奥多是正确的。答案是使用IObservable方法将Publish()转换为hot。

我写了一篇有关here

的文章

虽然这可行,但上面谜题的答案要好得多。

using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.Diagnostics;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Observables
{
    class Subscriber
    {
        public string Name;

        //Listen for OnNext and write to the debug window when it happens
        public Subscriber(IObservable<string> observable,string name)
        {
            Name = name;
            observable.Subscribe(s => Debug.WriteLine($"Name: {Name} Message: {s}"));
        }
    }

    [TestClass]
    public class UnitTest1
    {
        static string GetData() => "Hi";

        [TestMethod]
        public async Task Messaging()
        {
            var cancellationSource = new CancellationTokenSource();
            var cancellationToken = cancellationSource.Token;

            var coldObservable = Observable.Create<string>(observer =>
            {
                _ = Task.Run(async () =>
                {
                    while (!cancellationToken.IsCancellationRequested)
                    {
                        var data = GetData();
                        observer.OnNext(data);
                        await Task.Delay(1000);
                    }
                },cancellationToken);

                return Disposable.Empty;
            });

            var publisher = coldObservable.Publish();
            var connection = publisher.Connect();

            new Subscriber(publisher,"One");
            new Subscriber(publisher,"Two");

            for (var i = 0; i < 5; i++)
            {
                if (i == 4)
                {
                    cancellationSource.Cancel();
                }

                await Task.Delay(1000);
            }

            connection.Dispose();
        }
    }
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams[&#39;font.sans-serif&#39;] = [&#39;SimHei&#39;] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -&gt; systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping(&quot;/hires&quot;) public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate&lt;String
使用vite构建项目报错 C:\Users\ychen\work&gt;npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-
参考1 参考2 解决方案 # 点击安装源 协议选择 http:// 路径填写 mirrors.aliyun.com/centos/8.3.2011/BaseOS/x86_64/os URL类型 软件库URL 其他路径 # 版本 7 mirrors.aliyun.com/centos/7/os/x86
报错1 [root@slave1 data_mocker]# kafka-console-consumer.sh --bootstrap-server slave1:9092 --topic topic_db [2023-12-19 18:31:12,770] WARN [Consumer clie
错误1 # 重写数据 hive (edu)&gt; insert overwrite table dwd_trade_cart_add_inc &gt; select data.id, &gt; data.user_id, &gt; data.course_id, &gt; date_format(
错误1 hive (edu)&gt; insert into huanhuan values(1,&#39;haoge&#39;); Query ID = root_20240110071417_fe1517ad-3607-41f4-bdcf-d00b98ac443e Total jobs = 1
报错1:执行到如下就不执行了,没有显示Successfully registered new MBean. [root@slave1 bin]# /usr/local/software/flume-1.9.0/bin/flume-ng agent -n a1 -c /usr/local/softwa
虚拟及没有启动任何服务器查看jps会显示jps,如果没有显示任何东西 [root@slave2 ~]# jps 9647 Jps 解决方案 # 进入/tmp查看 [root@slave1 dfs]# cd /tmp [root@slave1 tmp]# ll 总用量 48 drwxr-xr-x. 2
报错1 hive&gt; show databases; OK Failed with exception java.io.IOException:java.lang.RuntimeException: Error in configuring object Time taken: 0.474 se
报错1 [root@localhost ~]# vim -bash: vim: 未找到命令 安装vim yum -y install vim* # 查看是否安装成功 [root@hadoop01 hadoop]# rpm -qa |grep vim vim-X11-7.4.629-8.el7_9.x
修改hadoop配置 vi /usr/local/software/hadoop-2.9.2/etc/hadoop/yarn-site.xml # 添加如下 &lt;configuration&gt; &lt;property&gt; &lt;name&gt;yarn.nodemanager.res