无码视频在线观看,99人妻,国产午夜视频,久久久久国产一级毛片高清版新婚

  • 始創(chuàng)于2000年 股票代碼:831685
    咨詢熱線:0371-60135900 注冊(cè)有禮 登錄
    • 掛牌上市企業(yè)
    • 60秒人工響應(yīng)
    • 99.99%連通率
    • 7*24h人工
    • 故障100倍補(bǔ)償
    全部產(chǎn)品
    您的位置: 網(wǎng)站首頁 > 幫助中心>文章內(nèi)容

    細(xì)說多線程(七) —— 并行編程與PLINQ

    發(fā)布時(shí)間:  2012/9/16 6:38:20

    目錄

    一、線程的定義

    二、線程的基礎(chǔ)知識(shí)

    三、以ThreadStart方式實(shí)現(xiàn)多線程

    四、CLR線程池的工作者線程

    五、CLR線程池的I/O線程

    六、異步 SqlCommand

    七、并行編程與PLINQ

    八、計(jì)時(shí)器與鎖

     

    七、并行編程與PLINQ

    要使用多線程開發(fā),必須非常熟悉Thread的使用,而且在開發(fā)過程中可能會(huì)面對(duì)很多未知的問題。為了簡(jiǎn)化開發(fā),.NET 4.0 特別提供一個(gè)并行編程庫System.Threading.Tasks,它可以簡(jiǎn)化并行開發(fā),你無需直接跟線程或線程池打交道,就可以簡(jiǎn)單建立多線程應(yīng)用 程序。此外,.NET還提供了新的一組擴(kuò)展方法PLINQ,它具有自動(dòng)分析查詢功能,如果并行查詢能提高系統(tǒng)效率,則同時(shí)運(yùn)行,如果查詢未能從并行查詢中 受益,則按原順序查詢。下面將詳細(xì)介紹并行操作的方式。

     

    7.1 泛型委托

    使用并行編程可以同時(shí)操作多個(gè)委托,在介紹并行編程前先簡(jiǎn)單介紹一下兩個(gè)泛型委托System.Func<>與System.Action<>。

    Func<>是一個(gè)能接受多個(gè)參數(shù)和一個(gè)返回值的泛型委托,它能接受0個(gè)到4個(gè)輸入?yún)?shù), 其中 T1,T2,T3,T4 代表自定的輸入類型,TResult為自定義的返回值。
    public delegate TResult Func<TResult>()
    public delegate TResult Func<T1,TResult>(T1 arg1)
    public delegate TResult Func<T1,T2, TResult>(T1 arg1,T2 arg2)
    public delegate TResult Func<T1,T2, T3, TResult>(T1 arg1,T2 arg2,T3 arg3)
    public delegate TResult Func<T1,T2, T3, ,T4, TResult>(T1 arg1,T2 arg2,T3 arg3,T4 arg4)

    Action<>與Func<>十分相似,不同在于Action<>的返回值為void,Action能接受1~16個(gè)參數(shù)
    public delegate void Action<T1>()
    public delegate void Action<T1,T2>(T1 arg1,T2 arg2)
    public delegate void Action<T1,T2, T3>(T1 arg1,T2 arg2, T3 arg3)
    .............
    public delegate void Action<T1,T2, T3, ,T4, ...... ,T16>(T1 arg1,T2 arg2,T3 arg3,T4 arg4,...... ,T16 arg16)

     

    7.2 任務(wù)并行庫(TPL)

    System.Threading.Tasks中的類被統(tǒng)稱為任務(wù)并行庫(Task Parallel Library,TPL),TPL使用CLR線程池把工作分配到CPU,并能自動(dòng)處理工作分區(qū)、線程調(diào)度、取消支持、狀態(tài)管理以及其他低級(jí)別的細(xì)節(jié)操作,極大地簡(jiǎn)化了多線程的開發(fā)。

    注意:TPL比Thread更具智能性,當(dāng)它判斷任務(wù)集并沒有從并行運(yùn)行中受益,就會(huì)選擇按順序運(yùn)行。但并非所有的項(xiàng)目都適合使用并行開發(fā),創(chuàng)建過多并行任務(wù)可能會(huì)損害程序的性能,降低運(yùn)行效率。

    TPL包括常用的數(shù)據(jù)并行與任務(wù)并行兩種執(zhí)行方式:

    7.2.1 數(shù)據(jù)并行

    數(shù)據(jù)并行的核心類就是System.Threading.Tasks.Parallel,它包含兩個(gè)靜態(tài)方法 Parallel.For 與 Parallel.ForEach, 使用方式與for、foreach相仿。通過這兩個(gè)方法可以并行處理System.Func<>、 System.Action<>委托。

    以下一個(gè)例子就是利用 public static ParallelLoopResult For( int from, int max, Action<int>) 方法對(duì)List<Person>進(jìn)行并行查詢。
    假設(shè)使用單線程方式查詢3個(gè)Person對(duì)象,需要用時(shí)大約6秒,在使用并行方式,只需使用2秒就能完成查詢,而且能夠避開Thread的繁瑣處理。

     1     class Program
     2     {
     3         static void Main(string[] args)
     4         {
     5             //設(shè)置最大線程數(shù)
    6 ThreadPool.SetMaxThreads(1000, 1000); 7 //并行查詢
    8 Parallel.For(0, 3,n => 9 { 10 Thread.Sleep(2000); //模擬查詢 11 ThreadPoolMessage(GetPersonList()[n]); 12 }); 13 Console.ReadKey(); 14 } 15 16 //模擬源數(shù)據(jù)
    17 static IList<Person> GetPersonList() 18 { 19 var personList = new List<Person>(); 20 21 var person1 = new Person(); 22 person1.ID = 1; 23 person1.Name = "Leslie"; 24 person1.Age = 30; 25 personList.Add(person1); 26 ........... 27 return personList; 28 } 29 30 //顯示線程池現(xiàn)狀
    31 static void ThreadPoolMessage(Person person) 32 { 33 int a, b; 34 ThreadPool.GetAvailableThreads(out a, out b); 35 string message = string.Format("Person ID:{0} Name:{1} Age:{2}\n" + 36 " CurrentThreadId is {3}\n WorkerThreads is:{4}" + 37 " CompletionPortThreads is :{5}\n", 38 person.ID, person.Name, person.Age, 39 Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString()); 40 41 Console.WriteLine(message); 42 } 43 }

    觀察運(yùn)行結(jié)果,對(duì)象并非按照原排列順序進(jìn)行查詢,而是使用并行方式查詢。

     

    若想停止操作,可以利用ParallelLoopState參數(shù),下面以ForEach作為例子。
    public static ParallelLoopResult ForEach<TSource>( IEnumerable<TSource> source, Action<TSource, ParallelLoopState> action)
    其中source為數(shù)據(jù) 集,在Action<TSource,ParallelLoopState>委托的ParallelLoopState參數(shù)當(dāng)中包含有 Break()和 Stop()兩個(gè)方法都可以使迭代停止。Break的使用跟傳統(tǒng)for里面的使用方式相似,但因?yàn)樘幱诓⑿刑幚懋?dāng)中,使用Break并不能保證所有運(yùn)行能 立即停止,在當(dāng)前迭代之前的迭代會(huì)繼續(xù)執(zhí)行。若想立即停止操作,可以使用Stop方法,它能保證立即終止所有的操作,無論它們是處于當(dāng)前迭代的之前還是之 后。

         class Program
         {
              static void Main(string[] args)
              {
                  //設(shè)置最大線程數(shù)
    ThreadPool.SetMaxThreads(1000, 1000); //并行查詢
    Parallel.ForEach(GetPersonList(), (person, state) => { if (person.ID == 2) state.Stop(); ThreadPoolMessage(person); }); Console.ReadKey(); } //模擬源數(shù)據(jù)
    static IList<Person> GetPersonList() { var personList = new List<Person>(); var person1 = new Person(); person1.ID = 1; person1.Name = "Leslie"; person1.Age = 30; personList.Add(person1); .......... return personList; } //顯示線程池現(xiàn)狀
    static void ThreadPoolMessage(Person person) { int a, b; ThreadPool.GetAvailableThreads(out a, out b); string message = string.Format("Person ID:{0} Name:{1} Age:{2}\n" + " CurrentThreadId is {3}\n WorkerThreads is:{4}" + " CompletionPortThreads is :{5}\n", person.ID, person.Name, person.Age, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString()); Console.WriteLine(message); } }

    觀察運(yùn)行結(jié)果,當(dāng)Person的ID等于2時(shí),運(yùn)行將會(huì)停止。

     

    當(dāng)要在多個(gè)線程中調(diào)用本地變量,可以使用以下方法:
    public static ParallelLoopResult ForEach<TSource, TLocal>(IEnumerable<Of TSource>, Func<Of TLocal>, Func<Of TSource,ParallelLoopState,TLocal,TLocal>, Action<Of TLocal>)
    其中第一個(gè)參數(shù)為數(shù)據(jù)集;
    第二個(gè)參數(shù)是一個(gè)Func委托,用于在每個(gè)線程執(zhí)行前進(jìn)行初始化;
    第 三個(gè)參數(shù)是委托Func<Of T1,T2,T3,TResult>,它能對(duì)數(shù)據(jù)集的每個(gè)成員進(jìn)行迭代,當(dāng)中T1是數(shù)據(jù)集的成員,T2是一個(gè)ParallelLoopState對(duì) 象,它可以控制迭代的狀態(tài),T3是線程中的本地變量;
    第四個(gè)參數(shù)是一個(gè)Action委托,用于對(duì)每個(gè)線程的最終狀態(tài)進(jìn)行最終操作。

    在以下例子中,使用ForEach計(jì)算多個(gè)Order的總體價(jià)格。在ForEach方法中,首先把參數(shù)初始化為0f,然后用把同一個(gè)Order的多 個(gè)OrderItem價(jià)格進(jìn)行累加,計(jì)算出Order的價(jià)格,最后把多個(gè)Order的價(jià)格進(jìn)行累加,計(jì)算出多個(gè)Order的總體價(jià)格。

     1     public class Order
     2     {
     3         public int ID;
     4         public float Price;
     5     }
     6 
     7     public class OrderItem
     8     {
     9         public int ID;
    10         public string Goods;
    11         public int OrderID;
    12         public float Price;
    13         public int Count;
    14     }
    15 
    16     class Program
    17     {
    18         static void Main(string[] args)
    19         {
    20             //設(shè)置最大線程數(shù)
    21 ThreadPool.SetMaxThreads(1000, 1000); 22 float totalPrice = 0f; 23 //并行查詢
    24 var parallelResult = Parallel.ForEach(GetOrderList(), 25 () => 0f, //把參數(shù)初始值設(shè)為0
    26 (order, state, orderPrice) => 27 { 28 //計(jì)算單個(gè)Order的價(jià)格
    29 orderPrice = GetOrderItem().Where(item => item.OrderID == order.ID) 30 .Sum(item => item.Price * item.Count); 31 order.Price = orderPrice; 32 ThreadPoolMessage(order); 33 34 return orderPrice; 35 }, 36 (finallyPrice) => 37 { 38 totalPrice += finallyPrice;//計(jì)算多個(gè)Order的總體價(jià)格
    39 } 40 ); 41 42 while (!parallelResult.IsCompleted) 43 Console.WriteLine("Doing Work!"); 44 45 Console.WriteLine("Total Price is:" + totalPrice); 46 Console.ReadKey(); 47 } 48 //虛擬數(shù)據(jù) 49 static IList<Order> GetOrderList() 50 { 51 IList<Order> orderList = new List<Order>(); 52 Order order1 = new Order(); 53 order1.ID = 1; 54 orderList.Add(order1); 55 ............ 56 return orderList; 57 } 58 //虛擬數(shù)據(jù) 59 static IList<OrderItem> GetOrderItem() 60 { 61 IList<OrderItem> itemList = new List<OrderItem>(); 62 63 OrderItem orderItem1 = new OrderItem(); 64 orderItem1.ID = 1; 65 orderItem1.Goods = "iPhone 4S"; 66 orderItem1.Price = 6700; 67 orderItem1.Count = 2; 68 orderItem1.OrderID = 1; 69 itemList.Add(orderItem1); 70 ........... 71 return itemList; 72 } 73 74 //顯示線程池現(xiàn)狀
    75 static void ThreadPoolMessage(Order order) 76 { 77 int a, b; 78 ThreadPool.GetAvailableThreads(out a, out b); 79 string message = string.Format("OrderID:{0} OrderPrice:{1}\n" + 80 " CurrentThreadId is {2}\n WorkerThreads is:{3}" + 81 " CompletionPortThreads is:{4}\n", 82 order.ID, order.Price, 83 Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString()); 84 85 Console.WriteLine(message); 86 } 87 }

    運(yùn)行結(jié)果

     

     7.2.2 任務(wù)并行

    在TPL當(dāng)中還可以使用Parallel.Invoke方法觸發(fā)多個(gè)異步任務(wù),其中 actions 中可以包含多個(gè)方法或者委托,parallelOptions用于配置Parallel類的操作。
    public static void Invoke(Action[] actions )
    public static void Invoke(ParallelOptions parallelOptions, Action[] actions )
    下面例子中利用了Parallet.Invoke并行查詢多個(gè)Person,actions當(dāng)中可以綁定方法、lambda表達(dá)式或者委托,注意綁定方法時(shí)必須是返回值為void的無參數(shù)方法。

         class Program
         {
             static void Main(string[] args)
             {
                 //設(shè)置最大線程數(shù)
    ThreadPool.SetMaxThreads(1000, 1000); //任務(wù)并行
    Parallel.Invoke(option, PersonMessage, ()=>ThreadPoolMessage(GetPersonList()[1]), delegate(){ ThreadPoolMessage(GetPersonList()[2]); }); Console.ReadKey(); } static void PersonMessage() { ThreadPoolMessage(GetPersonList()[0]); } //顯示線程池現(xiàn)狀
    static void ThreadPoolMessage(Person person) { int a, b; ThreadPool.GetAvailableThreads(out a, out b); string message = string.Format("Person ID:{0} Name:{1} Age:{2}\n" + " CurrentThreadId is {3}\n WorkerThreads is:{4}" + " CompletionPortThreads is :{5}\n", person.ID, person.Name, person.Age, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString()); Console.WriteLine(message); } //模擬源數(shù)據(jù)
    static IList<Person> GetPersonList() { var personList = new List<Person>(); var person1 = new Person(); person1.ID = 1; person1.Name = "Leslie"; person1.Age = 30; personList.Add(person1); .......... return personList; } }

    運(yùn)行結(jié)果

     


    7.3 Task簡(jiǎn)介

    以Thread創(chuàng)建的線程被默認(rèn)為前臺(tái)線程,當(dāng)然你可以把線程IsBackground屬性設(shè)置為true,但TPL為此提供了一個(gè)更簡(jiǎn)單的類Task。
    Task存在于System.Threading.Tasks命名空間當(dāng)中,它可以作為異步委托的簡(jiǎn)單替代品。
    通過Task的Factory屬性將返回TaskFactory類,以TaskFactory.StartNew(Action)方法可以創(chuàng)建一個(gè)新線程,所創(chuàng)建的線程默認(rèn)為后臺(tái)線程。

     1     class Program
     2     {
     3         static void Main(string[] args)
     4         {
     5             ThreadPool.SetMaxThreads(1000, 1000);
     6             Task.Factory.StartNew(() => ThreadPoolMessage());
     7             Console.ReadKey();
     8         }
     9 
    10         //顯示線程池現(xiàn)狀
    11 static void ThreadPoolMessage() 12 { 13 int a, b; 14 ThreadPool.GetAvailableThreads(out a, out b); 15 string message = string.Format("CurrentThreadId is:{0}\n" + 16 "CurrentThread IsBackground:{1}\n" + 17 "WorkerThreads is:{2}\nCompletionPortThreads is:{3}\n", 18 Thread.CurrentThread.ManagedThreadId, 19 Thread.CurrentThread.IsBackground.ToString(), 20 a.ToString(), b.ToString()); 21 Console.WriteLine(message); 22 } 23 }

    運(yùn)行結(jié)果

     

     

    若要取消處理,可以利用CancellationTakenSource對(duì)象,在TaskFactory中包含有方法
    public Task StartNew( Action action, CancellationToken cancellationToken )
    在 方法中加入CancellationTakenSource對(duì)象的CancellationToken屬性,可以控制任務(wù)的運(yùn)行,調(diào)用 CancellationTakenSource.Cancel時(shí)任務(wù)就會(huì)自動(dòng)停止。下面以圖片下載為例子介紹一下TaskFactory的使用。

    服務(wù)器端頁面

     <html xmlns="http://www.w3.org/1999/xhtml">
     <head runat="server">
         <title></title>
         <script type="text/C#" runat="server">
    private static List<string> url=new List<string>();

    protected void Page_Load(object sender, EventArgs e)
    {
    if (!Page.IsPostBack)
    {
    url.Clear();
    Application["Url"] = null;
    }
    }

    protected void CheckBox_CheckedChanged(object sender, EventArgs e)
    {
    CheckBox checkBox = (CheckBox)sender;
    if (checkBox.Checked)
    url.Add(checkBox.Text);
    else
    url.Remove(checkBox.Text);
    Application["Url"]= url;
    }
    </script> </head> <body> <form id="form1" runat="server" > <div align="left"> <div align="center" style="float: left;"> <asp:Image ID="Image1" runat="server" ImageUrl="~/Images/A.jpg" /><br /> <asp:CheckBox ID="CheckBox1" runat="server" AutoPostBack="True"
    oncheckedchanged="CheckBox_CheckedChanged" Text="A.jpg" /> </div> <div align="center" style="float: left"> <asp:Image ID="Image2" runat="server" ImageUrl="~/Images/B.jpg" /><br /> <asp:CheckBox ID="CheckBox2" runat="server" AutoPostBack="True"
    oncheckedchanged="CheckBox_CheckedChanged" Text="B.jpg" /> </div> <div align="center" style="float: left"> <asp:Image ID="Image3" runat="server" ImageUrl="~/Images/C.jpg" /><br /> <asp:CheckBox ID="CheckBox3" runat="server" AutoPostBack="True"
    oncheckedchanged="CheckBox_CheckedChanged" Text="C.jpg" /> </div> <div align="center" style="float: left"> <asp:Image ID="Image4" runat="server" ImageUrl="~/Images/D.jpg" /><br /> <asp:CheckBox ID="CheckBox4" runat="server" AutoPostBack="True"
    oncheckedchanged="CheckBox_CheckedChanged" Text="D.jpg" /> </div> <div align="center" style="float: left"> <asp:Image ID="Image5" runat="server" ImageUrl="~/Images/E.jpg" /><br /> <asp:CheckBox ID="CheckBox5" runat="server" AutoPostBack="True"
    oncheckedchanged="CheckBox_CheckedChanged" Text="E.jpg" /> </div> </div> </form> </body> </html>

    首先在服務(wù)器頁面中顯示多個(gè)*.jpg圖片,每個(gè)圖片都有對(duì)應(yīng)的CheckBox檢測(cè)其選擇情況。
    所選擇圖片的路徑會(huì)記錄在Application["Url"]當(dāng)中傳遞到Handler.ashx當(dāng)中。

    注意:Application是一個(gè)全局變量,此處只是為了顯示Task的使用方式,在ASP.NET開發(fā)應(yīng)該慎用Application。

    Handler.ashx 處理圖片的下載,它從 Application["Url"] 當(dāng)中獲取所選擇圖片的路徑,并把圖片轉(zhuǎn)化成byte[]二進(jìn)制數(shù)據(jù)。
    再把圖片的數(shù)量,每副圖片的二進(jìn)制數(shù)據(jù)的長(zhǎng)度記錄在OutputStream的頭部。
    最后把圖片的二進(jìn)制數(shù)據(jù)記入 OutputStream 一并輸出。

     1 public class Handler : IHttpHandler 
     2 {
     3     public void ProcessRequest(HttpContext context)
     4     {
     5         //獲取圖片名,把圖片數(shù)量寫OutputStream
    6 List<String> urlList = (List<string>)context.Application["Url"]; 7 context.Response.OutputStream.Write(BitConverter.GetBytes(urlList.Count), 0, 4); 8 9 //把圖片轉(zhuǎn)換成二進(jìn)制數(shù)據(jù)
    10 List<string> imageList = GetImages(urlList); 11 12 //把每副圖片長(zhǎng)度寫入OutputStream
    13 foreach (string image in imageList) 14 { 15 byte[] imageByte=Convert.FromBase64String(image); 16 context.Response.OutputStream.Write(BitConverter.GetBytes(imageByte.Length),0,4); 17 } 18 19 //把圖片寫入OutputStream
    20 foreach (string image in imageList) 21 { 22 byte[] imageByte = Convert.FromBase64String(image); 23 context.Response.OutputStream.Write(imageByte,0,imageByte.Length); 24 } 25 } 26 27 //獲取多個(gè)圖片的二進(jìn)制數(shù)據(jù)
    28 private List<string> GetImages(List<string> urlList) 29 { 30 List<string> imageList = new List<string>(); 31 foreach (string url in urlList) 32 imageList.Add(GetImage(url)); 33 return imageList; 34 } 35 36 //獲取單副圖片的二進(jìn)制數(shù)據(jù)
    37 private string GetImage(string url) 38 { 39 string path = "E:/My Projects/Example/WebSite/Images/"+url; 40 FileStream stream = new FileStream(path, FileMode.Open, FileAccess.Read); 41 byte[] imgBytes = new byte[10240]; 42 int imgLength = stream.Read(imgBytes, 0, 10240); 43 return Convert.ToBase64String(imgBytes,0,imgLength); 44 } 45 46 public bool IsReusable 47 { 48 get{ return false;} 49 } 50 }

     

    客戶端

    建立一個(gè)WinForm窗口,里面加入一個(gè)WebBrowser連接到服務(wù)器端的Default.aspx頁面。
    當(dāng)按下Download按鍵時(shí),系統(tǒng)就會(huì)利用TaskFactory.StartNew的方法建立異步線程,使用WebRequest方法向Handler.ashx發(fā)送請(qǐng)求。
    接收到回傳流時(shí),就會(huì)根據(jù)頭文件的內(nèi)容判斷圖片的數(shù)量與每副圖片的長(zhǎng)度,把二進(jìn)制數(shù)據(jù)轉(zhuǎn)化為*.jpg文件保存。

    系統(tǒng)利用TaskFactory.StartNew(action,cancellationToken) 方式異步調(diào)用GetImages方法進(jìn)行圖片下載。
    當(dāng) 用戶按下Cancel按鈕時(shí),異步任務(wù)就會(huì)停止。值得注意的是,在圖片下載時(shí)調(diào)用了 CancellationToken.ThrowIfCancellationRequested方法,目的在檢查并行任務(wù)的運(yùn)行情況,在并行任務(wù)被停止 時(shí)釋放出OperationCanceledException異常,確保用戶按下Cancel按鈕時(shí),停止所有并行任務(wù)。

         public partial class Form1 : Form
         {
             private CancellationTokenSource tokenSource = new CancellationTokenSource();
             
             public Form1()
             {
                 InitializeComponent();
                 ThreadPool.SetMaxThreads(1000, 1000);
             }
     
             private void downloadToolStripMenuItem_Click(object sender, EventArgs e)
             {
                  Task.Factory.StartNew(GetImages,tokenSource.Token);
             }
     
             private void cancelToolStripMenuItem_Click(object sender, EventArgs e)
             {
                 tokenSource.Cancel();
             }
     
             private void GetImages()
             {
                 //發(fā)送請(qǐng)求,獲取輸出流
    WebRequest webRequest = HttpWebRequest.Create("Http://localhost:5800/Handler.ashx"); Stream responseStream=webRequest.GetResponse().GetResponseStream(); byte[] responseByte = new byte[81960]; IAsyncResult result=responseStream.BeginRead(responseByte,0,81960,null,null); int responseLength = responseStream.EndRead(result); //獲取圖片數(shù)量
    int imageCount = BitConverter.ToInt32(responseByte, 0); //獲取每副圖片的長(zhǎng)度
    int[] lengths = new int[imageCount]; for (int n = 0; n < imageCount; n++) { int length = BitConverter.ToInt32(responseByte, (n + 1) * 4); lengths[n] = length; } try { //保存圖片
    for (int n = 0; n < imageCount; n++) { string path = string.Format("E:/My Projects/Example/Test/Images/pic{0}.jpg", n); FileStream file = new FileStream(path, FileMode.Create, FileAccess.ReadWrite); //計(jì)算字節(jié)偏移量
    int offset = (imageCount + 1) * 4; for (int a = 0; a < n; a++) offset += lengths[a]; file.Write(responseByte, offset, lengths[n]); file.Flush(); //模擬操作
    Thread.Sleep(1000); //檢測(cè)CancellationToken變化
    tokenSource.Token.ThrowIfCancellationRequested(); } } catch (OperationCanceledException ex) { MessageBox.Show("Download cancel!"); } } }


     

    7.4 并行查詢(PLINQ)

    并行 LINQ (PLINQ) 是 LINQ 模式的并行實(shí)現(xiàn),主要區(qū)別在于 PLINQ 嘗試充分利用系統(tǒng)中的所有處理器。 它利用所有處理器的方法,把數(shù)據(jù)源分成片段,然后在多個(gè)處理器上對(duì)單獨(dú)工作線程上的每個(gè)片段并行執(zhí)行查詢, 在許多情況下,并行執(zhí)行意味著查詢運(yùn)行速度顯著提高。但這并不說明所有PLINQ都會(huì)使用并行方式,當(dāng)系統(tǒng)測(cè)試要并行查詢會(huì)對(duì)系統(tǒng)性能造成損害時(shí),那將自動(dòng)化地使用同步執(zhí)行。
    在System.Linq.ParallelEnumerable類中,包含了并行查詢的大部分方法。

     

    方法成員 

    說明

    AsParallel

    PLINQ 的入口點(diǎn)。 指定如果可能,應(yīng)并行化查詢的其余部分。

    AsSequential(Of TSource)

    指定查詢的其余部分應(yīng)像非并行 LINQ 查詢一樣按順序運(yùn)行。

    AsOrdered

    指定 PLINQ 應(yīng)保留查詢的其余部分的源序列排序,直到例如通過使用 orderby(在 Visual Basic 中為 Order By)子句更改排序?yàn)橹埂?/span>

    AsUnordered(Of TSource)

    指定查詢的其余部分的 PLINQ 不需要保留源序列的排序。

    WithCancellation(Of TSource)

    指定 PLINQ 應(yīng)定期監(jiān)視請(qǐng)求取消時(shí)提供的取消標(biāo)記和取消執(zhí)行的狀態(tài)。

    WithDegreeOfParallelism(Of TSource)

    指定 PLINQ 應(yīng)當(dāng)用來并行化查詢的處理器的最大數(shù)目。

    WithMergeOptions(Of TSource)

    提供有關(guān) PLINQ 應(yīng)當(dāng)如何(如果可能)將并行結(jié)果合并回到使用線程上的一個(gè)序列的提示。

    WithExecutionMode(Of TSource)

    指定 PLINQ 應(yīng)當(dāng)如何并行化查詢(即使默認(rèn)行為是按順序運(yùn)行查詢)。

    ForAll(Of TSource)

    多線程枚舉方法,與循環(huán)訪問查詢結(jié)果不同,它允許在不首先合并回到使用者線程的情況下并行處理結(jié)果。

    Aggregate 重載

    對(duì)于 PLINQ 唯一的重載,它啟用對(duì)線程本地分區(qū)的中間聚合以及一個(gè)用于合并所有分區(qū)結(jié)果的最終聚合函數(shù)。


    7.4.1 AsParallel

    通常想要實(shí)現(xiàn)并行查詢,只需向數(shù)據(jù)源添加 AsParallel 查詢操作即可。

     1     class Program
     2     {
     3         static void Main(string[] args)
     4         {
     5             var personList=GetPersonList().AsParallel() 
     6                    .Where(x=>x.Age>30);
     7             Console.ReadKey();
     8         }
     9 
    10         //模擬源數(shù)據(jù)
    11 static IList<Person> GetPersonList() 12 { 13 var personList = new List<Person>(); 14 15 var person1 = new Person(); 16 person1.ID = 1; 17 person1.Name = "Leslie"; 18 person1.Age = 30; 19 personList.Add(person1); 20 ........... 21 return personList; 22 } 23 }

     

    7.4.2 AsOrdered

    若要使查詢結(jié)果必須保留源序列排序方式,可以使用AsOrdered方法。
    AsOrdered依然使用并行方式,只是在查詢過程加入額外信息,在并行結(jié)束后把查詢結(jié)果再次進(jìn)行排列。

         class Program
         {
             static void Main(string[] args)
             {
                 var personList=GetPersonList().AsParallel().AsOrdered()
                     .Where(x=>x.Age<30);
                 Console.ReadKey();
             }
     
             static IList<Person> GetPersonList()
             {......}
         }


    7.4.3 WithDegreeOfParallelism

    默認(rèn)情況下,PLINQ 使用主機(jī)上的所有處理器,這些處理器的數(shù)量最多可達(dá) 64 個(gè)。
    通過使用 WithDegreeOfParallelism(Of TSource) 方法,可以指示 PLINQ 使用不多于指定數(shù)量的處理器。

     1     class Program
     2     {
     3         static void Main(string[] args)
     4         {
     5             var personList=GetPersonList().AsParallel().WithDegreeOfParallelism(2)
     6                 .Where(x=>x.Age<30);
     7             Console.ReadKey();
     8         }
     9 
    10         static IList<Person> GetPersonList()
    11         {.........}
    12     }

     

    7.4.4 ForAll

    如果要對(duì)并行查詢結(jié)果進(jìn)行操作,一般會(huì)在for或foreach中執(zhí)行,執(zhí)行枚舉操作時(shí)會(huì)使用同步方式。
    有見及此,PLINQ中包含了ForAll方法,它可以使用并行方式對(duì)數(shù)據(jù)集進(jìn)行操作。

         class Program
         {
             static void Main(string[] args)
             {
                 ThreadPool.SetMaxThreads(1000, 1000);
                 GetPersonList().AsParallel().ForAll(person =>{
                     ThreadPoolMessage(person);
                 });
                 Console.ReadKey();
             }
     
             static IList<Person> GetPersonList()
             {.......}
     
              //顯示線程池現(xiàn)狀
    static void ThreadPoolMessage(Person person) { int a, b; ThreadPool.GetAvailableThreads(out a, out b); string message = string.Format("Person ID:{0} Name:{1} Age:{2}\n" + " CurrentThreadId is {3}\n WorkerThreads is:{4}" + " CompletionPortThreads is :{5}\n", person.ID, person.Name, person.Age, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString()); Console.WriteLine(message); } }

    運(yùn)行結(jié)果

     

    7.4.5 WithCancellation

    如果需要停止查詢,可以使用 WithCancellation(Of TSource) 運(yùn)算符并提供 CancellationToken 實(shí)例作為參數(shù)。
    與 第三節(jié)Task的例子相似,如果標(biāo)記上的 IsCancellationRequested 屬性設(shè)置為 true,則 PLINQ 將會(huì)注意到它,并停止所有線程上的處理,然后引發(fā) OperationCanceledException。這可以保證并行查詢能夠立即停止。

     1     class Program
     2     {
     3         static CancellationTokenSource tokenSource = new CancellationTokenSource();
     4 
     5         static void Main(string[] args)
     6         {
     7             Task.Factory.StartNew(Cancel);
     8             try
     9             {
    10                 GetPersonList().AsParallel().WithCancellation(tokenSource.Token)
    11                     .ForAll(person =>
    12                     {
    13                         ThreadPoolMessage(person);
    14                     });
    15             }
    16             catch (OperationCanceledException ex)
    17             { }
    18             Console.ReadKey();
    19         }
    20 
    21         //在10~50毫秒內(nèi)發(fā)出停止信號(hào)
    22 static void Cancel() 23 { 24 Random random = new Random(); 25 Thread.Sleep(random.Next(10,50)); 26 tokenSource.Cancel(); 27 } 28 29 static IList<Person> GetPersonList() 30 {......} 31 32 //顯示線程池現(xiàn)狀
    33 static void ThreadPoolMessage(Person person) 34 { 35 int a, b; 36 ThreadPool.GetAvailableThreads(out a, out b); 37 string message = string.Format("Person ID:{0} Name:{1} Age:{2}\n" + 38 " CurrentThreadId is {3}\n WorkerThreads is:{4}" + 39 " CompletionPortThreads is :{5}\n", 40 person.ID, person.Name, person.Age, 41 Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString()); 42 Console.WriteLine(message); 43 } 44 } 億恩-天使(QQ:530997) 電話 037160135991 服務(wù)器租用,托管歡迎咨詢。

    本文出自:億恩科技【mszdt.com】

    服務(wù)器租用/服務(wù)器托管中國(guó)五強(qiáng)!虛擬主機(jī)域名注冊(cè)頂級(jí)提供商!15年品質(zhì)保障!--億恩科技[ENKJ.COM]

  • 您可能在找
  • 億恩北京公司:
  • 經(jīng)營(yíng)性ICP/ISP證:京B2-20150015
  • 億恩鄭州公司:
  • 經(jīng)營(yíng)性ICP/ISP/IDC證:豫B1.B2-20060070
  • 億恩南昌公司:
  • 經(jīng)營(yíng)性ICP/ISP證:贛B2-20080012
  • 服務(wù)器/云主機(jī) 24小時(shí)售后服務(wù)電話:0371-60135900
  • 虛擬主機(jī)/智能建站 24小時(shí)售后服務(wù)電話:0371-60135900
  • 專注服務(wù)器托管17年
    掃掃關(guān)注-微信公眾號(hào)
    0371-60135900
    Copyright© 1999-2019 ENKJ All Rights Reserved 億恩科技 版權(quán)所有  地址:鄭州市高新區(qū)翠竹街1號(hào)總部企業(yè)基地億恩大廈  法律顧問:河南亞太人律師事務(wù)所郝建鋒、杜慧月律師   京公網(wǎng)安備41019702002023號(hào)
      1
     
     
     
     

    0371-60135900
    7*24小時(shí)客服服務(wù)熱線