當前位置:
首頁 > 最新 > C#ConcurrentBag的實現原理

C#ConcurrentBag的實現原理

來源:InCerry

cnblogs.com/InCerry/p/9497729.html

一、前言

最近在做一個項目,項目中為了提升吞吐量,使用了消息隊列,中間實現了生產消費模式,在生產消費者模式中需要有一個集合,來存儲生產者所生產的物品,筆者使用了最常見的List集合類型。

由於生產者線程有很多個,消費者線程也有很多個,所以不可避免的就產生了線程同步的問題。

開始筆者是使用lock關鍵字,進行線程同步,但是性能並不是特別理想,然後有網友說可以使用SynchronizedList來代替使用List達到線程安全的目的。

於是筆者就替換成了SynchronizedList,但是發現性能依舊糟糕,於是查看了SynchronizedList的源代碼,發現它就是簡單的在List提供的API的基礎上加了lock,所以性能基本與筆者實現方式相差無幾。

最後筆者找到了解決的方案,使用ConcurrentBag類來實現,性能有很大的改觀,於是筆者查看了ConcurrentBag的源代碼,實現非常精妙,特此在這記錄一下。

二、ConcurrentBag類

ConcurrentBag實現了IProducerConsumerCollection介面,該介面主要用於生產者消費者模式下,可見該類基本就是為生產消費者模式定製的。

然後還實現了常規的IReadOnlyCollection類,實現了該類就需要實現IEnumerable、IEnumerable、 ICollection類。

ConcurrentBag對外提供的方法沒有List那麼多,但是同樣有Enumerable實現的擴展方法。類本身提供的方法如下所示。

三、 ConcurrentBag線程安全實現原理

1. ConcurrentBag的私有欄位

ConcurrentBag線程安全實現主要是通過它的數據存儲的結構和細顆粒度的鎖。

public class ConcurrentBag : IProducerConsumerCollection, IReadOnlyCollection

{

// ThreadLocalList對象包含每個線程的數據

ThreadLocal m_locals;

// 這個頭指針和尾指針指向中的第一個和最後一個本地列表,這些本地列表分散在不同線程中

// 允許在線程局部對象上枚舉

volatile ThreadLocalList m_headList, m_tailList;

// 這個標誌是告知操作線程必須同步操作

// 在GlobalListsLock 鎖中 設置

bool m_needSync;

}

首選我們來看它聲明的私有欄位,其中需要注意的是集合的數據是存放在ThreadLocal線程本地存儲中的。

也就是說訪問它的每個線程會維護一個自己的集合數據列表,一個集合中的數據可能會存放在不同線程的本地存儲空間中,所以如果線程訪問自己本地存儲的對象,那麼是沒有問題的,這就是實現線程安全的第一層,使用線程本地存儲數據。

然後可以看到ThreadLocalList m_headList, m_tailList;這個是存放著本地列表對象的頭指針和尾指針,通過這兩個指針,我們就可以通過遍歷的方式來訪問所有本地列表。它使用volatile修飾,所以它是線程安全的。

最後又定義了一個標誌,這個標誌告知操作線程必須進行同步操作,這是實現了一個細顆粒度的鎖,因為只有在幾個條件滿足的情況下才需要進行線程同步。

2. 用於數據存儲的TrehadLocalList類

接下來我們來看一下ThreadLocalList類的構造,該類就是實際存儲了數據的位置。實際上它是使用雙向鏈表這種結構進行數據存儲。

[Serializable]

// 構造了雙向鏈表的節點

internal class Node

{

public Node(T value)

{

m_value = value;

}

public readonly T m_value;

public Node m_next;

public Node m_prev;

}

///

/// 集合操作類型

///

internal enum ListOperation

{

None,

Add,

Take

};

///

/// 線程鎖定的類

///

internal class ThreadLocalList

