Monday, 13 July 2015

Producer-consumer scenario with BlockingCollection of Task Parallell Library

The BlockingCollection of Task Parallel Library or TPL supports Producer-Consumer scenarios well. This is a scenario or pattern, where you want to start processing items as soon as they are available. BlockingCollection makes this easy, as long as you follow the convention. In this article, a simple scenario with five producers and one consumer is presented. We create an array of tasks (Task[]) that we wait on, using the Task.WaitAll(Task[]) method. This creates a barrier in our main method of which we call the CompleteAdding() method of BlockingCollection to signal that we have no more items to add. The consumer, which here is a single Task, will use standard foreach iteration and call the method GetConsumingEnumerable() on the BlockingCollection to get the collection to iterate over. Note that we will be inside the foreach loop until the CompleteAdding() method is called on the BlockingCollection, i.e. the iteration is halted until CompleteAdding() is called and no more items are available.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace TestBlockingCollectionSecond
{

    public class Node
    {

        public int ManagedThreadId { get; set; }

        public int Value { get; set; }

        public override string ToString()
        {
            return string.Format("Inside ManagedThreadId {0}, Value: {1}", ManagedThreadId, Value);
        }

    }

    class Program
    {
        static void Main(string[] args)
        {

            BlockingCollection<Node> bc = new BlockingCollection<Node>();

            Console.WriteLine("Producer-consumer scenario BlockingCollection.\nFive producers, one consumer scenario");

            var rnd = new Random();

            var producers = new List<Task>();

            for (int i = 0; i < 5; i++)
            {

                var producer = Task.Factory.StartNew(() =>
                {                  
                    for (int j = 0; j < 5; j++)
                    {
                        Thread.Sleep(rnd.Next(100,300));
                        bc.Add(new Node { ManagedThreadId = Thread.CurrentThread.ManagedThreadId, Value = rnd.Next(1, 100) });
                    }
                });

                producers.Add(producer);

            }

            var consumer = Task.Factory.StartNew(() =>
            {
                foreach (Node item in bc.GetConsumingEnumerable())
                {
                    Console.WriteLine(item);
                }

                Console.WriteLine("Consumer all done!");             
            });

            Task.WaitAll(producers.ToArray());

            bc.CompleteAdding();
       

            Console.WriteLine("Hit any key to exit program");
            Console.ReadKey();         


        }
    }
}


If you are not sure when to signal CompleteAdding(), you can keep taking items from the BlockingCollection, using one single consumer or multiple, but remember to catch InvalidOperationException, in case there are no more items available, that is - after CompleteAdding() method has been called on the BlockingCollection. Note that the while loop below is wrapped in a consumer Task that is once again started before the Task.WaitAll call on the producers array, to start consuming right away. Our exit condition here is the flag bc.IsAddingCompleted is set to true, i.e. a call to CompleteAdding is performed. The benefit of using the GetConsumingEnumerable() here is having not to deal with exceptions and boolean flag of completed adding items in the consumer Block, since calling the Take() method on the collection after the CompleteAdding() method is called will throw an InvalidOperationException.



            var consumer = Task.Factory.StartNew(() => {

                try{
                while (!bc.IsAddingCompleted){
                    Console.WriteLine("BlockingCollection element: " + bc.Take());
                }
                }
                Console.WriteLine("Consumer all done!");
                catch (InvalidOperationException ioe){
                    //Console.WriteLine(ioe.Message);
                }
            });

No comments:

Post a Comment