當前位置:
首頁 > 知識 > 基於Redis實現分散式鎖-Redisson使用及源碼分析

基於Redis實現分散式鎖-Redisson使用及源碼分析

在分散式場景下,有很多種情況都需要實現最終一致性。在設計遠程上下文的領域事件的時候,為了保證最終一致性,在通過領域事件進行通訊的方式中,可以共享存儲(領域模型和消息的持久化數據源),或者做全局XA事務(兩階段提交,數據源可分開),也可以藉助消息中間件(消費者處理需要能冪等)。通過Observer模式來發布領域事件可以提供很好的高並發性能,並且事件存儲也能追溯更小粒度的事件數據,使各個應用系統擁有更好的自治性。

本文主要探討另外一種實現分散式最終一致性的解決方案——採用分散式鎖。基於分散式鎖的解決方案,比如zookeeper,redis都是相較於持久化(如利用InnoDB行鎖,或事務,或version樂觀鎖)方案提供了高可用性,並且支持豐富化的使用場景。 本文通過Java版本的redis分散式鎖開源框架——Redisson來解析一下實現分散式鎖的思路。

分散式鎖的使用場景

如果是不跨限界上下文的情況,跟本地領域服務相關的數據一致性,盡量還是用事務來保證。但也有些無法用事務或者樂觀鎖來處理的情況,這些情況大多是對於一個共享型的數據源,有並發寫操作的場景,但又不是對於單一領域的操作。

舉個例子,還是用租書來比喻,A和B兩個人都來租書,在查看圖書的時候,發現自己想要看的書《大設計》庫存僅剩一本。書店系統中,書作為一種商品,是在商品系統中,以Item表示出租商品的領域模型,同時每一筆交易都會產生一個訂單,Order是在訂單系統(交易限界上下文)中的領域模型。這裡假設先不考慮跨系統通信的問題(感興趣的可以參考下領域服務、領域事件),也暫時不考慮支付環節,但是我們需要保證A,B兩個人不會都對於《大設計》產生訂單就可以,也就是其中一個人是可以成功下單,另外一個人只要提示庫存已沒即可。此時,書的庫存就是一種共享的分散式資源,下訂單,減庫存就是一個需要保證一致性的寫操作。但又因為兩個操作不能在同一個本地事務,或者說,不共享持久化的數據源的情況,這時候就可以考慮用分散式鎖來實現。本例子中,就需要對於共享資源——書的庫存進行加鎖,至於鎖的key可以結合領域模型的唯一標識,如itemId,以及操作類型(如操作類型是RENT的)設計一個待加鎖的資源標識。當然,這裡還有一個並發性能的問題,如果是個庫存很多的秒殺類型的業務,那麼就不能單純在itemId 加類型加鎖,還需要設計排隊隊列以及合理的調度演算法,防止超賣等等,那些就是題外話了。本文只是將這個場景作為一個切入點,具體怎麼設計鎖,什麼場景用還要結合業務。

需要解決的問題

分散式的思路和線程同步鎖ReentrantLock的思路是一樣的。我們也要考慮如以下幾個問題:

死鎖的情況。複雜的網路環境下,當加鎖成功,後續操作正在處理時,獲得鎖的節點忽然宕機,無法釋放鎖的情況。如A在Node1 節點申請到了鎖資源,但是Node1宕機,鎖一直無法釋放,訂單沒有生成,但是其他用戶將無法申請到鎖資源。

鎖的性能效率。分散式鎖不能成為性能瓶頸或者單點故障不能導致業務異常。

如果關鍵業務,可能需要重入場景,是否設計成可重入鎖。這個可以參考下在多線程的情況下,比如ReentrantLock就是一種可重入鎖,其內部又提供了公平鎖和非公平鎖兩種實現和應用,本文不繼續探討。帶著以上問題,和場景,沿著下文,來一一找到解決方案。

基於Redis實現

Redis 命令

