Skip Navigation LinksHome > View Post

Getting good at Parallel: 1 - Cancellation

This is part of a short series of posts following my presentation ‘Getting Good at Parallel’ for the NxtGenUG Southampton user group. This post is all about the subtleties of Task cancallation in the TPL. The expectation is that you’re already at least vaguely familiar with Tasks (maybe because you attended my session) but to recap quickly.

The TPL provides a CancellationTokenSource that can be used to cancel Tasks. The code might look something like this:

CancellationTokenSource cts = new CancellationTokenSource();
Task taskA = Task.Factory.StartNew(() =>
    {
        // do some work
        Thread.Sleep(1000);
    }, cts
.Token);


Task taskB = Task.Factory.StartNew(() =>
    {
        // this task decides to cancel
        cts.Cancel();
    }, cts
.Token);

In this very basic sample we have two tasks and both share the Token provided by our CancellationTokenSource. Notice that taskB decides (for whatever reason you like) to cancel the Token and any tasks that are sharing that token. But what actually happens when Tasks are cancelled? running Tasks plucked from the processor and brutally stopped? Do I get an exception? This post hopefully answers a few of these questions with the aid of some simple diagrams that represent the flow of tasks, like this one:

clip_image002

Here we have two tasks, A and B. The arrow represents the time the Task is Running (that is, actively undergoing execution). As you can see, both tasks begin executing at approximately the same time. Task A will run for significantly longer than Task B, which calls Cancel() on the CancellationTokenSource. Both tasks share the same token from this CancellationTokenSource.

So what happens when Task B’s body calls cts.Cancel(). With the code above, pretty much nothing.

Note that Task A ISN’T going to be plucked from existence and aborted. Task A will happily continue to its natural conclusion unless the developer has coded for co-operative cancellation inside the body of Task A, i.e. the code checks to see if cancellation has been requested and exits the method (via return or an exception).

The behaviour of cancellation gets more interesting when we try to do some co-ordination by waiting on Task A or B.

Task.WaitAll(taskA, taskB);

If, for example, we call Task.WaitAll and specify both Task A and Task B, the WaitAll invocation would return at time t3 shown on the diagram; that is, when all tasks have completed.

Task.WaitAny(taskA, taskB);

If we call Task.WaitAny and specify both Task A and Task B, the WaitAny invocation would return at time t2 shown on the diagram. This is when the first task completes. Note that it isn’t when the cancellation is invoked. Task 1 would continue to process in the background until time t3.

OperationCanceledException

Both WaitAll and WaitAny (static methods of the Task type used above) support an overload that accepts a CancellationTokenSource parameter. If this token source is cancelled (as above) then WaitAll and WaitAny will throw an OperationCanceledException the moment the Cancel() is called. One exception (ha!) to this rule of course is if a task is completes before the CancellationTokenSource is cancelled. In that case, WaitAny will return the index of that task.

What happens if we add a third task, C, that is scheduled (taskC.Status == TaskStatus.WaitingToRun) but doesn’t get executed until around the time Task B completes, as shown below:

clip_image002[9]

How does this change the behaviour of WaitAll  and WaitAny?

Task.WaitAll(taskA, taskB, taskC);

If we call Task.WaitAll and specify all three tasks, the WaitAll invocation would throw an invocation exception at time t3. The AggregateException would contain a TaskCancelledException created for Task C – because its execution never took place.

Task.WaitAny(taskA, taskB, taskC);

If we call Task.WaitAny and specify all three tasks the WaitAny invocation would not throw any exception – it would return 1 to indicate that taskB completed and released the wait at time t2 on the diagram. If you’d need to know about cancellation at this point – you would simply use the overload that allows you to specify a CancellationTokenSource as discussed in the OperationCancelledException callout above, e.g.

try
{
    Task.WaitAny(taskA, taskB, taskC, cts);
}
catch (OperationCanceledException)
{
    // operation was cancelled
}

Exceptions, exceptions everywhere

