Wednesday, 15 July 2015

Data block types in Task Parallel Library

The following code sample shows data block types in Task Parallel Libary (TPL). The code is written in C# and can be run in a simple Console Application. Dataflow in TPL makes it possible to build actor-based programming and orchestrate coarse-grained dataflow and pipelining tasks, maintaining robustness and supporting concurrency-enabled applications. This makes it easier to construct high-performance, low latency systems.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace TplDataFlowSimpleTests
{
    class Program
    {

        static void Main(string[] args)
        {
            ActionBlockPostingAndFaulting();

            //BufferBlockPostingAndReceiving();

            //BroadCastPostAndMultipleReceive();

            //WriteOnceBlockParallellPostsAndSingleReceive();

            //ActionBlockPostingCompleting();

            //TransformBlockPostingAndProducingOutput(); 

            //TransformManyBlockPostingsAndReceive(); 

            //BatchBlockSeveralBatches();

            //JoinBlockAritmeticCombinations(); 

            //BatchedJoinBlockPropagatingValuesAndException();

            Console.WriteLine("Press the any key to continue ..");
            Console.ReadKey();

        }

        private static void BatchedJoinBlockPropagatingValuesAndException()
        {
            Func<int, int> doWork = n =>
            {
                if (n < 0)
                    throw new ArgumentOutOfRangeException();
                return n;
            };

            var batchedJoinBlock = new BatchedJoinBlock<int, Exception>(7);

            foreach (int i in new int[] {5, 6, -7, -22, 13, 55, 0})
            {
                try
                {
                    batchedJoinBlock.Target1.Post(doWork(i));
                }
                catch (ArgumentOutOfRangeException e)
                {
                    batchedJoinBlock.Target2.Post(e);
                }
            }

            var results = batchedJoinBlock.Receive();

            foreach (int n in results.Item1)
            {
                Console.WriteLine(n);
            }

            foreach (Exception e in results.Item2)
            {
                Console.WriteLine(e.Message);
            }
        }

        private static void JoinBlockAritmeticCombinations()
        {
            var joinBlock = new JoinBlock<int, int, char>(new GroupingDataflowBlockOptions{ Greedy = true});

            joinBlock.Target1.Post(3);
            joinBlock.Target1.Post(6);

            joinBlock.Target2.Post(5);
            joinBlock.Target2.Post(4);

            joinBlock.Target3.Post('+');
            joinBlock.Target3.Post('-');

            for (int i = 0; i < 2; i++)
            {
                var data = joinBlock.Receive();

                switch (data.Item3)
                {
                    case '+':
                        Console.WriteLine("{0} + {1} = {2}", data.Item1, data.Item2, data.Item1 + data.Item2);
                        break;
                    case '-':
                        Console.WriteLine("{0} - {1} = {2}", data.Item1, data.Item2, data.Item1 - data.Item2);
                        break;
                    default:
                        Console.WriteLine("Unknown operator '{0}'.", data.Item3);
                        break;
                } //switch 
            } //for 

        }

        private static void BatchBlockSeveralBatches()
        {
            var batchBlock = new BatchBlock<int>(10);

            for (int i = 0; i < 13; i++)
            {
                batchBlock.Post(i);
            }

            batchBlock.Complete();


            Console.WriteLine("The elements of the first batch are: [{0}] ", string.Join(",", batchBlock.Receive()));
            Console.WriteLine("The elements of the second batch are: [{0}]", string.Join(",", batchBlock.Receive()));
        }

        private static void TransformManyBlockPostingsAndReceive()
        {
            var transformManyBlock = new TransformManyBlock<string, char>(s => s.ToCharArray());

            transformManyBlock.Post("Hello");
            transformManyBlock.Post("World");

            for (int i = 0; i < ("Hello" + "World").Length; i++)
            {
                Console.WriteLine(transformManyBlock.Receive());
            }
        }

        private static void TransformBlockPostingAndProducingOutput()
        {
            var transformBlock = new TransformBlock<int, double>(n => Math.Sqrt(n));

            transformBlock.Post(10);
            transformBlock.Post(20);
            transformBlock.Post(30);

            for (int i = 0; i < 3; i++)
            {
                Console.WriteLine(transformBlock.Receive());
            }
        }

        private static void ActionBlockPostingCompleting()
        {
            var cts = new CancellationTokenSource();
            var actionBlock = new ActionBlock<int>(n =>
            {
                Console.WriteLine(n);
                Thread.Sleep(1000);
                if (n > 50)
                    cts.Cancel();
            }, new ExecutionDataflowBlockOptions{ MaxDegreeOfParallelism = 2, MaxMessagesPerTask = 1, CancellationToken = cts.Token });

            for (int i = 0; i < 10; i++)
            {
                actionBlock.Post(i*10); 
         
            }

            actionBlock.Complete();
            try
            {
                actionBlock.Completion.Wait(cts.Token);
            }
            catch (OperationCanceledException ex)
            {
                Console.WriteLine(ex.Message);
            }
        }

        private static void WriteOnceBlockParallellPostsAndSingleReceive()
        {
            var writeOnceBlock = new WriteOnceBlock<string>(null);

            Parallel.Invoke(
                () => writeOnceBlock.Post("Message 1"),
                () => writeOnceBlock.Post("Message 2"),
                () => writeOnceBlock.Post("Message 3"));

            Console.WriteLine(writeOnceBlock.Receive());
        }

        private static void BroadCastPostAndMultipleReceive()
        {
            var broadcastBlock = new BroadcastBlock<double>(null);

            broadcastBlock.Post(Math.PI);

            for (int i = 0; i < 3; i++)
            {
                Console.WriteLine(broadcastBlock.Receive());
            }
        }

        private static void BufferBlockPostingAndReceiving()
        {
            var bufferBlock = new BufferBlock<int>();

            for (int i = 0; i < 3; i++)
            {
                bufferBlock.Post(i);
            }

            for (int i = 0; i < 4; i++)
            {
                try
                {
                    Console.WriteLine(bufferBlock.Receive(new TimeSpan(0, 0, 2)));
                }
                catch (TimeoutException tie)
                {
                    Console.WriteLine("Exception of type: {0} with message: {1}", tie.GetType().Name, tie.Message);
                }
            }
        }

        private static void ActionBlockPostingAndFaulting()
        {
            var throwIfNegative = new ActionBlock<int>(n =>
            {
                Console.WriteLine("n = {0}", n);
                if (n < 0)
                    throw new ArgumentOutOfRangeException();
            });

            throwIfNegative.Completion.ContinueWith(
                task => { Console.WriteLine("The status of the completion task is '{0}'", task.Status); });

            throwIfNegative.Post(0);
            throwIfNegative.Post(-1);
            throwIfNegative.Post(1);
            throwIfNegative.Post(2);

            throwIfNegative.Complete();

            try
            {
                throwIfNegative.Completion.Wait();
            }
            catch (AggregateException ae)
            {
                ae.Handle(e =>
                {
                    Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message);
                    return true;
                });
            }

          
        }
    }
}

No comments:

Post a Comment