微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

真正的并行处理

如何解决真正的并行处理

我想在 C# 中并行运行 7 个线程\任务。在 7 个线程中,我希望在特定超时内列表中的第一个线程的结果可以说 200 毫秒 (第一个线程没有必要先完成,只是如果它失败或花费的时间超过所需的时间,我不需要等待任何其他任务完成)。如果我在 200 毫秒内没有得到列表中第一个线程的结果,或者如果结果为空\无效,我想中止或取消所有任务。如果第一个线程的结果在超时范围内并且有效,我想查看是否有任何其他任务已完成并给出有效结果。如果是,那么我想获取结果并取消\终止所有剩余的任务。我想出的使用 Task 类的代码如下:

public partial class Form1 : Form
{
    public Form1()
    {
        InitializeComponent();
    }

    private async void btnStart_Click(object sender,EventArgs e)
    {
        TaskWrapper _taskWrapper = new TaskWrapper();
        Stopwatch _st = new Stopwatch();
        _st.Start();
        Threadobject t = await _taskWrapper.DoTasks();
        _st.Stop();
        if (t != null)
        {
            txtResult.Text += t._id.ToString() + " and " + t._idSec
                + " is completed in " + _st.ElapsedMilliseconds + " ms."
                + Environment.NewLine;
        }
        else
        {
            txtResult.Text += "All Failed in " + _st.ElapsedMilliseconds + " ms."
                + Environment.NewLine;
        }
        //_taskWrapper = null;
        //_taskWrapper.dispose();
    }
}

public class TaskWrapper
{
    Task<Threadobject> _t1;
    Task<Threadobject> _t2;
    Task<Threadobject> _t3;
    Task<Threadobject> _t4;
    Task<Threadobject> _t5;
    Task<Threadobject> _t6;
    Task<Threadobject> _t7;

    Threadobject _tO1;
    Threadobject _tO2;
    Threadobject _tO3;
    Threadobject _tO4;
    Threadobject _tO5;
    Threadobject _tO6;
    Threadobject _tO7;

    public async Task<Threadobject> DoTasks()
    {
        var _cancellationTokenSource = new CancellationTokenSource();
        CancellationToken _cancellationToken = _cancellationTokenSource.Token;

        //_cancellationToken.Register(() => { this.dispose(); });

        _tO1 = new Threadobject(1);
        _t1 = new Task<Threadobject>(() => _tO1.DoAction(50),_cancellationToken);

        _tO2 = new Threadobject(2);
        _t2 = new Task<Threadobject>(() => _tO2.DoAction(10),_cancellationToken);

        _tO3 = new Threadobject(3);
        _t3 = new Task<Threadobject>(() => _tO3.DoAction(30),_cancellationToken);

        _tO4 = new Threadobject(4);
        _t4 = new Task<Threadobject>(() => _tO4.DoAction(40),_cancellationToken);

        _tO5 = new Threadobject(5);
        _t5 = new Task<Threadobject>(() => _tO5.DoAction(60),_cancellationToken);

        _tO6 = new Threadobject(6);
        _t6 = new Task<Threadobject>(() => _tO6.DoAction(70),_cancellationToken);

        _tO7 = new Threadobject(7);
        _t7 = new Task<Threadobject>(() => _tO7.DoAction(200),_cancellationToken);

        var _tasks = new List<Task<Threadobject>> { _t1,_t2,_t3,_t4,_t5,_t6,_t7,new Task<Threadobject>(() => { Thread.Sleep(1); return null; }) };

        var _completedTasks = new List<Task<Threadobject>>();

        var _mainTask = new List<Task<Threadobject>>();

        _tasks.AsParallel().WithCancellation(_cancellationToken).ForAll(o => o.Start());
        while (_tasks.Count > 0)
        {
            var completed = await Task.WhenAny(_tasks);
            if (completed.Result._isComplete)
            {
                if (completed.Result._id == 1)
                {
                    _mainTask.Add(completed);
                }
                else
                {
                    _completedTasks.Add(completed);
                }
                if (_completedTasks.Count > 0 && _mainTask.Count > 0)
                {
                    _mainTask[0].Result._idSec = _completedTasks[0].Result._id;
                    _cancellationTokenSource.Cancel();
                    return await _mainTask[0];
                }
            }
            else
            {
                _tasks.Remove(completed);
            }
        }
        return null;
    }
}

public class Threadobject
{
    public bool _isComplete = false;
    public int _id;
    public int _idSec;

    private Thread thread;
    //private static readonly object lockObj;
    public Threadobject(int id)
    {
        _id = id;
    }
    public Threadobject DoAction(int _milliseconds)
    {
        Thread.Sleep(_milliseconds);
        _isComplete = true;
        File.AppendAllText("E:\\ThreadingDemo2_" + _id + ".txt",DateTime.Now.ToString("dd-MM-yyyy HH:mm:ss.fff") + " " + _id
                + " Processing finished." + Environment.NewLine);
        return this;
    }
}

我不确定如何在上述方法中实现超时逻辑。因此,我使用 Thread 类实现了另一个逻辑来实现如下相同:

public partial class Form1 : Form
{
    public Form1()
    {
        InitializeComponent();
    }

    private void btnStart_Click(object sender,EventArgs e)
    {
        ThreadWrapper threadWrapper = new ThreadWrapper();
        Stopwatch _st = new Stopwatch();
        _st.Start();
        Threadobject TO = threadWrapper.DoTasks();
        _st.Stop();
        if (TO != null)
        {
            if (TO._idSec == 0)
                txtResult.Text += TO._id.ToString() + " is completed in "
                    + _st.ElapsedMilliseconds + " ms." + Environment.NewLine;
            else
                txtResult.Text += TO._id.ToString() + " and " + TO._idSec
                    + " is completed in " + _st.ElapsedMilliseconds + " ms."
                    + Environment.NewLine;
        }
        else
        {
            txtResult.Text += "All Failed in " + _st.ElapsedMilliseconds + " ms."
                + Environment.NewLine;
        }
    }
}

