从定期异步请求创建可观察的

如何解决从定期异步请求创建可观察的

我想要一种将异步方法转换为可观察方法的通用方法。就我而言,我正在处理使用HttpClient从API提取数据的方法

假设我们有一种方法Task<string> GetSomeData(),它需要变成一个单独的Observable<string>,其中值是由以下各项的组合生成的:

  • 重复调用GetSomeData()(例如,每x秒)
  • 在任何给定时间(例如,当用户点击刷新时)手动触发对GetSomeData()的呼叫。

由于有两种触发GetSomeData()并发执行的方式可能是个问题。为了避免要求GetSomeData()是线程安全的,我想限制并发性,以便只有一个线程同时执行该方法。结果,我需要使用某种策略来处理重叠的请求。我制作了一种大理石图,试图描述问题和想要的结果

https://docs.wildfly.org/21/wildscribe/subsystem/jgroups/stack/transport/TCP/index.html

我的直觉告诉我有一种简单的方法可以实现这一目标,所以请给我一些见解:)

这是我到目前为止的解决方案。不幸的是,它不能解决并发问题。

    public class ObservableCreationWrapper<T>
    {
        private Subject<Unit> _manualCallsSubject = new Subject<Unit>();
        private Func<Task<T>> _methodToCall;
        private IObservable<T> _manualCalls;

        public IObservable<T> Stream { get; private set; }

        public ObservableCreationWrapper(Func<Task<T>> methodToCall,TimeSpan period)
        {
            _methodToCall = methodToCall;
            _manualCalls = _manualCallsSubject.AsObservable()
                .Select(x => Observable.FromAsync(x => methodToCall()))
                .Merge(1);

            Stream = Observable.FromAsync(() => _methodToCall())
                .DelayRepeat(period)
                .Merge(_manualCalls);
        }

        public void TriggerAdditionalCall()
        {
            _manualCallsSubject.OnNext(Unit.Default);
        }
    }

延后重复的扩展方法

static class Extensions
{
    public static IObservable<T> DelayRepeat<T>(this IObservable<T> source,TimeSpan delay) => source
        .Concat(
            Observable.Create<T>(async observer =>
            {
                await Task.Delay(delay);
                observer.OnCompleted();
            }))
        .Repeat();
}

包含用于生成可观察对象的方法的服务示例

class SomeService
{
    private int _ticks = 0;

    public async Task<string> GetSomeValueAsync()
    {
        //Just a hack to dermine if request was triggered manuall or by timer
        var initiatationWay = (new StackTrace()).GetFrame(4).getmethod().ToString().Contains("System.Threading.CancellationToken") ? "manually" : "by timer";

        //Here we have a data race! We would like to limit access to this method 
        var valuetoReturn = $"{_ticks} ({initiatationWay})";

        await Task.Delay(500);
        _ticks += 1; 
        return valuetoReturn;
    }
}

这样使用(会发生数据争用):

static async Task Main(string[] args)
{
    //Running this program will yield non deterministic results due to data-race in GetSomeValueAsync
    var someService = new SomeService();
    var stopwatch = Stopwatch.StartNew();
    var observableWrapper = new ObservableCreationWrapper<string>(someService.GetSomeValueAsync,TimeSpan.FromMilliseconds(2000));
    observableWrapper.Stream
        .Take(6)
        .Subscribe(x => 
            {
                Console.WriteLine($"{stopwatch.ElapsedMilliseconds} | Request: {x} fininshed");
            });

    await Task.Delay(4000);
    observableWrapper.TriggerAdditionalCall();
    observableWrapper.TriggerAdditionalCall();
    Console.ReadLine();
}

解决方法

这是我对这个问题的看法:


更新:通过借鉴Enigmativity的answer的想法,我可以大大简化建议的解决方案。 Observable.StartAsync方法可自动处理混乱的取消业务,并且只需使用SemaphoreSlim就可以强制执行不重叠执行的要求。

/// <summary>
/// Creates an observable sequence containing the results of an asynchronous
/// function that is invoked periodically and manually. Overlapping invocations
/// are prevented. Timer ticks that would cause overlapping are ignored.
/// Manual invocations cancel previous invocations,and restart the timer.
/// </summary>
public static IObservable<T> PeriodicAndManual<T>(
    Func<bool,CancellationToken,Task<T>> functionAsync,TimeSpan period,out Action manualInvocation)
{
    // Arguments validation omitted
    var manualSubject = new Subject<bool>();
    manualInvocation = () => manualSubject.OnNext(true);
    var semaphore = new SemaphoreSlim(1);
    return Observable
        .Interval(period)
        .Select(_ => false) // Not manual
        .Merge(manualSubject)
        .TakeUntil(isManual => isManual) // Stop on first manual
        .Repeat() // ... and restart the timer
        .Prepend(false) // Skip the initial interval delay
        .Scan(seed: (
            // Both representations of an operation are needed
            // The Observable provides automatic cancellation on unsubscription
            // The Task maintains the IsCompleted state
            Operation: (IObservable<T>)null,AsTask: Task.FromResult(default(T))
        ),accumulator: (previous,isManual) =>
        {
            // Start a new operation only if the previous operation is completed,// or if the call is manual. Otherwise return the previous operation.
            if (!previous.AsTask.IsCompleted && !isManual) return previous;
            // Start a new operation as hot observable
            var operation = Observable.StartAsync(async ct =>
            {
                await semaphore.WaitAsync(ct); // Ensure no overlapping
                try { return await functionAsync(isManual,ct); }
                finally { semaphore.Release(); }
            },Scheduler.Immediate); // Propagate the task status synchronously
            return (operation,operation.ToTask());
        })
        .Select(entry => entry.Operation) // Discard the AsTask representation
        .DistinctUntilChanged() // Ignore duplicate operations
        .Switch(); // Cancel pending operations and ignore them
}