在Redisson介紹前,回顧下Redis的命令,以及不通過任何開源框架,可以基於redis怎麼設計一個分散式鎖。基於不同應用系統實現的語言,也可以通過其他一些如Jedis,或者Spring的RedisOperations 等,來執行Reids命令Redis command list。

分散式鎖主要需要以下redis命令,這裡列舉一下。在實現部分可以繼續參照命令的操作含義。

SETNX key value (SET if Not eXists):當且僅當 key 不存在,將 key 的值設為 value ,並返回1;若給定的 key 已經存在,則 SETNX 不做任何動作,並返回0。詳見:SETNX commond

GETSET key value:將給定 key 的值設為 value ,並返回 key 的舊值 (old value),當 key 存在但不是字元串類型時,返回一個錯誤,當key不存在時,返回nil。詳見:GETSET commond

GET key:返回 key 所關聯的字元串值,如果 key 不存在那麼返回 nil 。詳見:GET Commond

DEL key [KEY …]:刪除給定的一個或多個 key ,不存在的 key 會被忽略,返回實際刪除的key的個數(integer)。詳見:DEL Commond

HSET key field value:給一個key 設置一個{field=value}的組合值,如果key沒有就直接賦值並返回1,如果field已有,那麼就更新value的值,並返回0.詳見:HSET Commond

HEXISTS key field:當key 中存儲著field的時候返回1,如果key或者field至少有一個不存在返回0。詳見HEXISTS Commond

HINCRBY key field increment:將存儲在 key 中的哈希(Hash)對象中的指定欄位 field 的值加上增量 increment。如果鍵 key 不存在,一個保存了哈希對象的新建將被創建。如果欄位 field 不存在,在進行當前操作前,其將被創建,且對應的值被置為 0。返回值是增量之後的值。詳見:HINCRBY Commond

PEXPIRE key milliseconds:設置存活時間,單位是毫秒。expire操作單位是秒。詳見:PEXPIRE Commond

PUBLISH channel message:向channel post一個message內容的消息,返回接收消息的客戶端數。詳見PUBLISH Commond

Redis 實現分散式鎖

假設我們現在要給itemId 1234 和下單操作 OP_ORDER 加鎖,key是OP_ORDER_1234,結合上面的redis命令,似乎加鎖的時候只要一個SETNX OP_ORDER_1234 currentTimestamp ,如果返回1代表加鎖成功,返回0 表示鎖被佔用著。然後再用DEL OP_ORDER_1234解鎖,返回1表示解鎖成功,0表示已經被解鎖過。然而卻還存在著很多問題:SETNX會存在鎖競爭,如果在執行過程中客戶端宕機,也會引起死鎖問題,即鎖資源無法釋放。並且當一個資源解鎖的時候,釋放鎖之後,其他之前等待的鎖沒有辦法再次自動重試申請鎖(除非重新申請鎖)。解決死鎖的問題其實可以可以向Mysql的死鎖檢測學習,設置一個失效時間,通過key的時間戳來判斷是否需要強制解鎖。但是強制解鎖也存在問題,一個就是時間差問題,不同的機器的本地時間可能也存在時間差,在很小事務粒度的高並發場景下還是會存在問題,比如刪除鎖的時候,在判斷時間戳已經超過時效,有可能刪除了其他已經獲取鎖的客戶端的鎖。另外,如果設置了一個超時時間,但是確實執行時間超過了超時時間,那麼鎖會被自動釋放,原來持鎖的客戶端再次解鎖的時候會出現問題,而且最為嚴重的還是一致性沒有得到保障。

所以設計的時候需要考慮以下幾點:

鎖的時效設置。避免單點故障造成死鎖,影響其他客戶端獲取鎖。但是也要保證一旦一個客戶端持鎖,在客戶端可用時不會被其他客戶端解鎖。(網上很多解決方案都是其他客戶端等待隊列長度判斷是否強制解鎖,但其實在偶發情況下就不能保證一致性,也就失去了分散式鎖的意義)。

持鎖期間的check,盡量在關鍵節點檢查鎖的狀態,所以要設計成可重入鎖,但在客戶端使用時要做好吞吐量的權衡。

