- A+
所属分类:.NET技术
NET 6 环境开发 实现 线程数量,任务队列,非核心线程,及核心线程活跃时间的管理。
namespace CustomThreadPool; /// <summary> /// 线程池类 /// </summary> public class ThreadPoolExecutor { /// <summary> /// 核心线程的任务队列 /// </summary> private readonly Queue<WorkTask> tasks = new Queue<WorkTask>(); /// <summary> /// 最大核心线程数 /// </summary> private int coreThreadCount; /// <summary> /// 最大非核心线程数 /// </summary> private int noneCoreThreadCount; /// <summary> /// 当前运行的核心线程的数量 /// </summary> private int runCoreThreadCount; /// <summary> /// 当前运行的非核心线程的数量 /// </summary> private int runNoneCoreThreadCount; /// <summary> /// 核心线程队列的最大数 /// </summary> private int maxQueueCount; /// <summary> /// 当核心线程空闲时最大活跃时间 /// </summary> private int keepAliveTimeout; /// <summary> /// 设置是否为后台线程 /// </summary> private bool isBackground; private ThreadPoolExecutor() { } /// <summary> /// /// </summary> /// <param name="CoreThreadCount">核心线程数</param> /// <param name="TotalThreadCount">总线程数</param> /// <param name="IsBackground">是否为后台线程</param> /// <param name="QueueCount">核心队列的最大数</param> /// <param name="KeepAliveTimeout">当核心线程空闲时最大活跃时间</param> /// <exception cref="ArgumentOutOfRangeException"></exception> /// <exception cref="ArgumentException"></exception> public ThreadPoolExecutor(int CoreThreadCount = 5, int TotalThreadCount = 10, bool IsBackground = true, int QueueCount = 200, int KeepAliveTimeout = 0) { if (CoreThreadCount < 1) throw new ArgumentOutOfRangeException(nameof(CoreThreadCount), CoreThreadCount, null); if (TotalThreadCount < CoreThreadCount) throw new ArgumentException($"{nameof(TotalThreadCount)}:{TotalThreadCount} must be greater than {nameof(CoreThreadCount)}:{CoreThreadCount}"); if (QueueCount < 0) throw new ArgumentOutOfRangeException(nameof(QueueCount), QueueCount, null); if (KeepAliveTimeout < 0) throw new ArgumentOutOfRangeException(nameof(KeepAliveTimeout), KeepAliveTimeout, null); coreThreadCount = CoreThreadCount; noneCoreThreadCount = TotalThreadCount - CoreThreadCount; keepAliveTimeout = KeepAliveTimeout; maxQueueCount = QueueCount; isBackground = IsBackground; } /// <summary> /// 执行任务 /// </summary> /// <param name="task">一个自定义任务</param> /// <exception cref="ArgumentNullException">任务为null时,抛出该错误</exception> /// <exception cref="NotSupportedException">当核心任务队列已满且非核心线程最大数为0时抛出该错误</exception> public void QueueTask(WorkTask task) { if (task == null) throw new ArgumentNullException(nameof(task)); lock (tasks) { tasks.Enqueue(task); if (tasks.Count <= maxQueueCount) { if (runCoreThreadCount < coreThreadCount) { ++runCoreThreadCount; Run(true); } } else { if (noneCoreThreadCount > 0 && runNoneCoreThreadCount < noneCoreThreadCount) { ++runNoneCoreThreadCount; Run(false); } } } } private void Run(bool isCore) { Tuple<int, bool> state = new(keepAliveTimeout, isCore); Thread thread = new(t => Excute(t)) { Name = Guid.NewGuid().ToString("D"), IsBackground = isBackground }; thread.Start(state); } private void Excute(object? state) { if (state == null) return; var parameter = (Tuple<int, bool>)state; bool first = true; DateTime firstTime = DateTime.Now; while (true) { WorkTask? item = null; lock (tasks) { if (tasks.Count > 0) { first = true; item = tasks.Dequeue(); } else { if (parameter.Item2) { if (first) { firstTime = DateTime.Now; first = false; } if ((DateTime.Now - firstTime).TotalMilliseconds > parameter.Item1) { --runCoreThreadCount; break; } } else { --runNoneCoreThreadCount; break; } } } item?.Runsynchronous(); } } }
namespace CustomThreadPool; /// <summary> /// 包装的任务类 /// </summary> public class WorkTask { public static WorkTaskFactory Factory { get; private set; } = new WorkTaskFactory(); /// <summary> /// 任务运行结束时触发该事件 /// </summary> public event Action<WorkTask>? TaskCompleted; /// <summary> /// 任务ID /// </summary> private static int _id = 0; /// <summary> /// 委托给任务不带执行参数的代码 /// </summary> private readonly Action? action; /// <summary> /// 委托给任务执行的带输入参数代码 /// </summary> private readonly Action<object?>? actionWithParamter; /// <summary> /// 线程间的同步事件 /// </summary> public AutoResetEvent WaitHandle { get; protected set; } = new AutoResetEvent(false); /// <summary> /// 执行代码的参数 /// </summary> public object? State { get; protected set; } /// <summary> /// 接收任务抛出的异常 /// </summary> public WorkTaskException? Exception { get; protected set; } /// <summary> /// 任务是否完成标志 /// </summary> public bool IsCompleted { get; protected set; } = false; /// <summary> /// 任务知否有异常 /// </summary> public bool IsFaulted { get; protected set; } = false; /// <summary> /// 任务状态 /// </summary> public WorkTaskStatus Status { get; protected set; } = WorkTaskStatus.Created; public int Id { get { return Interlocked.Increment(ref _id); } } protected WorkTask() { } protected void OnTaskCompleted(WorkTask sender) { TaskCompleted?.Invoke(sender); } public WorkTask(Action action) { this.action = action ?? throw new ArgumentNullException(nameof(action)); } public WorkTask(Action<object?> action, object state) { actionWithParamter = action ?? throw new ArgumentNullException(nameof(action)); this.State = state; } /// <summary> /// 任务的同步方法 /// </summary> public virtual void Runsynchronous() { if (Status != WorkTaskStatus.Created) return; Status = WorkTaskStatus.Running; try { action?.Invoke(); actionWithParamter?.Invoke(State); } catch (Exception ex) { Exception = new WorkTaskException(ex.Message, ex); IsFaulted = true; } finally { OnTaskCompleted(this); WaitHandle.Set(); IsCompleted = true; Status = WorkTaskStatus.RanToCompleted; } } /// <summary> /// 通过调用线程执行的方法 /// </summary> public void Start() { Factory.ThreadPoolExcutor?.QueueTask(this); } /// <summary> /// 通过调用线程执行的方法 /// </summary> /// <param name="executor">线程池管理类</param> public void Start(ThreadPoolExecutor executor) { executor.QueueTask(this); } /// <summary> /// 执行一组任务并等待所有任务完成。 /// </summary> /// <param name="tasks">一组任务</param> /// <returns>所有任务是否都接收到完成的信号。</returns> public static bool WaitAll(WorkTask[] tasks) { var result = true; foreach (var task in tasks) { result = result && task.WaitHandle.WaitOne(); } return result; } /// <summary> /// 执行一组任务并等待任意一个任务完成。 /// </summary> /// <param name="tasks">一组任务</param> /// <returns>返回已完成任务的索引</returns> public static int WaitAny(WorkTask[] tasks) { var index = new Random().Next(0, tasks.Length - 1); tasks[index].WaitHandle.WaitOne(); return index; } } /// <summary> /// 具有返回类型的任务 /// </summary> /// <typeparam name="TResult"></typeparam> public class WorkTask<TResult> : WorkTask { private readonly Func<TResult>? func; private readonly Func<object?, TResult>? funcWithParameter; protected TResult? _result = default(TResult); public TResult? Result { get { if (!isSetSignal) WaitHandle.WaitOne(); return _result; } } public WorkTask(Func<TResult> func) { this.func = func ?? throw new ArgumentNullException(nameof(func)); } public WorkTask(Func<object?, TResult> func, object? state) { this.funcWithParameter = func ?? throw new ArgumentNullException(nameof(func)); this.State = state; } private bool isSetSignal = false; public override void Runsynchronous() { if (Status != WorkTaskStatus.Created) return; Status = WorkTaskStatus.Running; try { if (func != null) _result = func(); if (funcWithParameter != null) _result = funcWithParameter(State); } catch (Exception ex) { Exception = new WorkTaskException(ex.Message, ex); IsFaulted = true; } finally { OnTaskCompleted(this); isSetSignal = WaitHandle.Set(); Status = WorkTaskStatus.RanToCompleted; IsCompleted = true; } } } public class WorkTaskException : Exception { public WorkTaskException() { } public WorkTaskException(string Message) : base(Message) { } public WorkTaskException(string Message, Exception InnerException) : base(Message, InnerException) { } } public enum WorkTaskStatus { /// <summary> /// 已创建 /// </summary> Created = 0, /// <summary> /// 正在运行 /// </summary> Running = 1, /// <summary> /// 已完成 /// </summary> RanToCompleted = 2, }
namespace CustomThreadPool; public class WorkTaskFactory { public ThreadPoolExecutor? ThreadPoolExcutor { get; private set; } public WorkTaskFactory(ThreadPoolExecutor excutor) { ThreadPoolExcutor = excutor; } public WorkTaskFactory() : this(new ThreadPoolExecutor(5, 10)) { } public WorkTask StartNew(Action action, ThreadPoolExecutor? executor = null) { WorkTask task = new WorkTask(action); ThreadPoolExcutor = executor ?? ThreadPoolExcutor; ThreadPoolExcutor?.QueueTask(task); return task; } public WorkTask<TResult> StartNew<TResult>(Func<object?, TResult> func, object? state, ThreadPoolExecutor? executor = null) { WorkTask<TResult> task = new WorkTask<TResult>(func, state); ThreadPoolExcutor = executor ?? ThreadPoolExcutor; ThreadPoolExcutor?.QueueTask(task); return task; } }
namespace CustomThreadPool; using System.Threading; using System.Text; using System; using System.Diagnostics; using System.Reflection.Emit; class Program { static void Main(string[] args) { int count = 5; ThreadPoolExecutor poolExcutor = new(5, 6, QueueCount: 5, KeepAliveTimeout: 2000); WorkTask<int?>[] workTasks = new WorkTask<int?>[count]; for (int i = 0; i < count; i++) workTasks[i] = WorkTask.Factory.StartNew(t => Action(t), state: i, executor: poolExcutor); WorkTask<int> task = WorkTask.Factory.StartNew(t => { Thread.Sleep(100); Console.WriteLine("start thread"); return 100; }, state: null, executor: poolExcutor); Console.WriteLine("start main"); WorkTask.WaitAll(workTasks); Console.WriteLine(task.Result); Console.WriteLine(workTasks.Sum(t => t.Result)); } private static int? Action(object? t) { Thread.Sleep(2000); Console.WriteLine($"Task Id:{Environment.CurrentManagedThreadId},Parameter:{t}"); return t == null ? default(int?) : (int)t + 1; } }
调用结果