Thursday, 30 June 2016

Creating TPL Dataflow meshes to construct pipelines of computations

The TPL DataFlow Library allows the creation of simple and more complex data meshes that propagate data computations and exceptions using the Nuget package Microsoft.Tpl.DataFlow Let's look at how we can create a compound mesh to do three calculations that is considered as a single mesh. These simple examples appear to give simple computations as these a huge overhead in complexity. Of course, you would use Microsoft.Tpl.DataFlow for more complex scenarios, the simple example is just used for clarity. Consider the following code: First off, make sure you add a reference to Microsoft.Tpl.Dataflow, since TPL Dataflow is not part of the base class Library BCL in .NET. In the Nuget Package Explorer commandline in VS:
Install-Package Microsoft.Tpl.DataFlow

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace DataFlowDemo
{

    class Program
    {

        static void Main(string[] args)
        {
            //TplDataDemo();
            SecondTplDataDemo();
            Console.WriteLine("Press any key to continue ..");
            Console.ReadKey();
        }

        private static async void SecondTplDataDemo()
        {
            int[] nums = { 1, 13, 26, 14, 29, 15 };
            Console.WriteLine("Input numbers: ");
            foreach (var n in nums)
                Console.WriteLine(n);
            IPropagatorBlock<int, int> compountBlock = GetPropagatorBlock();
            Console.WriteLine("Pipeline: " + "x = (x * 2) => (x + 2) => (x / 2)");
            foreach (var num in nums)
            {
                compountBlock.Post(num);
            }
            try
            {

                while (true)
                {
                    try
                    {
                        Task<int> f = compountBlock.ReceiveAsync(TimeSpan.FromSeconds(1));
                        await f;
                        await Task.Delay(1000);
                        Console.WriteLine(f.Result);
                    }
                    catch (TimeoutException err)
                    {
                        //Console.WriteLine(err.Message);
                        break;
                    }
                    catch (Exception err)
                    {
                        //Console.WriteLine(err.Message);
                        throw err;
                    }
                }

            }
            catch (Exception err)
            {
                Console.WriteLine(err.Message);
            }
        }

        private static IPropagatorBlock<int, int> GetPropagatorBlock()
        {
            var multiplyBlock = new TransformBlock<int, int>(item => item * 2);
            var addBlock = new TransformBlock<int, int>(item => item + 2);
            var divideBlock = new TransformBlock<int, int>(item => item / 2);

            var flowCompletion = new DataflowLinkOptions { PropagateCompletion = true };
            multiplyBlock.LinkTo(addBlock, flowCompletion);
            addBlock.LinkTo(divideBlock, flowCompletion);

            return DataflowBlock.Encapsulate(multiplyBlock, divideBlock);
        }
  }

We build up the steps of the computation pipeline as a TransformBlock. The multiplyblock is linked to the addBlock and the divideBlock is then linked to the addBlock. We got a pipeline like this: multiplyBlock-addBlock-divideBlock. Each computation will then be: y = (x * 2) => z = y + 2 => w = z / 2. We also use the Encapsulate method to glue together the start step and the end step. We then get the following output:
Input numbers:
1
13
26
14
29
15
Pipeline: x = (x * 2) => (x + 2) => (x / 2)
2
14
27
15
30
16
Press any key to continue ..
Test out TPL Dataflow sample above (VS 2015 solution here: VS Solution With sample code above (.zip)

1 comment:

  1. Thank you for sharing your article. Great efforts put it to find the list of articles which is very useful to know, Definitely will share the same to other forums.

    appvn app

    ReplyDelete