聊聊.net 并发控制,lock,Monitor,Semaphore,BlockingQueue串讲

  • A+
所属分类:.NET技术
摘要

面试(对,最近在找工作面试…)被问到,.net 并发控制怎么做,BlockingQueue和CuncurrenceQueue有什么区别?

面试(对,最近在找工作面试...)被问到,.net 并发控制怎么做,BlockingQueue和CuncurrenceQueue有什么区别?

那么我们就来聊一下.net并发相关的知识,接下来会说到下面几个并发控制相关功能:

lock

Monitor

Semaphore

CuncurrenceQueue

BlockingQueue

BlockingCollection

 

一、lock

说到并发控制,我们首先想到的肯定是 lock关键字。

这里要说一下,lock锁的究竟是什么?是lock下面的代码块吗,不,是locker对象。

我们想象一下,locker对象相当于一把门锁(或者钥匙),后面代码块相当于屋里的资源。

哪个线程先控制这把锁,就有权访问代码块,访问完成后再释放权限,下一个线程再进行访问。

注意:如果代码块中的逻辑执行时间很长,那么其他线程也会一直等下去,直到上一个线程执行完毕,释放锁。

 1         object locker = new object();  2   3         private void Add()  4         {  5             lock (locker)  6             {  7                 Thread.Sleep(1000);  8                 counter++;  9                 this.logger.LogDebug($"{DateTime.Now.ToLongTimeString()} Add counter={counter}."); 10             } 11         }

 

二、Moniter

Monitor是一个静态类(System.Threading.Monitor),功能与lock关键字基本一样,也是加锁,控制并发。

有两个重要的方法:

Monitor.Enter()  //获取一个锁

Monitor.Exit()   //释放一个锁 

另外几个方法:

public static bool TryEnter(object obj, int millisecondsTimeout)  //相比于 public static void Enter(object obj) 方法,多了超时时间设置,如果等待超过一定时间,就不再等待了,另外,只有TryEnter()返回值为true时,才能进入代码块。

public static bool Wait(object obj, int millisecondsTimeout)    //这个方法在已经获得锁权限的代码块中调用时,或暂时释放锁,等待一定时间后,重新获取锁权限,继续执行Wait后面的代码。(真想不明怎么会有这种相互礼让的操作)

