激情五月天婷婷,亚洲愉拍一区二区三区,日韩视频一区,a√天堂中文官网8

<ul id="buwfs"><strike id="buwfs"><strong id="buwfs"></strong></strike></ul>
    <output id="buwfs"></output>
  • <dfn id="buwfs"><source id="buwfs"></source></dfn>
      <dfn id="buwfs"><td id="buwfs"></td></dfn>
      <div id="buwfs"><small id="buwfs"></small></div>
      <dfn id="buwfs"><source id="buwfs"></source></dfn>
      1. <dfn id="buwfs"><td id="buwfs"></td></dfn>
        始創(chuàng)于2000年 股票代碼:831685
        咨詢(xún)熱線:0371-60135900 注冊(cè)有禮 登錄
        • 掛牌上市企業(yè)
        • 60秒人工響應(yīng)
        • 99.99%連通率
        • 7*24h人工
        • 故障100倍補(bǔ)償
        全部產(chǎn)品
        您的位置: 網(wǎng)站首頁(yè) > 幫助中心>文章內(nèi)容

        細(xì)說(shuō)多線程(七) —— 并行編程與PLINQ

        發(fā)布時(shí)間:  2012/9/16 6:38:20

        目錄

        一、線程的定義

        二、線程的基礎(chǔ)知識(shí)

        三、以ThreadStart方式實(shí)現(xiàn)多線程

        四、CLR線程池的工作者線程

        五、CLR線程池的I/O線程

        六、異步 SqlCommand

        七、并行編程與PLINQ

        八、計(jì)時(shí)器與鎖

         

        七、并行編程與PLINQ

        要使用多線程開(kāi)發(fā),必須非常熟悉Thread的使用,而且在開(kāi)發(fā)過(guò)程中可能會(huì)面對(duì)很多未知的問(wèn)題。為了簡(jiǎn)化開(kāi)發(fā),.NET 4.0 特別提供一個(gè)并行編程庫(kù)System.Threading.Tasks,它可以簡(jiǎn)化并行開(kāi)發(fā),你無(wú)需直接跟線程或線程池打交道,就可以簡(jiǎn)單建立多線程應(yīng)用 程序。此外,.NET還提供了新的一組擴(kuò)展方法PLINQ,它具有自動(dòng)分析查詢(xún)功能,如果并行查詢(xún)能提高系統(tǒng)效率,則同時(shí)運(yùn)行,如果查詢(xún)未能從并行查詢(xún)中 受益,則按原順序查詢(xún)。下面將詳細(xì)介紹并行操作的方式。

         

        7.1 泛型委托

        使用并行編程可以同時(shí)操作多個(gè)委托,在介紹并行編程前先簡(jiǎn)單介紹一下兩個(gè)泛型委托System.Func<>與System.Action<>。

        Func<>是一個(gè)能接受多個(gè)參數(shù)和一個(gè)返回值的泛型委托,它能接受0個(gè)到4個(gè)輸入?yún)?shù), 其中 T1,T2,T3,T4 代表自定的輸入類(lèi)型,TResult為自定義的返回值。
        public delegate TResult Func<TResult>()
        public delegate TResult Func<T1,TResult>(T1 arg1)
        public delegate TResult Func<T1,T2, TResult>(T1 arg1,T2 arg2)
        public delegate TResult Func<T1,T2, T3, TResult>(T1 arg1,T2 arg2,T3 arg3)
        public delegate TResult Func<T1,T2, T3, ,T4, TResult>(T1 arg1,T2 arg2,T3 arg3,T4 arg4)

        Action<>與Func<>十分相似,不同在于Action<>的返回值為void,Action能接受1~16個(gè)參數(shù)
        public delegate void Action<T1>()
        public delegate void Action<T1,T2>(T1 arg1,T2 arg2)
        public delegate void Action<T1,T2, T3>(T1 arg1,T2 arg2, T3 arg3)
        .............
        public delegate void Action<T1,T2, T3, ,T4, ...... ,T16>(T1 arg1,T2 arg2,T3 arg3,T4 arg4,...... ,T16 arg16)

         

        7.2 任務(wù)并行庫(kù)(TPL)

        System.Threading.Tasks中的類(lèi)被統(tǒng)稱(chēng)為任務(wù)并行庫(kù)(Task Parallel Library,TPL),TPL使用CLR線程池把工作分配到CPU,并能自動(dòng)處理工作分區(qū)、線程調(diào)度、取消支持、狀態(tài)管理以及其他低級(jí)別的細(xì)節(jié)操作,極大地簡(jiǎn)化了多線程的開(kāi)發(fā)。

        注意:TPL比Thread更具智能性,當(dāng)它判斷任務(wù)集并沒(méi)有從并行運(yùn)行中受益,就會(huì)選擇按順序運(yùn)行。但并非所有的項(xiàng)目都適合使用并行開(kāi)發(fā),創(chuàng)建過(guò)多并行任務(wù)可能會(huì)損害程序的性能,降低運(yùn)行效率。

        TPL包括常用的數(shù)據(jù)并行與任務(wù)并行兩種執(zhí)行方式:

        7.2.1 數(shù)據(jù)并行

        數(shù)據(jù)并行的核心類(lèi)就是System.Threading.Tasks.Parallel,它包含兩個(gè)靜態(tài)方法 Parallel.For 與 Parallel.ForEach, 使用方式與for、foreach相仿。通過(guò)這兩個(gè)方法可以并行處理System.Func<>、 System.Action<>委托。

        以下一個(gè)例子就是利用 public static ParallelLoopResult For( int from, int max, Action<int>) 方法對(duì)List<Person>進(jìn)行并行查詢(xún)。
        假設(shè)使用單線程方式查詢(xún)3個(gè)Person對(duì)象,需要用時(shí)大約6秒,在使用并行方式,只需使用2秒就能完成查詢(xún),而且能夠避開(kāi)Thread的繁瑣處理。

         1     class Program
         2     {
         3         static void Main(string[] args)
         4         {
         5             //設(shè)置最大線程數(shù)
        6 ThreadPool.SetMaxThreads(1000, 1000); 7 //并行查詢(xún)
        8 Parallel.For(0, 3,n => 9 { 10 Thread.Sleep(2000); //模擬查詢(xún) 11 ThreadPoolMessage(GetPersonList()[n]); 12 }); 13 Console.ReadKey(); 14 } 15 16 //模擬源數(shù)據(jù)
        17 static IList<Person> GetPersonList() 18 { 19 var personList = new List<Person>(); 20 21 var person1 = new Person(); 22 person1.ID = 1; 23 person1.Name = "Leslie"; 24 person1.Age = 30; 25 personList.Add(person1); 26 ........... 27 return personList; 28 } 29 30 //顯示線程池現(xiàn)狀
        31 static void ThreadPoolMessage(Person person) 32 { 33 int a, b; 34 ThreadPool.GetAvailableThreads(out a, out b); 35 string message = string.Format("Person ID:{0} Name:{1} Age:{2}\n" + 36 " CurrentThreadId is {3}\n WorkerThreads is:{4}" + 37 " CompletionPortThreads is :{5}\n", 38 person.ID, person.Name, person.Age, 39 Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString()); 40 41 Console.WriteLine(message); 42 } 43 }

        觀察運(yùn)行結(jié)果,對(duì)象并非按照原排列順序進(jìn)行查詢(xún),而是使用并行方式查詢(xún)。

         

        若想停止操作,可以利用ParallelLoopState參數(shù),下面以ForEach作為例子。
        public static ParallelLoopResult ForEach<TSource>( IEnumerable<TSource> source, Action<TSource, ParallelLoopState> action)
        其中source為數(shù)據(jù) 集,在Action<TSource,ParallelLoopState>委托的ParallelLoopState參數(shù)當(dāng)中包含有 Break()和 Stop()兩個(gè)方法都可以使迭代停止。Break的使用跟傳統(tǒng)for里面的使用方式相似,但因?yàn)樘幱诓⑿刑幚懋?dāng)中,使用Break并不能保證所有運(yùn)行能 立即停止,在當(dāng)前迭代之前的迭代會(huì)繼續(xù)執(zhí)行。若想立即停止操作,可以使用Stop方法,它能保證立即終止所有的操作,無(wú)論它們是處于當(dāng)前迭代的之前還是之 后。

             class Program
             {
                  static void Main(string[] args)
                  {
                      //設(shè)置最大線程數(shù)
        ThreadPool.SetMaxThreads(1000, 1000); //并行查詢(xún)
        Parallel.ForEach(GetPersonList(), (person, state) => { if (person.ID == 2) state.Stop(); ThreadPoolMessage(person); }); Console.ReadKey(); } //模擬源數(shù)據(jù)
        static IList<Person> GetPersonList() { var personList = new List<Person>(); var person1 = new Person(); person1.ID = 1; person1.Name = "Leslie"; person1.Age = 30; personList.Add(person1); .......... return personList; } //顯示線程池現(xiàn)狀
        static void ThreadPoolMessage(Person person) { int a, b; ThreadPool.GetAvailableThreads(out a, out b); string message = string.Format("Person ID:{0} Name:{1} Age:{2}\n" + " CurrentThreadId is {3}\n WorkerThreads is:{4}" + " CompletionPortThreads is :{5}\n", person.ID, person.Name, person.Age, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString()); Console.WriteLine(message); } }

        觀察運(yùn)行結(jié)果,當(dāng)Person的ID等于2時(shí),運(yùn)行將會(huì)停止。

         

        當(dāng)要在多個(gè)線程中調(diào)用本地變量,可以使用以下方法:
        public static ParallelLoopResult ForEach<TSource, TLocal>(IEnumerable<Of TSource>, Func<Of TLocal>, Func<Of TSource,ParallelLoopState,TLocal,TLocal>, Action<Of TLocal>)
        其中第一個(gè)參數(shù)為數(shù)據(jù)集;
        第二個(gè)參數(shù)是一個(gè)Func委托,用于在每個(gè)線程執(zhí)行前進(jìn)行初始化;
        第 三個(gè)參數(shù)是委托Func<Of T1,T2,T3,TResult>,它能對(duì)數(shù)據(jù)集的每個(gè)成員進(jìn)行迭代,當(dāng)中T1是數(shù)據(jù)集的成員,T2是一個(gè)ParallelLoopState對(duì) 象,它可以控制迭代的狀態(tài),T3是線程中的本地變量;
        第四個(gè)參數(shù)是一個(gè)Action委托,用于對(duì)每個(gè)線程的最終狀態(tài)進(jìn)行最終操作。

        在以下例子中,使用ForEach計(jì)算多個(gè)Order的總體價(jià)格。在ForEach方法中,首先把參數(shù)初始化為0f,然后用把同一個(gè)Order的多 個(gè)OrderItem價(jià)格進(jìn)行累加,計(jì)算出Order的價(jià)格,最后把多個(gè)Order的價(jià)格進(jìn)行累加,計(jì)算出多個(gè)Order的總體價(jià)格。

         1     public class Order
         2     {
         3         public int ID;
         4         public float Price;
         5     }
         6 
         7     public class OrderItem
         8     {
         9         public int ID;
        10         public string Goods;
        11         public int OrderID;
        12         public float Price;
        13         public int Count;
        14     }
        15 
        16     class Program
        17     {
        18         static void Main(string[] args)
        19         {
        20             //設(shè)置最大線程數(shù)
        21 ThreadPool.SetMaxThreads(1000, 1000); 22 float totalPrice = 0f; 23 //并行查詢(xún)
        24 var parallelResult = Parallel.ForEach(GetOrderList(), 25 () => 0f, //把參數(shù)初始值設(shè)為0
        26 (order, state, orderPrice) => 27 { 28 //計(jì)算單個(gè)Order的價(jià)格
        29 orderPrice = GetOrderItem().Where(item => item.OrderID == order.ID) 30 .Sum(item => item.Price * item.Count); 31 order.Price = orderPrice; 32 ThreadPoolMessage(order); 33 34 return orderPrice; 35 }, 36 (finallyPrice) => 37 { 38 totalPrice += finallyPrice;//計(jì)算多個(gè)Order的總體價(jià)格
        39 } 40 ); 41 42 while (!parallelResult.IsCompleted) 43 Console.WriteLine("Doing Work!"); 44 45 Console.WriteLine("Total Price is:" + totalPrice); 46 Console.ReadKey(); 47 } 48 //虛擬數(shù)據(jù) 49 static IList<Order> GetOrderList() 50 { 51 IList<Order> orderList = new List<Order>(); 52 Order order1 = new Order(); 53 order1.ID = 1; 54 orderList.Add(order1); 55 ............ 56 return orderList; 57 } 58 //虛擬數(shù)據(jù) 59 static IList<OrderItem> GetOrderItem() 60 { 61 IList<OrderItem> itemList = new List<OrderItem>(); 62 63 OrderItem orderItem1 = new OrderItem(); 64 orderItem1.ID = 1; 65 orderItem1.Goods = "iPhone 4S"; 66 orderItem1.Price = 6700; 67 orderItem1.Count = 2; 68 orderItem1.OrderID = 1; 69 itemList.Add(orderItem1); 70 ........... 71 return itemList; 72 } 73 74 //顯示線程池現(xiàn)狀
        75 static void ThreadPoolMessage(Order order) 76 { 77 int a, b; 78 ThreadPool.GetAvailableThreads(out a, out b); 79 string message = string.Format("OrderID:{0} OrderPrice:{1}\n" + 80 " CurrentThreadId is {2}\n WorkerThreads is:{3}" + 81 " CompletionPortThreads is:{4}\n", 82 order.ID, order.Price, 83 Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString()); 84 85 Console.WriteLine(message); 86 } 87 }

        運(yùn)行結(jié)果

         

         7.2.2 任務(wù)并行

        在TPL當(dāng)中還可以使用Parallel.Invoke方法觸發(fā)多個(gè)異步任務(wù),其中 actions 中可以包含多個(gè)方法或者委托,parallelOptions用于配置Parallel類(lèi)的操作。
        public static void Invoke(Action[] actions )
        public static void Invoke(ParallelOptions parallelOptions, Action[] actions )
        下面例子中利用了Parallet.Invoke并行查詢(xún)多個(gè)Person,actions當(dāng)中可以綁定方法、lambda表達(dá)式或者委托,注意綁定方法時(shí)必須是返回值為void的無(wú)參數(shù)方法。

             class Program
             {
                 static void Main(string[] args)
                 {
                     //設(shè)置最大線程數(shù)
        ThreadPool.SetMaxThreads(1000, 1000); //任務(wù)并行
        Parallel.Invoke(option, PersonMessage, ()=>ThreadPoolMessage(GetPersonList()[1]), delegate(){ ThreadPoolMessage(GetPersonList()[2]); }); Console.ReadKey(); } static void PersonMessage() { ThreadPoolMessage(GetPersonList()[0]); } //顯示線程池現(xiàn)狀
        static void ThreadPoolMessage(Person person) { int a, b; ThreadPool.GetAvailableThreads(out a, out b); string message = string.Format("Person ID:{0} Name:{1} Age:{2}\n" + " CurrentThreadId is {3}\n WorkerThreads is:{4}" + " CompletionPortThreads is :{5}\n", person.ID, person.Name, person.Age, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString()); Console.WriteLine(message); } //模擬源數(shù)據(jù)
        static IList<Person> GetPersonList() { var personList = new List<Person>(); var person1 = new Person(); person1.ID = 1; person1.Name = "Leslie"; person1.Age = 30; personList.Add(person1); .......... return personList; } }

        運(yùn)行結(jié)果

         


        7.3 Task簡(jiǎn)介

        以Thread創(chuàng)建的線程被默認(rèn)為前臺(tái)線程,當(dāng)然你可以把線程IsBackground屬性設(shè)置為true,但TPL為此提供了一個(gè)更簡(jiǎn)單的類(lèi)Task。
        Task存在于System.Threading.Tasks命名空間當(dāng)中,它可以作為異步委托的簡(jiǎn)單替代品。
        通過(guò)Task的Factory屬性將返回TaskFactory類(lèi),以TaskFactory.StartNew(Action)方法可以創(chuàng)建一個(gè)新線程,所創(chuàng)建的線程默認(rèn)為后臺(tái)線程。

         1     class Program
         2     {
         3         static void Main(string[] args)
         4         {
         5             ThreadPool.SetMaxThreads(1000, 1000);
         6             Task.Factory.StartNew(() => ThreadPoolMessage());
         7             Console.ReadKey();
         8         }
         9 
        10         //顯示線程池現(xiàn)狀
        11 static void ThreadPoolMessage() 12 { 13 int a, b; 14 ThreadPool.GetAvailableThreads(out a, out b); 15 string message = string.Format("CurrentThreadId is:{0}\n" + 16 "CurrentThread IsBackground:{1}\n" + 17 "WorkerThreads is:{2}\nCompletionPortThreads is:{3}\n", 18 Thread.CurrentThread.ManagedThreadId, 19 Thread.CurrentThread.IsBackground.ToString(), 20 a.ToString(), b.ToString()); 21 Console.WriteLine(message); 22 } 23 }

        運(yùn)行結(jié)果

         

         

        若要取消處理,可以利用CancellationTakenSource對(duì)象,在TaskFactory中包含有方法
        public Task StartNew( Action action, CancellationToken cancellationToken )
        在 方法中加入CancellationTakenSource對(duì)象的CancellationToken屬性,可以控制任務(wù)的運(yùn)行,調(diào)用 CancellationTakenSource.Cancel時(shí)任務(wù)就會(huì)自動(dòng)停止。下面以圖片下載為例子介紹一下TaskFactory的使用。

        服務(wù)器端頁(yè)面

         <html xmlns="http://www.w3.org/1999/xhtml">
         <head runat="server">
             <title></title>
             <script type="text/C#" runat="server">
        private static List<string> url=new List<string>();

        protected void Page_Load(object sender, EventArgs e)
        {
        if (!Page.IsPostBack)
        {
        url.Clear();
        Application["Url"] = null;
        }
        }

        protected void CheckBox_CheckedChanged(object sender, EventArgs e)
        {
        CheckBox checkBox = (CheckBox)sender;
        if (checkBox.Checked)
        url.Add(checkBox.Text);
        else
        url.Remove(checkBox.Text);
        Application["Url"]= url;
        }
        </script> </head> <body> <form id="form1" runat="server" > <div align="left"> <div align="center" style="float: left;"> <asp:Image ID="Image1" runat="server" ImageUrl="~/Images/A.jpg" /><br /> <asp:CheckBox ID="CheckBox1" runat="server" AutoPostBack="True"
        oncheckedchanged="CheckBox_CheckedChanged" Text="A.jpg" /> </div> <div align="center" style="float: left"> <asp:Image ID="Image2" runat="server" ImageUrl="~/Images/B.jpg" /><br /> <asp:CheckBox ID="CheckBox2" runat="server" AutoPostBack="True"
        oncheckedchanged="CheckBox_CheckedChanged" Text="B.jpg" /> </div> <div align="center" style="float: left"> <asp:Image ID="Image3" runat="server" ImageUrl="~/Images/C.jpg" /><br /> <asp:CheckBox ID="CheckBox3" runat="server" AutoPostBack="True"
        oncheckedchanged="CheckBox_CheckedChanged" Text="C.jpg" /> </div> <div align="center" style="float: left"> <asp:Image ID="Image4" runat="server" ImageUrl="~/Images/D.jpg" /><br /> <asp:CheckBox ID="CheckBox4" runat="server" AutoPostBack="True"
        oncheckedchanged="CheckBox_CheckedChanged" Text="D.jpg" /> </div> <div align="center" style="float: left"> <asp:Image ID="Image5" runat="server" ImageUrl="~/Images/E.jpg" /><br /> <asp:CheckBox ID="CheckBox5" runat="server" AutoPostBack="True"
        oncheckedchanged="CheckBox_CheckedChanged" Text="E.jpg" /> </div> </div> </form> </body> </html>

        首先在服務(wù)器頁(yè)面中顯示多個(gè)*.jpg圖片,每個(gè)圖片都有對(duì)應(yīng)的CheckBox檢測(cè)其選擇情況。
        所選擇圖片的路徑會(huì)記錄在Application["Url"]當(dāng)中傳遞到Handler.ashx當(dāng)中。

        注意:Application是一個(gè)全局變量,此處只是為了顯示Task的使用方式,在ASP.NET開(kāi)發(fā)應(yīng)該慎用Application。

        Handler.ashx 處理圖片的下載,它從 Application["Url"] 當(dāng)中獲取所選擇圖片的路徑,并把圖片轉(zhuǎn)化成byte[]二進(jìn)制數(shù)據(jù)。
        再把圖片的數(shù)量,每副圖片的二進(jìn)制數(shù)據(jù)的長(zhǎng)度記錄在OutputStream的頭部。
        最后把圖片的二進(jìn)制數(shù)據(jù)記入 OutputStream 一并輸出。

         1 public class Handler : IHttpHandler 
         2 {
         3     public void ProcessRequest(HttpContext context)
         4     {
         5         //獲取圖片名,把圖片數(shù)量寫(xiě)OutputStream
        6 List<String> urlList = (List<string>)context.Application["Url"]; 7 context.Response.OutputStream.Write(BitConverter.GetBytes(urlList.Count), 0, 4); 8 9 //把圖片轉(zhuǎn)換成二進(jìn)制數(shù)據(jù)
        10 List<string> imageList = GetImages(urlList); 11 12 //把每副圖片長(zhǎng)度寫(xiě)入OutputStream
        13 foreach (string image in imageList) 14 { 15 byte[] imageByte=Convert.FromBase64String(image); 16 context.Response.OutputStream.Write(BitConverter.GetBytes(imageByte.Length),0,4); 17 } 18 19 //把圖片寫(xiě)入OutputStream
        20 foreach (string image in imageList) 21 { 22 byte[] imageByte = Convert.FromBase64String(image); 23 context.Response.OutputStream.Write(imageByte,0,imageByte.Length); 24 } 25 } 26 27 //獲取多個(gè)圖片的二進(jìn)制數(shù)據(jù)
        28 private List<string> GetImages(List<string> urlList) 29 { 30 List<string> imageList = new List<string>(); 31 foreach (string url in urlList) 32 imageList.Add(GetImage(url)); 33 return imageList; 34 } 35 36 //獲取單副圖片的二進(jìn)制數(shù)據(jù)
        37 private string GetImage(string url) 38 { 39 string path = "E:/My Projects/Example/WebSite/Images/"+url; 40 FileStream stream = new FileStream(path, FileMode.Open, FileAccess.Read); 41 byte[] imgBytes = new byte[10240]; 42 int imgLength = stream.Read(imgBytes, 0, 10240); 43 return Convert.ToBase64String(imgBytes,0,imgLength); 44 } 45 46 public bool IsReusable 47 { 48 get{ return false;} 49 } 50 }

         

        客戶(hù)端

        建立一個(gè)WinForm窗口,里面加入一個(gè)WebBrowser連接到服務(wù)器端的Default.aspx頁(yè)面。
        當(dāng)按下Download按鍵時(shí),系統(tǒng)就會(huì)利用TaskFactory.StartNew的方法建立異步線程,使用WebRequest方法向Handler.ashx發(fā)送請(qǐng)求。
        接收到回傳流時(shí),就會(huì)根據(jù)頭文件的內(nèi)容判斷圖片的數(shù)量與每副圖片的長(zhǎng)度,把二進(jìn)制數(shù)據(jù)轉(zhuǎn)化為*.jpg文件保存。

        系統(tǒng)利用TaskFactory.StartNew(action,cancellationToken) 方式異步調(diào)用GetImages方法進(jìn)行圖片下載。
        當(dāng) 用戶(hù)按下Cancel按鈕時(shí),異步任務(wù)就會(huì)停止。值得注意的是,在圖片下載時(shí)調(diào)用了 CancellationToken.ThrowIfCancellationRequested方法,目的在檢查并行任務(wù)的運(yùn)行情況,在并行任務(wù)被停止 時(shí)釋放出OperationCanceledException異常,確保用戶(hù)按下Cancel按鈕時(shí),停止所有并行任務(wù)。

             public partial class Form1 : Form
             {
                 private CancellationTokenSource tokenSource = new CancellationTokenSource();
                 
                 public Form1()
                 {
                     InitializeComponent();
                     ThreadPool.SetMaxThreads(1000, 1000);
                 }
         
                 private void downloadToolStripMenuItem_Click(object sender, EventArgs e)
                 {
                      Task.Factory.StartNew(GetImages,tokenSource.Token);
                 }
         
                 private void cancelToolStripMenuItem_Click(object sender, EventArgs e)
                 {
                     tokenSource.Cancel();
                 }
         
                 private void GetImages()
                 {
                     //發(fā)送請(qǐng)求,獲取輸出流
        WebRequest webRequest = HttpWebRequest.Create("Http://localhost:5800/Handler.ashx"); Stream responseStream=webRequest.GetResponse().GetResponseStream(); byte[] responseByte = new byte[81960]; IAsyncResult result=responseStream.BeginRead(responseByte,0,81960,null,null); int responseLength = responseStream.EndRead(result); //獲取圖片數(shù)量
        int imageCount = BitConverter.ToInt32(responseByte, 0); //獲取每副圖片的長(zhǎng)度
        int[] lengths = new int[imageCount]; for (int n = 0; n < imageCount; n++) { int length = BitConverter.ToInt32(responseByte, (n + 1) * 4); lengths[n] = length; } try { //保存圖片
        for (int n = 0; n < imageCount; n++) { string path = string.Format("E:/My Projects/Example/Test/Images/pic{0}.jpg", n); FileStream file = new FileStream(path, FileMode.Create, FileAccess.ReadWrite); //計(jì)算字節(jié)偏移量
        int offset = (imageCount + 1) * 4; for (int a = 0; a < n; a++) offset += lengths[a]; file.Write(responseByte, offset, lengths[n]); file.Flush(); //模擬操作
        Thread.Sleep(1000); //檢測(cè)CancellationToken變化
        tokenSource.Token.ThrowIfCancellationRequested(); } } catch (OperationCanceledException ex) { MessageBox.Show("Download cancel!"); } } }


         

        7.4 并行查詢(xún)(PLINQ)

        并行 LINQ (PLINQ) 是 LINQ 模式的并行實(shí)現(xiàn),主要區(qū)別在于 PLINQ 嘗試充分利用系統(tǒng)中的所有處理器。 它利用所有處理器的方法,把數(shù)據(jù)源分成片段,然后在多個(gè)處理器上對(duì)單獨(dú)工作線程上的每個(gè)片段并行執(zhí)行查詢(xún), 在許多情況下,并行執(zhí)行意味著查詢(xún)運(yùn)行速度顯著提高。但這并不說(shuō)明所有PLINQ都會(huì)使用并行方式,當(dāng)系統(tǒng)測(cè)試要并行查詢(xún)會(huì)對(duì)系統(tǒng)性能造成損害時(shí),那將自動(dòng)化地使用同步執(zhí)行。
        在System.Linq.ParallelEnumerable類(lèi)中,包含了并行查詢(xún)的大部分方法。

         

        方法成員 

        說(shuō)明

        AsParallel

        PLINQ 的入口點(diǎn)。 指定如果可能,應(yīng)并行化查詢(xún)的其余部分。

        AsSequential(Of TSource)

        指定查詢(xún)的其余部分應(yīng)像非并行 LINQ 查詢(xún)一樣按順序運(yùn)行。

        AsOrdered

        指定 PLINQ 應(yīng)保留查詢(xún)的其余部分的源序列排序,直到例如通過(guò)使用 orderby(在 Visual Basic 中為 Order By)子句更改排序?yàn)橹埂?/span>

        AsUnordered(Of TSource)

        指定查詢(xún)的其余部分的 PLINQ 不需要保留源序列的排序。

        WithCancellation(Of TSource)

        指定 PLINQ 應(yīng)定期監(jiān)視請(qǐng)求取消時(shí)提供的取消標(biāo)記和取消執(zhí)行的狀態(tài)。

        WithDegreeOfParallelism(Of TSource)

        指定 PLINQ 應(yīng)當(dāng)用來(lái)并行化查詢(xún)的處理器的最大數(shù)目。

        WithMergeOptions(Of TSource)

        提供有關(guān) PLINQ 應(yīng)當(dāng)如何(如果可能)將并行結(jié)果合并回到使用線程上的一個(gè)序列的提示。

        WithExecutionMode(Of TSource)

        指定 PLINQ 應(yīng)當(dāng)如何并行化查詢(xún)(即使默認(rèn)行為是按順序運(yùn)行查詢(xún))。

        ForAll(Of TSource)

        多線程枚舉方法,與循環(huán)訪問(wèn)查詢(xún)結(jié)果不同,它允許在不首先合并回到使用者線程的情況下并行處理結(jié)果。

        Aggregate 重載

        對(duì)于 PLINQ 唯一的重載,它啟用對(duì)線程本地分區(qū)的中間聚合以及一個(gè)用于合并所有分區(qū)結(jié)果的最終聚合函數(shù)。


        7.4.1 AsParallel

        通常想要實(shí)現(xiàn)并行查詢(xún),只需向數(shù)據(jù)源添加 AsParallel 查詢(xún)操作即可。

         1     class Program
         2     {
         3         static void Main(string[] args)
         4         {
         5             var personList=GetPersonList().AsParallel() 
         6                    .Where(x=>x.Age>30);
         7             Console.ReadKey();
         8         }
         9 
        10         //模擬源數(shù)據(jù)
        11 static IList<Person> GetPersonList() 12 { 13 var personList = new List<Person>(); 14 15 var person1 = new Person(); 16 person1.ID = 1; 17 person1.Name = "Leslie"; 18 person1.Age = 30; 19 personList.Add(person1); 20 ........... 21 return personList; 22 } 23 }

         

        7.4.2 AsOrdered

        若要使查詢(xún)結(jié)果必須保留源序列排序方式,可以使用AsOrdered方法。
        AsOrdered依然使用并行方式,只是在查詢(xún)過(guò)程加入額外信息,在并行結(jié)束后把查詢(xún)結(jié)果再次進(jìn)行排列。

             class Program
             {
                 static void Main(string[] args)
                 {
                     var personList=GetPersonList().AsParallel().AsOrdered()
                         .Where(x=>x.Age<30);
                     Console.ReadKey();
                 }
         
                 static IList<Person> GetPersonList()
                 {......}
             }


        7.4.3 WithDegreeOfParallelism

        默認(rèn)情況下,PLINQ 使用主機(jī)上的所有處理器,這些處理器的數(shù)量最多可達(dá) 64 個(gè)。
        通過(guò)使用 WithDegreeOfParallelism(Of TSource) 方法,可以指示 PLINQ 使用不多于指定數(shù)量的處理器。

         1     class Program
         2     {
         3         static void Main(string[] args)
         4         {
         5             var personList=GetPersonList().AsParallel().WithDegreeOfParallelism(2)
         6                 .Where(x=>x.Age<30);
         7             Console.ReadKey();
         8         }
         9 
        10         static IList<Person> GetPersonList()
        11         {.........}
        12     }

         

        7.4.4 ForAll

        如果要對(duì)并行查詢(xún)結(jié)果進(jìn)行操作,一般會(huì)在for或foreach中執(zhí)行,執(zhí)行枚舉操作時(shí)會(huì)使用同步方式。
        有見(jiàn)及此,PLINQ中包含了ForAll方法,它可以使用并行方式對(duì)數(shù)據(jù)集進(jìn)行操作。

             class Program
             {
                 static void Main(string[] args)
                 {
                     ThreadPool.SetMaxThreads(1000, 1000);
                     GetPersonList().AsParallel().ForAll(person =>{
                         ThreadPoolMessage(person);
                     });
                     Console.ReadKey();
                 }
         
                 static IList<Person> GetPersonList()
                 {.......}
         
                  //顯示線程池現(xiàn)狀
        static void ThreadPoolMessage(Person person) { int a, b; ThreadPool.GetAvailableThreads(out a, out b); string message = string.Format("Person ID:{0} Name:{1} Age:{2}\n" + " CurrentThreadId is {3}\n WorkerThreads is:{4}" + " CompletionPortThreads is :{5}\n", person.ID, person.Name, person.Age, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString()); Console.WriteLine(message); } }

        運(yùn)行結(jié)果

         

        7.4.5 WithCancellation

        如果需要停止查詢(xún),可以使用 WithCancellation(Of TSource) 運(yùn)算符并提供 CancellationToken 實(shí)例作為參數(shù)。
        與 第三節(jié)Task的例子相似,如果標(biāo)記上的 IsCancellationRequested 屬性設(shè)置為 true,則 PLINQ 將會(huì)注意到它,并停止所有線程上的處理,然后引發(fā) OperationCanceledException。這可以保證并行查詢(xún)能夠立即停止。

         1     class Program
         2     {
         3         static CancellationTokenSource tokenSource = new CancellationTokenSource();
         4 
         5         static void Main(string[] args)
         6         {
         7             Task.Factory.StartNew(Cancel);
         8             try
         9             {
        10                 GetPersonList().AsParallel().WithCancellation(tokenSource.Token)
        11                     .ForAll(person =>
        12                     {
        13                         ThreadPoolMessage(person);
        14                     });
        15             }
        16             catch (OperationCanceledException ex)
        17             { }
        18             Console.ReadKey();
        19         }
        20 
        21         //在10~50毫秒內(nèi)發(fā)出停止信號(hào)
        22 static void Cancel() 23 { 24 Random random = new Random(); 25 Thread.Sleep(random.Next(10,50)); 26 tokenSource.Cancel(); 27 } 28 29 static IList<Person> GetPersonList() 30 {......} 31 32 //顯示線程池現(xiàn)狀
        33 static void ThreadPoolMessage(Person person) 34 { 35 int a, b; 36 ThreadPool.GetAvailableThreads(out a, out b); 37 string message = string.Format("Person ID:{0} Name:{1} Age:{2}\n" + 38 " CurrentThreadId is {3}\n WorkerThreads is:{4}" + 39 " CompletionPortThreads is :{5}\n", 40 person.ID, person.Name, person.Age, 41 Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString()); 42 Console.WriteLine(message); 43 } 44 } 億恩-天使(QQ:530997) 電話 037160135991 服務(wù)器租用,托管歡迎咨詢(xún)。

        本文出自:億恩科技【mszdt.com】

        服務(wù)器租用/服務(wù)器托管中國(guó)五強(qiáng)!虛擬主機(jī)域名注冊(cè)頂級(jí)提供商!15年品質(zhì)保障!--億恩科技[ENKJ.COM]

      2. 您可能在找
      3. 億恩北京公司:
      4. 經(jīng)營(yíng)性ICP/ISP證:京B2-20150015
      5. 億恩鄭州公司:
      6. 經(jīng)營(yíng)性ICP/ISP/IDC證:豫B1.B2-20060070
      7. 億恩南昌公司:
      8. 經(jīng)營(yíng)性ICP/ISP證:贛B2-20080012
      9. 服務(wù)器/云主機(jī) 24小時(shí)售后服務(wù)電話:0371-60135900
      10. 虛擬主機(jī)/智能建站 24小時(shí)售后服務(wù)電話:0371-60135900
      11. 專(zhuān)注服務(wù)器托管17年
        掃掃關(guān)注-微信公眾號(hào)
        0371-60135900
        Copyright© 1999-2019 ENKJ All Rights Reserved 億恩科技 版權(quán)所有  地址:鄭州市高新區(qū)翠竹街1號(hào)總部企業(yè)基地億恩大廈  法律顧問(wèn):河南亞太人律師事務(wù)所郝建鋒、杜慧月律師   京公網(wǎng)安備41019702002023號(hào)
          0
         
         
         
         

        0371-60135900
        7*24小時(shí)客服服務(wù)熱線