Sunday, December 15, 2013

Throttling Datafow and the Task Parallel Library

The Task Parallel Library is an amazingly powerful and versatile library. However, knowledge of how dataflow blocks process their data is vital to using them correctly. Trying to link source and target blocks to each other without fully understanding them is like throwing a live grenade into your app domain; at some point it will tear it down!

I recently experienced a bug where linking two blocks together without managing their Bounded Capacity caused them queue actions at an unsustainable rate, and eventually the dataflow literally eat all of the available memory on the server. This could have been easily avoided by throttling the source and target blocks.

How do you throttle your source block based on your target block?

Once linked together, a source block will produce messages as fast as it's target block can consume them. To prevent a source block from being too greedy, you want to restrict the bounded capacity for both it and it's consumer. Even then, you still need to understand that setting a bounded capacity could cause message producers to either block or fail to post.

...I'm sorry, but this subject is very complex, but I think code will explain these details best! Below are a detailed set of tests to take you through all the basic scenarios for throttling a dataflow:

[Fact]
public void Post()
{
    // We post 100 messages to an unbounded block.
    // All of the messages arrive and get processed.
 
    var count = 0;
            
    var actionBlock = new ActionBlock<int>(
        i => Interlocked.Increment(ref count));
            
    Parallel.For(0, 100, i => actionBlock.Post(i));
 
    actionBlock.Complete();
    actionBlock.Completion.Wait();
 
    Assert.Equal(100, count);
}
 
[Fact]
public void BoundedCapacity()
{
    // We try to post 100 messages to a bounded queue.
    // The post returns false and fails to queue the messages.
            
    var count = 0;
 
    var actionBlock = new ActionBlock<int>(
        i => Interlocked.Increment(ref count),
        new ExecutionDataflowBlockOptions {BoundedCapacity = 20});
 
    Parallel.For(0, 100, i => actionBlock.Post(i));
 
    actionBlock.Complete();
    actionBlock.Completion.Wait();
 
    AssertAlmostEqual(20, count);
}
 
[Fact]
public void SendAsyncWait()
{
    // The  queue is bounded, but we block the thread that is posting
    // the messages, thus all messages still get processed.
 
    var count = 0;
 
    var actionBlock = new ActionBlock<int>(
        i => Interlocked.Increment(ref count),
        new ExecutionDataflowBlockOptions { BoundedCapacity = 20 });
 
    Parallel.For(0, 100, i => actionBlock.SendAsync(i).Wait());
 
    actionBlock.Complete();
    actionBlock.Completion.Wait();
 
    Assert.Equal(100, count);
}
 
[Fact]
public void LinkTo()
{
    // This is where we start to see the power of dataflow.
    // We link two unbounded queues together, and all 100
    // messages get processed.
 
    var transformCount = 0;
    var actionCount = 0;
    var log = new ConcurrentStack<string>();
 
    var transformBlock = new TransformBlock<int, string>(
        i =>
        {
            log.Push(i + " - Transform");
            Interlocked.Increment(ref transformCount);
            return i.ToString();
        });
 
    var actionBlock = new ActionBlock<string>(
        s =>
        {
            log.Push(s + " - Action");
            Task.Delay(50).Wait();
            Interlocked.Increment(ref actionCount);
        });
 
    transformBlock.LinkTo(
        actionBlock,
        new DataflowLinkOptions { PropagateCompletion = true });
 
    Parallel.For(0, 20,
        new ParallelOptions { MaxDegreeOfParallelism = 3 },
        i => transformBlock.Post(i));
 
    transformBlock.Complete();
    actionBlock.Completion.Wait();
 
    Assert.Equal(20, transformCount);
    Assert.Equal(20, actionCount);
}
 