During my session one of the attendees raised the point that if feels, to some degree, that the TPL tends toward driving control flow with exceptions. I can see why one would think this but I've still not had a chance to decide whether I really agree. After all, lots of the framework requires the handling of exceptions that would, to some degree, drive your business flow. The important thing is that your business logic sufficiently abstracts that I guess. Anyway, the good news is there are ways to avoid using exceptions as shown above if you prefer, e.g.

public static void WaitUntilTasksCompleteOrCancellation1
    (Task[] tasks, CancellationToken cancellationToken)
{
    var mres = new ManualResetEventSlim();
    using (cancellationToken.Register(() => mres.Set()))
    {
        Task.Factory.ContinueWhenAll(tasks, _ => mres.Set());
        mres.Wait();
    }
}

public static void WaitUntilTasksCompleteOrCancellation2
    (Task[] tasks, CancellationToken cancellationToken)
{
    var allCompleted = Task.Factory.ContinueWhenAll(tasks, delegate { });
    var allCompletedHandle = ((IAsyncResult)allCompleted).AsyncWaitHandle;
    WaitHandle.WaitAny(
        new WaitHandle[]
        {allCompletedHandle, cancellationToken.WaitHandle} );
}

Not pretty, but if you have to avoid exceptions – there are options.

Next up in the series – cancelling a long running ‘async’ task. Huge thanks to Stephen Toub for all his help and support in dealing with my questions. Thanks!

Tags:

 
Josh Post By Josh Twist
11:46 AM
03 Nov 2010

» Next Post: My last blog post working for Microsoft UK
« Previous Post: Getting Good at Parallel (with NxtGenUG)

Comments are closed for this post.

Posted by Terry Carvin @ 12 Jan 2011 4:07 PM
Hi Josh, Great post as ever :)

On the subject of waiting until tasks are complete, im wondering how this translates to the Parallel.ForEach construct.

I currently have an iteration i have enclosed within a Parallel.ForEach. In the Example below I have posed some questions in the comments about how best to handle the gracefully termination of the loop;
"
private void myFunction()
{

IList<string> iListOfItems = new List<string>();
// populate iListOfItems

CancellationTokenSource cts = new CancellationTokenSource();

ParallelOptions po = new ParallelOptions();
po.MaxDegreeOfParallelism = 20; // max threads
po.CancellationToken = cts.Token;

try
{
var myWcfProxy = new myWcfClientSoapClient();

if (Parallel.ForEach(iListOfItems, po, (item, loopsate) =>
{
try
{
if (_requestedToStop)
loopsate.Stop();
// long running blocking WS call, check before and after
var response = myWcfProxy.ProcessIntervalConfiguration(item);
if (_requestedToStop)
loopsate.Stop();

// perform some local processing of the response object
}
catch (Exception ex)
{
// cannot continue game over.
if (myWcfProxy.State == CommunicationState.Faulted)
{
loopsate.Stop();
throw;
}
}

// else carry on..
// raise some events and other actions that could all risk an unhandled error.

}
).IsCompleted)
{
RaiseAllItemsCompleteEvent();
}
}
catch (Exception ex)
{
// if an unhandled eror is raised within one if the Parallel.ForEach threads, do all theards in the
// ForEach abort or run to comleteion but the net affect is as soon as an Exception is raised
// an equeivent to loopsate.Stop() is allso called by the underlying famworke loop exception managment
// so do i need to call cts.Cancel here?

// i want to wait for all the threads to termainate before i continue at this point.

// do i need to call cts.Dispose()

MessageBox.Show(Logging.FormatException(ex));
}
finally
{

if (myWcfProxy != null)
{
// possible race condition with the foreach therads here
if (myWcfProxy.State == System.ServiceModel.CommunicationState.Faulted)
myWcfProxy.Abort();

myWcfProxy.Close();
}

// possible race condition with the foreach therads here
_requestedToStop = false;

}

}
"

Cheers
Terry.

Posted by Terry @ 12 Jan 2011 4:10 PM
... and sorry about the spelling in the comments lol. woops

© 2005 - 2014 Josh Twist - All Rights Reserved.