{

// 雙向鏈表的頭結點 如果為null那麼表示鏈表為空

internal volatile Node m_head;

// 雙向鏈表的尾節點

private volatile Node m_tail;

// 定義當前對List進行操作的種類

// 與前面的 ListOperation 相對應

internal volatile int m_currentOp;

// 這個列表元素的計數

private int m_count;

// The stealing count

// 這個不是特別理解 好像是在本地列表中 刪除某個Node 以後的計數

internal int m_stealCount;

// 下一個列表 可能會在其它線程中

internal volatile ThreadLocalList m_nextList;

// 設定鎖定是否已進行

internal bool m_lockTaken;

// The owner thread for this list

internal Thread m_ownerThread;

// 列表的版本,只有當列表從空變為非空統計是底層

internal volatile int m_version;

///

/// ThreadLocalList 構造器

///

///

擁有這個集合的線程

internal ThreadLocalList(Thread ownerThread)

{

m_ownerThread = ownerThread;

}

///

/// 添加一個新的item到鏈表首部

///

///

The item to add.

///

是否更新計數.

internal void Add(T item, bool updateCount)

{

checked

{

m_count++;

}

Node node = new Node(item);

if (m_head == null)

{

Debug.Assert(m_tail == null);

m_head = node;

m_tail = node;

m_version++; // 因為進行初始化了,所以將空狀態改為非空狀態

}

else

{

// 使用頭插法 將新的元素插入鏈表

node.m_next = m_head;

m_head.m_prev = node;

m_head = node;

}

if (updateCount) // 更新計數以避免此添加同步時溢出

{

m_count = m_count - m_stealCount;

m_stealCount = 0;

}

}

///

/// 從列表的頭部刪除一個item

///

///

The removed item

internal void Remove(out T result)

{

// 雙向鏈表刪除頭結點數據的流程

Debug.Assert(m_head != null);

Node head = m_head;

m_head = m_head.m_next;

if (m_head != null)

{

m_head.m_prev = null;

}

else

{

m_tail = null;

}

m_count--;

result = head.m_value;

}

///

/// 返回列表頭部的元素

///

///

the peeked item

/// True if succeeded, false otherwise

internal bool Peek(out T result)

{

Node head = m_head;

if (head != null)

{

result = head.m_value;

return true;

}

result = default(T);

return false;

}

///

/// 從列表的尾部獲取一個item

///

///

the removed item

///

remove or peek flag

internal void Steal(out T result, bool remove)

{

Node tail = m_tail;

Debug.Assert(tail != null);

if (remove) // Take operation

{

m_tail = m_tail.m_prev;

if (m_tail != null)

{

m_tail.m_next = null;

}

else

{

m_head = null;

}

// Increment the steal count

m_stealCount++;

}

result = tail.m_value;

}

///

/// 獲取總計列表計數, 它不是線程安全的, 如果同時調用它, 則可能提供不正確的計數

///

internal int Count

{

get

{

return m_count - m_stealCount;

}

}

}

從上面的代碼中我們可以更加驗證之前的觀點,就是ConcurentBag在一個線程中存儲數據時,使用的是雙向鏈表,ThreadLocalList實現了一組對鏈表增刪改查的方法。

3. ConcurrentBag實現新增元素

接下來我們看一看ConcurentBag是如何新增元素的。

///

/// 嘗試獲取無主列表,無主列表是指線程已經被暫停或者終止,但是集合中的部分數據還存儲在那裡

/// 這是避免內存泄漏的方法

///

///

private ThreadLocalList GetUnownedList()

{

//此時必須持有全局鎖

Contract.Assert(Monitor.IsEntered(GlobalListsLock));

// 從頭線程列表開始枚舉 找到那些已經被關閉的線程

// 將它所在的列表對象 返回

ThreadLocalList currentList = m_headList;

while (currentList != null)

{

if (currentList.m_ownerThread.ThreadState == System.Threading.ThreadState.Stopped)

{

currentList.m_ownerThread = Thread.CurrentThread; // the caller should acquire a lock to make this line thread safe

return currentList;

}

currentList = currentList.m_nextList;

}

return null;

}

///

/// 本地幫助方法,通過線程對象檢索線程線程本地列表

///

///

如果列表不存在,那麼創建新列表

/// The local list object

private ThreadLocalList GetThreadList(bool forceCreate)

{

ThreadLocalList list = m_locals.Value;

if (list != null)

{

return list;

}

else if (forceCreate)

{

// 獲取用於更新操作的 m_tailList 鎖

lock (GlobalListsLock)

{

// 如果頭列表等於空,那麼說明集合中還沒有元素

// 直接創建一個新的

if (m_headList == null)

{

list = new ThreadLocalList(Thread.CurrentThread);

m_headList = list;

m_tailList = list;

}

else

{

// ConcurrentBag內的數據是以雙向鏈表的形式分散存儲在各個線程的本地區域中

// 通過下面這個方法 可以找到那些存儲有數據 但是已經被停止的線程

// 然後將已停止線程的數據 移交到當前線程管理

list = GetUnownedList();

// 如果沒有 那麼就新建一個列表 然後更新尾指針的位置

if (list == null)

{

list = new ThreadLocalList(Thread.CurrentThread);

m_tailList.m_nextList = list;

m_tailList = list;

}

}

m_locals.Value = list;

}

}

else

{

return null;

}

Debug.Assert(list != null);

return list;

}

///

/// Adds an object to the .

///

///

The object to be added to the

/// . The value can be a null reference

/// (Nothing in Visual Basic) for reference types.

public void Add(T item)

{

// 獲取該線程的本地列表, 如果此線程不存在, 則創建一個新列表 (第一次調用 add)

ThreadLocalList list = GetThreadList(true);

// 實際的數據添加操作 在AddInternal中執行

AddInternal(list, item);

}

