using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace TplDataFlowSimpleTests
{
class Program
{
static void Main(string[] args)
{
ActionBlockPostingAndFaulting();
//BufferBlockPostingAndReceiving();
//BroadCastPostAndMultipleReceive();
//WriteOnceBlockParallellPostsAndSingleReceive();
//ActionBlockPostingCompleting();
//TransformBlockPostingAndProducingOutput();
//TransformManyBlockPostingsAndReceive();
//BatchBlockSeveralBatches();
//JoinBlockAritmeticCombinations();
//BatchedJoinBlockPropagatingValuesAndException();
Console.WriteLine("Press the any key to continue ..");
Console.ReadKey();
}
private static void BatchedJoinBlockPropagatingValuesAndException()
{
Func<int, int> doWork = n =>
{
if (n < 0)
throw new ArgumentOutOfRangeException();
return n;
};
var batchedJoinBlock = new BatchedJoinBlock<int, Exception>(7);
foreach (int i in new int[] {5, 6, -7, -22, 13, 55, 0})
{
try
{
batchedJoinBlock.Target1.Post(doWork(i));
}
catch (ArgumentOutOfRangeException e)
{
batchedJoinBlock.Target2.Post(e);
}
}
var results = batchedJoinBlock.Receive();
foreach (int n in results.Item1)
{
Console.WriteLine(n);
}
foreach (Exception e in results.Item2)
{
Console.WriteLine(e.Message);
}
}
private static void JoinBlockAritmeticCombinations()
{
var joinBlock = new JoinBlock<int, int, char>(new GroupingDataflowBlockOptions{ Greedy = true});
joinBlock.Target1.Post(3);
joinBlock.Target1.Post(6);
joinBlock.Target2.Post(5);
joinBlock.Target2.Post(4);
joinBlock.Target3.Post('+');
joinBlock.Target3.Post('-');
for (int i = 0; i < 2; i++)
{
var data = joinBlock.Receive();
switch (data.Item3)
{
case '+':
Console.WriteLine("{0} + {1} = {2}", data.Item1, data.Item2, data.Item1 + data.Item2);
break;
case '-':
Console.WriteLine("{0} - {1} = {2}", data.Item1, data.Item2, data.Item1 - data.Item2);
break;
default:
Console.WriteLine("Unknown operator '{0}'.", data.Item3);
break;
} //switch
} //for
}
private static void BatchBlockSeveralBatches()
{
var batchBlock = new BatchBlock<int>(10);
for (int i = 0; i < 13; i++)
{
batchBlock.Post(i);
}
batchBlock.Complete();
Console.WriteLine("The elements of the first batch are: [{0}] ", string.Join(",", batchBlock.Receive()));
Console.WriteLine("The elements of the second batch are: [{0}]", string.Join(",", batchBlock.Receive()));
}
private static void TransformManyBlockPostingsAndReceive()
{
var transformManyBlock = new TransformManyBlock<string, char>(s => s.ToCharArray());
transformManyBlock.Post("Hello");
transformManyBlock.Post("World");
for (int i = 0; i < ("Hello" + "World").Length; i++)
{
Console.WriteLine(transformManyBlock.Receive());
}
}
private static void TransformBlockPostingAndProducingOutput()
{
var transformBlock = new TransformBlock<int, double>(n => Math.Sqrt(n));
transformBlock.Post(10);
transformBlock.Post(20);
transformBlock.Post(30);
for (int i = 0; i < 3; i++)
{
Console.WriteLine(transformBlock.Receive());
}
}
private static void ActionBlockPostingCompleting()
{
var cts = new CancellationTokenSource();
var actionBlock = new ActionBlock<int>(n =>
{
Console.WriteLine(n);
Thread.Sleep(1000);
if (n > 50)
cts.Cancel();
}, new ExecutionDataflowBlockOptions{ MaxDegreeOfParallelism = 2, MaxMessagesPerTask = 1, CancellationToken = cts.Token });
for (int i = 0; i < 10; i++)
{
actionBlock.Post(i*10);
}
actionBlock.Complete();
try
{
actionBlock.Completion.Wait(cts.Token);
}
catch (OperationCanceledException ex)
{
Console.WriteLine(ex.Message);
}
}
private static void WriteOnceBlockParallellPostsAndSingleReceive()
{
var writeOnceBlock = new WriteOnceBlock<string>(null);
Parallel.Invoke(
() => writeOnceBlock.Post("Message 1"),
() => writeOnceBlock.Post("Message 2"),
() => writeOnceBlock.Post("Message 3"));
Console.WriteLine(writeOnceBlock.Receive());
}
private static void BroadCastPostAndMultipleReceive()
{
var broadcastBlock = new BroadcastBlock<double>(null);
broadcastBlock.Post(Math.PI);
for (int i = 0; i < 3; i++)
{
Console.WriteLine(broadcastBlock.Receive());
}
}
private static void BufferBlockPostingAndReceiving()
{
var bufferBlock = new BufferBlock<int>();
for (int i = 0; i < 3; i++)
{
bufferBlock.Post(i);
}
for (int i = 0; i < 4; i++)
{
try
{
Console.WriteLine(bufferBlock.Receive(new TimeSpan(0, 0, 2)));
}
catch (TimeoutException tie)
{
Console.WriteLine("Exception of type: {0} with message: {1}", tie.GetType().Name, tie.Message);
}
}
}
private static void ActionBlockPostingAndFaulting()
{
var throwIfNegative = new ActionBlock<int>(n =>
{
Console.WriteLine("n = {0}", n);
if (n < 0)
throw new ArgumentOutOfRangeException();
});
throwIfNegative.Completion.ContinueWith(
task => { Console.WriteLine("The status of the completion task is '{0}'", task.Status); });
throwIfNegative.Post(0);
throwIfNegative.Post(-1);
throwIfNegative.Post(1);
throwIfNegative.Post(2);
throwIfNegative.Complete();
try
{
throwIfNegative.Completion.Wait();
}
catch (AggregateException ae)
{
ae.Handle(e =>
{
Console.WriteLine("Encountered {0}: {1}", e.GetType().Name, e.Message);
return true;
});
}
}
}
}
Wednesday, 15 July 2015
Data block types in Task Parallel Library
The following code sample shows data block types in Task Parallel Libary (TPL). The code is written in C# and can be run in a simple Console Application.
Dataflow in TPL makes it possible to build actor-based programming and orchestrate coarse-grained dataflow and pipelining tasks, maintaining robustness
and supporting concurrency-enabled applications. This makes it easier to construct high-performance, low latency systems.
Subscribe to:
Post Comments (Atom)
No comments:
Post a Comment