減少獲取鎖的操作,盡量減少redis壓力。所以需要讓客戶端的申請鎖有一個等待時間,而不是所有申請鎖的請求要循環申請鎖。

加鎖的事務或者操作盡量粒度小,減少其他客戶端申請鎖的等待時間,提高處理效率和並發性。

持鎖的客戶端解鎖後,要能通知到其他等待鎖的節點,否則其他節點只能一直等待一個預計的時間再觸發申請鎖。類似線程的notifyAll,要能同步鎖狀態給其他客戶端,並且是分散式消息。

考慮任何執行句柄中可能出現的異常,狀態的正確流轉和處理。比如,不能因為一個節點解鎖失敗,或者鎖查詢失敗(redis 超時或者其他運行時異常),影響整個等待的任務隊列,或者任務池。

鎖設計

由於時間戳的設計有很多問題,以及上述幾個問題,所以再換一種思路。先回顧幾個關於鎖的概念和經典java API。通過一些java.util.concurrent的API來處理一些本地隊列的同步以及等待信號量的處理。

Semaphore :Semaphore可以控制某個資源可被同時訪問的個數,通過 acquire() 獲取一個許可,如果沒有就等待,而 release() 釋放一個許可。其內部維護了一個int 類型的permits。有一個關於廁所的比喻很貼切,10個人在廁所外面排隊,廁所有5個坑,只能最多進去五個人,那麼就是初始化一個 permits=5的Semaphore。當一個人出來,會release一個坑位,其他等坑的人會被喚醒然後開始要有人進坑。Semaphore同ReentrantLock一樣都是基於AbstractQueuedSynchronizer提供了公平鎖和非公平鎖兩種實現。如果等待的人有秩序的排隊等著,就說明選擇了Semaphore的公平鎖實現,如果外面的人沒有秩序,誰搶到是誰的(活躍線程就會一直有機會,存在線程飢餓可能),那就是Semaphore的非公平鎖實現。無論外面人怎麼個等法Semaphore對於出坑的控制是一致的,每次只能是從一個坑裡出來一個人。理解起來,其實就是廁所的5個坑位是一個共享資源,也就是permits的值=5,每次acquire一下就是外面來了個人排隊,每次release一下就是裡面出來個人。廁所聊多有點不雅觀,再回歸到分散式鎖的話題。在剛才講述的redis實現分散式鎖的「第三點」,減少redis申請鎖調用頻率上就可以通過Semaphore來控制請求。雖然Semaphore只是虛擬機內部的鎖粒度的實現(不能跨進程),但是也可以一定程度減輕最後請求redis節點的壓力。當然,也有種方法是,隨機sleep一段時間再去tryLock之類的,也可以達到減輕最後redis節點壓力,但是畢竟使用信號量能更好得控制。而且我們可以再簡單點,對於同一個鎖對象的申請鎖操作,可以設計一個初始化permits = 0的LockEntry,permits = 0也就顧名思義,誰都進不來,廁所維修中。當有一個持鎖對象unlock的時候,通過分散式消息機制通知所有等待節點,這時候,再release,這時候permits=1,也就是本虛擬機中只能有一個線程能在acquire()的阻塞中脫穎而出(當然只是進了坑,但不一定能獲取得到分散式鎖)。

ConcurrentHashMap:這個應該不必多說,之談談在設計分散式鎖中的用途。在上述的「第一點」,對於鎖的時效性的設置里提到了,要在持鎖線程正常運行(持鎖節點沒有宕機或內部異常)的時候,保證其一直佔用鎖。只要佔著茅坑的人還在用著,只要他還沒有暴斃或者無聊占著茅坑不XX,那就應該讓外面的人都等著,不能強行開門託人。再收回來。。。這裡ConcurrentHashMap的key無疑是鎖對象的標識(我們需要設計的redis的key),value就是一個時間任務對象,比如可以netty的TimerTask或其他定時API,定時得觸發給我的鎖重新設置延時。這就是好比(好吧,再次用廁所比喻),蹲在裡面的人的一種主動行為,隔1分鐘敲兩下廁所門,讓外面的等的人知道,裡面的人正在使用中,如果裡面的人1分鐘超過還沒有敲門,可能是裡面人掛掉了,那麼再採取強制措施,直接開門拽人,釋放坑位。

