目录
  • 前言
  • 节点
    • Then
    • Parallel
    • Schedule
    • Delay
  • 试用一下
    • 顺序节点
    • 并行义务
  • 编写事情流
    • 接口构建器
    • 事情流构建器
    • 依赖注入
    • 实现事情流剖析

前言

前面学习了许多多线程和义务的基础知识,这里要来实践一下啦。通过本篇教程,你可以写出一个简朴的事情流引擎。

本篇教程内容完成是基于义务的,只需要看过笔者的三篇关于异步的文章,掌握 C# 基础,即可轻松完成。

  • C#多线程(13):义务基础①
  • C#多线程(14):义务基础②
  • C#多线程(15):义务基础③

由于本篇文章编写的事情流程序,主要使用义务,有些逻辑历程会比较难明白,多测试一下就好。代码主要照样 C# 基础,为什么说简朴?

  • 不包罗 async 、await
  • 险些不含包罗多线程(有个读写锁)
  • 不包罗表达式树
  • 险些不含反射(有个小地方需要反射一下,然则异常简朴)
  • 没有庞大的算法

由于是基于义务(Task)的,以是可以轻松设计组合流程,组成庞大的事情流。

由于只是讲述基础,以是不会包罗许多种流程控制,这里只实现一些简朴的。

先说明,别用到营业上。。。这个事情流异常简朴,就几个功效,这个事情流是基于笔者的多线程系列文章的知识点。写这个器械是为领会说义务操作,让读者加倍深入明白义务。

代码地址:https://github.com/whuanle/CZGL.FLow

这两天忙着搬器械,今天没认真写文章,代码不明白的地方,可以到微信群找我。微信名称:痴者工良,dotnet 的群基本我都在。

节点

在最先前,我们来设计几种流程控制的器械。

将一个 步骤/流程/节点 称为 step。

Then

一个通俗的节点,包罗一个义务。

多个 Then 节点,可以组成一条延续的事情流。

Parallel

并行节点,可以设置多个并行节点放到 Parallel 中,以及在内里为任一个节点确立新的分支。

Schedule

准时节点,确立后会在一准时间后执行节点中的义务。

Delay

让当前义务壅闭一段时间。

试用一下

顺序节点

打开你的 VS ,确立项目,Nuget 引用 CZGL.DoFlow ,版本 1.0.2 。

确立一个类 MyFlow1,继续 IDoFlow

    public class MyFlow1 : IDoFlow
    {
        public int Id => 1;

        public string Name => "随便起个名字";

        public int Version => 1;

        public IDoFlowBuilder Build(IDoFlowBuilder builder)
        {
            throw new NotImplementedException();
        }
    }

你可以确立多个事情流义务,每个事情流的 Id 必须唯一。Name 和 Version 随便填,由于这里笔者没有对这几个字段做逻辑。

IDoFlowBuilder 是构建事情流的一个接口。

我们来写一个事情流测试一下。

/// <summary>
/// 通俗节点 Then 使用方式
/// </summary>
public class MyFlow1 : IDoFlow
{
    public int Id => 1;
    public string Name => "test";
    public int Version => 1;

    public IDoFlowBuilder Build(IDoFlowBuilder builder)
    {
        builder.StartWith(() =>
        {
            Console.WriteLine("事情流最先");
        }).Then(() =>
        {
            Console.WriteLine("下一个节点");
        }).Then(() =>
         {
             Console.WriteLine("最后一个节点");
         });
        return builder;
    }
} 

Main 方式中:

        static void Main(string[] args)
        {
            FlowCore.RegisterWorkflow<MyFlow1>();
            // FlowCore.RegisterWorkflow(new MyFlow1());
            FlowCore.Start(1);
            Console.ReadKey();
        }

.StartWith() 方式最先一个事情流;

FlowCore.RegisterWorkflow<T>() 注册一个事情流;

FlowCore.Start();执行一个事情流;

并行义务

