.net – Rx框架中的内容允许我在创建过程中等待其他方法时返回IObservable?

我一直在努力使用 Reactive Extensions for Twitter’s streaming APIs创建 IObservable<T>实现.

从高级别发送HTTP请求并保持连接打开.以长度为前缀的项目被发送到消费.

基本上,它是使用await keyword循环调用Stream.ReadAsync. IObserver<T> interface实现(从Observable.CreateDataflow library的块,无关紧要,它是一个实现细节)传递给此循环,然后调用上的metods IObserver< T>实现,产生可观察的.

但是,在此循环开始处理之前必须完成许多事情,这需要调用Task<T>返回方法,所有这些方法都可以使用await关键字在C#5.0中更容易调用.像这样的东西:

public async Task<IObservable<string>> Create(string parameter,CancellationToken cancellationToken)
{
     // Make some call that requires await.
     var response = await _httpClient.SendAsync(parameter,cancellationToken).
         ConfigureAwait(false);

     // Create a BufferBlock which will be the observable.
     var block = new BufferBlock<T>();

     // Start some background task which will use the block and publish to it
     // on a separate task.  This is not something that is awaited for.
     ConsumeResponse(response,block,cancellationToken);

     // Create the observable.
     return block.AsObservable();
}

也就是说,我正在返回一个任务< IObservable< T>>从我的方法,但我觉得我在Reactive Extensions中缺少一些东西,这将允许我使用await来促进我需要进行的调用,但也返回一个IObservable< T>而不是任务< IObservable< T>>.

Reactive Extensions中的哪个方法允许我创建一个observable,它需要在从创建方法返回之前等待方法

我发现最接近的是Observable.DeferAsync.假设对我的方法调用和observable的使用是这样的:

public async Task Observe()
{
    // NOT the real name of the interface,but explains it's role here.
    IObservableFactory factory;

    // Create is really named something else.
    IObservable<string> observable = factory.Create("parameter");

    // Subscribe.
    observable.Subscribe(o => Console.WriteLine("Observed: {0}",o));

    // Wait.
    await observable;
}

使用DeferAsync在这里不起作用,因为对Subscribe的调用将发送第一个请求,然后读取它,然后对observable的await调用将创建第二个订阅,但是创建一个不同的observable.

或者,最终返回任务< IObservable< T>>在Reactive Framework中执行此操作的适当方法

随后,由于该方法返回任务< T>,因此最好通过CancellationToken取消操作.也就是说,我可以理解CancellationToken用于取消observable的创建,但它是否也应该用于取消实际的observable(因为它可以被传递下去以读取流等).

我的直觉说不,因为这里存在违反问题分离以及取消的DRY原则:

>取消创建和取消观察是两个不同的事情.
>调用Subscribe将返回IDisposable实施,这将取消订阅.

解决方法

我不会返回任务< IObservable< T>>.在公共API中混合任务和Observable最终会让人感到困惑.请记住,任务可以被视为产生单个值的可观察对象.这也意味着不要将CancellationTokens与公共API中的observable混合使用.您可以通过订阅和取消订阅来控制可观察对象.

这并不意味着你不能混合幕后的概念.以下是使用Observable.Using,Task.ToObservable和Cancellationdisposable执行所需操作的方法

首先,修改您的方法以返回任务< ISourceBlock< string>>:

public async Task<ISourceBlock<string>> CreateBlock(string parameter,cancellationToken).ConfigureAwait(false);

     // Create a BufferBlock which will be the observable.
     var block = new BufferBlock<T>();

     // Start some background task which will use the block and publish to it
     // on a separate task.  This is not something that is awaited for.
     ConsumeResponse(response,cancellationToken);

     return block;
}

现在,这是使用上述方法的新Create方法

public IObservable<string> Create(string parameter)
{
    // Create a cancellation token that will be canceled when the observable is unsubscribed,use this token in your call to CreateBlock.
    // Use ToObservable() to convert the Task to an observable so we can then
    // use SelectMany to subscribe to the block itself once it is available
    return Observable.Using(() => new Cancellationdisposable(),cd => CreateBlock(parameter,cd.Token)
               .ToObservable()
               .SelectMany(block => block.AsObservable()));
}

编辑:我发现Rx已经使用FromAsync实现了这种模式:

public IObservable<string> Create(string parameter)
{
    return Observable.FromAsync(token => CreateBlock(parameter,token))
                     .SelectMany(block => block.AsObservable());
}

还有,DeferAsync,这更合适,因为你的Task实际上是在创建你真正想要观察的Observable(例如你的块):

public IObservable<string> Create(string parameter)
{
    return Observable.DeferAsync(async token => (await CreateBlock(parameter,token)).AsObservable());
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


原文地址:http://msdn.microsoft.com/en-us/magazine/cc163791.aspx 原文发布日期: 9/19/2005 原文已经被 Microsoft 删除了,收集过程中发现很多文章图都不全,那是因为原文的图都不全,所以特收集完整全文。 目录 前言 CLR启动程序
前言 随着近些年微服务的流行,有越来越多的开发者和团队所采纳和使用,它的确提供了很多的优势也解决了很多的问题,但是我们也知道也并不是银弹,提供优势的同时它也给我们的开发人员和团队也带来了很多的挑战。 为了迎接或者采用这些新技术,开发团队需要更加注重一些流程或工具的使用,这样才能更好的适应这些新技术所
最近因为比较忙,好久没有写博客了,这篇主要给大家分享一下PLINQ中的分区。上一篇介绍了并行编程,这边详细介绍一下并行编程中的分区和自定义分区。 先做个假设,假设我们有一个200Mb的文本文件需要读取,怎么样才能做到最优的速度呢?对,很显然就是拆分,把文本文件拆分成很多个小文件,充分利用我们计算机中
在多核CPU在今天和不久的将来,计算机将拥有更多的内核,Microsoft为了利用这个硬件特性,于是在Visual Studio 2010 和 .NET Framework 4的发布及以上版本中,添加了并行编程这个新特性,我想它以后势必会改变我们的开发方式。 在以前或者说现在,我们在并行开发的时候可
c语言输入成绩怎么判断等级
字符型数据在内存中的存储形式是什么
c语言怎么求字符串的长度并输出
c语言函数的三种调用方式是什么
c语言中保留两位小数怎么表示
double的输入格式符是什么
长整型输出格式是什么
C语言中文件包含的命令关键字是什么
c程序如何编写x的y次方
c语言开根号代码是什么
c语言怎么进行字符串比较
c语言怎么进行强制类型转换
c语言运算符的优先级顺序是什么
c++用什么软件编程
中序遍历是怎么遍历的
h文件和c文件的关系是什么