並發API以及一些框架的使用主要是控制鎖的進入和調度,加鎖的流程以及鎖的邏輯也是非常重要。因為redis支持hash結構,除了key作為鎖的標識,還可以利用value的結構

加鎖

下面參數的含義先說明下 :

KEYS[1] :需要加鎖的key,這裡需要是字元串類型。

ARGV[1] :鎖的超時時間,防止死鎖

ARGV[2] :鎖的唯一標識,也就是剛才介紹的 id(UUID.randomUUID()) + 「:」 + threadId

// 檢查是否key已經被佔用,如果沒有則設置超時時間和唯一標識,初始化value=1

if (redis.call("exists", KEYS[1]) == 0)

then

redis.call("hset", KEYS[1], ARGV[2], 1);

redis.call("pexpire", KEYS[1], ARGV[1]);

return nil;

end;

// 如果鎖重入,需要判斷鎖的key field 都一直情況下 value 加一

if (redis.call("hexists", KEYS[1], ARGV[2]) == 1)

then

redis.call("hincrby", KEYS[1], ARGV[2], 1);

redis.call("pexpire", KEYS[1], ARGV[1]);//鎖重入重新設置超時時間

return nil;

end;

// 返回剩餘的過期時間

return redis.call("pttl", KEYS[1]);

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

以上的方法,當返回空是,說明獲取到鎖,如果返回一個long數值(pttl 命令的返回值),說明鎖已被佔用,通過返回剩餘時間,外部可以做一些等待時間的判斷和調整。

解鎖

也還是先說明一下參數信息:

- KEYS[1] :需要加鎖的key,這裡需要是字元串類型。

- KEYS[2] :redis消息的ChannelName,一個分散式鎖對應唯一的一個channelName:「redisson_lock__channel__{」 + getName() + 「}」

- ARGV[1] :reids消息體,這裡只需要一個位元組的標記就可以,主要標記redis的key已經解鎖,再結合redis的Subscribe,能喚醒其他訂閱解鎖消息的客戶端線程申請鎖。

- ARGV[2] :鎖的超時時間,防止死鎖

- ARGV[3] :鎖的唯一標識,也就是剛才介紹的 id(UUID.randomUUID()) + 「:」 + threadId

// 如果key已經不存在,說明已經被解鎖,直接發布(publihs)redis消息

if (redis.call("exists", KEYS[1]) == 0)

then

redis.call("publish", KEYS[2], ARGV[1]);

return 1;

end;

// key和field不匹配,說明當前客戶端線程沒有持有鎖,不能主動解鎖。

if (redis.call("hexists", KEYS[1], ARGV[3]) == 0)

then

return nil;

end;

// 將value減1

local counter = redis.call("hincrby", KEYS[1], ARGV[3], -1);

// 如果counter>0說明鎖在重入,不能刪除key

if (counter > 0)

then

redis.call("pexpire", KEYS[1], ARGV[2]); return 0;

else

// 刪除key並且publish 解鎖消息

redis.call("del", KEYS[1]); redis.call("publish", KEYS[2], ARGV[1]);

return 1;

end;

return nil;

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

這就是解鎖過程,當然建議提供強制解鎖的介面,直接刪除key,以防一些緊急故障出現的時候,關鍵業務節點受到影響。這裡還有一個關鍵點,就是publish命令,通過在鎖的唯一通道發布解鎖消息,可以減少其他分散式節點的等待或者空轉,整體上能提高加鎖效率。至於redis的消息訂閱可以有多種方式,基於Jedis的訂閱API或者Spring的MessageListener都可以實現訂閱,這裡就可以結合剛才說的Semaphore,在第一次申請鎖失敗後acquire,接收到分散式消息後release就可以控制申請鎖流程的再次進入。下面結合Redisson源碼,相信會有更清晰的認識。