其代码如下:

    /// <summary>
    /// 并行节点 Parallel 使用方式
    /// </summary>
    public class MyFlow2 : IDoFlow
    {
        public int Id => 2;
        public string Name => "test";
        public int Version => 1;

        public IDoFlowBuilder Build(IDoFlowBuilder builder)
        {
            builder.StartWith()
                .Parallel(steps =>
                {
                    // 每个并行义务也可以设计后面继续执行其它义务
                    steps.Do(() =>
                    {
                        Console.WriteLine("并行1");
                    }).Do(() =>
                    {
                        Console.WriteLine("并行2");
                    });
                    steps.Do(() =>
                    {
                        Console.WriteLine("并行3");
                    });

                    // 并行义务设计完成后,必须挪用此方式
                    // 此方式必须放在所有并行义务 .Do() 的最后
                    steps.EndParallel();

                    // 若是 .Do() 在 EndParallel() 后,那么不会守候此义务
                    steps.Do(() => { Console.WriteLine("并行异步"); });

                    // 开启新的分支
                    steps.StartWith()
                    .Then(() =>
                    {
                        Console.WriteLine("新的分支" + Task.CurrentId);
                    }).Then(() => { Console.WriteLine("分支2.0" + Task.CurrentId); });

                }, false)
                .Then(() =>
                {
                    Console.WriteLine("11111111111111111 ");
                });

            return builder;
        }
    }

Main 方式中:

        static void Main(string[] args)
        {
            FlowCore.RegisterWorkflow<MyFlow2>();
            FlowCore.Start(2);
            Console.ReadKey();
        }

通过以上示例,可以也许领会本篇文章中我们要写的程序。

编写事情流

确立一个类库项目,名为 DoFlow

确立 ExtensionsInterfacesServices 三个目录。

接口构建器

新建 IStepBuilder 接口文件到 Interfaces 目录,其内容如下:

using System;

namespace DoFlow.Interfaces
{
    public interface IStepBuilder
    {
        /// <summary>
        /// 通俗节点
        /// </summary>
        /// <param name="stepBuilder"></param>
        /// <returns></returns>
        IStepBuilder Then(Action action);

        /// <summary>
        /// 多个节点
        /// <para>默认下,需要守候所有的义务完成,这个step才算完成</para>
        /// </summary>
        /// <param name="action"></param>
        /// <param name="anyWait">随便一个义务完成即可跳转到下一个step</param>
        /// <returns></returns>
        IStepBuilder Parallel(Action<IStepParallel> action, bool anyWait = false);

        /// <summary>
        /// 节点将在某个时间距离后执行
        /// <para>异步,不会壅闭当前事情流的运行,计划义务将在一段时间后触发</para>
        /// </summary>
        /// <returns></returns>
        IStepBuilder Schedule(Action action, TimeSpan time);

        /// <summary>
        /// 壅闭一段时间
        /// </summary>
        /// <param name="time"></param>
        /// <returns></returns>
        IStepBuilder Delay(TimeSpan time);
    }
}

新建 IStepParallel 文件到 Interfaces 目录。

using System;

namespace DoFlow.Interfaces
{
    /// <summary>
    /// 并行义务
    ///  <para>默认情况下,只有这个节点的所有并行义务都完成后,这个节点才算完成</para>
    /// </summary>
    public interface IStepParallel
    {
        /// <summary>
        /// 一个并行义务
        /// </summary>
        /// <param name="action"></param>
        /// <returns></returns>
        IStepParallel Do(Action action);

        /// <summary>
        /// 最先一个分支
        /// </summary>
        /// <param name="action"></param>
        /// <returns></returns>
        IStepBuilder StartWith(Action action = null);

        /// <summary>
        /// 必须使用此方式竣事一个并行义务
        /// </summary>
        void EndParallel();
    }

    /// <summary>
    /// 并行义务
    /// <para>随便一个义务完成后,就可以跳转到下一个 step</para>
    /// </summary>
    public interface IStepParallelAny : IStepParallel
    {

    }
}

