閉鎖、柵欄與異步編排
無論是項目開發還是開源代碼閱讀,多線程都是不可或缺的一個重要知識點,基于這個考量,于是總結出本篇文章,討論閉鎖(CountDownLatch)、柵欄(CyclicBarrier)與異步編排(CompletableFuture)
@Author:Akai-yuan
@更新時間:2023/2/4
1.CountDownLatch
1.適用場景
- 協調子線程結束動作:等待所有子線程運行結束
主線程創建了5個子線程,各子任務執行確認動作,期間主線程進入等待狀態,直到各子線程的任務均已經完成,主線程恢復繼續執行。
- 協調子線程開始動作:統一各線程動作開始的時機
從多線程的角度看,這恰似你創建了一些多線程,但是你需要統一管理它們的任務開始時間。
2.設計思想
CountDownLatch基于一個同步器實現,并且只有CountDownLatch(int count)一個構造器,指定數量count不得在中途修改它。
核心函數
- await():等待latch降為0;
- boolean await(long timeout, TimeUnit unit):等待latch降為0,但是可以設置超時時間。
- countDown():latch數量減1;
- getCount():獲取當前的latch數量。
3.場景實例
場景1. 對各子線程的等待
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(4);
Thread t1 = new Thread(countDownLatch::countDown);
Thread t2 = new Thread(countDownLatch::countDown);
Thread t3 = new Thread(countDownLatch::countDown);
Thread t4 = new Thread(() -> {
try {
// 稍等...
Thread.sleep(1500);
countDownLatch.countDown();
} catch (InterruptedException ignored) {}
});
t1.start();
t2.start();
t3.start();
t4.start();
//直到所有線程都對計數器進行減一后,這里才放行
countDownLatch.await();
System.out.println("所有子線程就位,可以繼續執行其他任務");
}
場景2. 對多線程的統一管理
我們仍然用4個線程調用了start(),但是它們在運行時都在等待countDownLatch的信號,在信號未收到前,它們不會往下執行。
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
Thread t1 = new Thread(() -> waitForCountDown(countDownLatch));
Thread t2 = new Thread(() -> waitForCountDown(countDownLatch));
Thread t3 = new Thread(() -> waitForCountDown(countDownLatch));
Thread t4 = new Thread(() -> waitForCountDown(countDownLatch));
t1.start();
t2.start();
t3.start();
t4.start();
Thread.sleep(1000);
countDownLatch.countDown();
System.out.println("所有線程準備完成");
}
private static void waitForCountDown(CountDownLatch countDownLatch) {
try {
// 等待信號
countDownLatch.await();
System.out.println("本線程等待完畢");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
輸出:
所有線程準備完成
本線程等待完畢
本線程等待完畢
本線程等待完畢
本線程等待完畢
Process finished with exit code 0
場景3. SOFAJRaft的實踐
// 定義一個CountDownLatch計數器
private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
public void start() {
switch (workerStateUpdater.get(this)) {
case WORKER_STATE_INIT:
if (workerStateUpdater.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
//此處調用工作線程執行CountDownLatch的countDown()方法
//即startTimeInitialized.countDown();
workerThread.start();
}
break;
case WORKER_STATE_STARTED:
break;
case WORKER_STATE_SHUTDOWN:
throw new IllegalStateException("cannot be started once stopped");
default:
throw new Error("Invalid WorkerState");
}
// 等待startTime被工作線程初始化完成
while (startTime == 0) {
try {
startTimeInitialized.await();
} catch (InterruptedException ignore) {
// Ignore - it will be ready very soon.
}
}
}
2.CyclicBarrier
1.適用場景
柵欄類似于閉鎖,它能阻塞一組線程直到某個事件的發生。柵欄與閉鎖的關鍵區別在于,所有的線程必須同時到達柵欄位置,才能繼續執行。閉鎖用于等待事件,而柵欄用于等待其他線程。
CyclicBarrier與CountDownLatch的區別
CyclicBarrier | CountDownLatch |
---|---|
CyclicBarrier是可重用的,其中的線程會等待所有的線程完成任務。屆時,屏障將被拆除,并可以選擇性地做一些特定的動作。 | CountDownLatch是一次性的,不同的線程在同一個計數器上工作,直到計數器為0 |
CyclicBarrier面向的是線程數 | CountDownLatch面向的是任務數 |
在使用CyclicBarrier時,你必須在構造中指定參與協作的線程數,這些線程必須調用await()方法 | 使用CountDownLatch時,則必須要指定任務數,至于這些任務由哪些線程完成無關緊要 |
CyclicBarrier可以在所有的線程釋放后重新使用 | CountDownLatch在計數器為0時不能再使用 |
在CyclicBarrier中,如果某個線程遇到了中斷、超時等問題時,則處于await的線程都會出現問題 | 在CountDownLatch中,如果某個線程出現問題,其他線程不受影響 |
2.設計思想
1.構造器
// 指定參與方的數量;
public CyclicBarrier(int parties);
// 指定參與方的數量,并指定在本代次結束時運行的代碼
public CyclicBarrier(int parties, Runnable barrierAction):
2.核心方法
//如果當前線程不是第一個到達屏障的話,它將會進入等待,直到其他線程都到達
//除非發生被中斷、屏障被拆除、屏障被重設等情況
public int await();
//和await()類似,但是加上了時間限制;
public int await(long timeout, TimeUnit unit);
//當前屏障是否被拆除;
public boolean isBroken();
//重設當前屏障。會先拆除屏障再設置新的屏障
public void reset();
//正在等待的線程數量
public int getNumberWaiting();
3.場景實例
下面以一個簡單的日常對話來講解CyclicBarrier的使用實例
private static String appointmentPlace = "書房";
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> System.out.println("yuan所在的地點:" + appointmentPlace));
// 線程Akai
Thread Akai = newThread("Akai", () -> {
System.out.println("yuan,飯好了快來吃飯...");
try {
// 此時Akai在屏障前等待
cyclicBarrier.await();
System.out.println("yuan,你來了...");
// 開始吃飯...
Thread.sleep(2600);
System.out.println("好的,你去洗你的碗吧!");
// 第二次調用await
cyclicBarrier.await();
Thread.sleep(100);
System.out.println("好吧,你這個懶豬!");
} catch (Exception e) {
e.printStackTrace();
}
});
// 線程yuan
Thread yuan = newThread("yuan", () -> {
try {
// yuan在敲代碼
Thread.sleep(500);
System.out.println("我在敲代碼,我馬上就來!");
// yuan到達飯桌前
cyclicBarrier.await();
Thread.sleep(500);
System.out.println("Akai,不好意思,剛剛沉迷于敲代碼了!");
// 開始吃飯...
Thread.sleep(1500);
// yuan想先吃完趕快洗碗然后溜出去敲代碼
System.out.println("我吃完了,我要去洗碗了");
// yuan把地點改成了廚房
appointmentPlace = "廚房";
// 洗碗中...
Thread.sleep(1500);
System.out.println("?yuan終于洗完自己的碗了");
// 第二次調用await
cyclicBarrier.await();
System.out.println("Akai你吃完了,你的碗自己去洗吧,我已經在敲代碼了");
} catch (Exception ignored) {}
});
Akai.start();
yuan.start();
}
輸出結果:
yuan,飯好了快來吃飯...
我在敲代碼,我馬上就來!
yuan所在的地點:書房
yuan,你來了...
Akai,不好意思,剛剛沉迷于敲代碼了!
我吃完了,我要去洗碗了
好的,你去洗你的碗吧!
yuan終于洗完自己的碗了
yuan所在的地點:廚房
Akai你吃完了,你的碗自己去洗吧,我已經在敲代碼了
好吧,你這個懶豬!
3.CompletableFuture
1.設計思想
1.Future的局限性
- 并發執行多任務:Future只提供了get()方法來獲取結果,并且是阻塞的。所以,除了等待你別無他法;
- 無法對多個任務進行鏈式調用:如果你希望在計算任務完成后執行特定動作,比如發郵件,但Future卻沒有提供這樣的能力;
- 無法組合多個任務:如果你運行了10個任務,并期望在它們全部執行結束后執行特定動作,那么在Future中這是無能為力的;
- 沒有異常處理:Future接口中沒有關于異常處理的方法;
2.Completable有哪些優勢
CompletableFuture是Future接口的擴展和增強。
CompletableFuture完整地繼承了Future接口,并在此基礎上進行了豐富地擴展,完美地彌補了Future上述的種種問題。更為重要的是,CompletableFuture實現了對任務的編排能力。借助這項能力,我們可以輕松地組織不同任務的運行順序、規則以及方式。
從某種程度上說,這項能力是它的核心能力。而在以往,雖然通過CountDownLatch等工具類也可以實現任務的編排,但需要復雜的邏輯處理,不僅耗費精力且難以維護。
2.核心設計
我們首先來討論CompletableFuture的核心:CompletionStage
顧名思義,根據CompletionStage名字中的"Stage",你可以把它理解為任務編排中的步驟。步驟,即任務編排的基本單元,它可以是一次純粹的計算或者是一個特定的動作。在一次編排中,會包含多個步驟,這些步驟之間會存在依賴、鏈式和組合等不同的關系,也存在并行和串行的關系。這種關系,類似于Pipeline或者流式計算。
既然是編排,就需要維護任務的創建、建立計算關系。為此,CompletableFuture提供了多達50多個方法,但沒有必要全部完全理解,但我們可以通過分類的方式簡化對方法的理解,理解了類型和變種,基本上我們也就掌握了CompletableFuture的核心能力。
這些方法可以總結為以下四類,其他大部分方法都是基于這四種類型的變種:
3.核心用法
1.runAsync
- runAsync()是CompletableFuture最常用的方法之一,它可以接收一個待運行的任務并返回一個CompletableFuture
- 當我們想異步運行某個任務時,在以往需要手動實現Thread或者借助Executor實現。而通過runAsync()`就簡單多了。比如,我們可以直接傳入Runnable類型的任務:
CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
System.out.println("something");
}
});
2.supply與supplyAsync
- 所謂supply表示提供結果,換句話說當我們使用supply()時,就表明我們會返回一個結果,并且這個結果可以被后續的任務所使用。
// 創建nameFuture,返回姓名
CompletableFuture <String> nameFuture = CompletableFuture.supplyAsync(() -> {
return "Akai-yuan";
});
// 使用thenApply()接收nameFuture的結果,并執行回調動作
CompletableFuture <String> sayLoveFuture = nameFuture.thenApply(name -> {
return "love you," + name;
});
//阻塞獲得表白的結果
System.out.println(sayLoveFuture.get()); // love you,Akai-yuan
一旦理解了supply()的含義,它也就如此簡單。如果你希望用新的線程運行任務,可以使用supplyAsync().
3.thenApply與thenApplyAsync
- 我們已經知道supply()是用于提供結果的,并且順帶提了thenApply()。很明顯,thenApply()是supply()的搭檔,用于接收supply()的執行結果,并執行特定的代碼邏輯,最后返回CompletableFuture結果。
// 使用thenApply()接收nameFuture的結果,并執行回調動作
CompletableFuture <String> sayLoveFuture = nameFuture.thenApply(name -> {
return "愛你," + name;
});
public <U> CompletableFuture <U> thenApplyAsync(
Function <? super T, ? extends U> fn) {
return uniApplyStage(null, fn);
}
4.thenAccept與thenAcceptAsync
作為supply()的檔案,thenApply()并不是唯一的存在,thenAccept()也是。但與thenApply()不同,thenAccept()只接收數據,但不會返回,它的返回類型是Void.
CompletableFuture<Void> sayLoveFuture = nameFuture.thenAccept(name -> {
System.out.println("愛你," + name);
});
public CompletableFuture < Void > thenAccept(Consumer < ? super T > action) {
return uniAcceptStage(null, action);
}
5.thenRun
thenRun()就比較簡單了,不接收任務的結果,只運行特定的任務,并且也不返回結果。
public CompletableFuture < Void > thenRun(Runnable action) {
return uniRunStage(null, action);
}
所以,如果你在回調中不想返回任何的結果,只運行特定的邏輯,那么你可以考慮使用thenAccept和thenRun一般來說,這兩個方法會在調用鏈的最后面使用。
6.thenCompose與 thenCombine
以上幾種方法都是各玩各的,但thenCompose()與thenCombine()就不同了,它們可以實現對依賴和非依賴兩種類型的任務的編排。
編排兩個存在依賴關系的任務
在前面的例子中,在接收前面任務的結果時,我們使用的是thenApply(). 也就是說,sayLoveFuture在執行時必須依賴nameFuture的完成,否則執行個錘子。
// 創建Future
CompletableFuture <String> nameFuture = CompletableFuture.supplyAsync(() -> {
return "Akai-yuan";
});
// 使用thenApply()接收nameFuture的結果,并執行回調動作
CompletableFuture <String> sayLoveFuture = nameFuture.thenApply(name -> {
return "愛你," + name;
});
但其實,除了thenApply()之外,我們還可以使用thenCompose()來編排兩個存在依賴關系的任務。比如,上面的示例代碼可以寫成:
// 創建Future
CompletableFuture <String> nameFuture = CompletableFuture.supplyAsync(() -> {
return "Akai-yuan";
});
CompletableFuture<String> sayLoveFuture2 = nameFuture.thenCompose(name -> {
return CompletableFuture.supplyAsync(() -> "愛你," + name);
});
可以看到,thenCompose()和thenApply()的核心不同之處在于它們的返回值類型:
- thenApply():返回計算結果的原始類型,比如返回String;
- thenCompose():返回CompletableFuture類型,比如返回CompletableFuture.
組合兩個相互獨立的任務
考慮一個場景,當我們在執行某個任務時,需要其他任務就緒才可以,應該怎么做?這樣的場景并不少見,我們可以使用前面學過的并發工具類實現,也可以使用thenCombine()實現。
舉個例子,當我們計算某個勝率時,我們需要獲取她參與的總場次(rounds),以及獲勝的場次(winRounds),然后再通過winRounds / rounds來計算。對于這個計算,我們可以這么做:
CompletableFuture < Integer > roundsFuture = CompletableFuture.supplyAsync(() -> 500);
CompletableFuture < Integer > winRoundsFuture = CompletableFuture.supplyAsync(() -> 365);
CompletableFuture < Object > winRateFuture = roundsFuture
.thenCombine(winRoundsFuture, (rounds, winRounds) -> {
if (rounds == 0) {
return 0.0;
}
DecimalFormat df = new DecimalFormat("0.00");
return df.format((float) winRounds / rounds);
});
System.out.println(winRateFuture.get());
thenCombine()將另外兩個任務的結果同時作為參數,參與到自己的計算邏輯中。在另外兩個參數未就緒時,它將會處于等待狀態。
7.allOf與anyOf
allOf()與anyOf()也是一對孿生兄弟,當我們需要對多個Future的運行進行組織時,就可以考慮使用它們:
- allOf():給定一組任務,等待所有任務執行結束;
- anyOf():給定一組任務,等待其中任一任務執行結束。
allOf()與anyOf()的方法簽名如下:
static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
需要注意的是,anyOf()將返回完任務的執行結果,但是allOf()不會返回任何結果,它的返回值是Void.
allOf()與anyOf()的示例代碼如下所示。我們創建了roundsFuture和winRoundsFuture,并通過sleep模擬它們的執行時間。在執行時,winRoundsFuture將會先返回結果,所以當我們調用 CompletableFuture.anyOf時也會發現輸出的是365.
CompletableFuture < Integer > roundsFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(200);
return 500;
} catch (InterruptedException e) {
return null;
}
});
CompletableFuture < Integer > winRoundsFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(100);
return 365;
} catch (InterruptedException e) {
return null;
}
});
CompletableFuture < Object > completedFuture = CompletableFuture.anyOf(winRoundsFuture, roundsFuture);
System.out.println(completedFuture.get()); // 返回365
CompletableFuture < Void > completedFutures = CompletableFuture.allOf(winRoundsFuture, roundsFuture);
在CompletableFuture之前,如果要實現所有任務結束后執行特定的動作,我們可以考慮CountDownLatch等工具類?,F在,則多了一選項,我們也可以考慮使用CompletableFuture.allOf.
8.異常處理
在CompletableFuture鏈式調用中,如果某個任務發生了異常,那么后續的任務將都不會再執行。對于異常,我們有兩種處理方式:exceptionally()和handle().
1.使用exceptionally()回調處理異常
在鏈式調用的尾部使用exceptionally(),捕獲異常并返回錯誤情況下的默認值。需要注意的是,exceptionally()僅在發生異常時才會調用。
CompletableFuture < ? extends Serializable > winRateFuture = roundsFuture
.thenCombine(winRoundsFuture, (rounds, winRounds) -> {
if (rounds == 0) {
throw new RuntimeException("總場次錯誤");
}
DecimalFormat df = new DecimalFormat("0.00");
return df.format((float) winRounds / rounds);
}).exceptionally(ex -> {
System.out.println("出錯:" + ex.getMessage());
return "";
});
System.out.println(winRateFuture.get());
2. 使用handle()處理異常
除了exceptionally(),CompletableFuture也提供了handle()來處理異常。不過,與exceptionally()不同的是,當我們在調用鏈中使用了handle(),那么無論是否發生異常,都會調用它。所以,在handle()方法的內部,我們需要通過 if (ex != null) 來判斷是否發生了異常。
CompletableFuture < ? extends Serializable > winRateFuture = roundsFuture
.thenCombine(winRoundsFuture, (rounds, winRounds) -> {
if (rounds == 0) {
throw new RuntimeException("總場次錯誤");
}
DecimalFormat df = new DecimalFormat("0.00");
return df.format((float) winRounds / rounds);
}).handle((res, ex) -> {
if (ex != null) {
System.out.println("出錯:" + ex.getMessage());
return "";
}
return res;
});
System.out.println(winRateFuture.get());
當然,如果我們允許某個任務發生異常而不中斷整個調用鏈路,那么可以在其內部通過try-catch消化掉。