The BlockingCollection of Task Parallel Library or TPL supports Producer-Consumer scenarios well. This is a scenario or pattern, where you
want to start processing items as soon as they are available. BlockingCollection makes this easy, as long as you follow the convention.
In this article, a simple scenario with five producers and one consumer is presented.
We create an array of tasks (Task[]) that we wait on, using the Task.WaitAll(Task[]) method. This creates a barrier in our main method
of which we call the
CompleteAdding() method of BlockingCollection to signal that we have no more items to add. The consumer, which here
is a single Task, will use standard foreach iteration and call the method
GetConsumingEnumerable() on the BlockingCollection to get the
collection to iterate over. Note that we will be inside the foreach loop until the
CompleteAdding() method is called on the BlockingCollection,
i.e. the iteration is halted until
CompleteAdding() is called and no more items are available.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace TestBlockingCollectionSecond
{
public class Node
{
public int ManagedThreadId { get; set; }
public int Value { get; set; }
public override string ToString()
{
return string.Format("Inside ManagedThreadId {0}, Value: {1}", ManagedThreadId, Value);
}
}
class Program
{
static void Main(string[] args)
{
BlockingCollection<Node> bc = new BlockingCollection<Node>();
Console.WriteLine("Producer-consumer scenario BlockingCollection.\nFive producers, one consumer scenario");
var rnd = new Random();
var producers = new List<Task>();
for (int i = 0; i < 5; i++)
{
var producer = Task.Factory.StartNew(() =>
{
for (int j = 0; j < 5; j++)
{
Thread.Sleep(rnd.Next(100,300));
bc.Add(new Node { ManagedThreadId = Thread.CurrentThread.ManagedThreadId, Value = rnd.Next(1, 100) });
}
});
producers.Add(producer);
}
var consumer = Task.Factory.StartNew(() =>
{
foreach (Node item in bc.GetConsumingEnumerable())
{
Console.WriteLine(item);
}
Console.WriteLine("Consumer all done!");
});
Task.WaitAll(producers.ToArray());
bc.CompleteAdding();
Console.WriteLine("Hit any key to exit program");
Console.ReadKey();
}
}
}
If you are not sure when to signal
CompleteAdding(), you can keep taking items from the BlockingCollection, using one single consumer or multiple, but
remember to catch
InvalidOperationException, in case there are no more items available, that is - after
CompleteAdding() method has been called on
the BlockingCollection.
Note that the while loop below is wrapped in a consumer Task that is once again started before the Task.WaitAll call on the producers array, to start consuming right away.
Our exit condition here is the flag bc.IsAddingCompleted is set to true, i.e. a call to
CompleteAdding is performed. The benefit of using the
GetConsumingEnumerable()
here is having not to deal with exceptions and boolean flag of completed adding items in the consumer Block, since calling the
Take() method on the collection
after the
CompleteAdding() method is called will throw an InvalidOperationException.
var consumer = Task.Factory.StartNew(() => {
try{
while (!bc.IsAddingCompleted){
Console.WriteLine("BlockingCollection element: " + bc.Take());
}
}
Console.WriteLine("Consumer all done!");
catch (InvalidOperationException ioe){
//Console.WriteLine(ioe.Message);
}
});
Share this article on LinkedIn.
No comments:
Post a Comment