Thursday, 30 June 2016

Creating TPL Dataflow meshes to construct pipelines of computations

The TPL DataFlow Library allows the creation of simple and more complex data meshes that propagate data computations and exceptions using the Nuget package Microsoft.Tpl.DataFlow Let's look at how we can create a compound mesh to do three calculations that is considered as a single mesh. These simple examples appear to give simple computations as these a huge overhead in complexity. Of course, you would use Microsoft.Tpl.DataFlow for more complex scenarios, the simple example is just used for clarity. Consider the following code: First off, make sure you add a reference to Microsoft.Tpl.Dataflow, since TPL Dataflow is not part of the base class Library BCL in .NET. In the Nuget Package Explorer commandline in VS:
Install-Package Microsoft.Tpl.DataFlow

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace DataFlowDemo
{

    class Program
    {

        static void Main(string[] args)
        {
            //TplDataDemo();
            SecondTplDataDemo();
            Console.WriteLine("Press any key to continue ..");
            Console.ReadKey();
        }

        private static async void SecondTplDataDemo()
        {
            int[] nums = { 1, 13, 26, 14, 29, 15 };
            Console.WriteLine("Input numbers: ");
            foreach (var n in nums)
                Console.WriteLine(n);
            IPropagatorBlock<int, int> compountBlock = GetPropagatorBlock();
            Console.WriteLine("Pipeline: " + "x = (x * 2) => (x + 2) => (x / 2)");
            foreach (var num in nums)
            {
                compountBlock.Post(num);
            }
            try
            {

                while (true)
                {
                    try
                    {
                        Task<int> f = compountBlock.ReceiveAsync(TimeSpan.FromSeconds(1));
                        await f;
                        await Task.Delay(1000);
                        Console.WriteLine(f.Result);
                    }
                    catch (TimeoutException err)
                    {
                        //Console.WriteLine(err.Message);
                        break;
                    }
                    catch (Exception err)
                    {
                        //Console.WriteLine(err.Message);
                        throw err;
                    }
                }

            }
            catch (Exception err)
            {
                Console.WriteLine(err.Message);
            }
        }

        private static IPropagatorBlock<int, int> GetPropagatorBlock()
        {
            var multiplyBlock = new TransformBlock<int, int>(item => item * 2);
            var addBlock = new TransformBlock<int, int>(item => item + 2);
            var divideBlock = new TransformBlock<int, int>(item => item / 2);

            var flowCompletion = new DataflowLinkOptions { PropagateCompletion = true };
            multiplyBlock.LinkTo(addBlock, flowCompletion);
            addBlock.LinkTo(divideBlock, flowCompletion);

            return DataflowBlock.Encapsulate(multiplyBlock, divideBlock);
        }
  }

We build up the steps of the computation pipeline as a TransformBlock. The multiplyblock is linked to the addBlock and the divideBlock is then linked to the addBlock. We got a pipeline like this: multiplyBlock-addBlock-divideBlock. Each computation will then be: y = (x * 2) => z = y + 2 => w = z / 2. We also use the Encapsulate method to glue together the start step and the end step. We then get the following output:
Input numbers:
1
13
26
14
29
15
Pipeline: x = (x * 2) => (x + 2) => (x / 2)
2
14
27
15
30
16
Press any key to continue ..
Test out TPL Dataflow sample above (VS 2015 solution here: VS Solution With sample code above (.zip)

Wrapping Asynchronous Programming Model (APM) to Task-based Asynchronous Pattern (TAP)

Let's look at how we can wrap classic Begin and End methods used in APM to the newer Task-based Asynchronous Pattern (TAP). Many methods of older framework Versions of .NET support such APM methods and we want to wrap or adapt them to support TAP and async await. Example code:

using System;
using System.IO;
using System.Net;
using System.Text;
using System.Threading.Tasks;

namespace ApmToTap
{
    class Program
    {

        static void Main(string[] args)
        {
            DownloadDemo();

            Console.WriteLine();
            Console.ReadKey();
        }

        private static async void DownloadDemo()
        {
            WebRequest wr = WebRequest.Create("https://t.co/UrkiLgN1BC");
            try
            {
                var response = await wr.GetResponseFromAsync();
                using (Stream stream = response.GetResponseStream())
                {
                    StreamReader reader = new StreamReader(stream, Encoding.UTF8);
                    Console.WriteLine(reader.ReadToEnd());
                }
            }
            catch (Exception err)
            {
                Console.WriteLine(err.Message);
            }
        }
    }

    public static class WebRequestExtensions
    {

        public static Task<WebResponse> GetResponseFromAsync(this WebRequest request)
        {
            return Task<WebResponse>.Factory.FromAsync(request.BeginGetResponse,
                request.EndGetResponse, null);
        }

    }

}

We use the Task<T>Factory.FromAsync method and provide the delegates for the Begin and End methods used in APM. We then provide just null as the AsyncState parameter, as this is not needed. We then can await the Task we create here and get the functionality Task provides such as information of how the asynchronous operation went, exceptions and so on. And of course we can also get the result we usually retrieve in the End method using APM. So there you have it. To use TAP With APM methods, you can use the Task<T>FromAsync method.

Wednesday, 29 June 2016

High performance Producer-Consumer scenario using Nito.AsyncEx

The Nuget Library "Nito.AsyncEx" contains powerful collections that makes it possible to create asynchronous collections that support Producer-Consumer scenarios. First off, let us install the Nuget package:

Install-Package Nito.AsyncEx


using Nito.AsyncEx;
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;

namespace AsyncCollectionDemo
{
    class Program
    {

        private static readonly AsyncCollection<int> _asyncStack = new AsyncCollection<int>(new ConcurrentStack<int>(), maxCount: 1);


        static void Main(string[] args)
        {
            ProducerConsumerDemo();
        }

        private static async void ConsumerThread()
        {
            await Task.Run(async () =>
             {
                 //Consumer code 
                 while (await _asyncStack.OutputAvailableAsync())
                 {
                     Console.WriteLine(await _asyncStack.TakeAsync());
                     //Thread.Sleep(1000);
                 }
             });
        }

        private static async void ProducerConsumerDemo()
        {
            ConsumerThread();
            //Producer code 
            await _asyncStack.AddAsync(7);
            await _asyncStack.AddAsync(13);
            await _asyncStack.AddAsync(19);
            _asyncStack.CompleteAdding();


            Console.WriteLine("Press any key to continue ..");
            Console.ReadKey(); 
        }

    }
}

We can use the AsyncCollection to create one or multiple producer threads and then return the results to the consumer threads. The benefit of this Collection compared to the BlockingCollection in BCL is that since it supports async, the consumer can be an UI thread for example. So you can have code that produces results and delivers them back to the user Interface. You can ofcourse skip using the ConcurrentStack if you want a FIFO ordering instead of the stack's LIFO ordering. The creator of Nito.AsyncEx is created by Stephen Cleary, which also is the author of the good book "Concurrency in C# Cookboox" which is an O'Reilly book.