Thursday 30 June 2016

Wrapping Asynchronous Programming Model (APM) to Task-based Asynchronous Pattern (TAP)

Let's look at how we can wrap classic Begin and End methods used in APM to the newer Task-based Asynchronous Pattern (TAP). Many methods of older framework Versions of .NET support such APM methods and we want to wrap or adapt them to support TAP and async await. Example code:

using System;
using System.IO;
using System.Net;
using System.Text;
using System.Threading.Tasks;

namespace ApmToTap
{
    class Program
    {

        static void Main(string[] args)
        {
            DownloadDemo();

            Console.WriteLine();
            Console.ReadKey();
        }

        private static async void DownloadDemo()
        {
            WebRequest wr = WebRequest.Create("https://t.co/UrkiLgN1BC");
            try
            {
                var response = await wr.GetResponseFromAsync();
                using (Stream stream = response.GetResponseStream())
                {
                    StreamReader reader = new StreamReader(stream, Encoding.UTF8);
                    Console.WriteLine(reader.ReadToEnd());
                }
            }
            catch (Exception err)
            {
                Console.WriteLine(err.Message);
            }
        }
    }

    public static class WebRequestExtensions
    {

        public static Task<WebResponse> GetResponseFromAsync(this WebRequest request)
        {
            return Task<WebResponse>.Factory.FromAsync(request.BeginGetResponse,
                request.EndGetResponse, null);
        }

    }

}

We use the Task<T>Factory.FromAsync method and provide the delegates for the Begin and End methods used in APM. We then provide just null as the AsyncState parameter, as this is not needed. We then can await the Task we create here and get the functionality Task provides such as information of how the asynchronous operation went, exceptions and so on. And of course we can also get the result we usually retrieve in the End method using APM. So there you have it. To use TAP With APM methods, you can use the Task<T>FromAsync method.

Wednesday 29 June 2016

High performance Producer-Consumer scenario using Nito.AsyncEx

The Nuget Library "Nito.AsyncEx" contains powerful collections that makes it possible to create asynchronous collections that support Producer-Consumer scenarios. First off, let us install the Nuget package:

Install-Package Nito.AsyncEx


using Nito.AsyncEx;
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;

namespace AsyncCollectionDemo
{
    class Program
    {

        private static readonly AsyncCollection<int> _asyncStack = new AsyncCollection<int>(new ConcurrentStack<int>(), maxCount: 1);


        static void Main(string[] args)
        {
            ProducerConsumerDemo();
        }

        private static async void ConsumerThread()
        {
            await Task.Run(async () =>
             {
                 //Consumer code 
                 while (await _asyncStack.OutputAvailableAsync())
                 {
                     Console.WriteLine(await _asyncStack.TakeAsync());
                     //Thread.Sleep(1000);
                 }
             });
        }

        private static async void ProducerConsumerDemo()
        {
            ConsumerThread();
            //Producer code 
            await _asyncStack.AddAsync(7);
            await _asyncStack.AddAsync(13);
            await _asyncStack.AddAsync(19);
            _asyncStack.CompleteAdding();


            Console.WriteLine("Press any key to continue ..");
            Console.ReadKey(); 
        }

    }
}

We can use the AsyncCollection to create one or multiple producer threads and then return the results to the consumer threads. The benefit of this Collection compared to the BlockingCollection in BCL is that since it supports async, the consumer can be an UI thread for example. So you can have code that produces results and delivers them back to the user Interface. You can ofcourse skip using the ConcurrentStack if you want a FIFO ordering instead of the stack's LIFO ordering. The creator of Nito.AsyncEx is created by Stephen Cleary, which also is the author of the good book "Concurrency in C# Cookboox" which is an O'Reilly book.

Producer-Consumer scenario in .NET using BlockingCollection

The BCL contains helpful classes to build your producer-consumer scenarios. We want to have a thread or several that produces data and one or several threads that consume that data. In this article I will show a simple single producer, single consumer scenario. BlockingCollection contains the necessary logic to support this. Consider the following code:


using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace BlockingCollection
{
    class Program
    {

        private static readonly BlockingCollection<int> _blockingQueue = new BlockingCollection<int>(boundedCapacity: 1); 


        static void Main(string[] args)
        {
            Task.Run(() =>
            {
                foreach (var item in _blockingQueue.GetConsumingEnumerable())
                {
                    Console.WriteLine("Hey I got this item: " + item + "!");
                    Thread.Sleep(1000);
                }

            });

            int[] nums = { 1, 3, 8, 11, 94, 28 };
            foreach (var n in nums)
                _blockingQueue.Add(n);
            _blockingQueue.CompleteAdding();

            Console.WriteLine("Hit the any key!");
            Console.ReadKey(); 
        }

    }
}


We create a blocking collection with a bound capacity of one item. This will only accept a single item at a time. This means that our second call to the .Add() method will actually block. We also use the .CompleteAdding() method to signal that we are finished filling up values. It is also important to start up our consumer before the blocking call. I use Task.Run()here to start up a new thread. We will inside that thread Call the .GetConsumingEnumerable() to set up our consumer. Note that I also use Thread.Sleep to make the consumer wait a bit, this is only for testing and normally you would of course not wait here. Note also that you could spawn multiple consumers with Task.Run or similar to support 1 producer, multiple consumers scenarios. For multiple producers you could use a single shared blocking collection or possibly multiple blocking collections. BlockingCollection is very flexible. It is important that you call .CompleteAdding() on your producer side to allow the consumers to finish also their execution.
What if you want to not have a FIFO ordering in your Producer-Consumer scenario, but say a LIFO scenario? As you probably can see, if you instantiate the BlockingCollection, the constructor takes overloads to somethhing called an IProducerConsumerCollection Collection. So we just adjust our instantiation like the following (note that I had to remove the boundedCapacity here set to 1 to make it work!):

   private static readonly BlockingCollection<int> _blockingQueue = new BlockingCollection<int>(
            new ConcurrentStack<int>()); 

So now we get our new output:

Hit the any key!
Hey I got this item: 28!
Hey I got this item: 94!
Hey I got this item: 11!
Hey I got this item: 8!
Hey I got this item: 3!
Hey I got this item: 1!

The BCL contains several interesting classes in the System.Collections.Concurrent namespace. Happy .NET coding!