使用约束和暂停支持对每个 DynamicData SourceCache 项目进行操作

如何解决使用约束和暂停支持对每个 DynamicData SourceCache 项目进行操作

在这里尝试做的是迭代 <section> <article> <header> <h2>History of xyz</h2> </header> <img src="img1.jpg"> <p>Lorem Ipsum is simply dummy text of the printing and typesetting industry. Lorem Ipsum has been the industry's standard dummy text ever since the 1500s,when an unkNown printer took a galley of type and scrambled it to make a type specimen book.It has survived not only five centuries,but also the leap into electronic typesetting,remaining essentially unchanged.</p> </article> </section> <aside> <img src="img2.jpg"> <p>Lorem Ipsum is simply dummy text of the printing and ypesetting industry.Lorem Ipsum has been the industry's standard dummy text ever since the 1500s,remaining essentially unchanged.</p> </aside> 的每个项目,每次运行之间有一点时间,并对每个项目执行一些异步工作。每个项目必须一个一个地处理,工作不能重叠。每次运行后我都会施加一些限制。源缓存数据可以随时更改,它不是示例中的固定数据。此示例运行良好,但我需要一种使用 SourceCache 暂停处理的方法,该 IObservable<bool> 在可以处理更多工作时发送 true,否则发送 false。

我不知道这是否是解决此问题的最佳方法,但也许还有另一种方法可以做到这一点。如果可能,尽量坚持使用响应式代码

顺便说一下,我使用的是 Net Framework 4.6.2 和 DynamicData 7.1.1,最重要的是我对编程(只是为了好玩)和 Rx.Net 真的很陌生。

var data = Observable.Range(0,20).Select(value => new Work(value));

ISourceCache<Work,int> source = new SourceCache<Work,int>(x => x.Id);

using var populate = source.PopulateFrom(data);

var observ = source.Connect().AsObservableCache();

using var _ = observ.Connect()
    .AutoRefresh(x => x.Status)
    .Filter(x => x.Status == Status.Queue)
    .ToCollection()
    .Select(collection => collection.ToObservable())
    .Switch()
    .Take(1)
    // How to pause just before status change
    .Do(work => work.Status = Status.Running)
    .Select(x => Run(x).ToObservable())
    .Concat()
    .Concat(Observable.Empty<Work>().Delay(TimeSpan.FromMilliseconds(250)))
    .Repeat()
    .Subscribe();

using var ignore = toggle.Connect();
private static async Task<Work> Run(Work work)
{
    // This is fixed but the time needed to run each work is not constant
    await Task.Delay(TimeSpan.FromSeconds(1));
    return work;
}

public enum Status
{
    Queue,Running
}

public class Work : AbstractNotifyPropertyChanged
{
    private Status _status;

    public Work(int id)
    {
        Id = id;
        Status = Status.Queue;
    }


    public int Id { get; }

    public Status Status
    {
        get => _status;
        set => SetAndRaise(ref _status,value);
    }

    public override string ToString()
    {
        return $"Id: {Id:000} - Status: {Status}";
    }
}

解决方法

也许您可以使用 BatchIf 暂停更新来稍微简化代码。

<button></button>
,

经过一些脑筋急转弯的会议后,我得出了一些我可以称之为解决问题的方法。

        var toggle = Observable.Timer(TimeSpan.FromSeconds(1),TimeSpan.FromSeconds(5))
            .SwitchState(false)
            .Do(Console.WriteLine)
            .Replay(1);

        var firstWork = observ.Connect()
            .AutoRefresh(x => x.Status)
            .Filter(x => x.Status == Status.Queue)
            .ToCollection()
            .Select(collection => collection.FirstOrOptional(x => true))
            .StartWith(Optional<Work>.None);

        var getWork = Observable.Timer(TimeSpan.FromMilliseconds(250))
            .WithLatestFrom(firstWork,(_,first) => first);

        var selector = toggle.Select(value => value ? 
                getWork :
                Observable.Empty<Optional<Work>>())
            .Switch()
            .Take(1)
            .Where(optional => optional.HasValue)
            .Select(optional => optional.Value)
            .Do(Console.WriteLine)
            .Do(work => work.Status = Status.Running)
            .Select(x => Run(x).ToObservable())
            .Concat()
            .Do(Console.WriteLine)
            .Repeat();

        using var executing = selector.Subscribe();

        using var pauseChange = toggle.Connect();

toggle 模拟每 5 秒暂停和取消暂停,但在我的情况下将是切换按钮的状态。

暂停时,selector 返回一个空的 observable,没有真正的工作要做。

当取消暂停时,流在初始到期时间后从集合中获得一个且仅一个 Work。然后启动后台作业直到完成,一切重新开始。

在我的实际情况中,我从一个空集合开始,这就是为什么我必须使用 DynamicData 的 Optional<T> 实现做一些魔术。

我认为这可以解决我的问题,但我愿意接受另一种解决方案。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?