事情流构建器

新建 IDoFlowBuilder 接口文件到 Interfaces 目录。

using System;
using System.Threading.Tasks;

namespace DoFlow.Interfaces
{
    /// <summary>
    /// 构建事情流义务
    /// </summary>
    public interface IDoFlowBuilder
    {
        /// <summary>
        /// 最先一个 step
        /// </summary>
        IStepBuilder StartWith(Action action = null);
        void EndWith(Action action);

        Task ThatTask { get; }
    }
}

新建 IDoFlow 接口文件到 Interfaces 目录。

namespace DoFlow.Interfaces
{

    /// <summary>
    /// 事情流
    /// <para>无参数通报</para>
    /// </summary>
    public interface IDoFlow
    {
        /// <summary>
        /// 全局唯一标识
        /// </summary>
        int Id { get; }

        /// <summary>
        /// 标识此事情流的名称
        /// </summary>
        string Name { get; }

        /// <summary>
        /// 标识此事情流的版本
        /// </summary>
        int Version { get; }

        IDoFlowBuilder Build(IDoFlowBuilder builder);
    }
}

依赖注入

新建 DependencyInjectionService 文件到 Services 目录。

用于实现依赖注入息争耦。

using DoFlow.Extensions;
using Microsoft.Extensions.DependencyInjection;
using System;

namespace DoFlow.Services
{
    /// <summary>
    /// 依赖注入服务
    /// </summary>
    public static class DependencyInjectionService
    {
        private static IServiceCollection _servicesList;
        private static IServiceProvider _services;
        static DependencyInjectionService()
        {
            IServiceCollection services = new ServiceCollection();
            _servicesList = services;
            // 注入引擎需要的服务
            InitExtension.StartInitExtension();
            var serviceProvider = services.BuildServiceProvider();
            _services = serviceProvider;
        }

        /// <summary>
        /// 添加一个注入到容器服务
        /// </summary>
        /// <typeparam name="TService"></typeparam>
        /// <typeparam name="TImplementation"></typeparam>
        public static void AddService<TService, TImplementation>()
            where TService : class
            where TImplementation : class, TService
        {
            _servicesList.AddTransient<TService, TImplementation>();
        }

        /// <summary>
        /// 获取需要的服务
        /// </summary>
        /// <typeparam name="TIResult"></typeparam>
        /// <returns></returns>
        public static TIResult GetService<TIResult>()
        {
            TIResult Tservice = _services.GetService<TIResult>();
            return Tservice;
        }
    }
}

添加一个 InitExtension 文件到 Extensions 目录。

using DoFlow.Interfaces;
using DoFlow.Services;

namespace DoFlow.Extensions
{
    public static class InitExtension
    {
        private static bool IsInit = false;
        public static void StartInitExtension()
        {
            if (IsInit) return;
            IsInit = true;
            DependencyInjectionService.AddService<IStepBuilder, StepBuilder>();
            DependencyInjectionService.AddService<IDoFlowBuilder, DoFlowBuilder>();
            DependencyInjectionService.AddService<IStepParallel, StepParallelWhenAll>();
            DependencyInjectionService.AddService<IStepParallelAny, StepParallelWhenAny>();
        }
    }
}

实现事情流剖析

以下文件均在 Services 目录确立。

新建 StepBuilder 文件,用于剖析节点,构建义务。

using DoFlow.Interfaces;
using System;
using System.Threading.Tasks;

namespace DoFlow.Services
{

    /// <summary>
    /// 节点事情引擎
    /// </summary>
    public class StepBuilder : IStepBuilder
    {
        private Task _task;

        /// <summary>
        /// 延迟执行
        /// </summary>
        /// <param name="time"></param>
        /// <returns></returns>
        public IStepBuilder Delay(TimeSpan time)
        {
            Task.Delay(time).Wait();
            return this;
        }

