<optgroup id="6y7f6"><small id="6y7f6"></small></optgroup>

<code id="6y7f6"></code>

  • <p id="6y7f6"><tbody id="6y7f6"><ins id="6y7f6"></ins></tbody></p>
    <code id="6y7f6"><form id="6y7f6"></form></code>

      通過一個示例形象地理解C# async await 非并行異步、并行異步、并行異步的并發量控制

      前言

      接上一篇 通過一個示例形象地理解C# async await異步
      我在 .NET與大數據 中吐槽前同事在雙層循環體中(肯定是單線程了)頻繁請求es,導致接口的總耗時很長。這不能怪前同事,確實難寫,會使代碼復雜度增加。
      評論區有人說他的理解是使用異步增加了系統吞吐能力,這個理解是正確的,但對于單個接口的單次請求而言,它是單線程的,耗時反而可能比同步還慢。如何縮短單個接口的單次請求的時間呢(要求:盡量不增加代碼復雜度)?請看下文。

      注意:在本文最后補充了最佳實踐

      示例的測試步驟

      先直接測試,看結果,下面再放代碼

      1. 點擊VS2022的啟動按鈕,啟動程序,它會先啟動Server工程,再啟動AsyncAwaitDemo2工程
      2. 分別點擊三個button
      3. 觀察思考輸出結果

      測試截圖

      非并行異步(順序執行的異步)


      截圖說明:單次請求耗時約0.5秒,共10次請求,耗時約 0.5秒×10=5秒

      并行異步


      截圖說明:單次請求耗時約0.5秒,共10次請求,耗時約 0.5秒

      并行異步(控制并發數量)


      截圖說明:單次請求耗時約0.5秒,共10次請求,并發數是5,耗時約 0.5秒×10÷5=1秒

      服務端
      服務端和客戶端是兩個獨立的工程,測試時在一起跑,但其實可以分開部署,部署到不同的機器上
      服務端是一個web api接口,用.NET 6、VS2022開發,代碼如下:

      [ApiController]
      [Route("[controller]")]
      public class TestController : ControllerBase
      {
          [HttpGet]
          [Route("[action]")]
          public async Task<Dictionary<int, int>> Get(int i)
          {
              var result = new Dictionary<int, int>();
      
              await Task.Delay(500); //模擬耗時操作
      
              if (i == 0)
              {
                  result.Add(0, 5);
                  result.Add(1, 4);
                  result.Add(2, 3);
                  result.Add(3, 2);
                  result.Add(4, 1);
              }
              else if (i == 1)
              {
                  result.Add(0, 10);
                  result.Add(1, 9);
                  result.Add(2, 8);
                  result.Add(3, 7);
                  result.Add(4, 6);
              }
      
              return result;
          }
      }
      

      客戶端
      大家看客戶端代碼時,不需要關心服務端怎么寫
      客戶端是一個Winform工程,用.NET 6、VS2022開發,代碼如下:

      public partial class Form1 : Form
      {
          private readonly string _url = "http://localhost:5028/Test/Get";
      
          public Form1()
          {
              InitializeComponent();
          }
      
          private async void Form1_Load(object sender, EventArgs e)
          {
              //預熱
              HttpClient httpClient = HttpClientFactory.GetClient();
              await (await httpClient.GetAsync(_url)).Content.ReadAsStringAsync();
          }
      
          //非并行異步(順序執行的異步)
          private async void button3_Click(object sender, EventArgs e)
          {
              await Task.Run(async () =>
              {
                  Log($"==== 非并行異步 開始,線程ID={Thread.CurrentThread.ManagedThreadId} ========================");
                  Stopwatch sw = Stopwatch.StartNew();
                  HttpClient httpClient = HttpClientFactory.GetClient();
                  var tasks = new Dictionary<string, Task<string>>();
                  StringBuilder sb = new StringBuilder();
                  for (int i = 0; i < 2; i++)
                  {
                      int sum = 0;
                      for (int j = 0; j < 5; j++)
                      {
                          Dictionary<int, int> dict = await RequestAsync(_url, i);
                          if (dict.ContainsKey(j))
                          {
                              int num = dict[j];
                              sum += num;
                              sb.Append($"{num}, ");
                          }
                      }
                      Log($"輸出:sum={sum}");
                  }
                  Log($"輸出:{sb}");
                  sw.Stop();
                  Log($"==== 結束,線程ID={Thread.CurrentThread.ManagedThreadId},耗時:{sw.Elapsed.TotalSeconds:0.000}秒 ========================");
              });
          }
      
          // 并行異步
          private async void button4_Click(object sender, EventArgs e)
          {
              await Task.Run(async () =>
              {
                  Log($"==== 并行異步 開始,線程ID={Thread.CurrentThread.ManagedThreadId} ========================");
                  Stopwatch sw = Stopwatch.StartNew();
                  HttpClient httpClient = HttpClientFactory.GetClient();
                  var tasks = new Dictionary<string, Task<Dictionary<int, int>>>();
                  StringBuilder sb = new StringBuilder();
                  //雙層循環寫第一遍
                  for (int i = 0; i < 2; i++)
                  {
                      for (int j = 0; j < 5; j++)
                      {
                          var task = RequestAsync(_url, i);
                          tasks.Add($"{i}_{j}", task);
                      }
                  }
                  //雙層循環寫第二遍
                  for (int i = 0; i < 2; i++)
                  {
                      int sum = 0;
                      for (int j = 0; j < 5; j++)
                      {
                          Dictionary<int, int> dict = await tasks[$"{i}_{j}"];
                          if (dict.ContainsKey(j))
                          {
                              int num = dict[j];
                              sum += num;
                              sb.Append($"{num}, ");
                          }
                      }
                      Log($"輸出:sum={sum}");
                  }
                  Log($"輸出:{sb}");
                  sw.Stop();
                  Log($"==== 結束,線程ID={Thread.CurrentThread.ManagedThreadId},耗時:{sw.Elapsed.TotalSeconds:0.000}秒 ========================");
              });
          }
      
          // 并行異步(控制并發數量)
          private async void button5_Click(object sender, EventArgs e)
          {
              await Task.Run(async () =>
              {
                  Log($"==== 并行異步(控制并發數量) 開始,線程ID={Thread.CurrentThread.ManagedThreadId} ===================");
                  Stopwatch sw = Stopwatch.StartNew();
                  HttpClient httpClient = HttpClientFactory.GetClient();
                  var tasks = new Dictionary<string, Task<Dictionary<int, int>>>();
                  Semaphore sem = new Semaphore(5, 5);
                  StringBuilder sb = new StringBuilder();
                  //雙層循環寫第一遍
                  for (int i = 0; i < 2; i++)
                  {
                      for (int j = 0; j < 5; j++)
                      {
                          var task = RequestAsync(_url, i, sem);
                          tasks.Add($"{i}_{j}", task);
                      }
                  }
                  //雙層循環寫第二遍
                  for (int i = 0; i < 2; i++)
                  {
                      int sum = 0;
                      for (int j = 0; j < 5; j++)
                      {
                          Dictionary<int, int> dict = await tasks[$"{i}_{j}"];
                          if (dict.ContainsKey(j))
                          {
                              int num = dict[j];
                              sum += num;
                              sb.Append($"{num}, ");
                          }
                      }
                      Log($"輸出:sum={sum}");
                  }
                  sem.Dispose(); //別忘了釋放
                  Log($"輸出:{sb}");
                  sw.Stop();
                  Log($"==== 結束,線程ID={Thread.CurrentThread.ManagedThreadId},耗時:{sw.Elapsed.TotalSeconds:0.000}秒 ========================");
              });
          }
      
          private async Task<Dictionary<int, int>> RequestAsync(string url, int i)
          {
              Stopwatch sw = Stopwatch.StartNew();
              HttpClient httpClient = HttpClientFactory.GetClient();
              var result = await (await httpClient.GetAsync($"{url}?i={i}")).Content.ReadAsStringAsync();
              sw.Stop();
              Log($"線程ID={Thread.CurrentThread.ManagedThreadId},請求耗時:{sw.Elapsed.TotalSeconds:0.000}秒");
              return JsonSerializer.Deserialize<Dictionary<int, int>>(result);
          }
      
          private async Task<Dictionary<int, int>> RequestAsync(string url, int i, Semaphore semaphore)
          {
              semaphore.WaitOne();
              try
              {
                  Stopwatch sw = Stopwatch.StartNew();
                  HttpClient httpClient = HttpClientFactory.GetClient();
                  var result = await (await httpClient.GetAsync($"{url}?i={i}")).Content.ReadAsStringAsync();
                  sw.Stop();
                  Log($"線程ID={Thread.CurrentThread.ManagedThreadId},請求耗時:{sw.Elapsed.TotalSeconds:0.000}秒");
                  return JsonSerializer.Deserialize<Dictionary<int, int>>(result);
              }
              catch (Exception ex)
              {
                  Log($"錯誤:{ex}");
                  throw;
              }
              finally
              {
                  semaphore.Release();
              }
          }
      
          #region Log
          private void Log(string msg)
          {
              msg = $"{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff")}  {msg}\r\n";
      
              if (this.InvokeRequired)
              {
                  this.BeginInvoke(new Action(() =>
                  {
                      txtLog.AppendText(msg);
                  }));
              }
              else
              {
                  txtLog.AppendText(msg);
              }
          }
          #endregion
      
          private void button6_Click(object sender, EventArgs e)
          {
              txtLog.Text = string.Empty;
          }
      }
      

      思考

      1. 使用Semaphore的注意事項

      1. 如果是Winform程序,可以在button事件方法中定義它的局部變量。如果是WebAPI接口服務,請在接口方法中定義Semaphore的局部變量。注意,別定義成全局的,或者定義成靜態的,或者定義成Controller的成員變量,那樣會嚴重限制使用它的接口的吞吐能力!
      2. 用完調用Dispose釋放

      2. 盡量不增加代碼復雜度

      請思考代碼中的注釋"雙層循環寫第一遍""雙層循環寫第二遍",這個寫法盡量不增加代碼復雜度,試想一下,如果你用Task.Run,且不說占用線程,就問你怎么寫能簡單?
      有人說,這題我會,這樣寫不就行了:

      Dictionary<int, int>[] result = await Task.WhenAll(tasks.Values);
      

      那請問,你接下來怎么寫?我相信你肯定會寫,但問題是,代碼的邏輯結構變了,代碼復雜度增加了!
      所以"雙層循環寫第一遍""雙層循環寫第二遍"是什么意思?你即能方便合并,又能方便拆分,代碼邏輯結構沒變,只是復制了一份。

      3. RequestAsync的復雜度可控

      RequestAsync的復雜度并沒有因為Semaphore的引入變得更復雜,增加的代碼可以接受。

      我寫這篇博客不只是寫個Demo,我確實有實際項目中的問題需要解決,代碼如下:

      WebAPI的Controller層:

      [HttpPost]
      [Route("[action]")]
      public async Task<List<NightActivitiesResultItem>> Get([FromBody] NightActivitiesPostData data)
      {
          return await ServiceFactory.Get<NightActivitiesService>().Get(data.startDate, data.endDate, data.startTime, data.endTime, data.threshold, data.peopleClusters);
      }
      

      WebAPI的Service層:

      public async Task<List<NightActivitiesResultItem>> Get(string strStartDate, string strEndDate, string strStartTime, string strEndTime, decimal threshold, List<PeopleCluster> peopleClusterList)
      {
          List<NightActivitiesResultItem> result = new List<NightActivitiesResultItem>();
      
          DateTime startDate = DateTime.ParseExact(strStartDate, "yyyyMMdd", CultureInfo.InvariantCulture);
          DateTime endDate = DateTime.ParseExact(strEndDate, "yyyyMMdd", CultureInfo.InvariantCulture);
          string[][] strTimes;
          if (string.Compare(strStartTime, strEndTime) > 0)
          {
              strTimes = new string[2][] { new string[2], new string[2] };
              strTimes[0][0] = strStartTime;
              strTimes[0][1] = "235959";
              strTimes[1][0] = "000000";
              strTimes[1][1] = strEndTime;
          }
          else
          {
              strTimes = new string[1][] { new string[2] };
              strTimes[0][0] = strStartTime;
              strTimes[0][1] = strEndTime;
          }
      
          foreach (PeopleCluster peopleCluster in peopleClusterList)
          {
              for (DateTime day = startDate; day <= endDate; day = day.AddDays(1))
              {
                  string strDate = day.ToString("yyyyMMdd");
                  int sum = 0;
                  foreach (string[] timeArr in strTimes)
                  {
                      List<PeopleFeatureAgg> list = await ServiceFactory.Get<PeopleFeatureQueryService>().QueryAgg(strDate + timeArr[0], strDate + timeArr[1], peopleCluster.ClusterIds);
                      Dictionary<string, int> agg = list.ToLookup(a => a.ClusterId).ToDictionary(a => a.Key, a => a.First().Count);
      
                      foreach (string clusterId in peopleCluster.ClusterIds)
                      {
                          if (agg.TryGetValue(clusterId, out int count))
                          {
                              sum += count;
                          }
                      }
                  }
                  if (sum >= threshold) //大于或等于閾值
                  {
                      NightActivitiesResultItem item = new NightActivitiesResultItem();
                      item.peopleCluster = peopleCluster;
                      item.date = strDate;
                      item.count = sum;
                      foreach (string[] timeArr in strTimes)
                      {
                          PeopleFeatureQueryResult featureList = await ServiceFactory.Get<PeopleFeatureQueryService>().Query(strDate + timeArr[0], strDate + timeArr[1], peopleCluster.ClusterIds, 10000);
                          item.list.AddRange(featureList.list);
                      }
                      item.dataType = "xxx";
                      result.Add(item);
                  }
              }
          }
      
          var clusters = result.ConvertAll<PeopleCluster>(a => a.peopleCluster);
          await ServiceFactory.Get<PersonScoreService>().Set(OpeType.Xxx, peopleClusterList, clusters, startDate.ToString("yyyyMMddHHmmss"), endDate.ToString("yyyyMMddHHmmss"));
      
          return result;
      }
      

      思考

      上述接口代碼,它有三層循環,在第三層循環體中await,第一層循環的數量會達到1000甚至10000,第二層循環的數量會達到30(一個月30天),甚至90(三個月),第三層循環的數量很少。
      那么總請求次數會達到3萬甚至90萬,如果不使用并行異步請求,那耗時將會很長。

      請問:在盡量不增加代碼復雜度的前提下,怎么優化,能縮短該服務接口的執行時間?

      我知道肯定有人要說我了,你傻啊,請求3萬次?你可以改寫一下,只請求一次,或者按天來,每天的數據只請求一次,那最多也才90次。然后在內存中計算,這不就快了?
      確實是這樣的,確實不應該請求3萬次。但問題沒這么簡單:

      1. 且不說代碼的復雜度,你寫的不是一個接口,可能會有幾十個這樣的接口要寫,復雜度增加一點這么多接口都要寫死人。
      2. 這3萬請求,可都是精確查詢,es強大的緩存機制,肯定會命中緩存,也就是這些請求實際上基本是直接從內存中拿數據,連遍歷集合都不需要,直接命中索引。只是網絡往返次數太多。
      3. 這1次請求,或30次請求,對es來說,變成了范圍查詢,es要遍歷,要給你查詢并組織數據,返回集合給你。當然es集群的運算速度肯定很快。
      4. 這1次請求,或30次請求,結果返回后,你就要在內存中計算了,有的接口我就是這樣寫的,但要多寫代碼,比如在內存中計算,為了提高效率,先創建字典,相當于建索引。
      5. 只是邏輯復雜了嗎?你還要多定義一些臨時的變量??!還可能要多定義一些實體類,哪怕是匿名對象。
      6. 代碼寫著寫著就變懶了,對于每個接口,先組織好數據,再進行1次請求,然后在內存中再遍歷再計算,心智負擔好重
      7. 我在網上看到es集群默認最多支持10000個并發查詢,需要請求es的業務程序肯定不止一個,對一個業務程序而言,確實要控制并發量
      8. 根據我的觀察,一個WebAPI程序,線程數一般也就幾十,多的時候上百,在沒有異步的時候,并發請求數量實際上受限于物理線程。
      9. 使用異步之后,并發請求數量實際上受限于虛擬線程。確實會增加請求es的并發數量,壓力大的時候,這個并發數量可能會很大。

      怎么查看并發請求數

      windows的cmd命令:
      netstat -ano | findstr 5028

      還有兩個問題,博客中沒有體現

      1. 客戶端程序執行請求時,客戶端線程數量

      通過任務管理器查看,非并行異步,線程數很少,請求開始后只增加了一兩個線程。并行異步線程數較多。并行異步控制并發數量,線程數少很多。

      2. Semaphore會阻塞當前線程

      semaphore.WaitOne()阻塞線程一直阻塞到semaphore.Release(),使用了Semaphore的接口,被請求一次,阻塞一個線程,不過問題不是很大。

      思考

      .NET只有一個CLR線程池和一個異步線程池(完成端口線程池),當線程池中線程數量不夠用時,.NET每秒才增加1到2個線程,線程增加的速度非常緩慢。結合異步,考慮一下這是為什么?
      我認為(不一定對):

      1. 異步不需要大量物理線程,少量即可
      2. 如果線程增加速度很快,以異步的吞吐量,怕不是要把es請求掛!因為并發請求數太多了。

      總結

      1. 并行異步,會有并發量太大,導致諸如數據庫或者es集群抗不住的問題,謹慎使用。
      2. 并行異步(控制并發數量),這個目前是最佳實踐。

      完整測試源碼

      注意是AsyncParallel分支
      https://gitee.com/s0611163/AsyncAwaitDemo2/tree/AsyncParallel/

      最后

      上述我寫的實際接口,可優化也可不優化,耗時長沒有問題,還有很多服務接口,它們通過定時任務在凌晨錯開時間跑,結果存儲在數據庫中供前端查詢。這是離線分析。
      前同事寫的接口是實時的,所以他覺得es慢了,如果只請求一次呢,可能es的查詢語句也不好寫,所以用ClickHouse,利用SQL靈活性,只查詢一次,然后在內存中計算。

      后續

      又寫了個測試程序,測試大量請求,并限制請求并發量。
      注意是AsyncParallel2分支
      https://gitee.com/s0611163/AsyncAwaitDemo2/tree/AsyncParallel2/

      怎么測試?

      1. 啟動服務端后,再啟動客戶端
      2. 點擊第一個按鈕,觀察輸出,打開Windows的資源管理器,查看Server.exe進程和AsyncAwaitDemo2.exe進程的線程數量,然后客戶端可以關了,因為跑完至少要半小時。
      3. 點擊第二個按鈕,觀察輸出,打開Windows的資源管理器同上,觀察工作線程數和異步線程數占用,能看到數據明顯變化,大概幾秒后就可以跑完。
      4. 點擊第三個按鈕,觀察輸出,打開Windows的資源管理器同上,觀察工作線程數和異步線程數占用,能看到數據明顯變化,大概20秒能跑完。0.065秒/每次請求×3萬次請求÷100并發量≈20秒。

      注意觀察服務端線程數量

      1. 并行異步請求
        并行異步請求時,請求3萬次只需要幾秒,第二次點擊需要的時間更短,僅需大約2.5秒。注意觀察服務端線程數量,50不到!
        我把服務端修改成同步接口,客戶端代碼不動,試了一下,3萬個請求,客戶端報異常:由于目標計算機積極拒絕,無法連接。
        我把服務端的線程池改大一些,ThreadPool.SetMinThreads(200, 200),客戶還是報異常:遠程主機強迫關閉了一個現有的連接。

      2. 并行異步請求(控制并發數量)
        3萬次請求,耗時大約60秒,很顯然服務端的吞吐量較低。
        把服務端的線程池改大一些,ThreadPool.SetMinThreads(200, 200),可以達到異步接口同樣的吞吐量。

      后續:使用Parallel.ForEachAsync實現異步方法的并行執行(最佳實踐)

      private async void button4_Click(object sender, EventArgs e)
      {
          await Task.Run(async () =>
          {
              Log($"==== 并行異步 開始,線程ID={Thread.CurrentThread.ManagedThreadId} ========================");
              Stopwatch sw = Stopwatch.StartNew();
              HttpClient httpClient = HttpClientFactory.GetClient();
              var tasks = new Dictionary<string, Task<Dictionary<int, int>>>();
              ConcurrentQueue<string> strs = new ConcurrentQueue<string>();
      
              await Parallel.ForEachAsync(Enumerable.Range(0, m), new ParallelOptions() { MaxDegreeOfParallelism = 100 }, async (i, c) =>
              {
                  int sum = 0;
                  await Parallel.ForEachAsync(Enumerable.Range(0, n), new ParallelOptions() { MaxDegreeOfParallelism = 30 }, async (j, c) =>
                  {
                      Dictionary<int, int> dict = await RequestAsync(_url, i);
                      if (dict.ContainsKey(j))
                      {
                          int num = dict[j];
                          Interlocked.Exchange(ref sum, sum + num);
                          strs.Enqueue($"{num}");
                      }
                  });
                  Log($"輸出:sum={sum}");
              });
      
              Log($"輸出:{string.Join(",", strs.ToArray())}");
              sw.Stop();
              Log($"==== 結束,線程ID={Thread.CurrentThread.ManagedThreadId},耗時:{sw.Elapsed.TotalSeconds:0.000}秒 ========================");
          });
      }
      

      上述代碼說明

      1. 在并行執行的異步方法中操作集合,要使用線程安全的集合:
      ConcurrentQueue<string> strs = new ConcurrentQueue<string>();
      
      1. 在并行執行的異步方法中計算數量,要使用Interlocked:
      Interlocked.Exchange(ref sum, sum + num);
      
      posted @ 2023-02-04 11:58  0611163  閱讀(1319)  評論(2編輯  收藏  舉報
      欧洲黄色网页链接入口,免费A级毛片无码无遮挡久久影院,a免费黄色网址,国产一级黄色的网站
      <optgroup id="6y7f6"><small id="6y7f6"></small></optgroup>

      <code id="6y7f6"></code>

    1. <p id="6y7f6"><tbody id="6y7f6"><ins id="6y7f6"></ins></tbody></p>
      <code id="6y7f6"><form id="6y7f6"></form></code>