out Action manualInvocation参数是触发手动调用的机制。

用法示例:

int ticks = 0;
var subscription = PeriodicAndManual(async (isManual,token) =>
{
    var id = $"{++ticks} " + (isManual ? "manual" : "periodic");
    Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} Begin {id}");
    await Task.Delay(500,token);
    return id;
},TimeSpan.FromMilliseconds(1000),out var manualInvocation)
.Do(x => Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} Received #{x}"))
.Subscribe();

await Task.Delay(3200);
manualInvocation();
await Task.Delay(200);
manualInvocation();
await Task.Delay(3200);

subscription.Dispose();

输出:

19:52:43.684 Begin 1 periodic
19:52:44.208 Received #1 periodic
19:52:44.731 Begin 2 periodic
19:52:45.235 Received #2 periodic
19:52:45.729 Begin 3 periodic
19:52:46.232 Received #3 periodic
19:52:46.720 Begin 4 periodic
19:52:46.993 Begin 5 manual
19:52:47.220 Begin 6 manual
19:52:47.723 Received #6 manual
19:52:48.223 Begin 7 periodic
19:52:48.728 Received #7 periodic
19:52:49.227 Begin 8 periodic
19:52:49.730 Received #8 periodic
19:52:50.226 Begin 9 periodic

this问题中借用了使用ScanDistinctUntilChanged运算符以在上一个异步操作运行时删除元素的技术。

,

这是您需要的查询:

var subject = new Subject<Unit>();
var delay = TimeSpan.FromSeconds(1.0);

IObservable<string> query =
    subject
        .StartWith(Unit.Default)
        .Select(x => Observable.Timer(TimeSpan.Zero,delay))
        .Switch()
        .SelectMany(x => Observable.FromAsync(() => GetSomeData()));

如果您每次致电subject.OnNext(Unit.Default),都会立即触发对GetSomeData的呼叫,然后根据TimeSpan中设置的delay重复呼叫。

使用.StartWith(Unit.Default)会将查询设置为立即有订阅者。

使用.Switch()会基于新的subject.OnNext(Unit.Default)取消所有待处理的操作。

这应该与您的大理石图匹配。


以上版本未引入值之间的延迟。

版本2应该。

var subject = new Subject<Unit>();
var delay = TimeSpan.FromSeconds(5.0);

var source = Observable.FromAsync(() => GetSomeData());

IObservable<string> query =
    subject
        .StartWith(Unit.Default)
        .Select(x => source.Expand(n => Observable.Timer(delay).SelectMany(y => source)))
        .Switch();

我已经使用Expand运算符在两个值之间引入了延迟。只要source仅产生一个值(FromAsync会产生一个值),就可以正常工作。

,

我建议不要尝试取消已经开始的呼​​叫。事情太混乱了。 如果GetSomeValueAsync中的逻辑涉及数据库调用和/或Web API调用,则您根本无法真正取消该调用。

我认为这里的关键是确保对GetSomeValueAsync的所有调用都已序列化。

我基于Enigmativity的版本1创建了以下解决方案。 在asp.net core 3.1上的webassembly blazor页面上进行了测试,效果很好。

private int _ticks = 0; //simulate a resource you want serialized access

//for manual event,trigger will be 0; for Timer event,trigger will be 1,2,3...
protected async Task<string> GetSomeValueAsync(string trigger)
{
    var valueToReturn = $"{DateTime.Now.Ticks.ToString()}: {_ticks.ToString()} | ({trigger})";

    await Task.Delay(1000);
    _ticks += 1;
    return valueToReturn;
}

//define two subjects
private Subject<string> _testSubject = new Subject<string>();
private Subject<string> _getDataSubject = new Subject<string>();

//driving observable,based on Enigmativity's Version 1
var delay = TimeSpan.FromSeconds(3.0);
IObservable<string> getDataObservable =
    _testSubject
   .StartWith("Init")
   .Select(x => Observable.Timer(TimeSpan.Zero,delay).Select(i => i.ToString()))
   .Switch()
   .WithLatestFrom(_getDataSubject.AsObservable().StartWith("IDLE"))
   .Where(a => a.Second == "IDLE")
   .Select(a => a.First);

//_disposables is CompositeDisposable defined in the page
_disposables.Add(getDataObservable.Subscribe(async t =>
{
     _getDataSubject.OnNext("WORKING");
     //_service.LogToConsole is my helper function to log data to console
     await _service.LogToConsole(await GetSomeValueAsync(t)); 
     _getDataSubject.OnNext("IDLE");
}));

就是这样。我使用了一个按钮来触发手动事件。 输出中的_ticks总是顺序的,即没有重叠发生。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?