public static void Pulse(object obj)      //这个方法的解释是,通知在等待队列中的线程,锁对象状态改变。(测试发现,此方法并不会真正改变锁定状态,只是通知的作用)

 TryEnter代码示例:

 1         int counter = 0;  2         object locker = new object();  3   4         private void Minus()  5         {  6             //加上try -catch-finally,防止由于异常,锁无法释放,这也是为什么我们更多使用lock而不是Moniter的原因。  7             try  8             {  9                 //只有TryEnter()返回值为true时,才能进入代码块,与Enter()方法不一样 10                 if (Monitor.TryEnter(locker, 5000)) 11                 { 12                     this.logger.LogDebug($"{DateTime.Now.ToLongTimeString()} Minus in"); 13                     Thread.Sleep(1000); 14                     counter--; 15                     this.logger.LogDebug($"{DateTime.Now.ToLongTimeString()} Minus counter={counter}."); 16                 } 17             } 18             catch (Exception ex) 19             { 20                 this.logger.LogDebug($"{DateTime.Now.ToLongTimeString()} Minus Exception {ex.Message}"); 21             } 22             finally 23             { 24                 Monitor.Exit(locker); 25             } 26         }

 

 

通过上面的代码,我们可以看出Monitor和lock实现的功能基本一致,但Monitor的使用要明显比lock更复杂,也行这就是我们平时更多的使用lock,而不是Monitor的原因。

 

三、Semaphore 信号量

System.Threading.Semaphore 

lock和Monitor加锁之后,每次只能有一个线程访问临界代码,信号量类似于一个线程池,线程访问之前获取一个信号,访问完成释放信号,只要信号量内有可用信号便可以访问,否则等待。

构造函数:

public Semaphore(int initialCount, int maximumCount)  //创建一个信号量,指定初始信号数量和最大信号数量。

几个重要方法:

public int Release()        //代码注释的意思是:退出信号量,并返回之前的(可用信号)数量。实际上,除了退出,这个方法每调用一次会增加一个可用信号,但数量达到最大数量时会抛异常。

public int Release(int releaseCount)   //和上面的方法类似,上面的方法每次只释放一个信号,这个方法可以指定信号数量。

public virtual bool WaitOne()    //等待一个可用信号

看下面的示例代码,如果只初始一个信号量,new Semaphore(1, 100),运行结果与lock和Monitor是一样的,两个方法交替执行,如果初始信号量为多个时,new Semaphore(3, 100),执行效率高的方法要占用更多的信号,从而执行更多次。

 1         int counter = 0;  2         int semaphoreCount = 0;  3         Semaphore semaphore = new Semaphore(3, 100);  4   5         private void Add()  6         {  7             semaphore.WaitOne();  8             Thread.Sleep(1000);  9             counter++; 10             semaphoreCount = semaphore.Release(); 11             this.logger.LogDebug($"{DateTime.Now.ToLongTimeString()} Add counter={counter}.SemaphoreCount:{semaphoreCount}"); 12         } 13  14         private void Minus() 15         { 16             semaphore.WaitOne(); 17             Thread.Sleep(2000); 18             counter--; 19             semaphore.Release(); 20             this.logger.LogDebug($"{DateTime.Now.ToLongTimeString()} Minus counter={counter}.SemaphoreCount:{semaphoreCount}"); 21         }

 

Semaphore在生产者/消费者模式下的应用

生产者每次添加一个信号,消费者每次消耗一个信号,如果信号量为0,则消费者进入等待状态。

 1         int counter = 0;  2         int semaphoreCount = 0;  3         Semaphore semaphore = new Semaphore(0, int.MaxValue);  4   5         private void Product()  6         {  7             semaphoreCount = semaphore.Release();  8             Thread.Sleep(1000);  9             counter++; 10             this.logger.LogDebug($"{DateTime.Now.ToLongTimeString()} Product counter={counter}.SemaphoreCount:{semaphoreCount}"); 11         } 12  13         private void Consume() 14         { 15             semaphore.WaitOne(); 16             Thread.Sleep(2000); 17             counter--; 18             this.logger.LogDebug($"{DateTime.Now.ToLongTimeString()} Consume counter={counter}.SemaphoreCount:{semaphoreCount}"); 19         }

 

 

四、ConcurrentQueue 和 Queue

.net 集合中有一类线程安全的集合 System.Collections.Concurrent,ConcurrentQueue 就是其中的一个,线程安全的队列,有普通队列Queue先进先出的特点,同时又具备多线程安全。

测试过程中发现:

Queue 类的两个出队列方法 Dequeue()TryDequeue(out result),在多线程环境下,Dequeue() 会出现并发访问错误,但TryDequeue(out result)不会,即TryDequeue(out result)即使不加锁,在多线程环境下也运行正常。

ConcurrentQueue 类只有一个出队列方法 TryDequeue(out result),当然,是线程安全的。

五、BlockingQueue

BlockingQueue并发.net内置的类,有人说到这个类,那他多半是在说BlockingCollection

BlockingQueue有一篇很不错的文章,可以参考一下:

https://docs.microsoft.com/zh-cn/archive/blogs/toub/blocking-queues

 

六、BlockingCollection

BlockingCollection是.net内置的类,相当于带有阻塞功能的ConcurrentQueue ,相比较ConcurrentQueue ,BlockingCollection在从队列中读取数据时,如果队列为空,那么它会等待(block),直到有数据可读取。

而ConcurrentQueue ,需要我们自行判断是否读取了数据,并且控制循环读取的频率。

.net 文档对这个类解释的非常详细,可以仔细阅读:

https://docs.microsoft.com/zh-cn/dotnet/api/system.collections.concurrent.blockingcollection-1?view=netcore-3.1