        /// <summary>
        /// 并行 step
        /// </summary>
        /// <param name="action"></param>
        /// <returns></returns>
        public IStepBuilder Parallel(Action<IStepParallel> action, bool anyAwait = false)
        {
            IStepParallel parallel = anyAwait ? DependencyInjectionService.GetService<IStepParallelAny>() : DependencyInjectionService.GetService<IStepParallel>();
            Task task = new Task(() =>
            {
                action.Invoke(parallel);
            });

            _task.ConfigureAwait(false).GetAwaiter().OnCompleted(() =>
            {
                task.Start();
            });
            _task = task;
            return this;
        }

        /// <summary>
        /// 计划义务
        /// </summary>
        /// <param name="action"></param>
        /// <param name="time"></param>
        /// <returns></returns>
        public IStepBuilder Schedule(Action action, TimeSpan time)
        {
            Task.Factory.StartNew(() =>
            {
                Task.Delay(time).Wait();
                action.Invoke();
            });
            return this;
        }

        /// <summary>
        /// 通俗 step
        /// </summary>
        /// <param name="action"></param>
        /// <returns></returns>
        public IStepBuilder Then(Action action)
        {
            Task task = new Task(action);
            _task.ConfigureAwait(false).GetAwaiter().OnCompleted(() =>
            {
                task.Start();
                task.Wait();
            });
            _task = task;
            return this;
        }

        public void SetTask(Task task)
        {
            _task = task;
        }
    }
}

新建 StepParallel 文件,内里有两个类,用于实现同步义务。

using DoFlow.Interfaces;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace DoFlow.Services
{
    /// <summary>
    /// 第一层所有义务竣事后才气跳转下一个 step
    /// </summary>
    public class StepParallelWhenAll : IStepParallel
    {
        private Task _task;
        private readonly List<Task> _tasks = new List<Task>();
        public StepParallelWhenAll()
        {
            _task = new Task(() => { },TaskCreationOptions.AttachedToParent);
        }
        public IStepParallel Do(Action action)
        {
            _tasks.Add(Task.Run(action));
            return this;
        }

        public void EndParallel()
        {
            _task.ConfigureAwait(false).GetAwaiter().OnCompleted(() =>
            {
                Task.WhenAll(_tasks).Wait();
            });
        }

        public IStepBuilder StartWith(Action action = null)
        {
            Task task =
                action is null ? new Task(() => { })
                : new Task(action);
            var _stepBuilder = DependencyInjectionService.GetService<IStepBuilder>();
            _task.ConfigureAwait(false).GetAwaiter().OnCompleted(() => { task.Start(); });

            return _stepBuilder;
        }
    }

    /// <summary>
    /// 完成随便一个义务即可跳转到下一个 step
    /// </summary>
    public class StepParallelWhenAny : IStepParallelAny
    {
        private Task _task;
        private readonly List<Task> _tasks = new List<Task>();
        public StepParallelWhenAny()
        {
            _task = Task.Run(() => { });
        }
        public IStepParallel Do(Action action)
        {
            _tasks.Add(Task.Run(action));
            return this;
        }

        public void EndParallel()
        {
            _task.ConfigureAwait(false).GetAwaiter().OnCompleted(() =>
            {
                Task.WhenAny(_tasks).Wait();
            });
        }

        public IStepBuilder StartWith(Action action = null)
        {
            Task task =
                action is null ? new Task(() => { })
                : new Task(action);
            var _stepBuilder = DependencyInjectionService.GetService<IStepBuilder>();
            _task.ConfigureAwait(false).GetAwaiter().OnCompleted(() => { task.Start(); });

            return _stepBuilder;
        }
    }
}

新建 DoFlowBuilder 文件,用于构建事情流。

using DoFlow.Interfaces;
using System;
using System.Threading.Tasks;

namespace DoFlow.Services
{
    public class DoFlowBuilder : IDoFlowBuilder
    {
        private Task _task;
        public Task ThatTask => _task;

