Sunday, September 25, 2016

.NET Asynchronous Parallel Batch Processor

Last year, I wrote about how to handle dynamically sized batches of data in an asynchronous manner. That original implementation used an abstract base class, and only supported a single background processing thread. I recently updated that implementation to support lambdas rather than requiring inheritance, and support a dynamic number of background threads.

...basically, this is a ConcurrentQueue that supports taking a lambda and thread count to asynchronously process enqueued items.

Unit Tests

public class ParallelProcessorTests
{
    [Fact]
    public async Task NoDisposeTimeout()
    {
        var results = new ConcurrentQueue<int>();
 
        using (var processor = new ParallelProcessor<int>(2, async (i, token) =>
        {
            await Task.Delay(200, token).ConfigureAwait(false);
            results.Enqueue(i);
        }, disposeTimeoutMs: 0))
        {
            processor.Enqueue(1);
            processor.Enqueue(2);
            processor.Enqueue(3);
            processor.Enqueue(4);
            processor.Enqueue(5);
 
            await Task.Delay(300).ConfigureAwait(false);
        }
 
        Assert.Equal(2, results.Count);
    }
 
    [Fact]
    public void MaxParallelizationLimit()
    {
        const int parallelism = 3;
        var results = new ConcurrentQueue<Tuple<int, int>>();
        var active = 0;
 
        using (var processor = new ParallelProcessor<int>(parallelism, async (i, token) =>
        {
            Interlocked.Increment(ref active);
            await Task.Delay(200, token).ConfigureAwait(false);
            var currentActive = Interlocked.Decrement(ref active) + 1;
 
            var tuple = Tuple.Create(currentActive, i);
            results.Enqueue(tuple);
        }))
        {
            processor.Enqueue(1);
            processor.Enqueue(2);
            processor.Enqueue(3);
            processor.Enqueue(4);
            processor.Enqueue(5);
        }
 
        Assert.Equal(5, results.Count);
 
        var maxParallelism = results.Max(t => t.Item1);
        Assert.Equal(parallelism, maxParallelism);
    }
 
    [Fact]
    public void BatchProcessor()
    {
        var results = new List<Tuple<long, List<int>>>();
        var sw = Stopwatch.StartNew();
 
        using (var processor = new BatchParallelProcessor<int>(1, 2, async (ints, token) =>
        {
            await Task.Delay(100, token).ConfigureAwait(false);
            var tuple = Tuple.Create(sw.ElapsedMilliseconds, ints);
            results.Add(tuple);
        }))
        {
            processor.Enqueue(1);
            processor.Enqueue(2);
            processor.Enqueue(3);
            processor.Enqueue(4);
            processor.Enqueue(5);
        }
 
        Assert.Equal(3, results.Count);
 
        Assert.Equal(2, results[0].Item2.Count);
        Assert.Equal(1, results[0].Item2[0]);
        Assert.Equal(2, results[0].Item2[1]);
 
        Assert.True(results[0].Item1 < results[1].Item1);
        Assert.Equal(2, results[1].Item2.Count);
        Assert.Equal(3, results[1].Item2[0]);
        Assert.Equal(4, results[1].Item2[1]);
 
        Assert.True(results[1].Item1 < results[2].Item1);
        Assert.Equal(1, results[2].Item2.Count);
        Assert.Equal(5, results[2].Item2[0]);
    }
}

ParallelProcessor

public sealed class ParallelProcessor<T> : ProcessorBase<T>
{
    private readonly Func<T, CancellationToken, Task> _processHandler;
    private readonly Action<T, Exception> _exceptionHandler;
        
    public ParallelProcessor(
        int maxParallelization, 
        Func<T, CancellationToken, Task> processHandler, 
        Action<T, Exception> exceptionHandler = null,
        int disposeTimeoutMs = 30000,
        int? maxQueueSize = null)
        : base(maxParallelization, disposeTimeoutMs, maxQueueSize)
    {
        if (maxParallelization < 1)
            throw new ArgumentException(
                $"{nameof(maxParallelization)} is required",
                nameof(maxParallelization));
 
        _processHandler = processHandler;
        _exceptionHandler = exceptionHandler;
    }
 
    protected override async Task ProcessLoopAsync()
    {
        T item;
        while (!CancelSource.IsCancellationRequested && Queue.TryDequeue(out item))
        {
            try
            {
                await _processHandler(item, CancelSource.Token).ConfigureAwait(false);
            }
            catch (TaskCanceledException) when (CancelSource.IsCancellationRequested)
            {
                // Cancellation was requested, ignore and exit.
                return;
            }
            catch (Exception ex)
            {
                _exceptionHandler?.Invoke(item, ex);
            }
        }
    }
}

BatchParallelProcessor

public sealed class BatchParallelProcessor<T> : ProcessorBase<T>
{
    private readonly int _batchSize;
    private readonly Func<List<T>, CancellationToken, Task> _processHandler;
    private readonly Action<List<T>, Exception> _exceptionHandler;
        
