• c# 异步进阶———— paralel [二]


    前言

    简单整理一下paralel,以上是并行的意思。

    正文

    我们在工作中常常使用task await 和 async,也就是将线程池进行了封装,那么还有一些更高级的应用。

    是对task的封装,那么来看下paralel。

    static void Main(string[] args)
    {
    	var ints= Enumerable.Range(1, 100);
    	var result = Parallel.ForEach(ints, arg =>
    	{
    		Console.WriteLine(arg);
    	});
    	
    	Console.Read();
    }
    

    可以看到结果是并行的。

    那么来看下实现机制。

    public static ParallelLoopResult ForEach<TSource>(IEnumerable source, Action body)
    {
    	if (source == null)
    	{
    		throw new ArgumentNullException(nameof(source));
    	}
    	if (body == null)
    	{
    		throw new ArgumentNullException(nameof(body));
    	}
    
    	return ForEachWorkerobject>(
    		source, s_defaultParallelOptions, body, null, null, null, null, null, null);
    }
    

    进行参数检验,然后交给了ForEachWorker。

    这是一个基本的代码思路,就是复杂的方法中可以先校验参数,然后具体实现交给另外一个方法。

    然后通过不同的类型,进行分类:

    然后看下具体实现是什么?

    进去看就是一个taskreplicator:

    看下run在做什么。

    public static void Run<TState>(ReplicatableUserAction action, ParallelOptions options, bool stopOnFirstFailure)
    {
    	int maxConcurrencyLevel = (options.EffectiveMaxConcurrencyLevel > 0) ? options.EffectiveMaxConcurrencyLevel : int.MaxValue;
    
    	TaskReplicator replicator = new TaskReplicator(options, stopOnFirstFailure);
    	new Replica(replicator, maxConcurrencyLevel, CooperativeMultitaskingTaskTimeout_RootTask, action).Start();
    
    	Replica nextReplica;
    	while (replicator._pendingReplicas.TryDequeue(out nextReplica))
    		nextReplica.Wait();
    
    	if (replicator._exceptions != null)
    		throw new AggregateException(replicator._exceptions);
    }
    
    1. 创建了一个taskreplictor,起到管理作用

    2. 然后创建了一个Replica,然后这个start 是关键

    3. 然后通过while,让每一个Replica 都运行完毕才推出,达到同步的效果

    if (replicator._exceptions != null)
    	throw new AggregateException(replicator._exceptions);
    

    可以看一下这个,这个是一个比较好的技巧。如果一个运行管理,不用抛出异常,之间在管理中进行运行处理总结。

    比如结果,异常等。

    那么就看下这个start。

    protected Replica(TaskReplicator replicator, int maxConcurrency, int timeout)
    {
    	_replicator = replicator;
    	_timeout = timeout;
    	_remainingConcurrency = maxConcurrency - 1;
    	_pendingTask = new Task(s => ((Replica)s).Execute(), this);
    	_replicator._pendingReplicas.Enqueue(this);
    }
    
    public void Start()
    {
    	_pendingTask.RunSynchronously(_replicator._scheduler);
    }
    

    将会运行Execute,是同步的,而不是异步的,也就是说第一个task将会运行在当前线程。

    那么看Execute在做什么?

    public void Execute()
    {
    	try
    	{
    		if (!_replicator._stopReplicating && _remainingConcurrency > 0)
    		{
    			CreateNewReplica();
    			_remainingConcurrency = 0; // new replica is responsible for adding concurrency from now on.
    		}
    
    		bool userActionYieldedBeforeCompletion;
    
    		ExecuteAction(out userActionYieldedBeforeCompletion);
    
    		if (userActionYieldedBeforeCompletion)
    		{
    			_pendingTask = new Task(s => ((Replica)s).Execute(), this, CancellationToken.None, TaskCreationOptions.None);
    			_pendingTask.Start(_replicator._scheduler);
    		}
    		else
    		{
    			_replicator._stopReplicating = true;
    			_pendingTask = null;
    		}
    	}
    	catch (Exception ex)
    	{
    		LazyInitializer.EnsureInitialized(ref _replicator._exceptions).Enqueue(ex);
    		if (_replicator._stopOnFirstFailure)
    			_replicator._stopReplicating = true;
    		_pendingTask = null;
    	}
    }
    

    一段一段分析:

    if (!_replicator._stopReplicating && _remainingConcurrency > 0)
    {
    	CreateNewReplica();
    	_remainingConcurrency = 0; // new replica is responsible for adding concurrency from now on.
    }
    

    这里当_replicator 也就是任务复制器没有停止的时候。这里有两种情况会停止,一种是任务完成,一种是任务异常且设置参数异常时候停止。

    _remainingConcurrency 指的是副本数,默认是int.max。

    那么就复制一个副本。

    protected override void CreateNewReplica()
    {
    	Replica newReplica = new Replica(_replicator, _remainingConcurrency, GenerateCooperativeMultitaskingTaskTimeout(), _action);
    	newReplica._pendingTask.Start(_replicator._scheduler);
    }
    

    复制完副本后,那么就开始运行我们的action了。

    protected override void ExecuteAction(out bool yieldedBeforeCompletion)
    {
    	_action(ref _state, _timeout, out yieldedBeforeCompletion);
    }
    

    这里传入了timeout,这个timeout并不是我们限制我们单个task的运行时间,而是当运行到一定时候后,这个task就停止运行,然后另外启动一个副本。

    if (CheckTimeoutReached(loopTimeout))
    {
    	replicationDelegateYieldedBeforeCompletion = true;
    	break;
    }
    
    if (userActionYieldedBeforeCompletion)
    {
    	_pendingTask = new Task(s => ((Replica)s).Execute(), this, CancellationToken.None, TaskCreationOptions.None);
    	_pendingTask.Start(_replicator._scheduler);
    }
    else
    {
    	_replicator._stopReplicating = true;
    	_pendingTask = null;
    }
    

    这个是为了符合操作系统的调度思想,跑的越久的,基本上优先级会低些。

    那么看下这个_action主要在做什么吧。

    while (myPartition.MoveNext())
    {
    	KeyValuePair<long, TSource> kvp = myPartition.Current;
    	long index = kvp.Key;
    	TSource value = kvp.Value;
    
    	// Update our iteration index
    	if (state != null) state.CurrentIteration = index;
    
    	if (simpleBody != null)
    		simpleBody(value);
    	else if (bodyWithState != null)
    		bodyWithState(value, state);
    	else if (bodyWithStateAndIndex != null)
    		bodyWithStateAndIndex(value, state, index);
    	else if (bodyWithStateAndLocal != null)
    		localValue = bodyWithStateAndLocal(value, state, localValue);
    	else
    		localValue = bodyWithEverything(value, state, index, localValue);
    
    	if (sharedPStateFlags.ShouldExitLoop(index)) break;
    
    	// Cooperative multitasking:
    	// Check if allowed loop time is exceeded, if so save current state and return.
    	// The task replicator will queue up a replacement task. Note that we don't do this on the root task.
    	if (CheckTimeoutReached(loopTimeout))
    	{
    		replicationDelegateYieldedBeforeCompletion = true;
    		break;
    	}
    }
    

    就是拉取我们的enumerator的数据,然后simpleBody(value),进行运行我们写的action。

    总结一下,其实Parallel 核心就是一个任务复制器,然后创建多个副本,拉取我们的数据,进行执行我们设置的action。

    里面的主要功能,Parallel做到了限制副本数,因为我们知道task并不是越多越好。

    第二个,如果长时间运行,那么Parallel是做了优化的,当达到timeout的时候,那么会重新启动一个副本(可以理解为一个线程)

    第三点,Parallel 有一个foreach 进行迭代器的处理,这里不仅仅是让任务可以并行。

    而且具备c# foreach的基本功能。

    static void Main(string[] args)
    {
    	var ints= Enumerable.Range(1, 100);
    	var result = Parallel.ForEach(ints,    (arg, state)
    		=>
    	{
    		if (state.IsStopped)
    		{
    			return;   
    		}
    		
    		if (arg > 18)
    		{
    			state.Break();
    		}
    	});
    	if (result.IsCompleted)
    	{
    		Console.WriteLine("完成");
    	}
    	Console.Read();
    }
    

    可以进行中断。

    还有一个函数,那就是stop,这个stop 比break 停止的快,break 要记录出,最小中断位置。

    而stop 就是立马停止下来。

    在上述中,我们知道可以传递一个taskschedule进行,那么这个taskschedule 是干什么的,对我们的任务调度有什么影响呢? 下一节,自我实现taskschedule。

  • 相关阅读:
    快手一面:讲一讲 Hadoop、Hive、Spark 之间的关系?
    SpringCloud【创建服务消费者、服务自保和服务剔除机制 、服务发现Discovery、高可用Eureka注册中心、高可用Eureka注册中心搭建】(二)
    61 不同路径
    Redis集群搭建
    深入实现 MyBatis 底层机制的任务阶段4 - 开发 Mapper 接口和 Mapper.xml
    基于JAVA-旅游产品销售管理-演示录像2020计算机毕业设计源码+系统+mysql数据库+lw文档+部署
    [Linux] Screen的简单使用与中途退出保持运行
    MYSQL 使用基础 - 这么用就对了
    Django后台项目实战之后台菜品类别信息管理
    C语言基础---函数、数组
  • 原文地址:https://www.cnblogs.com/aoximin/p/17324467.html