        public void EndWith(Action action)
        {
            _task.Start();
        }

        public IStepBuilder StartWith(Action action = null)
        {
            if (action is null)
                _task = new Task(() => { });
            else _task = new Task(action);

            IStepBuilder _stepBuilder = DependencyInjectionService.GetService<IStepBuilder>();
            ((StepBuilder)_stepBuilder).SetTask(_task);
            return _stepBuilder;
        }
    }
}

新建 FlowEngine 文件,用于执行事情流。

using DoFlow.Interfaces;

namespace DoFlow.Services
{
    /// <summary>
    /// 事情流引擎
    /// </summary>
    public class FlowEngine
    {
        private readonly IDoFlow _flow;
        public FlowEngine(IDoFlow flow)
        {
            _flow = flow;
        }

        /// <summary>
        /// 最先一个事情流
        /// </summary>
        public void Start()
        {
            IDoFlowBuilder builder = DependencyInjectionService.GetService<IDoFlowBuilder>();
            _flow.Build(builder).ThatTask.Start();
        }
    }
}

新建 FlowCore 文件,用于存储和索引事情流。使用读写锁解决并发字典问题

using DoFlow.Interfaces;
using System;
using System.Collections.Generic;
using System.Threading;

namespace DoFlow.Services
{
    public static class FlowCore
    {
        private static Dictionary<int, FlowEngine> flowEngines = new Dictionary<int, FlowEngine>();

        // 读写锁
        private static ReaderWriterLockSlim readerWriterLockSlim = new ReaderWriterLockSlim();

        /// <summary>
        /// 注册事情流
        /// </summary>
        /// <param name="flow"></param>
        public static bool RegisterWorkflow(IDoFlow flow)
        {
            try
            {
                readerWriterLockSlim.EnterReadLock();
                if (flowEngines.ContainsKey(flow.Id))
                    return false;
                flowEngines.Add(flow.Id, new FlowEngine(flow));
                return true;
            }
            finally
            {
                readerWriterLockSlim.ExitReadLock();
            }
        }

        /// <summary>
        /// 注册事情流
        /// </summary>
        /// <param name="flow"></param>
        public static bool RegisterWorkflow<TDoFlow>()
        {

            Type type = typeof(TDoFlow);
            IDoFlow flow = (IDoFlow)Activator.CreateInstance(type);
            try
            {
                readerWriterLockSlim.EnterReadLock();
                if (flowEngines.ContainsKey(flow.Id))
                    return false;
                flowEngines.Add(flow.Id, new FlowEngine(flow));
                return true;
            }
            finally
            {
                readerWriterLockSlim.ExitReadLock();
            }
        }

        /// <summary>
        /// 要启动的事情流
        /// </summary>
        /// <param name="id"></param>
        public static bool Start(int id)
        {
            FlowEngine engine;
            // 读写锁
            try
            {
                readerWriterLockSlim.EnterUpgradeableReadLock();

                if (!flowEngines.ContainsKey(id))
                    return default;
                try
                {
                    readerWriterLockSlim.EnterWriteLock();
                    engine = flowEngines[id];
                }
                catch { return default; }
                finally
                {
                    readerWriterLockSlim.ExitWriteLock();
                }
            }
            catch { return default; }
            finally
            {
                readerWriterLockSlim.ExitUpgradeableReadLock();
            }

            engine.Start();
            return true;
        }
    }
}

就这样程序写完了。

忙去了。

,

诚信在线

诚信在线www.goldo.cn用诚信赢得信任、用服务赢得口碑、用技术赢得赞赏。新的一年到来,诚信在线将一如既往地服务好每位客户。

Allbet Gaming声明:该文看法仅代表作者自己,与阳光在线无关。转载请注明:赣州论坛网:C#多线程(16):手把手教你撸一个工作流
发布评论

分享到:

阳江刀:台湾权王-群益金鼎证券 布局指数权证 规避下修风险
你是第一个吃螃蟹的人
发表评论

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。