CancellationTokenSource(参考使用的就是这个-之前用abord存在问题!) CancellationTokenSource 是 .NET 框架中的一个类,主要用于创建和管理取消令牌(CancellationToken),这些令牌可以用于在异步操作中请求取消操作。以下是关于 CancellationTokenSource 的详细解释:
创建 CancellationTokenSource 对象 总结来说,CancellationTokenSource
是 .NET 中用于创建和管理取消令牌的重要工具,它允许开发者在异步操作中灵活地请求和响应取消操作,从而提高应用程序的健壮性和可靠性。
使用 new
关键字实例化 CancellationTokenSource
类来创建一个新的对象。
1 CancellationTokenSource cts = new CancellationTokenSource();
生成取消标记(CancellationToken)
请求取消操作
当需要取消某个操作时,可以调用 CancellationTokenSource
的 Cancel
方法。
调用此方法后,与 CancellationToken
关联的操作会接收到取消请求,并可以通过检查 CancellationToken.IsCancellationRequested
属性来确定是否应该停止执行。
取消多个操作
检查取消请求
在执行可能需要取消的操作时,应定期检查 CancellationToken.IsCancellationRequested
属性来确定是否已请求取消。
1 2 3 4 5 if (token.IsCancellationRequested) { return ; }
注意事项
CancellationTokenSource
可以在多线程和异步操作中使用,以安全地取消执行中的操作。
当 CancellationTokenSource.Cancel
被调用时,与其关联的 CancellationToken
会被标记为取消,但这并不意味着操作会立即停止。操作应该定期检查 CancellationToken.IsCancellationRequested
并适当地响应。
如果与 CancellationToken
关联的操作在接收到取消请求后仍然继续执行,那么它可能会抛出 OperationCanceledException
异常。这是通知调用者操作已被取消的一种机制。
Task启动,暂停,继续,结束 延时: 1 2 3 4 5 6 async Task DelayMethod ( ) { Console.WriteLine("Before Delay" ); await Task.Delay(1000 ); Console.WriteLine("After Delay" ); }
类构造方法: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 在C#中,可以使用Task类来创建和管理多线程任务。以下是Task的启动、暂停、继续和结束的方法: 启动Task: 1. 使用Task.Run()方法启动一个新的Task。2. 使用Task.Factory.StartNew()方法启动一个新的Task。3. 使用Task构造函数创建一个新的Task,然后调用Task.Start()方法启动它。 暂停Task: 1. 使用ManualResetEvent类创建一个事件对象,然后在Task中使用WaitOne()方法暂停Task。2. 使用CancellationTokenSource类创建一个取消令牌对象,然后在Task中使用该对象的Cancel()方法暂停Task。 继续Task: 1. 使用ManualResetEvent类的Set()方法继续Task。2. 使用CancellationTokenSource类的Cancel()方法取消暂停Task。 结束Task: 1. 使用Task.Wait()方法等待Task完成。2. 使用Task.WaitAll()方法等待多个Task完成。3. 使用Task.WaitAny()方法等待任何一个Task完成。4. 使用Task的CancellationTokenSource对象的Cancel()方法取消Task。
1 2 3 4 5 6 7 8 9 10 11 public Task (Action action ) ;public Task (Action action, CancellationToken 取消令牌 ) ;public Task (Action action, TaskCreationOptions 创建选项 ) ;public Task (Action<object > action, object 状态 ) ;public Task (Action action, CancellationToken 取消令牌, TaskCreationOptions 创建选项 ) ;public Task (Action<object > action, object 状态, CancellationToken 取消令牌 ) ;public Task (Action<object > action, object 状态, TaskCreationOptions 创建选项 ) ;public Task (Action<object > action, object 状态, CancellationToken 取消令牌, TaskCreationOptions 创建选项 ) ; public static Task Run (Func<Task> function, CancellationToken cancellationToken关闭线程 )
第二参数:用于关闭线程。(字段位置) 1 2 3 private CancellationTokenSource cts ;private ManualResetEvent resetEvent = new ManualResetEvent(true );
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public static async Task Main ( ) { var tokenSource = new CancellationTokenSource(); var token = tokenSource.Token; var files = new List<Tuple<string , string , long , DateTime>>(); var t = Task.Run( () => { string dir = "C:\\Windows\\System32\\" ; object obj = new Object(); if (Directory.Exists(dir)) { Parallel.ForEach(Directory.GetFiles(dir), f => { if (token.IsCancellationRequested) token.ThrowIfCancellationRequested(); var fi = new FileInfo(f); lock (obj) { files.Add(Tuple.Create(fi.Name, fi.DirectoryName, fi.Length, fi.LastWriteTimeUtc)); } }); } } , token); await Task.Yield(); tokenSource.Cancel();
【1】新建线程: 1 2 3 4 5 6 7 8 9 10 11 cts = new CancellationTokenSource(); Task task1 = new Task(() => { }, cts.Token); task1.Start(); Task.Factory.StartNew(() => { canopen.set_reg32bit(Convert.ToInt16(textBox1.Text, 16 ), 0 , Convert.ToInt32(textBox6.Text, 16 )); });
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 Task.Run(() => { i= canopen.get_region(Convert.ToInt16(textBox3.Text, 16 ), Convert.ToByte(textBox4.Text, 16 )); this .Invoke(new Action(() => { textBox5.Text = i.ToString("X8" ); })); } ); Task.Factory.StartNew(() => { canopen.set_reg32bit(0x1017 , 0x00 , 7000 ); } );
1 2 3 4 5 6 7 8 9 10 Task<int > task = Task.Run(() => { return 42 ; }); int result = task.Result; Console.WriteLine(result);
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 private void btnStart_Click (object sender, EventArgs e ) { cts = new CancellationTokenSource(); CancellationToken ct = cts.Token; Task task = new Task(async () => { while (true ) { if (!cts.IsCancellationRequested) { resetEvent.WaitOne(); Console.WriteLine("任务执行中..." ); await Task.Delay(1000 ); } else { return ; } } }, ct); task.Start(); }
令牌结束,没有作用 ,实际还是要方法体内判断状态,如果是结束,退出方法体,线程才真正结束。 cts.Cancel();只是将只读属性True,方法体内return才真正结束线程。
1 2 3 4 Task task3 = Task.Factory.StartNew(() => { });
【2】排队等待: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 Task task1 = new Task(() => { Thread.Sleep(1000 ); Console.WriteLine($"Task1子线程Id={Thread.CurrentThread.ManagedThreadId} {DateTime.Now.ToLongTimeString()} " ); }); task1.Start(); Task task2 = new Task(() => { Thread.Sleep(2000 ); Console.WriteLine($"Task2子线程Id={Thread.CurrentThread.ManagedThreadId} {DateTime.Now.ToLongTimeString()} " ); }); task2.Start(); Task.WaitAny(task1, task2); Console.WriteLine("主线程开始运行!Time=" + DateTime.Now.ToLongTimeString());
1 2 3 4 5 6 7 8 9 Task.WhenAll(task1, task2).ContinueWith(task3 => { Console.WriteLine($"Task3子线程Id={Thread.CurrentThread.ManagedThreadId} {DateTime.Now.ToLongTimeString()} " ); }); Console.WriteLine("主线程开始运行!Time=" + DateTime.Now.ToLongTimeString());
12都完成3才run
1 2 3 4 5 6 7 Task.WhenAny(task1, task2).ContinueWith(task3 => { Console.WriteLine($"Task3子线程Id={Thread.CurrentThread.ManagedThreadId} {DateTime.Now.ToLongTimeString()} " ); });
有1个完成,3就run
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 Task parentTask = new Task(() => { Task task1 = new Task(() => { Thread.Sleep(1000 ); }, TaskCreationOptions.AttachedToParent); Task task2 = new Task(() => { Thread.Sleep(3000 ); }, TaskCreationOptions.AttachedToParent); task1.Start(); task2.Start(); }); parentTask.Start(); parentTask.Wait(); Console.WriteLine("主线程开始执行!Time= " + DateTime.Now.ToLongTimeString());
耗时线程: 1 2 3 4 5 6 7 8 9 10 11 12 13 Task task1 = new Task(() => { Thread.Sleep(2000 ); }, TaskCreationOptions.LongRunning); task1.Start(); task1.Wait(); Console.WriteLine("主线程开始执行!Time= " + DateTime.Now.ToLongTimeString());
【3】线程取消: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 using System;using System.Threading;using System.Threading.Tasks; class Program { static async Task Main ( ) { var tokenSource2 = new CancellationTokenSource(); CancellationToken ct = tokenSource2.Token; var task = Task.Run(() => { ct.ThrowIfCancellationRequested(); bool moreToDo = true ; while (moreToDo) { if (ct.IsCancellationRequested) { ct.ThrowIfCancellationRequested(); } } }, tokenSource2.Token); tokenSource2.Cancel(); try { await task; } catch (OperationCanceledException e) { Console.WriteLine($"{nameof (OperationCanceledException)} thrown with message: {e.Message} " ); } finally { tokenSource2.Dispose(); } Console.ReadKey(); } }
任务取消 | Microsoft Learn
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 CancellationTokenSource cts = new CancellationTokenSource(); Task task = Task.Factory.StartNew(() => { while (!cts.IsCancellationRequested) { Thread.Sleep(200 ); Console.WriteLine($"子线程Id={Thread.CurrentThread.ManagedThreadId} {DateTime.Now.ToLongTimeString()} " ); } }, cts.Token); Thread.Sleep(2000 ); cts.Cancel();
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 CancellationTokenSource cts = new CancellationTokenSource(); Task task = Task.Factory.StartNew(() => { while (!cts.IsCancellationRequested) { Thread.Sleep(500 ); Console.WriteLine($"子线程Id={Thread.CurrentThread.ManagedThreadId} {DateTime.Now.ToLongTimeString()} " ); } }, cts.Token); cts.Token.Register(() => { Console.WriteLine("任务取消,开始清理工作......" ); Thread.Sleep(2000 ); Console.WriteLine("任务取消,清理工作结束......" ); }); Thread.Sleep(3000 ); cts.Cancel();
限时任务: 1 2 3 cts.CancelAfter(3000 ); 重新开的线程,需要用新的cts绑定。
Lock锁:锁对象,必须是引用类型 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 private static int nums = 0 ; private static object myLock = new object (); static void Method12 ( ) { for (int i = 0 ; i < 5 ; i++) { Task.Factory.StartNew(() => { TestMethod(); }); } } static void TestMethod ( ) { for (int i = 0 ; i < 100 ; i++) { lock (myLock) { nums++; Console.WriteLine(nums); } } }
lock 内,对象是同一个,才能进入
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 using System;using System.Collections.Generic;using System.Threading;using System.Threading.Tasks; public class Example { public static void Main ( ) { CancellationTokenSource source = new CancellationTokenSource(); CancellationToken token = source.Token; Random rnd = new Random(); Object lockObj = new Object(); List<Task<int []>> tasks = new List<Task<int []>>(); TaskFactory factory = new TaskFactory(token); for (int taskCtr = 0 ; taskCtr <= 10 ; taskCtr++) { int iteration = taskCtr + 1 ; tasks.Add(factory.StartNew( () => { int value ; int [] values = new int [10 ]; for (int ctr = 1 ; ctr <= 10 ; ctr++) { lock (lockObj) { value = rnd.Next(0 ,101 ); } if (value == 0 ) { source.Cancel(); Console.WriteLine("Cancelling at task {0}" , iteration); break ; } values[ctr-1 ] = value ; } return values; }, token)); } try { Task<double > fTask = factory.ContinueWhenAll(tasks.ToArray(), (results) => { Console.WriteLine("Calculating overall mean..." ); long sum = 0 ; int n = 0 ; foreach (var t in results) { foreach (var r in t.Result) { sum += r; n++; } } return sum/(double ) n; } , token); Console.WriteLine("The mean is {0}." , fTask.Result); } catch (AggregateException ae) { foreach (Exception e in ae.InnerExceptions) { if (e is TaskCanceledException) Console.WriteLine("Unable to compute mean: {0}" , ((TaskCanceledException) e).Message); else Console.WriteLine("Exception: " + e.GetType().Name); } } finally { source.Dispose(); } } }
浅谈C#取消令牌CancellationTokenSource 前言 相信大家在使用C#进行开发的时候,特别是使用异步的场景,多多少少会接触到CancellationTokenSource。看名字就知道它和取消异步任务相关的,而且一看便知大名鼎鼎的CancellationToken就是它生产出来的。不看不知道,一看吓一跳。它在取消异步任务、异步通知等方面效果还是不错的,不仅好用而且够强大。无论是微软底层类库还是开源项目涉及到Task相关的,基本上都能看到它的身影,而微软近几年也是很重视框架中的异步操作,特别是在.NET Core上基本上能看到Task的地方就能看到CancellationTokenSource的身影。这次我们抱着学习的态度,来揭开它的神秘面纱。
简单示例 相信对于CancellationTokenSource基本的使用,许多同学已经非常熟悉了。不过为了能够让大家带入文章的节奏,我们还是打算先展示几个基础的操作,让大家找找感觉,回到那个熟悉的年代。
基础操作 首先呈现一个最基础的操作。
1 2 3 4 5 6 CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); CancellationToken cancellationToken = cancellationTokenSource.Token; cancellationToken.Register(() => System.Console.WriteLine("取消了???" )); cancellationToken.Register(() => System.Console.WriteLine("取消了!!!" )); cancellationToken.Register(state => System.Console.WriteLine($"取消了。。。{state} " ),"啊啊啊" ); System.Console.WriteLine("做了点别的,然后取消了." );
这个操作是最简单的操作,我们上面提到过CancellationTokenSource就是用来生产CancellationToken的,还可以说CancellationToken是CancellationTokenSource的表现,这个待会看源码的时候我们会知道为啥这么说。这里呢我们给CancellationToken
注册几个操作,然后使用CancellationTokenSource的Cancel方法
取消操作,这时候控制台就会打印结果如下
1 2 3 4 做了点别的,然后取消了. 取消了。。。啊啊啊 取消了!!! 取消了???
通过上面简单的示例,大家应该非常轻松的理解了它的简单使用。
定时取消 有的时候呢我们可能需要超时操作,比如我不想一直等着,到了一个固定的时间我就要取消操作,这时候我们可以利用CancellationTokenSource的构造函数给定一个限定时间,过了这个时间CancellationTokenSource就会被取消了,操作如下
1 2 3 4 5 6 7 8 CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(3000 ); CancellationToken cancellationToken = cancellationTokenSource.Token; cancellationToken.Register(() => System.Console.WriteLine("我被取消了." )); System.Console.WriteLine("先等五秒钟." ); await Task.Delay(5000 );System.Console.WriteLine("手动取消." ) cancellationTokenSource.Cancel();
然后在控制台打印的结果是这个样子的,活脱脱的为我们实现了内建的超时操作。
上面的写法是在构造CancellationTokenSource的时候设置超时等待,还有另一种写法等同于这种写法,使用的是CancelAfter
方法,具体使用如下
1 2 3 4 5 CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); cancellationTokenSource.Token.Register(() => System.Console.WriteLine("我被取消了." )); cancellationTokenSource.CancelAfter(5000 ); System.Console.WriteLine("不会阻塞,我会执行." );
这个操作也是定时取消操作,需要注意的是CancelAfter
方法并不会阻塞执行,所以打印的结果是
关联取消 还有的时候是这样的场景,就是我们设置一组关联的CancellationTokenSource,我们期望的是只要这一组里的任意一个CancellationTokenSource被取消了,那么这个被关联的CancellationTokenSource就会被取消。说得通俗一点就是,我们几个当中只要一个不在了,那么你也可以不在了,具体的实现方式是这样的
1 2 3 4 5 6 7 8 9 10 11 12 CancellationTokenSource tokenSource = new CancellationTokenSource(); CancellationTokenSource tokenSource2 = new CancellationTokenSource(); CancellationTokenSource tokenSource3 = new CancellationTokenSource(); tokenSource2.Token.Register(() => System.Console.WriteLine("tokenSource2被取消了" )); CancellationTokenSource tokenSourceNew = CancellationTokenSource.CreateLinkedTokenSource(tokenSource.Token, tokenSource2.Token, tokenSource3.Token); tokenSourceNew.Token.Register(() => System.Console.WriteLine("tokenSourceNew被取消了" )); tokenSource2.Cancel();
上述示例中因为tokenSourceNew关联了tokenSource、tokenSource2、tokenSource3所以只要他们其中有一个被取消那么tokenSourceNew也会被取消,所以上述示例的打印结果是
1 2 tokenSourceNew被取消了 tokenSource2被取消了
判断取消 上面我们使用的方式,都是通过回调的方式得知CancellationTokenSource被取消了,没办法通过标识去得知CancellationTokenSource是否可用。不过微软贴心的为我们提供了IsCancellationRequested
属性去判断,需要注意的是它是CancellationToken
的属性,具体使用方式如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 CancellationTokenSource tokenSource = new CancellationTokenSource(); CancellationToken cancellationToken = tokenSource.Token; cancellationToken.Register(() => System.Console.WriteLine("被取消了." )); Task.Run(async ()=> { while (!cancellationToken.IsCancellationRequested) { System.Console.WriteLine("一直在执行..." ); await Task.Delay(1000 ); } }); tokenSource.CancelAfter(5000 );
上述代码五秒之后CancellationTokenSource被取消,因此CancellationTokenSource的Token也会被取消。反映到IsCancellationRequested上就是值为true说明被取消,为false说明没被取消,因此控制台输出的结果是
1 2 3 4 5 6 一直在执行... 一直在执行... 一直在执行... 一直在执行... 一直在执行... 被取消了.
还有另一种方式,也可以主动判断任务是否被取消,不过这种方式简单粗暴,直接是抛出了异常。如果是使用异步的方式的话,需要注意的是Task内部异常的捕获方式,否则对外可能还没有感知到具体异常的原因,它的使用方式是这样的,这里为了演示方便我直接换了一种更直接的方式
1 2 3 4 5 6 7 8 9 10 11 CancellationTokenSource tokenSource = new CancellationTokenSource(); CancellationToken cancellationToken = tokenSource.Token; cancellationToken.Register(() => System.Console.WriteLine("被取消了." )); tokenSource.CancelAfter(5000 ); while (true ){ cancellationToken.ThrowIfCancellationRequested(); System.Console.WriteLine("一直在执行..." ); await Task.Delay(1000 ); }
执行五秒之后则直接抛出 System.OperationCanceledException: The operation was canceled.
异常,异步情况下注意异常处理的方式即可。通过上面这些简单的示例,相信大家对CancellationTokenSource有了一定的认识,大概知道了在什么时候可以使用它,主要是异步取消通知,或者限定时间操作通知等等。CancellationTokenSource是个不错的神器,使用简单功能强大。
源码探究 通过上面的示例,相信大家对CancellationTokenSource有了一个基本的认识,真的是非常强大,而且使用起来也非常的简单,这也是c#语言的精妙之处,非常实用,让你用起来的时候非常舒服,有种用着用着就想跪下的冲动。步入正题,接下来让我们来往深处看看CancellationTokenSource的源码,看看它的工作机制是啥。本文贴出的源码是博主精简过的,毕竟源码太多不太可能全部粘贴出来,主要是跟着它的思路了解它的工作方式。
构造入手 因为这一次呢CancellationTokenSource
的初始化函数中有一个比较重要的构造函数,那就是可以设置定时超时的操作,那么我们就从它的构造函数入手[点击查看源码????[1]]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 private volatile int _state;private const int NotCanceledState = 1 ; public CancellationTokenSource ( ) => _state = NotCanceledState; public CancellationTokenSource (TimeSpan delay ) { long totalMilliseconds = (long )delay.TotalMilliseconds; if (totalMilliseconds < -1 || totalMilliseconds > int .MaxValue) { throw new ArgumentOutOfRangeException(nameof (delay)); } InitializeWithTimer((int )totalMilliseconds); } public CancellationTokenSource (int millisecondsDelay ) { if (millisecondsDelay < -1 ) { throw new ArgumentOutOfRangeException(nameof (millisecondsDelay)); } InitializeWithTimer(millisecondsDelay); }
无参构造函数没啥好说的,就是给全局state状态初始化NotCanceledState的初始值,也就是初始化状态。我们比较关注的是可以定时取消的构造函数,虽然是两个构造函数,但是殊途同归,本质都是传递的毫秒整形参数,而且调用的核心方法都是InitializeWithTimer
,看来是一个定时器操作,这样不奇怪了,我们看下InitializeWithTimer
方法的实现[点击查看源码????[2]]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 private const int NotifyingCompleteState = 2 ;private volatile TimerQueueTimer? _timer;private static readonly TimerCallback s_timerCallback = TimerCallback;private static void TimerCallback (object ? state ) => ((CancellationTokenSource)state!).NotifyCancellation(throwOnFirstException: false ); private void InitializeWithTimer (uint millisecondsDelay ) { if (millisecondsDelay == 0 ) { _state = NotifyingCompleteState; } else { _timer = new TimerQueueTimer(s_timerCallback, this , millisecondsDelay, Timeout.UnsignedInfinite, flowExecutionContext: false ); } }
通过这个方法,我们可以可以非常清晰的看到定时初始化的核心操作其实就是初始化一个定时器,而定时的时间就是我们初始化传递的毫秒数,其中s_timerCallback
是定时的回调函数,即如果等待超时之后则调用这个委托,其本质正是CancellationTokenSource的NotifyCancellation方法
,这个方法正是处理超时之后的操作[点击查看源码????[3]]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 private volatile ManualResetEvent? _kernelEvent;private void NotifyCancellation (bool throwOnFirstException ) { if (!IsCancellationRequested && Interlocked.CompareExchange(ref _state, NotifyingState, NotCanceledState) == NotCanceledState) { TimerQueueTimer? timer = _timer; if (timer != null ) { _timer = null ; timer.Close(); } _kernelEvent?.Set(); ExecuteCallbackHandlers(throwOnFirstException); Debug.Assert(IsCancellationCompleted, "Expected cancellation to have finished" ); } }
NotifyCancellation正是处理定时器到时的操作,说白了就是到了指定的时间但是没有手动取消执行的操作,其实也是执行的取消操作,这个方法里涉及到了两个比较重要的点,也是接下来我们会分析的点,这里做一下说明
•首先是ManualResetEvent
这个实例,这个类的功能是通过信号机制控制是否阻塞或执行后续操作,与之相辅的还有另一个类AutoResetEvent
。这两个类实现的效果是一致的,只是ManualResetEvent需要手动重置初始状态,而AutoResetEvent则会自动重置。有关两个类的说明,这里不做过多介绍,有需要了解的同学们可以自行百度。而CancellationTokenSource类的一个重要属性WaitHandle
正是使用的它。•还有一个是ExecuteCallbackHandlers
方法,这个是CancellationTokenSource执行取消操作的核心操作。为了保证阅读的顺序性,咱们在讲取消操作的时候在重点讲这个方法。
上面提到了,为了保证阅读的顺序性方便理解,咱们在本文接下来会讲解这两部分,就不再初始化这里讲解了,这里做一下标记,以防大家觉得没讲清楚就继续了。
小插曲WaitHandle 上面我们提到了CancellationTokenSource的WaitHandle属性,它是基于ManualResetEvent实现的。这个算是一个稍微独立的地方,我们可以先进行讲解一下[点击查看源码????[4]]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 private volatile ManualResetEvent? _kernelEvent;internal WaitHandle WaitHandle{ get { ThrowIfDisposed(); if (_kernelEvent != null ) { return _kernelEvent; } var mre = new ManualResetEvent(false ); if (Interlocked.CompareExchange(ref _kernelEvent, mre, null ) != null ) { mre.Dispose(); } if (IsCancellationRequested) { _kernelEvent.Set(); } return _kernelEvent; } }
通过这段代码我们可以看到,如果使用了WaitHandle属性则可以使用它实现简单的阻塞通知操作,也就是收到取消通知操作之后我们可以执行WaitHandle之后的操作,但是WaitHandle是internal修饰的,我们该怎么使用呢?莫慌,我们知道CancellationTokenSource的Token属性获取的是CancellationToken
实例[点击查看源码????[5]]
1 2 3 4 5 6 7 8 public CancellationToken Token{ get { ThrowIfDisposed(); return new CancellationToken(this ); } }
直接实例化了一个CancellationToken实例返回去了,并传递了当前CancellationTokenSource实例,找到CancellationToken
的这个构造函数[点击查看源码????[6]]
1 2 3 private readonly CancellationTokenSource? _source;internal CancellationToken (CancellationTokenSource? source ) => _source = source;public WaitHandle WaitHandle => (_source ?? CancellationTokenSource.s_neverCanceledSource).WaitHandle;
通过上面的代码我们可以看到通过CancellationToken实例便可以使用WaitHandle属性,实现我们访问到它的效果,光是说的话可能有点迷糊,通过一个简单的示例我们来了解WaitHandle的使用方式,简单来看下
1 2 3 4 5 6 7 8 9 10 CancellationTokenSource tokenSource = new CancellationTokenSource(); CancellationToken cancellationToken = tokenSource.Token; cancellationToken.Register(() => System.Console.WriteLine("被取消了." )); tokenSource.CancelAfter(5000 ); Task.Run(()=> { System.Console.WriteLine("阻塞之前" ); cancellationToken.WaitHandle.WaitOne(); System.Console.WriteLine("阻塞取消,执行到了." ); }); System.Console.WriteLine("执行到了这里" );
在CancellationTokenSource为被取消之前WaitHandle.WaitOne()
方法会阻塞后续执行,也就是下面的输出暂时不会输出。等到CancellationTokenSource执行了Cancel操作里调用了ManualResetEvent的Set方法停止阻塞,后续的输出才会被执行到这是一个同步操作,如果了解ManualResetEvent
的同学相信对这个不难理解。为了演示效果我用Task演示异步的情况,所以执行的结果如下所示
1 2 3 4 执行到了这里 阻塞之前 阻塞取消,执行到了. 被取消了.
注册操作 上面我们大概讲解了一些初始化相关的和一些辅助的操作,接下来我们看一下核心的注册操作,注册操作的用途就是注册CancellationTokenSource取消或超时后需要执行的动作,而注册Register
的操作并未由CancellationTokenSource直接进行,而是通过它的Token
属性即CancellationToken实例操作的,话不多说直接找到CancellationToken的Register方法[点击查看源码????[7]]
1 2 3 4 5 6 public CancellationTokenRegistration Register (Action callback ) =>Register( s_actionToActionObjShunt, callback ?? throw new ArgumentNullException(nameof (callback)), useSynchronizationContext: false , useExecutionContext: true );
它是直接调用自己的重载方法,注意几个参数,如果看细节的话还是要关注方法参数的。过程就省略了,直接找到最底层的方法[点击查看源码????[8]]
1 2 3 4 5 6 7 8 9 10 11 private CancellationTokenRegistration Register (Action<object ?> callback, object ? state, bool useSynchronizationContext, bool useExecutionContext ) { if (callback == null ) throw new ArgumentNullException(nameof (callback)); CancellationTokenSource? source = _source; return source != null ? source.InternalRegister(callback, state, useSynchronizationContext ? SynchronizationContext.Current : null , useExecutionContext ? ExecutionContext.Capture() : null ) : default ;
从这个最底层的方法我们可以得知,其本质还是调用CancellationTokenSource的InternalRegister方法,核心操作都不在CancellationToken还是在CancellationTokenSource类,CancellationToken更像是依赖CancellationTokenSource的表现类,看一下InternalRegister方法[点击查看源码????[9]]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 private volatile CallbackPartition?[]? _callbackPartitions;private static readonly int s_numPartitions = GetPartitionCount(); internal CancellationTokenRegistration InternalRegister ( Action<object ?> callback, object ? stateForCallback, SynchronizationContext? syncContext, ExecutionContext? executionContext ) { if (!IsCancellationRequested) { if (_disposed) { return default ; } CallbackPartition?[]? partitions = _callbackPartitions; if (partitions == null ) { partitions = new CallbackPartition[s_numPartitions]; partitions = Interlocked.CompareExchange(ref _callbackPartitions, partitions, null ) ?? partitions; } int partitionIndex = Environment.CurrentManagedThreadId & s_numPartitionsMask; CallbackPartition? partition = partitions[partitionIndex]; if (partition == null ) { partition = new CallbackPartition(this ); partition = Interlocked.CompareExchange(ref partitions[partitionIndex], partition, null ) ?? partition; } long id; CallbackNode? node; bool lockTaken = false ; partition.Lock.Enter(ref lockTaken); try { id = partition.NextAvailableId++; node = partition.FreeNodeList; if (node != null ) { partition.FreeNodeList = node.Next; } else { node = new CallbackNode(partition); } node.Id = id; node.Callback = callback; node.CallbackState = stateForCallback; node.ExecutionContext = executionContext; node.SynchronizationContext = syncContext; node.Next = partition.Callbacks; if (node.Next != null ) { node.Next.Prev = node; } partition.Callbacks = node; } finally { partition.Lock.Exit(useMemoryBarrier: false ); } var ctr = new CancellationTokenRegistration(id, node); if (!IsCancellationRequested || !partition.Unregister(id, node)) { return ctr; } } callback(stateForCallback); return default ; }
这里涉及到一个比较核心的类那就是CallbackPartition
,这是一个内部类,它的主要用途就是辅助构建执行回调的链表操作,其大概实现是这个样子的[点击查看源码????[10]]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 internal sealed class CallbackPartition { public readonly CancellationTokenSource Source; public SpinLock Lock = new SpinLock(enableThreadOwnerTracking: false ); public CallbackNode? Callbacks; public CallbackNode? FreeNodeList; public long NextAvailableId = 1 ; public CallbackPartition (CancellationTokenSource source ) { Source = source; } internal bool Unregister (long id, CallbackNode node ) { } }
这里面我暂时没有列出Unregister
的内容,因为它是和取消相关的,说到取消的时候咱们再看,如果返回true则说明取消成功。这个类核心就是辅助构建Register回调链表的,它的核心都是在操作CallbackNode
节点和其构建的回调链表,而CallbackNode则是链表的一个节点定义,其大致结构如下[点击查看源码????[11]]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 internal sealed class CallbackNode { public readonly CallbackPartition Partition; public CallbackNode? Prev; public CallbackNode? Next; public long Id; public Action<object ?>? Callback; public object ? CallbackState; public ExecutionContext? ExecutionContext; public SynchronizationContext? SynchronizationContext; public CallbackNode (CallbackPartition partition ) { Partition = partition; } public void ExecuteCallback ( ) { } }
到了这里关于Register
涉及到的核心操作都罗列出来了,由于贴出来的是源码相关看着是比较蒙圈的,但是如果顺着看的话其实还是大致的实现思路还是可以理解的,这里我大致的总结一下它的实现思路
•首先是构建了CallbackPartition
数组,构建这个数组的长度是根据CPU的核心数来决定,每个CallbackPartition是操作的核心,为了防止过多的线程同时操作一个CallbackPartition实例,它采用了为不同线程分区的思路,CallbackPartition维护了构建链表节点的类CallbackNode。•CallbackNode
是组成链表的核心,CallbackNode每个实例都是链表的一个节点,从它自包含Prev和Next属性便可以看出是一个双向链表。•CallbackPartition的核心功能就是为了构建Register
进来的回调,从上面的InternalRegister
方法里的操作我们可以得知,通过CallbackPartition的辅助将CallbackNode节点构建为一个倒序链表,也就是最新的CallbackNode实例是链表的首节点,而最老的CallbackNode实例则是链表的尾节点。每一次Register进来的回调,都被包装成了CallbackNode添加到这个链表中。
上面InternalRegister方法里我们看到操作CallbackNode的时候,使用了SpinLock自旋锁。短时间锁定的情况下SpinLock更快,因为自旋锁本质上不会让线程休眠,而是一直循环尝试对资源访问,直到可用。所以自旋锁线程被阻塞时,不进行线程上下文切换,而是空转等待。对于多核CPU而言,减少了切换线程上下文的开销,从而提高了性能。
取消操作 上面我们看到了注册相关的操作,注册还是比较统一的,就一种操作方式。取消却有两种方式,一种是超时取消,另一种是主动取消,接下来我们就分别看一下这两种方式分别是如何操作的。
Cancel操作 首先我们来看主动取消的操作方式这个是最简单最直接的方式,而且这个方法属于CancellationTokenSource类,话不多说直接看实现[点击查看源码????[12]]
1 2 3 4 5 6 7 public void Cancel ( ) => Cancel(false ); public void Cancel (bool throwOnFirstException ) { ThrowIfDisposed(); NotifyCancellation(throwOnFirstException); }
重点来了Cancel
方法居然也是调用的NotifyCancellation
方法,这个方法咱们上面已经看过了。在说定时的方式构造CancellationTokenSource的时候有一个自动取消的操作,提到了NotifyCancellation
方法的核心是ExecuteCallbackHandlers
方法,这个是CancellationTokenSource执行取消操作的核心操作。还说了为了保证阅读的顺序性,咱们在讲取消操作的时候在重点讲这个方法。看来这个时刻终于还是到来了,直接打开ExecuteCallbackHandlers方法[点击查看源码????[13]]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 private volatile int _threadIDExecutingCallbacks = -1 ;private volatile CallbackPartition?[]? _callbackPartitions;private const int NotifyingCompleteState = 3 ;private void ExecuteCallbackHandlers (bool throwOnFirstException ) { ThreadIDExecutingCallbacks = Environment.CurrentManagedThreadId; CallbackPartition?[]? partitions = Interlocked.Exchange(ref _callbackPartitions, null ); if (partitions == null ) { Interlocked.Exchange(ref _state, NotifyingCompleteState); return ; } List<Exception>? exceptionList = null ; try { foreach (CallbackPartition? partition in partitions) { if (partition == null ) { continue ; } while (true ) { CallbackNode? node; bool lockTaken = false ; partition.Lock.Enter(ref lockTaken); try { node = partition.Callbacks; if (node == null ) { break ; } else { if (node.Next != null ) node.Next.Prev = null ; partition.Callbacks = node.Next; } _executingCallbackId = node.Id; node.Id = 0 ; } finally { partition.Lock.Exit(useMemoryBarrier: false ); } try { if (node.SynchronizationContext != null ) { node.SynchronizationContext.Send(static s => { var n = (CallbackNode)s!; n.Partition.Source.ThreadIDExecutingCallbacks = Environment.CurrentManagedThreadId; n.ExecuteCallback(); }, node); ThreadIDExecutingCallbacks = Environment.CurrentManagedThreadId; } else { node.ExecuteCallback(); } } catch (Exception ex) when (!throwOnFirstException) { (exceptionList ??= new List<Exception>()).Add(ex); } } } } finally { _state = NotifyingCompleteState; Volatile.Write(ref _executingCallbackId, 0 ); Interlocked.MemoryBarrier(); } if (exceptionList != null ) { Debug.Assert(exceptionList.Count > 0 , $"Expected {exceptionList.Count} > 0" ); throw new AggregateException(exceptionList); } }
关于ExecuteCallback
方法是CallbackNode
类的方法,也就是咱们上面罗列CallbackNode类结构时被省略的方法,它的主要功能就是调用Register的回调,也就是执行Register里的委托。欠下的我会补上来,注意这里是CallbackNode
类,接下来看下实现[点击查看源码????[14]]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public ExecutionContext? ExecutionContext;public void ExecuteCallback ( ) { ExecutionContext? context = ExecutionContext; if (context != null ) { ExecutionContext.RunInternal(context, static s => { Debug.Assert(s is CallbackNode, $"Expected {typeof (CallbackNode)} , got {s} " ); CallbackNode n = (CallbackNode)s; Debug.Assert(n.Callback != null ); n.Callback(n.CallbackState); }, this ); } else { Debug.Assert(Callback != null ); Callback(CallbackState); } }
关于取消的核心方法ExecuteCallbackHandlers
的重要操作,咱们已经罗列出来了,其实我们看到注册的思路的时候,就已经能猜到执行取消回调的大致思路了,既然Register的时候进行了拉链,那么取消执行注册回调肯定是变量链表执行里面的Callback了,大致总结一下
•执行Cancel之后核心操作还是针对构建的CallbackNode链表进行遍历,咱们之前说过构建的CallbackNode链表是倒序链表,最新的节点放在链表的首部,这也就解释了为啥我们上面的示例Register多个委托的时候,最先输出的是最后注册委托。•Register注册时候有参数判断是否需要传递当前同步上下文SynchronizationContext和执行上下文ExecutionContext,作用就是为了是否在当时的上下文环境执行Callback回调操作。•上面的遍历代码我们看到了会执行CallbackNode.Next.Prev=null
的操作,是为了断开当前链表节点和上下节点的关系,个人感觉是为了切断对象引用方便释放的,防止内存泄漏,同时也说明了默认情况下Register的的回调函数执行是一次性的,当执行完Cancel操作之后当前CancellationToken实例也就失效了。
CancelAfter操作 之前我们演示的时候说过有两种方式可以执行超时取消操作,一种是在构建CancellationTokenSource实例构造的时候传递超时时间,还有另一种是使用CancelAfter
操作,这个方法表示在指定时间之后取消,效果上等同于实例化CancellationTokenSource的时候传递超时时间的操作,废话不多说直接罗列代码[点击查看源码????[15]]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 public void CancelAfter (TimeSpan delay ) { long totalMilliseconds = (long )delay.TotalMilliseconds; if (totalMilliseconds < -1 || totalMilliseconds > int .MaxValue) { throw new ArgumentOutOfRangeException(nameof (delay)); } CancelAfter((int )totalMilliseconds); } private static readonly TimerCallback s_timerCallback = obj =>{ ((CancellationTokenSource)obj).NotifyCancellation(throwOnFirstException: false ); }; public void CancelAfter (int millisecondsDelay ) { if (millisecondsDelay < -1 ) { throw new ArgumentOutOfRangeException(nameof (millisecondsDelay)); } if (IsCancellationRequested) { return ; } TimerQueueTimer? timer = _timer; if (timer == null ) { timer = new TimerQueueTimer(s_timerCallback, this , Timeout.UnsignedInfinite, Timeout.UnsignedInfinite, flowExecutionContext: false ); TimerQueueTimer? currentTimer = Interlocked.CompareExchange(ref _timer, timer, null ); if (currentTimer != null ) { timer.Close(); timer = currentTimer; } } try { timer.Change((uint )millisecondsDelay, Timeout.UnsignedInfinite); } catch (ObjectDisposedException) { } }
通过上面的源码我们可以看到CancelAfter的操作代码和传递超时时间构造CancellationTokenSource的代码基本上是一致的,都是通过TimerQueueTimer的方式定时触发调用CancellationTokenSource的NotifyCancellation方法,而NotifyCancellation
方法的核心实现就是ExecuteCallbackHandlers
方法,这些方法咱们上面都有讲解过,就不重复介绍了,这样关于取消相关的操作我们也就全部讲解完成了。
总结 本文我们主要讲解了C#取消令牌CancellationTokenSource,虽然设计到的类并不多,但是这部分源码并不少,而且也只是讲解核心功能的部分源码,有兴趣的同学可以自行阅读这个类相关代码,如果你觉得你的GitHub比较不给力推荐一个可以阅读CoreCLR源码的网站source.dot.net[16]这个网站看到的是目前CoreCLR最新的源码,可以直接连接到GitHub非常方便,但是最新版本的源码和稳定版本的有些差别,这个还需要注意。由于文章比较长,再加上笔者技术能力和文笔能力都有限,这里做一下简单的总结
•CancellationTokenSource的用途就是可以感知到取消操作,其中涉及到的Register回调、WaitHandle、IsCancellationRequested都能实现这个功能,当然它还支持超时取消操作。•CancellationTokenSource的Register和Cancel相关成双成对的,虽然有CancelAfter和构造传递超时时间的方式,其本质和Cancel操作是一样的。•CancellationTokenSource的核心操作原理,是通过CallbackPartition
和CallbackNode
构建倒序链表,Register的时候通过Callback委托构建链表,Cancel的时候遍历构建的链表执行Callback,虽然有一堆额外操作,但是核心工作方式就是链表操作。•需要注意的是,默认情况下CancellationTokenSource产生的CancellationToken是一次性的,取消了之后是没有办法进行重置的,当然微软已经为我们提供了IChangeToken
去解决了CancellationToken重复触发的问题,请放心使用。
由于本篇文章篇幅较长,加上笔者能力有限,文笔更是一般,如果讲解的不清楚还望谅解,或者感兴趣的同学可以自行阅读源码。关于看源码每个人都有自己的关注点,我一般的初衷都是弄明白它的原理,顺便学习下它代码风格或思路。学无止境,结果有时候并不那么重要,过程才重要。就和许多人追求自己能有到达什么样的高度,成功其实只是成长过程中顺便的一种表现,就和你如果不满现状,说明你在很早之前没想过改变自己一样。
References [1]
点击查看源码????: https://github.com/dotnet/runtime/blob/v5.0.9/src/libraries/System.Private.CoreLib/src/System/Threading/CancellationTokenSource.cs#L168 [2]
点击查看源码????: https://github.com/dotnet/runtime/blob/v5.0.9/src/libraries/System.Private.CoreLib/src/System/Threading/CancellationTokenSource.cs#L212 [3]
点击查看源码????: https://github.com/dotnet/runtime/blob/v5.0.9/src/libraries/System.Private.CoreLib/src/System/Threading/CancellationTokenSource.cs#L578 [4]
点击查看源码????: https://github.com/dotnet/runtime/blob/v5.0.9/src/libraries/System.Private.CoreLib/src/System/Threading/CancellationTokenSource.cs#L110 [5]
点击查看源码????: https://github.com/dotnet/runtime/blob/v5.0.9/src/libraries/System.Private.CoreLib/src/System/Threading/CancellationTokenSource.cs#L101 [6]
点击查看源码????: https://github.com/dotnet/runtime/blob/v5.0.9/src/libraries/System.Private.CoreLib/src/System/Threading/CancellationToken.cs#L93 [7]
点击查看源码????: https://github.com/dotnet/runtime/blob/v5.0.9/src/libraries/System.Private.CoreLib/src/System/Threading/CancellationToken.cs#L139 [8]
点击查看源码????: https://github.com/dotnet/runtime/blob/v5.0.9/src/libraries/System.Private.CoreLib/src/System/Threading/CancellationToken.cs#L270 [9]
点击查看源码????: https://github.com/dotnet/runtime/blob/v5.0.9/src/libraries/System.Private.CoreLib/src/System/Threading/CancellationTokenSource.cs#L477 [10]
点击查看源码????: https://github.com/dotnet/runtime/blob/v5.0.9/src/libraries/System.Private.CoreLib/src/System/Threading/CancellationTokenSource.cs#L921 [11]
点击查看源码????: https://github.com/dotnet/runtime/blob/v5.0.9/src/libraries/System.Private.CoreLib/src/System/Threading/CancellationTokenSource.cs#1008 [12]
点击查看源码????: https://github.com/dotnet/runtime/blob/v5.0.9/src/libraries/System.Private.CoreLib/src/System/Threading/CancellationTokenSource.cs#250 [13]
点击查看源码????: https://github.com/dotnet/runtime/blob/v5.0.9/src/libraries/System.Private.CoreLib/src/System/Threading/CancellationTokenSource.cs#L608 [14]
点击查看源码????: https://github.com/dotnet/runtime/blob/v5.0.9/src/libraries/System.Private.CoreLib/src/System/Threading/CancellationTokenSource.cs#L1026 [15]
点击查看源码????: https://github.com/dotnet/runtime/blob/v5.0.9/src/libraries/System.Private.CoreLib/src/System/Threading/CancellationTokenSource.cs#L303 [16]
source.dot.net: https://source.dot.net/
相关链接
Task启动,暂停,继续,结束
C#中CancellationTokenSource的理解
浅谈C#取消令牌CancellationTokenSource
=================我是分割线=================
欢迎到公众号来唠嗑: