<optgroup id="6y7f6"><small id="6y7f6"></small></optgroup>

<code id="6y7f6"></code>

  • <p id="6y7f6"><tbody id="6y7f6"><ins id="6y7f6"></ins></tbody></p>
    <code id="6y7f6"><form id="6y7f6"></form></code>

      閉鎖、柵欄與異步編排

      無論是項目開發還是開源代碼閱讀,多線程都是不可或缺的一個重要知識點,基于這個考量,于是總結出本篇文章,討論閉鎖(CountDownLatch)、柵欄(CyclicBarrier)與異步編排(CompletableFuture)
      @Author:Akai-yuan
      @更新時間:2023/2/4

      1.CountDownLatch

      1.適用場景

      1. 協調子線程結束動作:等待所有子線程運行結束

      主線程創建了5個子線程,各子任務執行確認動作,期間主線程進入等待狀態,直到各子線程的任務均已經完成,主線程恢復繼續執行。

      1. 協調子線程開始動作:統一各線程動作開始的時機

      從多線程的角度看,這恰似你創建了一些多線程,但是你需要統一管理它們的任務開始時間。

      2.設計思想

      CountDownLatch基于一個同步器實現,并且只有CountDownLatch(int count)一個構造器,指定數量count不得在中途修改它。
      核心函數

      • await():等待latch降為0;
      • boolean await(long timeout, TimeUnit unit):等待latch降為0,但是可以設置超時時間。
      • countDown():latch數量減1;
      • getCount():獲取當前的latch數量。

      3.場景實例

      場景1. 對各子線程的等待

      public static void main(String[] args) throws InterruptedException {
          CountDownLatch countDownLatch = new CountDownLatch(4);
      
          Thread t1 = new Thread(countDownLatch::countDown);
          Thread t2 = new Thread(countDownLatch::countDown);
          Thread t3 = new Thread(countDownLatch::countDown);
          Thread t4 = new Thread(() -> {
              try {
                  // 稍等...
                  Thread.sleep(1500);
                  countDownLatch.countDown();
              } catch (InterruptedException ignored) {}
          });
      
          t1.start();
          t2.start();
          t3.start();
          t4.start();
      		//直到所有線程都對計數器進行減一后,這里才放行
          countDownLatch.await();
          System.out.println("所有子線程就位,可以繼續執行其他任務");
      }
      

      場景2. 對多線程的統一管理

      我們仍然用4個線程調用了start(),但是它們在運行時都在等待countDownLatch的信號,在信號未收到前,它們不會往下執行。

      public static void main(String[] args) throws InterruptedException {
          CountDownLatch countDownLatch = new CountDownLatch(1);
      
          Thread t1 = new Thread(() -> waitForCountDown(countDownLatch));
          Thread t2 = new Thread(() -> waitForCountDown(countDownLatch));
          Thread t3 = new Thread(() -> waitForCountDown(countDownLatch));
          Thread t4 = new Thread(() -> waitForCountDown(countDownLatch));
      
          t1.start();
          t2.start();
          t3.start();
          t4.start();
          Thread.sleep(1000);
          countDownLatch.countDown();
          System.out.println("所有線程準備完成");
      }
      
      private static void waitForCountDown(CountDownLatch countDownLatch) {
          try {
      			// 等待信號
              countDownLatch.await(); 
              System.out.println("本線程等待完畢");
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
      }
      

      輸出:

      所有線程準備完成
      本線程等待完畢
      本線程等待完畢
      本線程等待完畢
      本線程等待完畢
      
      Process finished with exit code 0
      

      場景3. SOFAJRaft的實踐

      // 定義一個CountDownLatch計數器
      private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
      
      public void start() {
          switch (workerStateUpdater.get(this)) {
              case WORKER_STATE_INIT:
                  if (workerStateUpdater.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
                      //此處調用工作線程執行CountDownLatch的countDown()方法
                      //即startTimeInitialized.countDown();
                      workerThread.start();
                  }
                  break;
              case WORKER_STATE_STARTED:
                  break;
              case WORKER_STATE_SHUTDOWN:
                  throw new IllegalStateException("cannot be started once stopped");
              default:
                  throw new Error("Invalid WorkerState");
          }
      
          // 等待startTime被工作線程初始化完成
          while (startTime == 0) {
              try {
                  startTimeInitialized.await();
              } catch (InterruptedException ignore) {
                  // Ignore - it will be ready very soon.
              }
          }
      }
      

      2.CyclicBarrier

      1.適用場景

      柵欄類似于閉鎖,它能阻塞一組線程直到某個事件的發生。柵欄與閉鎖的關鍵區別在于,所有的線程必須同時到達柵欄位置,才能繼續執行。閉鎖用于等待事件,而柵欄用于等待其他線程。
      CyclicBarrier與CountDownLatch的區別

      CyclicBarrier CountDownLatch
      CyclicBarrier是可重用的,其中的線程會等待所有的線程完成任務。屆時,屏障將被拆除,并可以選擇性地做一些特定的動作。 CountDownLatch是一次性的,不同的線程在同一個計數器上工作,直到計數器為0
      CyclicBarrier面向的是線程數 CountDownLatch面向的是任務數
      在使用CyclicBarrier時,你必須在構造中指定參與協作的線程數,這些線程必須調用await()方法 使用CountDownLatch時,則必須要指定任務數,至于這些任務由哪些線程完成無關緊要
      CyclicBarrier可以在所有的線程釋放后重新使用 CountDownLatch在計數器為0時不能再使用
      在CyclicBarrier中,如果某個線程遇到了中斷、超時等問題時,則處于await的線程都會出現問題 在CountDownLatch中,如果某個線程出現問題,其他線程不受影響

      2.設計思想

      1.構造器

      // 指定參與方的數量;
      public CyclicBarrier(int parties);
      // 指定參與方的數量,并指定在本代次結束時運行的代碼
      public CyclicBarrier(int parties, Runnable barrierAction):
      

      2.核心方法

      //如果當前線程不是第一個到達屏障的話,它將會進入等待,直到其他線程都到達
      //除非發生被中斷、屏障被拆除、屏障被重設等情況
      public int await();
      //和await()類似,但是加上了時間限制;
      public int await(long timeout, TimeUnit unit);
      //當前屏障是否被拆除;
      public boolean isBroken();
      //重設當前屏障。會先拆除屏障再設置新的屏障
      public void reset();
      //正在等待的線程數量
      public int getNumberWaiting();
      

      3.場景實例

      下面以一個簡單的日常對話來講解CyclicBarrier的使用實例

      private static String appointmentPlace = "書房";
      
      public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> System.out.println("yuan所在的地點:" + appointmentPlace));
          // 線程Akai
          Thread Akai = newThread("Akai", () -> {
          System.out.println("yuan,飯好了快來吃飯...");
          try {
            // 此時Akai在屏障前等待
            cyclicBarrier.await();
            System.out.println("yuan,你來了...");
            // 開始吃飯...
            Thread.sleep(2600); 
            System.out.println("好的,你去洗你的碗吧!");
            // 第二次調用await
            cyclicBarrier.await(); 
            Thread.sleep(100);
            System.out.println("好吧,你這個懶豬!");
          } catch (Exception e) {
            e.printStackTrace();
          }
        });
        // 線程yuan
        Thread yuan = newThread("yuan", () -> {
          try {
            // yuan在敲代碼
            Thread.sleep(500); 
            System.out.println("我在敲代碼,我馬上就來!");
            // yuan到達飯桌前
            cyclicBarrier.await(); 
            Thread.sleep(500); 
            System.out.println("Akai,不好意思,剛剛沉迷于敲代碼了!");
            // 開始吃飯...
            Thread.sleep(1500); 
            // yuan想先吃完趕快洗碗然后溜出去敲代碼
            System.out.println("我吃完了,我要去洗碗了"); 
            // yuan把地點改成了廚房
            appointmentPlace = "廚房"; 
            // 洗碗中...
            Thread.sleep(1500); 
            System.out.println("?yuan終于洗完自己的碗了");
            // 第二次調用await
            cyclicBarrier.await();
            System.out.println("Akai你吃完了,你的碗自己去洗吧,我已經在敲代碼了");
          } catch (Exception ignored) {}
        });
      
        Akai.start();
        yuan.start();
      }
      

      輸出結果:

      yuan,飯好了快來吃飯...
      我在敲代碼,我馬上就來!
      yuan所在的地點:書房
      yuan,你來了...
      Akai,不好意思,剛剛沉迷于敲代碼了!
      我吃完了,我要去洗碗了
      好的,你去洗你的碗吧!
      yuan終于洗完自己的碗了
      yuan所在的地點:廚房
      Akai你吃完了,你的碗自己去洗吧,我已經在敲代碼了
      好吧,你這個懶豬!    
      

      3.CompletableFuture

      1.設計思想

      1.Future的局限性

      • 并發執行多任務:Future只提供了get()方法來獲取結果,并且是阻塞的。所以,除了等待你別無他法;
      • 無法對多個任務進行鏈式調用:如果你希望在計算任務完成后執行特定動作,比如發郵件,但Future卻沒有提供這樣的能力;
      • 無法組合多個任務:如果你運行了10個任務,并期望在它們全部執行結束后執行特定動作,那么在Future中這是無能為力的;
      • 沒有異常處理:Future接口中沒有關于異常處理的方法;

      2.Completable有哪些優勢

      CompletableFuture是Future接口的擴展和增強。
      CompletableFuture完整地繼承了Future接口,并在此基礎上進行了豐富地擴展,完美地彌補了Future上述的種種問題。更為重要的是,CompletableFuture實現了對任務的編排能力。借助這項能力,我們可以輕松地組織不同任務的運行順序、規則以及方式。
      從某種程度上說,這項能力是它的核心能力。而在以往,雖然通過CountDownLatch等工具類也可以實現任務的編排,但需要復雜的邏輯處理,不僅耗費精力且難以維護。

      2.核心設計

      我們首先來討論CompletableFuture的核心:CompletionStage
      顧名思義,根據CompletionStage名字中的"Stage",你可以把它理解為任務編排中的步驟。步驟,即任務編排的基本單元,它可以是一次純粹的計算或者是一個特定的動作。在一次編排中,會包含多個步驟,這些步驟之間會存在依賴、鏈式組合等不同的關系,也存在并行串行的關系。這種關系,類似于Pipeline或者流式計算。
      既然是編排,就需要維護任務的創建、建立計算關系。為此,CompletableFuture提供了多達50多個方法,但沒有必要全部完全理解,但我們可以通過分類的方式簡化對方法的理解,理解了類型變種,基本上我們也就掌握了CompletableFuture的核心能力。
      這些方法可以總結為以下四類,其他大部分方法都是基于這四種類型的變種:

      3.核心用法

      1.runAsync

      • runAsync()是CompletableFuture最常用的方法之一,它可以接收一個待運行的任務并返回一個CompletableFuture
      • 當我們想異步運行某個任務時,在以往需要手動實現Thread或者借助Executor實現。而通過runAsync()`就簡單多了。比如,我們可以直接傳入Runnable類型的任務:
      CompletableFuture.runAsync(new Runnable() {
          @Override
          public void run() {
              System.out.println("something");
          }
      });
      

      2.supply與supplyAsync

      • 所謂supply表示提供結果,換句話說當我們使用supply()時,就表明我們會返回一個結果,并且這個結果可以被后續的任務所使用。
      // 創建nameFuture,返回姓名
      CompletableFuture <String> nameFuture = CompletableFuture.supplyAsync(() -> {
          return "Akai-yuan";
      });
      
       // 使用thenApply()接收nameFuture的結果,并執行回調動作
      CompletableFuture <String> sayLoveFuture = nameFuture.thenApply(name -> {
          return "love you," + name;
      });
      
      //阻塞獲得表白的結果
      System.out.println(sayLoveFuture.get()); // love you,Akai-yuan
      

      一旦理解了supply()的含義,它也就如此簡單。如果你希望用新的線程運行任務,可以使用supplyAsync().

      3.thenApply與thenApplyAsync

      • 我們已經知道supply()是用于提供結果的,并且順帶提了thenApply()。很明顯,thenApply()supply()的搭檔,用于接收supply()的執行結果,并執行特定的代碼邏輯,最后返回CompletableFuture結果。
       // 使用thenApply()接收nameFuture的結果,并執行回調動作
      CompletableFuture <String> sayLoveFuture = nameFuture.thenApply(name -> {
          return "愛你," + name;
      });
      
      public <U> CompletableFuture <U> thenApplyAsync(
          Function <? super T, ? extends U> fn) {
          return uniApplyStage(null, fn);
      }
      

      4.thenAccept與thenAcceptAsync

      作為supply()的檔案,thenApply()并不是唯一的存在,thenAccept()也是。但與thenApply()不同,thenAccept()只接收數據,但不會返回,它的返回類型是Void.

      CompletableFuture<Void> sayLoveFuture = nameFuture.thenAccept(name -> {
           System.out.println("愛你," + name);
      });
              
      public CompletableFuture < Void > thenAccept(Consumer < ? super T > action) {
          return uniAcceptStage(null, action);
      }
      

      5.thenRun

      thenRun()就比較簡單了,不接收任務的結果,只運行特定的任務,并且也不返回結果。

      public CompletableFuture < Void > thenRun(Runnable action) {
         return uniRunStage(null, action);
      }
      

      所以,如果你在回調中不想返回任何的結果,只運行特定的邏輯,那么你可以考慮使用thenAcceptthenRun一般來說,這兩個方法會在調用鏈的最后面使用。

      6.thenCompose與 thenCombine

      以上幾種方法都是各玩各的,但thenCompose()與thenCombine()就不同了,它們可以實現對依賴非依賴兩種類型的任務的編排。
      編排兩個存在依賴關系的任務
      在前面的例子中,在接收前面任務的結果時,我們使用的是thenApply(). 也就是說,sayLoveFuture在執行時必須依賴nameFuture的完成,否則執行個錘子。

      // 創建Future
      CompletableFuture <String> nameFuture = CompletableFuture.supplyAsync(() -> {
          return "Akai-yuan";
      });
      
       // 使用thenApply()接收nameFuture的結果,并執行回調動作
      CompletableFuture <String> sayLoveFuture = nameFuture.thenApply(name -> {
          return "愛你," + name;
      });
      

      但其實,除了thenApply()之外,我們還可以使用thenCompose()來編排兩個存在依賴關系的任務。比如,上面的示例代碼可以寫成:

      // 創建Future
      CompletableFuture <String> nameFuture = CompletableFuture.supplyAsync(() -> {
          return "Akai-yuan";
      });
      
      CompletableFuture<String> sayLoveFuture2 = nameFuture.thenCompose(name -> {
          return CompletableFuture.supplyAsync(() -> "愛你," + name);
      });
      

      可以看到,thenCompose()和thenApply()的核心不同之處在于它們的返回值類型

      • thenApply():返回計算結果的原始類型,比如返回String;
      • thenCompose():返回CompletableFuture類型,比如返回CompletableFuture.

      組合兩個相互獨立的任務
      考慮一個場景,當我們在執行某個任務時,需要其他任務就緒才可以,應該怎么做?這樣的場景并不少見,我們可以使用前面學過的并發工具類實現,也可以使用thenCombine()實現。
      舉個例子,當我們計算某個勝率時,我們需要獲取她參與的總場次(rounds),以及獲勝的場次(winRounds),然后再通過winRounds / rounds來計算。對于這個計算,我們可以這么做:

      CompletableFuture < Integer > roundsFuture = CompletableFuture.supplyAsync(() -> 500);
      CompletableFuture < Integer > winRoundsFuture = CompletableFuture.supplyAsync(() -> 365);
      
      CompletableFuture < Object > winRateFuture = roundsFuture
          .thenCombine(winRoundsFuture, (rounds, winRounds) -> {
              if (rounds == 0) {
                  return 0.0;
              }
              DecimalFormat df = new DecimalFormat("0.00");
              return df.format((float) winRounds / rounds);
          });
      System.out.println(winRateFuture.get());
      

      thenCombine()將另外兩個任務的結果同時作為參數,參與到自己的計算邏輯中。在另外兩個參數未就緒時,它將會處于等待狀態。

      7.allOf與anyOf

      allOf()與anyOf()也是一對孿生兄弟,當我們需要對多個Future的運行進行組織時,就可以考慮使用它們:

      • allOf():給定一組任務,等待所有任務執行結束;
      • anyOf():給定一組任務,等待其中任一任務執行結束。

      allOf()與anyOf()的方法簽名如下:

      static CompletableFuture<Void>	 allOf(CompletableFuture<?>... cfs)
      static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
      

      需要注意的是,anyOf()將返回完任務的執行結果,但是allOf()不會返回任何結果,它的返回值是Void.
      allOf()與anyOf()的示例代碼如下所示。我們創建了roundsFuture和winRoundsFuture,并通過sleep模擬它們的執行時間。在執行時,winRoundsFuture將會先返回結果,所以當我們調用 CompletableFuture.anyOf時也會發現輸出的是365.

       CompletableFuture < Integer > roundsFuture = CompletableFuture.supplyAsync(() -> {
         try {
           Thread.sleep(200);
           return 500;
         } catch (InterruptedException e) {
           return null;
         }
       });
       CompletableFuture < Integer > winRoundsFuture = CompletableFuture.supplyAsync(() -> {
         try {
           Thread.sleep(100);
           return 365;
         } catch (InterruptedException e) {
           return null;
         }
       });
      
       CompletableFuture < Object > completedFuture = CompletableFuture.anyOf(winRoundsFuture, roundsFuture);
       System.out.println(completedFuture.get()); // 返回365
      
       CompletableFuture < Void > completedFutures = CompletableFuture.allOf(winRoundsFuture, roundsFuture);
      

      在CompletableFuture之前,如果要實現所有任務結束后執行特定的動作,我們可以考慮CountDownLatch等工具類?,F在,則多了一選項,我們也可以考慮使用CompletableFuture.allOf.

      8.異常處理

      在CompletableFuture鏈式調用中,如果某個任務發生了異常,那么后續的任務將都不會再執行。對于異常,我們有兩種處理方式:exceptionally()handle().

      1.使用exceptionally()回調處理異常

      在鏈式調用的尾部使用exceptionally(),捕獲異常并返回錯誤情況下的默認值。需要注意的是,exceptionally()僅在發生異常時才會調用。

      CompletableFuture < ? extends Serializable > winRateFuture = roundsFuture
          .thenCombine(winRoundsFuture, (rounds, winRounds) -> {
              if (rounds == 0) {
                  throw new RuntimeException("總場次錯誤");
              }
              DecimalFormat df = new DecimalFormat("0.00");
              return df.format((float) winRounds / rounds);
          }).exceptionally(ex -> {
              System.out.println("出錯:" + ex.getMessage());
              return "";
          });
      System.out.println(winRateFuture.get());
      
      2. 使用handle()處理異常

      除了exceptionally(),CompletableFuture也提供了handle()來處理異常。不過,與exceptionally()不同的是,當我們在調用鏈中使用了handle(),那么無論是否發生異常,都會調用它。所以,在handle()方法的內部,我們需要通過 if (ex != null) 來判斷是否發生了異常。

      CompletableFuture < ? extends Serializable > winRateFuture = roundsFuture
          .thenCombine(winRoundsFuture, (rounds, winRounds) -> {
              if (rounds == 0) {
                  throw new RuntimeException("總場次錯誤");
              }
              DecimalFormat df = new DecimalFormat("0.00");
              return df.format((float) winRounds / rounds);
          }).handle((res, ex) -> {
              if (ex != null) {
                  System.out.println("出錯:" + ex.getMessage());
                  return "";
              }
              return res;
          });
      System.out.println(winRateFuture.get());
      

      當然,如果我們允許某個任務發生異常而不中斷整個調用鏈路,那么可以在其內部通過try-catch消化掉。

      posted @ 2023-02-04 23:55  Akai-yuan  閱讀(148)  評論(0編輯  收藏  舉報
      欧洲黄色网页链接入口,免费A级毛片无码无遮挡久久影院,a免费黄色网址,国产一级黄色的网站
      <optgroup id="6y7f6"><small id="6y7f6"></small></optgroup>

      <code id="6y7f6"></code>

    1. <p id="6y7f6"><tbody id="6y7f6"><ins id="6y7f6"></ins></tbody></p>
      <code id="6y7f6"><form id="6y7f6"></form></code>