public class ThreadWrapper
{
    Thread _t1;
    Thread _t2;
    Thread _t3;
    Thread _t4;
    Thread _t5;
    Thread _t6;
    Thread _t7;

    Threadobject _tO1;
    Threadobject _tO2;
    Threadobject _tO3;
    Threadobject _tO4;
    Threadobject _tO5;
    Threadobject _tO6;
    Threadobject _tO7;

    Threadobject mainThread;
    int SecId = 0;

    public Threadobject DoTasks()
    {
        _tO1 = new Threadobject(1);
        //_tO1.TaskCompleted += TaskCompleted;
        _t1 = new Thread(() => mainThread = _tO1.DoAction(50));

        _tO2 = new Threadobject(2);
        _tO2.TaskCompleted += TaskCompleted;
        _t2 = new Thread(() => _tO2.DoAction(10));

        _tO3 = new Threadobject(3);
        _tO3.TaskCompleted += TaskCompleted;
        _t3 = new Thread(() => _tO3.DoAction(30));

        _tO4 = new Threadobject(4);
        _tO4.TaskCompleted += TaskCompleted;
        _t4 = new Thread(() => _tO4.DoAction(40));

        _tO5 = new Threadobject(5);
        _tO5.TaskCompleted += TaskCompleted;
        _t5 = new Thread(() => _tO5.DoAction(60));

        _tO6 = new Threadobject(6);
        _tO6.TaskCompleted += TaskCompleted;
        _t6 = new Thread(() => _tO6.DoAction(70));

        _tO7 = new Threadobject(7);
        _tO7.TaskCompleted += TaskCompleted;
        _t7 = new Thread(() => _tO7.DoAction(20));

        List<Thread> threads = new List<Thread> { _t1,_t7 };

        threads.AsParallel<Thread>().ForAll<Thread>(o => o.Start());

        _t1.Join(200);

        Task.Run(() =>
        threads.AsParallel<Thread>().ForAll<Thread>(o => o.Abort()));
        if (mainThread != null)
        {
            mainThread._idSec = SecId;
        }

        return mainThread;
    }

    private void TaskCompleted(int id,int idSec,bool isCompleted)
    {
        if (SecId == 0)
            SecId = id;
    }
}

public class Threadobject
{
    public bool _isComplete = false;
    public int _id;
    public int _idSec;
    public delegate void NotifyTaskCompletion(int id,bool isCompleted);
    public event NotifyTaskCompletion TaskCompleted;

    private static readonly object lockObj = 1;
    //private static readonly object lockObj;
    public Threadobject(int id)
    {
        _id = id;
    }
    public Threadobject DoAction(int _milliseconds)
    {
        Thread.Sleep(_milliseconds);
        _isComplete = true;
        //lock (lockObj)
        //{
        //    File.AppendAllText("E:\\ThreadingDemo3_" + _id + ".txt"
        //,DateTime.Now.ToString("dd-MM-yyyy HH:mm:ss.fff") + " " + _id
        //            + " Processing finished." + Environment.NewLine);
        //}
        TaskCompleted?.Invoke(_id,_idSec,_isComplete);
        return this;
    }
}

我之前使用 Parallel.Invoke 方法并行执行任务,但最近我注意到 Parallel.Invoke 实际上并没有并行处理任务,因此最终占用了不必要的时间。 现在到我的实际问题,是的,有多个,如下:

  1. 有没有办法在我的并行任务中添加超时?我已经知道一种方法,我可以在并行运行的任务集合中添加一个任务,该任务将暂停 200 毫秒或我想要的任何超时,然后返回 null。 (因为我使用 WhenAny 来检查完成,控制将返回给调用 UI 线程)。但这对我来说就像是在作弊,所以我想知道是否还有其他方法
  2. 在这种情况下,当我从类创建 Task 时,TaskWrapper 会将 TaskWrapper 对象处理或设置为 null 中止或销毁使用该对象创建/运行的任务吗?
  3. 同样,对于线程,如果我在这种情况下从类内部创建了一个线程\线程ThreadWrapper,会将 ThreadWrapper 对象处理或设置为空中止或销毁创建的线程\运行使用那个对象?

编辑:清除评论中提到的混淆。

解决方法

简单的方法是使用 CancelAfter 方法在 200 毫秒后取消取消令牌。另一种选择是运行 await Task.Delay(200)。我还建议使用 Parallel.For,因为这将是更紧凑的代码:

    public static void CancelParallel()
    {
        var cts = new CancellationTokenSource();
        cts.CancelAfter(TimeSpan.FromSeconds(200));
        var po = new ParallelOptions()
        {
            CancellationToken = cts.Token
        };
        void ParallelMethod(int iterationIndex,ParallelLoopState state)
        {
            // Do whatever
            // if the iteration is long running you can check if it should exit
            if (state.ShouldExitCurrentIteration)
            {
                return;
            }
        }

        var loopResult = Parallel.For(0,7,po,ParallelMethod);
        if (loopResult.IsCompleted)
        {
            // Do something if all threads completed
        }
        else
        {
            // Do something else
        }
    }

如果你只关心第一次迭代的成功,如果成功,设置一些标志并检查它而不是 loopResult。

请注意,Parallel.For 将在令牌取消时停止运行新的迭代,但会让现有迭代运行完成,因此您可能希望也可能不想检查迭代中的令牌

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。