///

///

///

///

private void AddInternal(ThreadLocalList list, T item)

{

bool lockTaken = false;

try

{

#pragma warning disable 0420

Interlocked.Exchange(ref list.m_currentOp, (int)ListOperation.Add);

#pragma warning restore 0420

// 同步案例:

// 如果列表計數小於兩個, 因為是雙向鏈表的關係 為了避免與任何竊取線程發生衝突 必須獲取鎖

// 如果設置了 m_needSync, 這意味著有一個線程需要凍結包 也必須獲取鎖

if (list.Count

{

// 將其重置為None 以避免與竊取線程的死鎖

list.m_currentOp = (int)ListOperation.None;

// 鎖定當前對象

Monitor.Enter(list, ref lockTaken);

}

// 調用 ThreadLocalList.Add方法 將數據添加到雙向鏈表中

// 如果已經鎖定 那麼說明線程安全 可以更新Count 計數

list.Add(item, lockTaken);

}

finally

{

list.m_currentOp = (int)ListOperation.None;

if (lockTaken)

{

Monitor.Exit(list);

}

}

}

從上面代碼中,我們可以很清楚的知道Add()方法是如何運行的,其中的關鍵就是GetThreadList()方法,通過該方法可以獲取當前線程的數據存儲列表對象,假如不存在數據存儲列表,它會自動創建或者通過GetUnownedList()方法來尋找那些被停止但是還存儲有數據列表的線程,然後將數據列表返回給當前線程中,防止了內存泄漏。

在數據添加的過程中,實現了細顆粒度的lock同步鎖,所以性能會很高。刪除和其它操作與新增類似,本文不再贅述。

4. ConcurrentBag 如何實現迭代器模式

看完上面的代碼後,我很好奇ConcurrentBag是如何實現IEnumerator來實現迭代訪問的,因為ConcurrentBag是通過分散在不同線程中的ThreadLocalList來存儲數據的,那麼在實現迭代器模式時,過程會比較複雜。

後面再查看了源碼之後,發現ConcurrentBag為了實現迭代器模式,將分在不同線程中的數據全都存到一個List集合中,然後返回了該副本的迭代器。所以每次訪問迭代器,它都會新建一個List的副本,這樣雖然浪費了一定的存儲空間,但是邏輯上更加簡單了。

///

/// 本地幫助器方法釋放所有本地列表鎖

///

private void ReleaseAllLocks()

{

// 該方法用於在執行線程同步以後 釋放掉所有本地鎖

// 通過遍歷每個線程中存儲的 ThreadLocalList對象 釋放所佔用的鎖

ThreadLocalList currentList = m_headList;

while (currentList != null)

{

if (currentList.m_lockTaken)

{

currentList.m_lockTaken = false;

Monitor.Exit(currentList);

}

currentList = currentList.m_nextList;

}

}

///

/// 從凍結狀態解凍包的本地幫助器方法

///

///

The lock taken result from the Freeze method

private void UnfreezeBag(bool lockTaken)

{

// 首先釋放掉 每個線程中 本地變數的鎖

// 然後釋放全局鎖

ReleaseAllLocks();

m_needSync = false;

if (lockTaken)

{

Monitor.Exit(GlobalListsLock);

}

}

///

/// 本地幫助器函數等待所有未同步的操作

///

private void WaitAllOperations()

{

Contract.Assert(Monitor.IsEntered(GlobalListsLock));

ThreadLocalList currentList = m_headList;

// 自旋等待 等待其它操作完成

while (currentList != null)

{

if (currentList.m_currentOp != (int)ListOperation.None)

{

SpinWait spinner = new SpinWait();

// 有其它線程進行操作時,會將cuurentOp 設置成 正在操作的枚舉

while (currentList.m_currentOp != (int)ListOperation.None)

{

spinner.SpinOnce();

}

}

currentList = currentList.m_nextList;

}

}

///

/// 本地幫助器方法獲取所有本地列表鎖

///

private void AcquireAllLocks()

{

Contract.Assert(Monitor.IsEntered(GlobalListsLock));

bool lockTaken = false;

ThreadLocalList currentList = m_headList;

// 遍歷每個線程的ThreadLocalList 然後獲取對應ThreadLocalList的鎖

while (currentList != null)

{

// 嘗試/最後 bllock 以避免在獲取鎖和設置所採取的標誌之間的線程港口

try

{

Monitor.Enter(currentList, ref lockTaken);

}

finally

{

if (lockTaken)

{

currentList.m_lockTaken = true;

lockTaken = false;

}

}

currentList = currentList.m_nextList;

}

}

///

/// Local helper method to freeze all bag operations, it

/// 1- Acquire the global lock to prevent any other thread to freeze the bag, and also new new thread can be added

/// to the dictionary

/// 2- Then Acquire all local lists locks to prevent steal and synchronized operations

/// 3- Wait for all un-synchronized operations to be done

///

///

Retrieve the lock taken result for the global lock, to be passed to Unfreeze method

private void FreezeBag(ref bool lockTaken)

{

Contract.Assert(!Monitor.IsEntered(GlobalListsLock));

// 全局鎖定可安全地防止多線程調用計數和損壞 m_needSync

Monitor.Enter(GlobalListsLock, ref lockTaken);

// 這將強制同步任何將來的添加/執行操作

m_needSync = true;

// 獲取所有列表的鎖

AcquireAllLocks();

// 等待所有操作完成

WaitAllOperations();

}

///

/// 本地幫助器函數返回列表中的包項, 這主要由 CopyTo 和 ToArray 使用。

/// 這不是線程安全, 應該被稱為凍結/解凍袋塊

/// 本方法是私有的 只有使用 Freeze/UnFreeze之後才是安全的

///

/// List the contains the bag items

private List ToList()

{

Contract.Assert(Monitor.IsEntered(GlobalListsLock));

// 創建一個新的List

List list = new List();

ThreadLocalList currentList = m_headList;

// 遍歷每個線程中的ThreadLocalList 將裡面的Node的數據 添加到list中

while (currentList != null)

{

Node currentNode = currentList.m_head;

while (currentNode != null)

{

list.Add(currentNode.m_value);

currentNode = currentNode.m_next;

}

currentList = currentList.m_nextList;

}

return list;

}

///

/// Returns an enumerator that iterates through the

/// cref="ConcurrentBag"/>.

///

/// An enumerator for the contents of the

/// cref="ConcurrentBag"/>.

///

/// The enumeration represents a moment-in-time snapshot of the contents

/// of the bag. It does not reflect any updates to the collection after

/// was called. The enumerator is safe to use

/// concurrently with reads from and writes to the bag.

///

public IEnumerator GetEnumerator()

{

// Short path if the bag is empty

if (m_headList == null)

return new List().GetEnumerator(); // empty list

bool lockTaken = false;

try

{

// 首先凍結整個 ConcurrentBag集合

FreezeBag(ref lockTaken);

// 然後ToList 再拿到 List的 IEnumerator

return ToList().GetEnumerator();

}

finally

{

UnfreezeBag(lockTaken);

}

}

由上面的代碼可知道,為了獲取迭代器對象,總共進行了三步主要的操作。

使用FreezeBag()方法,凍結整個ConcurrentBag集合。因為需要生成集合的List副本,生成副本期間不能有其它線程更改損壞數據。

將ConcurrrentBag生成List副本。因為ConcurrentBag存儲數據的方式比較特殊,直接實現迭代器模式困難,考慮到線程安全和邏輯,最佳的辦法是生成一個副本。

完成以上操作以後,就可以使用UnfreezeBag()方法解凍整個集合。

那麼FreezeBag()方法是如何來凍結整個集合的呢?也是分為三步走。

首先獲取全局鎖,通過Monitor.Enter(GlobalListsLock, ref lockTaken);這樣一條語句,這樣其它線程就不能凍結集合。

然後獲取所有線程中ThreadLocalList的鎖,通過`AcquireAllLocks()方法來遍歷獲取。這樣其它線程就不能對它進行操作損壞數據。

等待已經進入了操作流程線程結束,通過WaitAllOperations()方法來實現,該方法會遍歷每一個ThreadLocalList對象的m_currentOp屬性,確保全部處於None操作。

完成以上流程後,那麼就是真正的凍結了整個ConcurrentBag集合,要解凍的話也類似。在此不再贅述。

四、總結

下面給出一張圖,描述了ConcurrentBag是如何存儲數據的。通過每個線程中的ThreadLocal來實現線程本地存儲,每個線程中都有這樣的結構,互不干擾。

然後每個線程中的m_headList總是指向ConcurrentBag的第一個列表,m_tailList指向最後一個列表。列表與列表之間通過m_locals 下的 m_nextList相連,構成一個單鏈表。

數據存儲在每個線程的m_locals中,通過Node類構成一個雙向鏈表。

PS: 要注意m_tailList和m_headList並不是存儲在ThreadLocal中,而是所有的線程共享一份。

以上就是有關ConcurrentBag類的實現,筆者的一些記錄和解析。

附上ConcurrentBag源碼地址:https://referencesource.microsoft.com/#System/sys/system/collections/concurrent/ConcurrentBag.cs

看完本文有收穫?請轉發分享給更多人

關注「DotNet」,提升.Net技能


喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 DotNet 的精彩文章:

C#編寫強大的SQL Server資料庫自動備份服務

TAG:DotNet |