[Fact]
public void LinkToBounded()
{
    // We link an unbounded queue to a bounded queue.
    // But why do all messages get processed?
    // The first target is unbounded, so all messages get queued.
    // The second queue is then tied to the completion of the first,
    // meaning that despite being bounded all of it's predecessors
    // messages will be completed.
 
    var transformCount = 0;
    var actionCount = 0;
    var log = new ConcurrentStack<string>();
 
    var transformBlock = new TransformBlock<int, string>(
        i =>
        {
            log.Push(i + " - Transform");
            Interlocked.Increment(ref transformCount);
            return i.ToString();
        });
 
    var actionBlock = new ActionBlock<string>(
        s =>
        {
            log.Push(s + " - Action");
            Task.Delay(50).Wait();
            Interlocked.Increment(ref actionCount);
        },
        new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 1,
            BoundedCapacity = 2
        });
 
    transformBlock.LinkTo(
        actionBlock,
        new DataflowLinkOptions { PropagateCompletion = true });
 
    Parallel.For(0, 20,
        new ParallelOptions { MaxDegreeOfParallelism = 3 },
        i => transformBlock.Post(i));
 
    transformBlock.Complete();
    actionBlock.Completion.Wait();
 
    Assert.Equal(20, transformCount);
    Assert.Equal(20, actionCount);
}
 
[Fact]
public void BoundedLinkToBounded()
{
    // Here we link a bounded queue to a bounded queue.
    // Just as we saw above in the BoundedQueue test,
    // the Post method fails to queue the messages,
    // thus not all of them are processed.
 
    var transformCount = 0;
    var actionCount = 0;
    var log = new ConcurrentStack<string>();
 
    var transformBlock = new TransformBlock<int, string>(
        i =>
        {
            log.Push(i + " - Transform");
            Interlocked.Increment(ref transformCount);
            return i.ToString();
        },
        new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 1,
            BoundedCapacity = 2
        });
 
    var actionBlock = new ActionBlock<string>(
        s =>
        {
            log.Push(s + " - Action");
            Task.Delay(50).Wait();
            Interlocked.Increment(ref actionCount);
        },
        new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 1,
            BoundedCapacity = 2
        });
 
    transformBlock.LinkTo(
        actionBlock,
        new DataflowLinkOptions { PropagateCompletion = true });
 
    Parallel.For(0, 20,
        new ParallelOptions { MaxDegreeOfParallelism = 3 },
        i => transformBlock.Post(i));
 
    transformBlock.Complete();
    actionBlock.Completion.Wait();
 
    Assert.Equal(2, transformCount);
    Assert.Equal(2, actionCount);
}
 
[Fact]
public void BufferLinkToBoundedLinkToBounded()
{
    // So how to we throttle processing so that...
    // 1) The primary thread is not blocked.
    // 2) The transform (source) block is throttled
    //    the action (target) block.
 
    // Great question, and it's not that hard!
    // Add an unbounded buffer block to the front of
    // of the blocks. This will accomplish the first goal.
 
    // Then link the bounded transform and bounded action
    // together, this will make the transform block
    // be throttled by the action block.
 
    // Finally, when we go to shut down, call Complete 
    // directly on the transform block, so that it will
    // stop processing without trying to complete all
    // of items queued in the buffer.
 
    var transformCount = 0;
    var actionCount = 0;
    var log = new ConcurrentStack<string>();
 
    var bufferBlock = new BufferBlock<int>();
 
    var transformBlock = new TransformBlock<int, string>(
        i =>
        {
            log.Push(i + " - Transform");
            Interlocked.Increment(ref transformCount);
            return i.ToString();
        },
        new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 1,
            BoundedCapacity = 2
        });
 
    var actionBlock = new ActionBlock<string>(
        s =>
        {
            log.Push(s + " - Action");
            Task.Delay(50).Wait();
            Interlocked.Increment(ref actionCount);
        },
        new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 1,
            BoundedCapacity = 2
        });
 
    bufferBlock.LinkTo(
        transformBlock,
        new DataflowLinkOptions { PropagateCompletion = false });
 
    transformBlock.LinkTo(
        actionBlock,
        new DataflowLinkOptions { PropagateCompletion = true });
 
    Parallel.For(0, 20,
        new ParallelOptions { MaxDegreeOfParallelism = 3 },
        i => bufferBlock.Post(i));
 
    Task.Delay(500).Wait();
 
    bufferBlock.Complete();
    transformBlock.Complete();
    actionBlock.Completion.Wait();
 
    AssertAlmostEqual(10, transformCount);
    AssertAlmostEqual(10, actionCount);
}
Shout it

I hope that helped,
Tom

No comments:

Post a Comment

Real Time Web Analytics