如何解决Rx.Net GroupBy,如何使多个观察者订阅特定的组
我正迈入Rx的世界,发现很难获得理想的结果,尤其是使用GroupBy运算符时,因此将不胜感激。
我的要求是:
我有一个DataProvider类,该类定期发出HTTP Api请求。
http响应是List<Item>
。每个项目都有一个唯一的ID属性。
我需要根据其ID将每个Item作为单独的流进行处理,这看起来像GroupBy的情况。
每个组都需要自己的管道,其中:
- 以特定值开头(StartWith运算符)
- 它缓冲前一个项目以与当前项目进行比较(Buffer(2,1)运算符)
- 如果当前项目与先前项目不同(在此处),则发出当前项目
结果是IObservable<Item>
的更改(ChangeStream)。我不再与特定的群体打交道。
- 观察者可以提前订阅(在Item.Id出现在响应流上之前以及在创建组之前)
- 观察者可以延迟订阅(在响应流中出现Item.Id并创建组之后)
- 最新的订户应该收到Item.Id(Replay(1))的最后更改,但我似乎也无法弄清楚这部分。
用于组播特定组的Rx方法是什么?任何帮助/建议将不胜感激。我在下面提供了示例代码。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
namespace RxTest
{
class Program
{
static void Main(string[] args)
{
var dataService = new MockDataService();
// How do I subscribe to a specific group?
// Eg. I am only interested in changes to Items where Id == 1
// Subscribers can be early (before the stream is hot)
var item1Stream = dataService.SubscribetoItem(1);
// There can be multiple subscribers to a group
var item1Stream2 = dataService.SubscribetoItem(1);
Console.WriteLine("Press Any Key to Start");
Console.ReadLine();
dataService.Start();
// Subscribers can be late (Eg. Subscriber to Item Id == 2 after it has emitted items)
Thread.Sleep(2000);
var item2Stream = dataService.SubscribetoItem(2);
// Subscribers can be early (After connect but before the Item Id appears on the Stream (before group creation))
// Eg. Subscribe to group 4 (Group 4 doesn't get created until 20s after connect in this example)
var item4Stream = dataService.SubscribetoItem(4);
// What is the Rx way to Multicast a Group?
Console.WriteLine("Press Any Key to Exit");
Console.ReadLine();
dataService.Stop();
}
}
public class MockDataService
{
private readonly IConnectableObservable<Item> _itemsstream;
private Idisposable _itemsSubscription;
private readonly IObservable<Item> _changeStream;
private Idisposable _changeSubscription;
public MockDataService()
{
// Simulate Http response pipeline.
//// Time: 1s...............10s..............20s.....etc
//// stream: [[1][2]]repeat...[[2][3]]repeat...[[3][4]]repeat...
IObservable<List<Item>> responseStream = Observable.Interval(TimeSpan.FromSeconds(1))
.Take(50)
.Select(tick =>
{
// Every 10 ticks an item drops off the stream and a new one starts
// Every 2 ticks the Item value increases to generate a change.
int rangeStart = Math.Min(((int)tick / 10) + 1,5);
return Enumerable.Range(rangeStart,2).Select(id => new Item(id,(int)tick / 2)).ToList();
});
// Flatten the list into IObservable<Item>
//// Time: 1s.............10s............20s.....etc
//// stream: [1][2]repeat...[2][3]repeat...[3][4]repeat...
_itemsstream = responseStream
.SelectMany(list => list)
.Publish();
// Split into groups by Item.Id and process each group for changes
// ChangeStream is an IObservable<Item> that have changes.
_changeStream = _itemsstream
.GroupBy(item => item.Id)
.SelectMany(grp =>
grp
// Pipeline for each group.
.StartWith(new Item(grp.Key,-1)) // Initial item from Db
//.takeuntil(Item => Item.IsComplete()) // Logic to complete the group
.LogConsoleWithThread($"Group: {grp.Key}")
.Buffer(2,1)
.Where(buffer => buffer.Count == 2 && buffer[0].HasChanges(buffer[1]))
.Select(buffer => buffer[1])
.LogConsoleWithThread($"Group.Change : {grp.Key}")
// How do I push changes in this group to Zero..Many subscribed Observers?
// I would also like to Replay(1) to all late subscribers to a group.
);
}
/// <summary>
/// How to get the IObservable for a specific group?
/// </summary>
/// <param name="itemId"></param>
/// <returns></returns>
public IObservable<Item> SubscribetoItem(int itemId)
{
// ????
return null;
}
public void Start()
{
_changeSubscription = _changeStream.SubscribeConsole("ChangeStream");
_itemsSubscription = _itemsstream.Connect();
}
public void Stop()
{
_changeSubscription.dispose();
_itemsSubscription.dispose();
}
}
public class Item
{
public int Id { get; private set; }
public int Value { get; private set; }
public Item(int id,int value)
{
Id = id;
Value = value;
}
public bool HasChanges(Item compareItem)
{
return this.Value != compareItem.Value;
}
public override string ToString()
{
return $"Item: Id={Id} Value={Value}";
}
}
public static class RxExtensions
{
public static Idisposable SubscribeConsole<T>(this IObservable<T> observable,string name = "")
{
return observable.Subscribe(new ConsoleObserver<T>(name));
}
/// <summary>
/// Logs to the Console the subscriptions and emissions done on/by the observable
/// each log message also includes the thread it happens on
/// </summary>
/// <typeparam name="T">The Observable Type</typeparam>
/// <param name="observable">The Observable to log.</param>
/// <param name="name">An optional name prefix that will be added before each notification</param>
/// <returns></returns>
public static IObservable<T> LogConsoleWithThread<T>(this IObservable<T> observable,string name = "")
{
return Observable.Defer(() =>
{
Console.WriteLine("{0} Subscription happened on Thread: {1}",name,Thread.CurrentThread.ManagedThreadId);
return observable.Do(
x => Console.WriteLine("{0} - OnNext({1}) Thread: {2}",x,Thread.CurrentThread.ManagedThreadId),ex =>
{
Console.WriteLine("{0} - OnError Thread:{1}",Thread.CurrentThread.ManagedThreadId);
Console.WriteLine("\t {0}",ex);
},() => Console.WriteLine("{0} - OnCompleted() Thread {1}",Thread.CurrentThread.ManagedThreadId));
});
}
}
/// <summary>
/// An observer that outputs to the console each time the OnNext,OnError or OnComplete occurs
/// </summary>
/// <typeparam name="T"></typeparam>
public class ConsoleObserver<T> : IObserver<T>
{
private readonly string _name;
public ConsoleObserver(string name = "")
{
_name = name;
}
public void OnNext(T value)
{
Console.WriteLine("{0} - OnNext({1})",_name,value);
}
public void OnError(Exception error)
{
Console.WriteLine("{0} - OnError:",_name);
Console.WriteLine("\t {0}",error);
}
public void OnCompleted()
{
Console.WriteLine("{0} - OnCompleted()",_name);
}
}
}
解决方法
您可能需要专门的发布运营商,因为现有的运营商(Publish
,PublishLast
和Replay
)太窄或太宽,无法满足您的需求。因此,您需要使用Multicast
运算符,该运算符随附有一个自定义重播主题,该主题仅缓冲每个键的最后一个元素。这是此类主题的基本实现:
public class ReplayLastPerKeySubject<T,TKey> : ISubject<T>
{
private readonly Func<T,TKey> _keySelector;
private readonly ReplaySubject<ReplaySubject<T>> _subjects;
private readonly IObservable<T> _mergedSubjects;
private readonly Dictionary<TKey,ReplaySubject<T>> _perKey;
public ReplayLastPerKeySubject(Func<T,TKey> keySelector)
{
_keySelector = keySelector;
_subjects = new ReplaySubject<ReplaySubject<T>>();
_mergedSubjects = _subjects.Merge();
_perKey = new Dictionary<TKey,ReplaySubject<T>>();
}
public void OnNext(T value)
{
var key = _keySelector(value);
ReplaySubject<T> subject;
if (!_perKey.TryGetValue(key,out subject))
{
subject = new ReplaySubject<T>(1);
_perKey.Add(key,subject);
_subjects.OnNext(subject);
}
subject.OnNext(value);
}
public void OnCompleted()
{
// All subjects,inner and outer,must be completed
_subjects.OnCompleted();
_subjects.Subscribe(subject => subject.OnCompleted());
}
public void OnError(Exception error)
{
// Faulting the master (outer) subject is enough
_subjects.OnError(error);
}
public IDisposable Subscribe(IObserver<T> observer)
{
return _mergedSubjects.Subscribe(observer);
}
}
此实现基于RX专家写的similar question的答案。原始答案使用Concat
观察值来订阅观察者,而这个答案使用Merge
观察值,所以我不确定它的正确性和效率100%。
有了这样的实现,其余的就很容易了。您首先创建原始可观察物的发布版本:
var published = YourObservable
.Multicast(new ReplayLastPerKeySubject<Item,int>(x => x.Id)))
.RefCount();
最后,您可以使用Where
运算符为特定键创建更改流:
var changeStream13 = published.Where(x => x.Id == 13);
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。