using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; namespace TplDataFlowSimpleTests { class Program { static void Main(string[] args) { ActionBlockPostingAndFaulting(); //BufferBlockPostingAndReceiving(); //BroadCastPostAndMultipleReceive(); //WriteOnceBlockParallellPostsAndSingleReceive(); //ActionBlockPostingCompleting(); //TransformBlockPostingAndProducingOutput(); //TransformManyBlockPostingsAndReceive(); //BatchBlockSeveralBatches(); //JoinBlockAritmeticCombinations(); //BatchedJoinBlockPropagatingValuesAndException(); Console.WriteLine("Press the any key to continue .."); Console.ReadKey(); } private static void BatchedJoinBlockPropagatingValuesAndException() { Func<int, int> doWork = n => { if (n < 0) throw new ArgumentOutOfRangeException(); return n; }; var batchedJoinBlock = new BatchedJoinBlock<int, Exception>(7); foreach (int i in new int[] {5, 6, -7, -22, 13, 55, 0}) { try { batchedJoinBlock.Target1.Post(doWork(i)); } catch (ArgumentOutOfRangeException e) { batchedJoinBlock.Target2.Post(e); } } var results = batchedJoinBlock.Receive(); foreach (int n in results.Item1) { Console.WriteLine(n); } foreach (Exception e in results.Item2) { Console.WriteLine(e.Message); } } private static void JoinBlockAritmeticCombinations() { var joinBlock = new JoinBlock<int, int, char>(new GroupingDataflowBlockOptions{ Greedy = true}); joinBlock.Target1.Post(3); joinBlock.Target1.Post(6); joinBlock.Target2.Post(5); joinBlock.Target2.Post(4); joinBlock.Target3.Post('+'); joinBlock.Target3.Post('-'); for (int i = 0; i < 2; i++) { var data = joinBlock.Receive(); switch (data.Item3) { case '+': Console.WriteLine("{0} + {1} = {2}", data.Item1, data.Item2, data.Item1 + data.Item2); break; case '-': Console.WriteLine("{0} - {1} = {2}", data.Item1, data.Item2, data.Item1 - data.Item2); break; default: Console.WriteLine("Unknown operator '{0}'.", data.Item3); break; } //switch } //for } private static void BatchBlockSeveralBatches() { var batchBlock = new BatchBlock<int>(10); for (int i = 0; i < 13; i++) { batchBlock.Post(i); } batchBlock.Complete(); Console.WriteLine("The elements of the first batch are: [{0}] ", string.Join(",", batchBlock.Receive())); Console.WriteLine("The elements of the second batch are: [{0}]", string.Join(",", batchBlock.Receive())); } private static void TransformManyBlockPostingsAndReceive() { var transformManyBlock = new TransformManyBlock<string, char>(s => s.ToCharArray()); transformManyBlock.Post("Hello"); transformManyBlock.Post("World"); for (int i = 0; i < ("Hello" + "World").Length; i++) { Console.WriteLine(transformManyBlock.Receive()); } } private static void TransformBlockPostingAndProducingOutput() { var transformBlock = new TransformBlock<int, double>(n => Math.Sqrt(n)); transformBlock.Post(10); transformBlock.Post(20); transformBlock.Post(30); for (int i = 0; i < 3; i++) { Console.WriteLine(transformBlock.Receive()); } } private static void ActionBlockPostingCompleting() { var cts = new CancellationTokenSource(); var actionBlock = new ActionBlock<int>(n => { Console.WriteLine(n); Thread.Sleep(1000); if (n > 50) cts.Cancel(); }, new ExecutionDataflowBlockOptions{ MaxDegreeOfParallelism = 2, MaxMessagesPerTask = 1, CancellationToken = cts.Token }); for (int i = 0; i < 10; i++) { actionBlock.Post(i*10); } actionBlock.Complete(); try { actionBlock.Completion.Wait(cts.Token); } catch (OperationCanceledException ex) { Console.WriteLine(ex.Message); } } private static void WriteOnceBlockParallellPostsAndSingleReceive() { var writeOnceBlock = new WriteOnceBlock<string>(null); Parallel.Invoke( () => writeOnceBlock.Post("Message 1"), () => writeOnceBlock.Post("Message 2"), () => writeOnceBlock.Post("Message 3")); Console.WriteLine(writeOnceBlock.Receive()); } private static void BroadCastPostAndMultipleReceive() { var broadcastBlock = new BroadcastBlock<double>(null); broadcastBlock.Post(Math.PI); for (int i = 0; i < 3; i++) { Console.WriteLine(broadcastBlock.Receive()); } } private static void BufferBlockPostingAndReceiving() { var bufferBlock = new BufferBlock<int>(); for (int i = 0; i < 3; i++) { bufferBlock.Post(i); } for (int i = 0; i < 4; i++) { try { Console.WriteLine(bufferBlock.Receive(new TimeSpan(0, 0, 2))); } catch (TimeoutException tie) { Console.WriteLine("Exception of type: {0} with message: {1}", tie.GetType().Name, tie.Message); } } } private static void ActionBlockPostingAndFaulting() { var throwIfNegative = new ActionBlock<int>(n => { Console.WriteLine("n = {0}", n); if (n < 0) throw new ArgumentOutOfRangeException(); }); throwIfNegative.Completion.ContinueWith( task => { Console.WriteLine("The status of the completion task is '{0}'", task.Status); }); throwIfNegative.Post(0); throwIfNegative.Post(-1); throwIfNegative.Post(1); throwIfNegative.Post(2); throwIfNegative.Complete(); try { throwIfNegative.Completion.Wait(); } catch (AggregateException ae) { ae.Handle(e => { Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message); return true; }); } } } }
Wednesday, 15 July 2015
Data block types in Task Parallel Library
The following code sample shows data block types in Task Parallel Libary (TPL). The code is written in C# and can be run in a simple Console Application.
Dataflow in TPL makes it possible to build actor-based programming and orchestrate coarse-grained dataflow and pipelining tasks, maintaining robustness
and supporting concurrency-enabled applications. This makes it easier to construct high-performance, low latency systems.
Subscribe to:
Post Comments (Atom)
No comments:
Post a Comment