使用Redisson示例

Redisson使用起來很方便,但是需要redis環境支持eval命令,否則一切都是悲劇,比如me.結果還是要用RedisCommands去寫一套。例子就如下,獲得一個RLock鎖對象,然後tryLock 和unlock。trylock方法提供了鎖重入的實現,並且客戶端一旦持有鎖,就會在能正常運行期間一直持有鎖,直到主動unlock或者節點故障,主動失效(超過默認的過期時間)釋放鎖。

public boolean doMyBusiness(Object t) {

RLock lock = redissonClient.getLock(getLockKey(t));

try {

if (lock.tryLock()) {

//do need Business

return true;

} else {

// do other Business or return error.

return false;

}

} finally {

lock.unlock();

}

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

Redisson還提供了設置最長等待時間以及設置釋放鎖時間的含參tryLock介面 boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException; 。Redisson的lock 擴展了java.util.concurrent.locks.Lock的實現,也基本按照了Lock介面的實現方案。lock()方法會一直阻塞申請鎖資源,直到有可用的鎖釋放。下面一部分會詳細解析一部分關鍵實現的代碼。

Redisson源碼解析

Redisson 的非同步任務(Future,Promise,FutureListener API),任務計時器(Timeout,TimerTask),以及通過AbstractChannel連接redis以及寫入執行批處理命令等很多都是基於netty框架的。po主因為不能使用eval,所以用Spring提供的redisApi ,RedisOperations來處理redis指令,非同步調度等用了Spring的AsyncResult,MessageListener以及一些concurrent api。這裡還是先看一下Redisson的實現。

trylock

這裡以帶參數的trylock解析一下,無參的trylock是一種默認參數的實現。先源碼走讀一下。

@Override

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {

long time = unit.toMillis(waitTime);

// 申請鎖,返回還剩餘的鎖過期時間

Long ttl = tryAcquire(leaseTime, unit);

// 如果為空,表示申請鎖成功

if (ttl == null) {

return true;

}

// 訂閱監聽redis消息,並且創建RedissonLockEntry,其中RedissonLockEntry中比較關鍵的是一個 Semaphore屬性對象用來控制本地的鎖請求的信號量同步,返回的是netty框架的Future實現。

Future<RedissonLockEntry> future = subscribe();

// 阻塞等待subscribe的future的結果對象,如果subscribe方法調用超過了time,說明已經超過了客戶端設置的最大wait time,則直接返回false,取消訂閱,不再繼續申請鎖了。

if (!future.await(time, TimeUnit.MILLISECONDS)) {

future.addListener(new FutureListener<RedissonLockEntry>() {

@Override

public void operationComplete(Future<RedissonLockEntry> future) throws Exception {

if (future.isSuccess()) {

unsubscribe(future);

}

}

});

return false;

}

try {

while (true) {

// 再次嘗試一次申請鎖

ttl = tryAcquire(leaseTime, unit);

// 獲得鎖,返回

if (ttl == null) {

return true;

}

// 不等待申請鎖,返回

if (time <= 0) {

return false;

}

// 阻塞等待鎖

long current = System.currentTimeMillis();

RedissonLockEntry entry = getEntry();

if (ttl >= 0 && ttl < time) {

// 通過信號量(共享鎖)阻塞,等待解鎖消息.

// 如果剩餘時間(ttl)小於wait time ,就在 ttl 時間內,從Entry的信號量獲取一個許可(除非被中斷或者一直沒有可用的許可)。

// 否則就在wait time 時間範圍內等待可以通過信號量

entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);

} else {

entry.getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);

}

// 更新等待時間(最大等待時間-已經消耗的阻塞時間)

long elapsed = System.currentTimeMillis() - current;

time -= elapsed;

}

} finally {

// 無論是否獲得鎖,都要取消訂閱解鎖消息

unsubscribe(future);

}

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

上述方法,調用加鎖的邏輯就是在tryAcquire(long leaseTime, TimeUnit unit)中

