var nums = ParallelEnumerable.Range(0, 10000); var parent = Task.Factory.StartNew(() => { Console.WriteLine("Parent task starting"); Thread.Sleep(2000); Console.WriteLine("Parent task done"); Task.Factory.StartNew(() => { Console.WriteLine("Child task starting"); Thread.Sleep(5000); Console.WriteLine("Child task done"); }, TaskCreationOptions.AttachedToParent); }); parent.Wait(); Console.WriteLine("Press any key to continue ..."); Console.ReadKey();The console app code above waits for the parent task and will not continue to the next line until not only the parent task is performed and executed, but also its child tasks are finished. In other ways, a task can have multiple child tasks. However, only the child tasks that specifies a TaskCreationOptions set to AttachedToParent will actually be waited upon. Also note that creating a next Task is done using the Task.Factory.StartNew calls. This is the preferred option to create new tasks, but one can also instantiate a Task with the new keyword, but then you have to remember to start the task also. Task.Factory.StartNew not only instantiates a new task but also starts it immediately. In the next short article, the ContinueWith construct in TPL will be presented. This is a construct that allows one to pipeline tasks, one after another. This is known as dataflow parallelism.
Saturday, 12 January 2013
Simple child tasks in TPL
This short article will present some code that displays how to perform the steps necessary to create a child task in the Task Parallel Library (TPL). To create a child task, simple specify the TaskCreationOptions
set to TaskCreationOptions.AttachedToParent.
The simple console application presents this next:
Sunday, 6 January 2013
Map and Reduce in PLinq
This article will present some code that can be used to apply a Map and a Reduce transformation of a provided input array of a given type of objects, using PLINQ.
For developers familiar with LINQ, PLINQ is the parallel version of Linq.
First, the source code will be presented, then the code will be explained.
using System; using System.Collections.Generic; using System.Linq; using System.Text; namespace TestParallelMap { class Program { static void Main(string[] args) { var nums = Enumerable.Range(0, 1001).ToArray(); Func<int,TaggedResult<int,double>> mapFunction = n => new TaggedResult<int,double>(n, Math.Pow(n, 2)); var result = ParallelMap(nums, mapFunction); foreach (var r in result) { Console.WriteLine("Got result {0} for the num {1}", r.OutputValue, r.InputtedValue); } Console.WriteLine("Press the any key to continue ..."); Console.ReadKey(); var resultSecond = ParallelReduce(nums, 0, (first, second) => first + second); Console.WriteLine("Got result {0}", resultSecond); Console.WriteLine("Press the any key to continue ..."); Console.ReadKey(); } private static TValue ParallelReduce<TValue>(TValue[] source, TValue seedValue, Func<TValue, TValue, TValue> reduceFunction) { return source.AsParallel().AsOrdered().Aggregate( seedValue, (localResult, nextValue) => reduceFunction(localResult, nextValue), (overallResult, localResult) => reduceFunction(overallResult, localResult), overallResult => overallResult); } private static TOutput[] ParallelMap<TInput, TOutput>(TInput[] source, Func<TInput, TOutput> mapFunction) { return source.AsParallel().AsOrdered().Select(x => mapFunction(x)).ToArray(); } private class TaggedResult<TInput, TOutput> { public TaggedResult(TInput inputtedValue, TOutput outputtedValue) { InputtedValue = inputtedValue; OutputValue = outputtedValue; } public TInput InputtedValue { get; set; } public TOutput OutputValue { get; set; } } } }The source code presented above is a simple console application. It is written in C# (obviously) and uses PLINQ to prosess the input array. The Map method will transform each value in the input array with a provided mapfunction. The source code uses Func to provide a function pointer to the method that will be executed to produce each item. The generic arguments control the type of the resulting array:
private static TOutput[] ParallelMap<TInput, TOutput>(TInput[] source, Func<TInput, TOutput> mapFunction) { return source.AsParallel().AsOrdered().Select(x => mapFunction(x)).ToArray(); }Note that PLINQ is very similar to LINQ and the main difference is the AsParallel() method. Also note that AsOrdered() will mean that the ordering will be maintained in the output array. If the order does not matter, there is a performance gain by omitting the call to AsOrdered().. Further, note that one uses the Select method of PLINQ to perform a projection where one at the same time performes the map method. The Reduce method is a bit more complicated. It uses PLINQ's Aggregate method, which really is a bit tough of understanding. Let's repeat the source code:
private static TValue ParallelReduce<TValue>(TValue[] source, TValue seedValue, Func<TValue, TValue, TValue> reduceFunction) { return source.AsParallel().AsOrdered().Aggregate( seedValue, (localResult, nextValue) => reduceFunction(localResult, nextValue), (overallResult, localResult) => reduceFunction(overallResult, localResult), overallResult => overallResult); }Actually, the PLINQ aggregate method is very versatile and powerful method when you want to process an entire array and perform a reduction method, in our case just a summation, but this could be anything, for example statistical methods and so on. The first argument is the seed value, the next parameter is a lambda expression that will perform the stepwise part of the reduction, the next parameter is the lambda expression that merges the stepwise result with the overall result - and the final fourth parameter is just a simple lambda which tells we do not want to do anything with the final overall result than propagate the value to the processed final result which then is returned. There are many more patterns that are a bit more complicated too, like MapReduce (which combines map and reduce), but this will be a relatively gentle introduction to the functionality that PLINQ presents. It is impressive to think that all the thread handling will be automatically be handled by PLINQ. From relatively simple code one can perform a multitude of different transformation from this source code with Map and Reduce, powered by PLINQ and generics.
Sunday, 18 November 2012
Tracking Prism EventAggregator traffic
Tracking Prism Events
It is possible to track the Prism events that are sent and received through the Prism EventAggregator,
but this is not out of the box of the Prism Framework. Usually, the developer must insert breakpoints
where the CompositePresentationEvent is published and subscribed.
but this is not out of the box of the Prism Framework. Usually, the developer must insert breakpoints
where the CompositePresentationEvent is published and subscribed.
This is satisfactory for small scenarios, but as your solution and project(s) grow, it can become an increasingly difficult task. It is in many cases desired to see which events are published and where they are subscribed. The Prism 4.x Framework still do not support this, but luckily there is a way to achieve this.
First off, it is required to change your Prism Events from using the class CompositePresentationEvent, to instead use a derived class presented next, CustomCompositePresentationEvent. I have chosen to display the Prism Event traffic in the Output of Visual Studio. This is done using Debug.WriteLine. It could be perhaps an alternative to log this event traffic or display it in a separate Window if you are writing a WPF application.
The custom class CustomCompositePresentationEvent looks like this:
using System; using System.ComponentModel; using System.Diagnostics; using System.Text; using Microsoft.Practices.Prism.Events; namespace EventAggregation.Infrastructure { /// <summary> /// Custom CompositePresentationEvent class, derived from this base class to allow the Prism Events to be displayed in the Debug Output Window. /// </summary> public class CustomCompositePresentationEvent<TEvent> : CompositePresentationEvent<TEvent> where TEvent : class { /// <summary> /// Overload of the publish method of CompositePresentationEvent /// </summary> public override void Publish(TEvent payload) { var declaringType = string.Empty; var callingMethod = new StackTrace().GetFrame(1).GetMethod(); if (callingMethod != null) declaringType = callingMethod.DeclaringType.FullName; System.Diagnostics.Debug.WriteLine(string.Format(@"Publishing event {0} at time: {1}. Source type: {2}. Event Payload: {3}", typeof(TEvent).Name,DateTime.Now, declaringType, GetContentAsString(payload))); base.Publish(payload); } /// <summary>Overload of the Subscribe method of CompositePresentationEvent</summary> /// <remarks> /// For this overload to be executed, set keepSubscriberReferenceAlive to /// true in the subscribe setup method on the target. /// </remarks> public override SubscriptionToken Subscribe(Action<TEvent> action, ThreadOption threadOption, bool keepSubscriberReferenceAlive, Predicate<TEvent> filter) { var loggingAction = new Action<TEvent>(e => { System.Diagnostics.Debug.WriteLine(string.Format( "Subscribing event {0} at time: {1}. Target type: {2}", typeof(TEvent).Name, DateTime.Now, action.Target.GetType().Name)); action.Invoke(e); }); var subscriptionToken = base.Subscribe(loggingAction, threadOption, keepSubscriberReferenceAlive, filter); return subscriptionToken; } private string GetContentAsString(TEvent payload) { if (payload == null) return string.Empty; else { StringBuilder sb = new StringBuilder(); foreach (PropertyDescriptor property in TypeDescriptor.GetProperties(payload)) { sb.Append(string.Format("{0}={1} ", property.Name, property.GetValue(payload))); } return sb.ToString(); } } } }As you can see from the source code above, the methods Subscribe and Publish are overriden. These are virtual methods of the CompositePresentationEvent base class. I have tested using this derived class from CompositePresentationEvent in the EventAggregator sample, which can be downloaded in the Prism 4.1 library with source code at the following url: Download Prism 4.1 You will find the EventAggregator sample in the \QuickStarts\EventAggregation folder after you have extracted the Prism 4.1 library with source code. I have done some minor changes to this demo solution. To the project EventAggregation.Infrastructure.Desktop, I added the class I made which was just presented, CustomCompositePresentationEvent. Next up, our Prism Events must use this CustomCompositePresentation event class! This is easy to change, as shown in the Prism Event FundAddedEvent, which I changed to the following, still in the project EventAggregation.Infrastructure.Desktop:
namespace EventAggregation.Infrastructure { public class FundAddedEvent : CustomCompositePresentationEventAs you can see, the FundAddedEvent Prism Event, is now inheriting from the class CustomCompositePresentationEvent. There are no changes needed to do for the way the FundAddedEvent is published, but the Subscribe method must be changed. The boolean flag keepSubscriberReferenceAlive must be set to true. The reason is that the Subscribe override method in the CustomCompositePresentationEvent requires this, as you can see in the source code. We modify the action sent back again to the base class to continue with its Subscribe implementation. I could not make the override method to work, i.e. the method override was not called, without setting the boolean flag keepSubscribeReferenceAlive to true. The change I had to make for the subscribe method of the FundAddedEvent is in the project ModuleB.Desktop, in the class ActivityPresenter. The line 67 was changed to this:{ } }
subscriptionToken = fundAddedEvent.Subscribe(FundAddedEventHandler, ThreadOption.UIThread, true, FundOrderFilter);The changes required then, is to change the Prism Events to inherit from the class CompositePresentationEvent to instead inherit from (use) the class CustomCompositePresentationEvent. This custom class is basically the CompositePresentationEvent class, but we have overridden the methods Subscribe and Publish. In addition, we must set the keepSubscriberReferenceAlive boolean flag to true in our Subscribe methods. This will potentially mean that you must change to the Subscribe overload method which has this flag. Let's first see the GUI of the EventAggregation sample:
The output from the Visual Studio is the most important feature of this CustomCompositePresentationEvent class. After all, we want to be able to see which classes publish and subscribe events, and the content of the payload of these events, correct? In this sample, the output looks like the following example:
Publishing event FundOrder at time: 18.11.2012 15:31:30. Source type: ModuleA.AddFundPresenter. Event Payload: CustomerId=Customer1 TickerSymbol=FundB Subscribing event FundOrder at time: 18.11.2012 15:31:30. Target type: ActivityPresenterAs we can see from the output above, the event FundOrder was published by the AddFundPresenter class. The PayLoad of this Prism event was CustomerId=Customer1 and the TickerSymbol=FundB. The event FundOrder was subscribed, interestingly at the same millisecond (i.e. the publish-subscribe execution obviously performs almost instantly in the Prism framework). If you think the concept of being able to see Prism traffic is both interesting and will help you as a developer, consider taking into to use this CompositePresentationEvent class. You will have to include the changes which I listed above also, but these changes should be relatively easy to overcome in a Prism application solution. The tracking or tracing of Prism Event Traffic will help you understand the application better and catch possible errors by this more convenient debugging scenario. For security reasons, you should consider if putting a Debug.WriteLine call to output the Event Payload is safe or not. Last, I want to mention the fact that Prism Publish-Subscribe scenarios take in my test 0 ms and occur to be happening almost instantly. Of course they take some CPU cycles to execute, but if you in any case wonder how fast is really a Publish-Subscribe, it will usually be the answer - REALLY FAST. However, it is possible that if multiple classes subscribe to a given Prism Event, it might take some milliseconds to execute. I have not tested this yet. In a MVVM Scenario, it is usually the ViewModels that publish and subscribe Prism Events, usually via an ICommand implementation (DelegateCommand) that is data bound in WPF or Silverlight to a Button or similar GUI object.
Subscribe to:
Posts (Atom)