    public BatchParallelProcessor(
        int maxParallelization,
        int batchSize,
        Func<List<T>, CancellationToken, Task> processHandler,
        Action<List<T>, Exception> exceptionHandler = null,
        int disposeTimeoutMs = 30000,
        int? maxQueueSize = null)
        : base(maxParallelization, disposeTimeoutMs, maxQueueSize)
    {
        if (batchSize < 1)
            throw new ArgumentException(
                $"{nameof(batchSize)} is required",
                nameof(batchSize));
 
        _batchSize = batchSize;
        _processHandler = processHandler;
        _exceptionHandler = exceptionHandler;
    }
 
    protected override async Task ProcessLoopAsync()
    {
        while (!CancelSource.IsCancellationRequested)
        {
            var count = Math.Min(_batchSize, Queue.Count + 1);
            var list = new List<T>(count);
 
            T item;
            while (list.Count < _batchSize && Queue.TryDequeue(out item))
                list.Add(item);
 
            if (list.Count == 0)
                return;
 
            try
            {
                await _processHandler(list, CancelSource.Token).ConfigureAwait(false);
            }
            catch (TaskCanceledException) when (CancelSource.IsCancellationRequested)
            {
                // Cancellation was requested, ignore and exit.
                return;
            }
            catch (Exception ex)
            {
                _exceptionHandler?.Invoke(list, ex);
            }
        }
    }
}

ProcessorBase

public abstract class ProcessorBase<T> : IDisposable
{
    protected readonly ConcurrentQueue<T> Queue = new ConcurrentQueue<T>();
    protected readonly CancellationTokenSource CancelSource = new CancellationTokenSource();
 
    private readonly object _lock = new object();
    private readonly Task[] _tasks;
    private readonly int _disposeTimeoutMs;
    private readonly int? _maxQueueSize;
 
    private bool _isDisposed;
 
    protected ProcessorBase(int maxParallelization, int disposeTimeoutMs, int? maxQueueSize)
    {
        _tasks = new Task[maxParallelization];
        _disposeTimeoutMs = disposeTimeoutMs;
        _maxQueueSize = maxQueueSize;
    }
 
    public void Dispose()
    {
        if (_isDisposed)
            return;
 
        _isDisposed = true;
 
        if (_disposeTimeoutMs > 0)
        {
            var tasks = _tasks.Where(t => t != null).ToArray();
            var allTask = Task.WhenAll(tasks);
            var delayTask = Task.Delay(_disposeTimeoutMs);
            Task.WaitAny(allTask, delayTask);
        }
 
        CancelSource.Cancel();
    }
 
    public void Enqueue(T item)
    {
        if (_isDisposed)
            throw new InvalidOperationException("Cancellation has been requested");
 
        if (_maxQueueSize.HasValue && Queue.Count >= _maxQueueSize)
            throw new InvalidOperationException("Queue is full");
 
        Queue.Enqueue(item);
        TryStartProcessLoop();
    }
 
    public bool TryEnqueue(T item)
    {
        if (_isDisposed)
            return false;
 
        if (_maxQueueSize.HasValue && Queue.Count >= _maxQueueSize)
            return false;
 
        Queue.Enqueue(item);
        TryStartProcessLoop();
        return true;
    }
 
    protected abstract Task ProcessLoopAsync();
 
    private void TryStartProcessLoop()
    {
        // Another thread is in the lock, bail out.
        if (!Monitor.TryEnter(_lock))
            return;
 
        // Create task outside of lock to ensure that we attach the
        // continue without while another thread can be in the block.
        Task task;
 
        try
        {
            // If cancellation has been requested, do not start.
            if (CancelSource.IsCancellationRequested)
                return;
 
            // If the queue is empty, do not start.
            if (Queue.Count == 0)
                return;
 
            var freeIndex = 0;
            var activeCount = 0;
 
            // Find last free index
            for (var i = 0; i < _tasks.Length; i++)
            {
                if (_tasks[i] == null || _tasks[i].IsCompleted)
                    freeIndex = i;
                else
                    activeCount++;
            }
 
            // All tasks are active, do not start.
            if (activeCount == _tasks.Length)
                return;
 
            // Only one in queue, at least one thread is active, do not start additional thread.
            if (activeCount > 0 && Queue.Count <= 1)
                return;
 
            // Start a new task to process the queue.
            task = _tasks[freeIndex] = Task.Run(ProcessLoopAsync, CancelSource.Token);
        }
        finally 
        {
            Monitor.Exit(_lock);
        }
 
        // When the process queue task completes, check to see if
        // the queue has been populated again and needs to restart.
        task.ContinueWith(t => TryStartProcessLoop());
    }
}

Enjoy,
Tom

2 comments:

  1. Nice bit of code, thanks!

    Whenever I see this sort of need now, I tend to think of the Actor Model and Akka.NET. The logic of the lambda/action becomes a message and an actor's response to it. You can then automatically put them behind a router of x size and have things processed simultaneously. I find it makes such things much easier to reason about.

    This is not to discount your solution by any means, but it seemed worth mentioning here.

    ReplyDelete
    Replies
    1. Thanks for bringing this up! I have used dataflow blocks form the TPL in the past to solve these sorts of problems, but I created this because I just wanted something small and lightweight. If you are doing anything at scale, then I would absolutely encourage people to look other more powerful solutions, such as Akka.

      Delete

Real Time Web Analytics