目錄
一、線程的定義
二、線程的基礎知識
三、以ThreadStart方式實現(xiàn)多線程
四、CLR線程池的工作者線程
五、CLR線程池的I/O線程
六、異步 SqlCommand
七、并行編程與PLINQ
八、計時器與鎖
七、并行編程與PLINQ
要使用多線程開發(fā),必須非常熟悉Thread的使用,而且在開發(fā)過程中可能會面對很多未知的問題。為了簡化開發(fā),.NET 4.0 特別提供一個并行編程庫System.Threading.Tasks,它可以簡化并行開發(fā),你無需直接跟線程或線程池打交道,就可以簡單建立多線程應用 程序。此外,.NET還提供了新的一組擴展方法PLINQ,它具有自動分析查詢功能,如果并行查詢能提高系統(tǒng)效率,則同時運行,如果查詢未能從并行查詢中 受益,則按原順序查詢。下面將詳細介紹并行操作的方式。
7.1 泛型委托
使用并行編程可以同時操作多個委托,在介紹并行編程前先簡單介紹一下兩個泛型委托System.Func<>與System.Action<>。
Func<>是一個能接受多個參數(shù)和一個返回值的泛型委托,它能接受0個到4個輸入?yún)?shù), 其中 T1,T2,T3,T4 代表自定的輸入類型,TResult為自定義的返回值。
public delegate TResult Func<TResult>()
public delegate TResult Func<T1,TResult>(T1 arg1)
public delegate TResult Func<T1,T2, TResult>(T1 arg1,T2 arg2)
public delegate TResult Func<T1,T2, T3, TResult>(T1 arg1,T2 arg2,T3 arg3)
public delegate TResult Func<T1,T2, T3, ,T4, TResult>(T1 arg1,T2 arg2,T3 arg3,T4 arg4)
Action<>與Func<>十分相似,不同在于Action<>的返回值為void,Action能接受1~16個參數(shù)
public delegate void Action<T1>()
public delegate void Action<T1,T2>(T1 arg1,T2 arg2)
public delegate void Action<T1,T2, T3>(T1 arg1,T2 arg2, T3 arg3)
.............
public delegate void Action<T1,T2, T3, ,T4, ...... ,T16>(T1 arg1,T2 arg2,T3 arg3,T4 arg4,...... ,T16 arg16)
7.2 任務并行庫(TPL)
System.Threading.Tasks中的類被統(tǒng)稱為任務并行庫(Task Parallel Library,TPL),TPL使用CLR線程池把工作分配到CPU,并能自動處理工作分區(qū)、線程調度、取消支持、狀態(tài)管理以及其他低級別的細節(jié)操作,極大地簡化了多線程的開發(fā)。
注意:TPL比Thread更具智能性,當它判斷任務集并沒有從并行運行中受益,就會選擇按順序運行。但并非所有的項目都適合使用并行開發(fā),創(chuàng)建過多并行任務可能會損害程序的性能,降低運行效率。
TPL包括常用的數(shù)據(jù)并行與任務并行兩種執(zhí)行方式:
7.2.1 數(shù)據(jù)并行
數(shù)據(jù)并行的核心類就是System.Threading.Tasks.Parallel,它包含兩個靜態(tài)方法 Parallel.For 與 Parallel.ForEach, 使用方式與for、foreach相仿。通過這兩個方法可以并行處理System.Func<>、 System.Action<>委托。
以下一個例子就是利用 public static ParallelLoopResult For( int from, int max, Action<int>) 方法對List<Person>進行并行查詢。
假設使用單線程方式查詢3個Person對象,需要用時大約6秒,在使用并行方式,只需使用2秒就能完成查詢,而且能夠避開Thread的繁瑣處理。
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 //設置最大線程數(shù) 6 ThreadPool.SetMaxThreads(1000, 1000);
7 //并行查詢 8 Parallel.For(0, 3,n =>
9 {
10 Thread.Sleep(2000); //模擬查詢
11 ThreadPoolMessage(GetPersonList()[n]);
12 });
13 Console.ReadKey();
14 }
15
16 //模擬源數(shù)據(jù) 17 static IList<Person> GetPersonList()
18 {
19 var personList = new List<Person>();
20
21 var person1 = new Person();
22 person1.ID = 1;
23 person1.Name = "Leslie";
24 person1.Age = 30;
25 personList.Add(person1);
26 ...........
27 return personList;
28 }
29
30 //顯示線程池現(xiàn)狀 31 static void ThreadPoolMessage(Person person)
32 {
33 int a, b;
34 ThreadPool.GetAvailableThreads(out a, out b);
35 string message = string.Format("Person ID:{0} Name:{1} Age:{2}\n" +
36 " CurrentThreadId is {3}\n WorkerThreads is:{4}" +
37 " CompletionPortThreads is :{5}\n",
38 person.ID, person.Name, person.Age,
39 Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
40
41 Console.WriteLine(message);
42 }
43 }
觀察運行結果,對象并非按照原排列順序進行查詢,而是使用并行方式查詢。
若想停止操作,可以利用ParallelLoopState參數(shù),下面以ForEach作為例子。
public static ParallelLoopResult ForEach<TSource>( IEnumerable<TSource> source, Action<TSource, ParallelLoopState> action)
其中source為數(shù)據(jù) 集,在Action<TSource,ParallelLoopState>委托的ParallelLoopState參數(shù)當中包含有 Break()和 Stop()兩個方法都可以使迭代停止。Break的使用跟傳統(tǒng)for里面的使用方式相似,但因為處于并行處理當中,使用Break并不能保證所有運行能 立即停止,在當前迭代之前的迭代會繼續(xù)執(zhí)行。若想立即停止操作,可以使用Stop方法,它能保證立即終止所有的操作,無論它們是處于當前迭代的之前還是之 后。
class Program
{
static void Main(string[] args)
{
//設置最大線程數(shù) ThreadPool.SetMaxThreads(1000, 1000);
//并行查詢 Parallel.ForEach(GetPersonList(), (person, state) =>
{
if (person.ID == 2)
state.Stop();
ThreadPoolMessage(person);
});
Console.ReadKey();
}
//模擬源數(shù)據(jù) static IList<Person> GetPersonList()
{
var personList = new List<Person>();
var person1 = new Person();
person1.ID = 1;
person1.Name = "Leslie";
person1.Age = 30;
personList.Add(person1);
..........
return personList;
}
//顯示線程池現(xiàn)狀 static void ThreadPoolMessage(Person person)
{
int a, b;
ThreadPool.GetAvailableThreads(out a, out b);
string message = string.Format("Person ID:{0} Name:{1} Age:{2}\n" +
" CurrentThreadId is {3}\n WorkerThreads is:{4}" +
" CompletionPortThreads is :{5}\n",
person.ID, person.Name, person.Age,
Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
Console.WriteLine(message);
}
}
觀察運行結果,當Person的ID等于2時,運行將會停止。
當要在多個線程中調用本地變量,可以使用以下方法:
public static ParallelLoopResult ForEach<TSource, TLocal>(IEnumerable<Of TSource>, Func<Of TLocal>, Func<Of TSource,ParallelLoopState,TLocal,TLocal>, Action<Of TLocal>)
其中第一個參數(shù)為數(shù)據(jù)集;
第二個參數(shù)是一個Func委托,用于在每個線程執(zhí)行前進行初始化;
第 三個參數(shù)是委托Func<Of T1,T2,T3,TResult>,它能對數(shù)據(jù)集的每個成員進行迭代,當中T1是數(shù)據(jù)集的成員,T2是一個ParallelLoopState對 象,它可以控制迭代的狀態(tài),T3是線程中的本地變量;
第四個參數(shù)是一個Action委托,用于對每個線程的最終狀態(tài)進行最終操作。
在以下例子中,使用ForEach計算多個Order的總體價格。在ForEach方法中,首先把參數(shù)初始化為0f,然后用把同一個Order的多 個OrderItem價格進行累加,計算出Order的價格,最后把多個Order的價格進行累加,計算出多個Order的總體價格。
1 public class Order
2 {
3 public int ID;
4 public float Price;
5 }
6
7 public class OrderItem
8 {
9 public int ID;
10 public string Goods;
11 public int OrderID;
12 public float Price;
13 public int Count;
14 }
15
16 class Program
17 {
18 static void Main(string[] args)
19 {
20 //設置最大線程數(shù) 21 ThreadPool.SetMaxThreads(1000, 1000);
22 float totalPrice = 0f;
23 //并行查詢 24 var parallelResult = Parallel.ForEach(GetOrderList(),
25 () => 0f, //把參數(shù)初始值設為0 26 (order, state, orderPrice) =>
27 {
28 //計算單個Order的價格 29 orderPrice = GetOrderItem().Where(item => item.OrderID == order.ID)
30 .Sum(item => item.Price * item.Count);
31 order.Price = orderPrice;
32 ThreadPoolMessage(order);
33
34 return orderPrice;
35 },
36 (finallyPrice) =>
37 {
38 totalPrice += finallyPrice;//計算多個Order的總體價格 39 }
40 );
41
42 while (!parallelResult.IsCompleted)
43 Console.WriteLine("Doing Work!");
44
45 Console.WriteLine("Total Price is:" + totalPrice);
46 Console.ReadKey();
47 }
48 //虛擬數(shù)據(jù)
49 static IList<Order> GetOrderList()
50 {
51 IList<Order> orderList = new List<Order>();
52 Order order1 = new Order();
53 order1.ID = 1;
54 orderList.Add(order1);
55 ............
56 return orderList;
57 }
58 //虛擬數(shù)據(jù)
59 static IList<OrderItem> GetOrderItem()
60 {
61 IList<OrderItem> itemList = new List<OrderItem>();
62
63 OrderItem orderItem1 = new OrderItem();
64 orderItem1.ID = 1;
65 orderItem1.Goods = "iPhone 4S";
66 orderItem1.Price = 6700;
67 orderItem1.Count = 2;
68 orderItem1.OrderID = 1;
69 itemList.Add(orderItem1);
70 ...........
71 return itemList;
72 }
73
74 //顯示線程池現(xiàn)狀 75 static void ThreadPoolMessage(Order order)
76 {
77 int a, b;
78 ThreadPool.GetAvailableThreads(out a, out b);
79 string message = string.Format("OrderID:{0} OrderPrice:{1}\n" +
80 " CurrentThreadId is {2}\n WorkerThreads is:{3}" +
81 " CompletionPortThreads is:{4}\n",
82 order.ID, order.Price,
83 Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
84
85 Console.WriteLine(message);
86 }
87 }
運行結果
7.2.2 任務并行
在TPL當中還可以使用Parallel.Invoke方法觸發(fā)多個異步任務,其中 actions 中可以包含多個方法或者委托,parallelOptions用于配置Parallel類的操作。
public static void Invoke(Action[] actions )
public static void Invoke(ParallelOptions parallelOptions, Action[] actions )
下面例子中利用了Parallet.Invoke并行查詢多個Person,actions當中可以綁定方法、lambda表達式或者委托,注意綁定方法時必須是返回值為void的無參數(shù)方法。
class Program
{
static void Main(string[] args)
{
//設置最大線程數(shù) ThreadPool.SetMaxThreads(1000, 1000);
//任務并行 Parallel.Invoke(option,
PersonMessage,
()=>ThreadPoolMessage(GetPersonList()[1]),
delegate(){
ThreadPoolMessage(GetPersonList()[2]);
});
Console.ReadKey();
}
static void PersonMessage()
{
ThreadPoolMessage(GetPersonList()[0]);
}
//顯示線程池現(xiàn)狀 static void ThreadPoolMessage(Person person)
{
int a, b;
ThreadPool.GetAvailableThreads(out a, out b);
string message = string.Format("Person ID:{0} Name:{1} Age:{2}\n" +
" CurrentThreadId is {3}\n WorkerThreads is:{4}" +
" CompletionPortThreads is :{5}\n",
person.ID, person.Name, person.Age,
Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
Console.WriteLine(message);
}
//模擬源數(shù)據(jù) static IList<Person> GetPersonList()
{
var personList = new List<Person>();
var person1 = new Person();
person1.ID = 1;
person1.Name = "Leslie";
person1.Age = 30;
personList.Add(person1);
..........
return personList;
}
}
運行結果
7.3 Task簡介
以Thread創(chuàng)建的線程被默認為前臺線程,當然你可以把線程IsBackground屬性設置為true,但TPL為此提供了一個更簡單的類Task。
Task存在于System.Threading.Tasks命名空間當中,它可以作為異步委托的簡單替代品。
通過Task的Factory屬性將返回TaskFactory類,以TaskFactory.StartNew(Action)方法可以創(chuàng)建一個新線程,所創(chuàng)建的線程默認為后臺線程。
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 ThreadPool.SetMaxThreads(1000, 1000);
6 Task.Factory.StartNew(() => ThreadPoolMessage());
7 Console.ReadKey();
8 }
9
10 //顯示線程池現(xiàn)狀 11 static void ThreadPoolMessage()
12 {
13 int a, b;
14 ThreadPool.GetAvailableThreads(out a, out b);
15 string message = string.Format("CurrentThreadId is:{0}\n" +
16 "CurrentThread IsBackground:{1}\n" +
17 "WorkerThreads is:{2}\nCompletionPortThreads is:{3}\n",
18 Thread.CurrentThread.ManagedThreadId,
19 Thread.CurrentThread.IsBackground.ToString(),
20 a.ToString(), b.ToString());
21 Console.WriteLine(message);
22 }
23 }
運行結果
若要取消處理,可以利用CancellationTakenSource對象,在TaskFactory中包含有方法
public Task StartNew( Action action, CancellationToken cancellationToken )
在 方法中加入CancellationTakenSource對象的CancellationToken屬性,可以控制任務的運行,調用 CancellationTakenSource.Cancel時任務就會自動停止。下面以圖片下載為例子介紹一下TaskFactory的使用。
服務器端頁面
<html xmlns="http://www.w3.org/1999/xhtml">
<head runat="server">
<title></title>
<script type="text/C#" runat="server"> private static List<string> url=new List<string>(); protected void Page_Load(object sender, EventArgs e) { if (!Page.IsPostBack) { url.Clear(); Application["Url"] = null; } } protected void CheckBox_CheckedChanged(object sender, EventArgs e) { CheckBox checkBox = (CheckBox)sender; if (checkBox.Checked) url.Add(checkBox.Text); else url.Remove(checkBox.Text); Application["Url"]= url; } </script>
</head>
<body>
<form id="form1" runat="server" >
<div align="left">
<div align="center" style="float: left;">
<asp:Image ID="Image1" runat="server" ImageUrl="~/Images/A.jpg" /><br />
<asp:CheckBox ID="CheckBox1" runat="server" AutoPostBack="True" oncheckedchanged="CheckBox_CheckedChanged" Text="A.jpg" />
</div>
<div align="center" style="float: left">
<asp:Image ID="Image2" runat="server" ImageUrl="~/Images/B.jpg" /><br />
<asp:CheckBox ID="CheckBox2" runat="server" AutoPostBack="True" oncheckedchanged="CheckBox_CheckedChanged" Text="B.jpg" />
</div>
<div align="center" style="float: left">
<asp:Image ID="Image3" runat="server" ImageUrl="~/Images/C.jpg" /><br />
<asp:CheckBox ID="CheckBox3" runat="server" AutoPostBack="True" oncheckedchanged="CheckBox_CheckedChanged" Text="C.jpg" />
</div>
<div align="center" style="float: left">
<asp:Image ID="Image4" runat="server" ImageUrl="~/Images/D.jpg" /><br />
<asp:CheckBox ID="CheckBox4" runat="server" AutoPostBack="True" oncheckedchanged="CheckBox_CheckedChanged" Text="D.jpg" />
</div>
<div align="center" style="float: left">
<asp:Image ID="Image5" runat="server" ImageUrl="~/Images/E.jpg" /><br />
<asp:CheckBox ID="CheckBox5" runat="server" AutoPostBack="True" oncheckedchanged="CheckBox_CheckedChanged" Text="E.jpg" />
</div>
</div>
</form>
</body>
</html>
首先在服務器頁面中顯示多個*.jpg圖片,每個圖片都有對應的CheckBox檢測其選擇情況。
所選擇圖片的路徑會記錄在Application["Url"]當中傳遞到Handler.ashx當中。
注意:Application是一個全局變量,此處只是為了顯示Task的使用方式,在ASP.NET開發(fā)應該慎用Application。
Handler.ashx 處理圖片的下載,它從 Application["Url"] 當中獲取所選擇圖片的路徑,并把圖片轉化成byte[]二進制數(shù)據(jù)。
再把圖片的數(shù)量,每副圖片的二進制數(shù)據(jù)的長度記錄在OutputStream的頭部。
最后把圖片的二進制數(shù)據(jù)記入 OutputStream 一并輸出。
1 public class Handler : IHttpHandler
2 {
3 public void ProcessRequest(HttpContext context)
4 {
5 //獲取圖片名,把圖片數(shù)量寫OutputStream 6 List<String> urlList = (List<string>)context.Application["Url"];
7 context.Response.OutputStream.Write(BitConverter.GetBytes(urlList.Count), 0, 4);
8
9 //把圖片轉換成二進制數(shù)據(jù) 10 List<string> imageList = GetImages(urlList);
11
12 //把每副圖片長度寫入OutputStream 13 foreach (string image in imageList)
14 {
15 byte[] imageByte=Convert.FromBase64String(image);
16 context.Response.OutputStream.Write(BitConverter.GetBytes(imageByte.Length),0,4);
17 }
18
19 //把圖片寫入OutputStream 20 foreach (string image in imageList)
21 {
22 byte[] imageByte = Convert.FromBase64String(image);
23 context.Response.OutputStream.Write(imageByte,0,imageByte.Length);
24 }
25 }
26
27 //獲取多個圖片的二進制數(shù)據(jù) 28 private List<string> GetImages(List<string> urlList)
29 {
30 List<string> imageList = new List<string>();
31 foreach (string url in urlList)
32 imageList.Add(GetImage(url));
33 return imageList;
34 }
35
36 //獲取單副圖片的二進制數(shù)據(jù) 37 private string GetImage(string url)
38 {
39 string path = "E:/My Projects/Example/WebSite/Images/"+url;
40 FileStream stream = new FileStream(path, FileMode.Open, FileAccess.Read);
41 byte[] imgBytes = new byte[10240];
42 int imgLength = stream.Read(imgBytes, 0, 10240);
43 return Convert.ToBase64String(imgBytes,0,imgLength);
44 }
45
46 public bool IsReusable
47 {
48 get{ return false;}
49 }
50 }
客戶端
建立一個WinForm窗口,里面加入一個WebBrowser連接到服務器端的Default.aspx頁面。
當按下Download按鍵時,系統(tǒng)就會利用TaskFactory.StartNew的方法建立異步線程,使用WebRequest方法向Handler.ashx發(fā)送請求。
接收到回傳流時,就會根據(jù)頭文件的內容判斷圖片的數(shù)量與每副圖片的長度,把二進制數(shù)據(jù)轉化為*.jpg文件保存。
系統(tǒng)利用TaskFactory.StartNew(action,cancellationToken) 方式異步調用GetImages方法進行圖片下載。
當 用戶按下Cancel按鈕時,異步任務就會停止。值得注意的是,在圖片下載時調用了 CancellationToken.ThrowIfCancellationRequested方法,目的在檢查并行任務的運行情況,在并行任務被停止 時釋放出OperationCanceledException異常,確保用戶按下Cancel按鈕時,停止所有并行任務。
public partial class Form1 : Form
{
private CancellationTokenSource tokenSource = new CancellationTokenSource();
public Form1()
{
InitializeComponent();
ThreadPool.SetMaxThreads(1000, 1000);
}
private void downloadToolStripMenuItem_Click(object sender, EventArgs e)
{
Task.Factory.StartNew(GetImages,tokenSource.Token);
}
private void cancelToolStripMenuItem_Click(object sender, EventArgs e)
{
tokenSource.Cancel();
}
private void GetImages()
{
//發(fā)送請求,獲取輸出流 WebRequest webRequest = HttpWebRequest.Create("Http://localhost:5800/Handler.ashx");
Stream responseStream=webRequest.GetResponse().GetResponseStream();
byte[] responseByte = new byte[81960];
IAsyncResult result=responseStream.BeginRead(responseByte,0,81960,null,null);
int responseLength = responseStream.EndRead(result);
//獲取圖片數(shù)量 int imageCount = BitConverter.ToInt32(responseByte, 0);
//獲取每副圖片的長度 int[] lengths = new int[imageCount];
for (int n = 0; n < imageCount; n++)
{
int length = BitConverter.ToInt32(responseByte, (n + 1) * 4);
lengths[n] = length;
}
try
{
//保存圖片 for (int n = 0; n < imageCount; n++)
{
string path = string.Format("E:/My Projects/Example/Test/Images/pic{0}.jpg", n);
FileStream file = new FileStream(path, FileMode.Create, FileAccess.ReadWrite);
//計算字節(jié)偏移量 int offset = (imageCount + 1) * 4;
for (int a = 0; a < n; a++)
offset += lengths[a];
file.Write(responseByte, offset, lengths[n]);
file.Flush();
//模擬操作 Thread.Sleep(1000);
//檢測CancellationToken變化 tokenSource.Token.ThrowIfCancellationRequested();
}
}
catch (OperationCanceledException ex)
{
MessageBox.Show("Download cancel!");
}
}
}
7.4 并行查詢(PLINQ)
并行 LINQ (PLINQ) 是 LINQ 模式的并行實現(xiàn),主要區(qū)別在于 PLINQ 嘗試充分利用系統(tǒng)中的所有處理器。 它利用所有處理器的方法,把數(shù)據(jù)源分成片段,然后在多個處理器上對單獨工作線程上的每個片段并行執(zhí)行查詢, 在許多情況下,并行執(zhí)行意味著查詢運行速度顯著提高。但這并不說明所有PLINQ都會使用并行方式,當系統(tǒng)測試要并行查詢會對系統(tǒng)性能造成損害時,那將自動化地使用同步執(zhí)行。
在System.Linq.ParallelEnumerable類中,包含了并行查詢的大部分方法。
方法成員
|
說明 |
AsParallel
|
PLINQ 的入口點。 指定如果可能,應并行化查詢的其余部分。
|
AsSequential(Of TSource)
|
指定查詢的其余部分應像非并行 LINQ 查詢一樣按順序運行。
|
AsOrdered
|
指定 PLINQ 應保留查詢的其余部分的源序列排序,直到例如通過使用 orderby(在 Visual Basic 中為 Order By)子句更改排序為止。
|
AsUnordered(Of TSource)
|
指定查詢的其余部分的 PLINQ 不需要保留源序列的排序。
|
WithCancellation(Of TSource)
|
指定 PLINQ 應定期監(jiān)視請求取消時提供的取消標記和取消執(zhí)行的狀態(tài)。
|
WithDegreeOfParallelism(Of TSource)
|
指定 PLINQ 應當用來并行化查詢的處理器的最大數(shù)目。
|
WithMergeOptions(Of TSource)
|
提供有關 PLINQ 應當如何(如果可能)將并行結果合并回到使用線程上的一個序列的提示。
|
WithExecutionMode(Of TSource)
|
指定 PLINQ 應當如何并行化查詢(即使默認行為是按順序運行查詢)。
|
ForAll(Of TSource)
|
多線程枚舉方法,與循環(huán)訪問查詢結果不同,它允許在不首先合并回到使用者線程的情況下并行處理結果。
|
Aggregate 重載
|
對于 PLINQ 唯一的重載,它啟用對線程本地分區(qū)的中間聚合以及一個用于合并所有分區(qū)結果的最終聚合函數(shù)。
|
7.4.1 AsParallel
通常想要實現(xiàn)并行查詢,只需向數(shù)據(jù)源添加 AsParallel 查詢操作即可。
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 var personList=GetPersonList().AsParallel()
6 .Where(x=>x.Age>30);
7 Console.ReadKey();
8 }
9
10 //模擬源數(shù)據(jù) 11 static IList<Person> GetPersonList()
12 {
13 var personList = new List<Person>();
14
15 var person1 = new Person();
16 person1.ID = 1;
17 person1.Name = "Leslie";
18 person1.Age = 30;
19 personList.Add(person1);
20 ...........
21 return personList;
22 }
23 }
7.4.2 AsOrdered
若要使查詢結果必須保留源序列排序方式,可以使用AsOrdered方法。
AsOrdered依然使用并行方式,只是在查詢過程加入額外信息,在并行結束后把查詢結果再次進行排列。
class Program
{
static void Main(string[] args)
{
var personList=GetPersonList().AsParallel().AsOrdered()
.Where(x=>x.Age<30);
Console.ReadKey();
}
static IList<Person> GetPersonList()
{......}
}
7.4.3 WithDegreeOfParallelism
默認情況下,PLINQ 使用主機上的所有處理器,這些處理器的數(shù)量最多可達 64 個。
通過使用 WithDegreeOfParallelism(Of TSource) 方法,可以指示 PLINQ 使用不多于指定數(shù)量的處理器。
1 class Program
2 {
3 static void Main(string[] args)
4 {
5 var personList=GetPersonList().AsParallel().WithDegreeOfParallelism(2)
6 .Where(x=>x.Age<30);
7 Console.ReadKey();
8 }
9
10 static IList<Person> GetPersonList()
11 {.........}
12 }
7.4.4 ForAll
如果要對并行查詢結果進行操作,一般會在for或foreach中執(zhí)行,執(zhí)行枚舉操作時會使用同步方式。
有見及此,PLINQ中包含了ForAll方法,它可以使用并行方式對數(shù)據(jù)集進行操作。
class Program
{
static void Main(string[] args)
{
ThreadPool.SetMaxThreads(1000, 1000);
GetPersonList().AsParallel().ForAll(person =>{
ThreadPoolMessage(person);
});
Console.ReadKey();
}
static IList<Person> GetPersonList()
{.......}
//顯示線程池現(xiàn)狀 static void ThreadPoolMessage(Person person)
{
int a, b;
ThreadPool.GetAvailableThreads(out a, out b);
string message = string.Format("Person ID:{0} Name:{1} Age:{2}\n" +
" CurrentThreadId is {3}\n WorkerThreads is:{4}" +
" CompletionPortThreads is :{5}\n",
person.ID, person.Name, person.Age,
Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
Console.WriteLine(message);
}
}
運行結果
7.4.5 WithCancellation
如果需要停止查詢,可以使用 WithCancellation(Of TSource) 運算符并提供 CancellationToken 實例作為參數(shù)。
與 第三節(jié)Task的例子相似,如果標記上的 IsCancellationRequested 屬性設置為 true,則 PLINQ 將會注意到它,并停止所有線程上的處理,然后引發(fā) OperationCanceledException。這可以保證并行查詢能夠立即停止。
1 class Program
2 {
3 static CancellationTokenSource tokenSource = new CancellationTokenSource();
4
5 static void Main(string[] args)
6 {
7 Task.Factory.StartNew(Cancel);
8 try
9 {
10 GetPersonList().AsParallel().WithCancellation(tokenSource.Token)
11 .ForAll(person =>
12 {
13 ThreadPoolMessage(person);
14 });
15 }
16 catch (OperationCanceledException ex)
17 { }
18 Console.ReadKey();
19 }
20
21 //在10~50毫秒內發(fā)出停止信號 22 static void Cancel()
23 {
24 Random random = new Random();
25 Thread.Sleep(random.Next(10,50));
26 tokenSource.Cancel();
27 }
28
29 static IList<Person> GetPersonList()
30 {......}
31
32 //顯示線程池現(xiàn)狀 33 static void ThreadPoolMessage(Person person)
34 {
35 int a, b;
36 ThreadPool.GetAvailableThreads(out a, out b);
37 string message = string.Format("Person ID:{0} Name:{1} Age:{2}\n" +
38 " CurrentThreadId is {3}\n WorkerThreads is:{4}" +
39 " CompletionPortThreads is :{5}\n",
40 person.ID, person.Name, person.Age,
41 Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString());
42 Console.WriteLine(message);
43 }
44 }
億恩-天使(QQ:530997) 電話 037160135991 服務器租用,托管歡迎咨詢。
本文出自:億恩科技【mszdt.com】
服務器租用/服務器托管中國五強!虛擬主機域名注冊頂級提供商!15年品質保障!--億恩科技[ENKJ.COM]
|