private Long tryAcquire(long leaseTime, TimeUnit unit) {

if (leaseTime != -1) {

return get(tryLockInnerAsync(leaseTime, unit, Thread.currentThread().getId()));

}

return get(tryLockInnerAsync(Thread.currentThread().getId()));

}

1

2

3

4

5

6

7

8

9

tryAcquire(long leaseTime, TimeUnit unit)只是針對leaseTime的不同參數進行不同的轉發處理,再提一下,trylock的無參方法就是直接調用了get(tryLockInnerAsync(Thread.currentThread().getId()));

所以下面再看核心的tryLockInnerAsync 基本命令已經在之前解析過,相信這裡看起來應該比較輕鬆,返回的是一個future對象,是為了非同步處理IO,提高系統吞吐量。

Future<Long> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId) {

internalLockLeaseTime = unit.toMillis(leaseTime);

return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,

"if (redis.call("exists", KEYS[1]) == 0) then " +

"redis.call("hset", KEYS[1], ARGV[2], 1); " +

"redis.call("pexpire", KEYS[1], ARGV[1]); " +

"return nil; " +

"end; " +

"if (redis.call("hexists", KEYS[1], ARGV[2]) == 1) then " +

"redis.call("hincrby", KEYS[1], ARGV[2], 1); " +

"redis.call("pexpire", KEYS[1], ARGV[1]); " +

"return nil; " +

"end; " +

"return redis.call("pttl", KEYS[1]);",Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

再說明一下,tryLock(long waitTime, long leaseTime, TimeUnit unit)有leaseTime參數的申請鎖方法是會按照leaseTime時間來自動釋放鎖的。但是沒有leaseTime參數的,比如tryLock()或者tryLock(long waitTime, TimeUnit unit)以及lock()是會一直持有鎖的。再來看一下沒有leaseTime參數的tryLockInnerAsync(Thread.currentThread().getId())

private Future<Long> tryLockInnerAsync(long threadId) {

// 設置了默認的30秒的失效時間

Future<Long> ttlRemaining = tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS, TimeUnit.SECONDS, threadId);

ttlRemaining.addListener(new FutureListener<Long>() {

@Override

public void operationComplete(Future<Long> future) throws Exception {

// 如果future方法沒有執行完成(IO被中斷等原因)直接返回,不繼續處理

if (!future.isSuccess()) {

return;

}

Long ttlRemaining = future.getNow();

// 成功申請到鎖,開始一個調度程序

if (ttlRemaining == null) {

scheduleExpirationRenewal();

}

}

});

return ttlRemaining;

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

這裡比有leaseTime參數的trylock就多了非同步scheduleExpirationRenewal調度。可以繼續看一下,這裡的expirationRenewalMap就是之前降到的一個ConcurrentMap結構。下面的這個調度方式很精妙。除非被unlock的cancleTask方法觸發,否則會一直循環重置過期時間。

private static final ConcurrentMap<String, Timeout> expirationRenewalMap = PlatformDependent.newConcurrentHashMap();

private void scheduleExpirationRenewal() {

// 保證任務不會被重複創建

if (expirationRenewalMap.containsKey(getName())) {

return;

}

// 添加一個netty的Timeout回調任務,每(internalLockLeaseTime / 3)毫秒執行一次

Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {

@Override

public void run(Timeout timeout) throws Exception {

// 非同步調用redis的pexpire命令,重置過期時間

expireAsync(internalLockLeaseTime, TimeUnit.MILLISECONDS);

// 移除,確保下一次調用

expirationRenewalMap.remove(getName());

scheduleExpirationRenewal(); // 再次循環調用

}

}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

// expirationRenewalMap如果已經有getName()任務,停止任務,也是為了在極端的並發情況下,保證任務不會被重複創建

if (expirationRenewalMap.putIfAbsent(getName(), task) != null) {

task.cancel();

}

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

這個任務,其實還有一個問題,個人覺得在expirationRenewalMap.containsKey判斷時也加上isLocked判斷會比較好,以防止unlock時出現redis節點異常的時候,任務沒有辦法自動停止,或者設置一個最大執行次數的限制也可以,否則極端情況下也會耗盡本地節點的CPU資源。

unlock

解鎖的邏輯相對簡單,如下,redis 命令相信看起來也會比較輕鬆了。

@Override

public void unlock() {

Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,

"if (redis.call("exists", KEYS[1]) == 0) then " +

"redis.call("publish", KEYS[2], ARGV[1]); " +

"return 1; " +

"end;" +

"if (redis.call("hexists", KEYS[1], ARGV[3]) == 0) then " +

"return nil;" +

"end; " +

"local counter = redis.call("hincrby", KEYS[1], ARGV[3], -1); " +

"if (counter > 0) then " +

"redis.call("pexpire", KEYS[1], ARGV[2]); " +

"return 0; " +

"else " +

"redis.call("del", KEYS[1]); " +

"redis.call("publish", KEYS[2], ARGV[1]); " +

"return 1; "+

"end; " +

"return nil;",

Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(Thread.currentThread().getId()));

if (opStatus == null) {

throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "

+ id + " thread-id: " + Thread.currentThread().getId());

}

// 解鎖成功之後取消更新鎖expire的時間任務

if (opStatus) {

cancelExpirationRenewal();

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

這裡的 cancelExpirationRenewal對應著取消 scheduleExpirationRenewal的重置expire時間任務。

void cancelExpirationRenewal() {

Timeout task = expirationRenewalMap.remove(getName());

if (task != null) {

task.cancel();

}

}

1

2

3

4

5

6

7

再看一下Redisson是如何處理unlock的redis消息的。這裡的消息內容就是unlockMessage = 0L和unlock方法中publish的內容是對應的。

public class LockPubSub extends PublishSubscribe<RedissonLockEntry> {

public static final Long unlockMessage = 0L;

@Override

protected RedissonLockEntry createEntry(Promise<RedissonLockEntry> newPromise) {

return new RedissonLockEntry(newPromise);

}

@Override

protected void onMessage(RedissonLockEntry value, Long message) {

if (message.equals(unlockMessage)) {

// 釋放一個許可,喚醒等待的entry.getLatch().tryAcquire去再次嘗試獲取鎖。

value.getLatch().release();

// 如果entry還有其他Listeners回調,也喚醒執行。

synchronized (value) {

Runnable runnable = value.getListeners().poll();

if (runnable != null) {

if (value.getLatch().tryAcquire()) {

runnable.run();

} else {

value.getListeners().add(runnable);

}

}

}

}

}

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

Redisson還支持Redis的多種集群配置,一主一備,一主多備,單機等等。也是通過netty的EventExecutorGroup,Promise,Future等API實現調度的。

結語

在思考是否採用分散式鎖以及採用哪種實現方案的時候,還是要基於業務,技術方案一定是基於業務基礎,服務於業務,並且衡量過投入產出比的。所以如果有成熟的解決方案,在業務可承受規模肯定是不要重複造輪子,當然還要經過嚴謹的測試。在po主用Spring的redis api實現時,也遇到了一些問題。

比如hIncrBy 的字符集問題,在使用命令的時候,當然可以直接set a 1然後incr a 1,這個問題可以參考ERR value is not an integer or out of range 問題,但在使用RedisConnection的時候,需要通過轉碼,byte[] value =SafeEncoder.encode(String.valueOf(「1」)) 再 connection.hSet(key, field, value)這樣才可以,或者自己通過String轉成正確的編碼也可以。

還有剛才說的調度pexpire任務,在unlock異常的時候,任務池中的任務無法自動結束。另外就是Spring的MessageListener的onMessage(Message message, byte[] pattern)回調方法message.getBody()是byte數組,消息內容轉化的時候要處理一下。

基於Redis實現分散式鎖-Redisson使用及源碼分析

深圳上空的超級月亮

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 程序員小新人學習 的精彩文章:

WEB測試項目實戰——測試用例架構搭建
Linux 下qW3xT.2,解決挖礦病毒

TAG:程序員小新人學習 |