Sunday, June 28, 2015

.NET Asynchronous Batch Processor

The .NET Framework offers a series of Thread-Safe Collections that allows you to consume collections across threads. Processing the contents of these collections still requires a thread, and while there is a BlockingCollection there is unfortunately no such class to support this in an asynchronous fashion. (Please note that the always awesome Stephen Cleary did actually implement an AsyncCollection.)

What if you want to handle dynamically sized batches of data in an asynchronous manner?

You could use a series of Dataflow blocks, or if you are looking for a simple solution you can write a small class that uses an async loop to process a ConcurrentQueue. Below is an abstract base class that can help you implement this:

Base Class

public abstract class BatchProcessorBase<T> : IDisposable
{
    protected readonly int MaxBatchSize;
    private readonly ConcurrentQueue<T> _queue;
    private readonly CancellationTokenSource _cancelSource;
    private readonly object _queueTaskLock;
    private Task _queueTask;
    private bool _isDiposed;
 
    protected BatchProcessorBase(int maxBatchSize)
    {
        MaxBatchSize = maxBatchSize;
        _queue = new ConcurrentQueue<T>();
        _cancelSource = new CancellationTokenSource();
        _queueTaskLock = new object();
        _queueTask = Task.FromResult(true);
    }
        
    public void Enqueue(T item)
    {
        _queue.Enqueue(item);
        TryStartProcessLoop();
    }
 
    public void Dispose()
    {
        if (_isDiposed)
            return;
 
        _cancelSource.Cancel();
        _isDiposed = true;
    }
 
    protected abstract Task ProcessBatchAsync(
        IList<T> list, 
        CancellationToken cancelToken);
 
    private void TryStartProcessLoop()
    {
        // Lock so only one thread can manipulate the queue task.
        lock (_queueTaskLock)
        {
            // If cancellationhas been requested, do not start.
            if (_cancelSource.IsCancellationRequested)
                return;
 
            // If the loop is still active, do not start.
            if (!_queueTask.IsCompleted)
                return;
 
            // If the queue is empty, do not start.
            if (_queue.Count == 0)
                return;
 
            // Start a new task to process the queue.
            _queueTask = Task.Run(() => ProcessQueue(), _cancelSource.Token);
 
            // When the process queue task completes, check to see if
            // the queue has been populated again and needs to restart.
            _queueTask.ContinueWith(t => TryStartProcessLoop());
        }
    }
 
    private async Task ProcessQueue()
    {
        // Stay alive until the queue is empty or cancellation is requested.
        while (!_cancelSource.IsCancellationRequested && _queue.Count > 0)
        {
            var list = new List<T>();
            T item;
 
            // Dequeue up to a full batch from the queue.
            while (list.Count < MaxBatchSize && _queue.TryDequeue(out item))
                list.Add(item);
 
            // Process the dequeued items.
            await ProcessBatchAsync(list, _cancelSource.Token);
        }
    }
}

Sample and Test

public class SampleBatchProcessor<T> : BatchProcessorBase<T>
{
    private readonly TimeSpan _processDelay;
 
    public SampleBatchProcessor(TimeSpan processDelay, int maxListSize)
        : base(maxListSize)
    {
        _processDelay = processDelay;
        LogLines = new ConcurrentQueue<string>();
    }
 
    public ConcurrentQueue<string> LogLines { get; private set; }
 
    protected override async Task ProcessBatchAsync(
        IList<T> list,
        CancellationToken cancelToken)
    {
        var items = string.Join(", ", list);
        LogLines.Enqueue(items);
        await Task.Delay(_processDelay, cancelToken);
    }
}
 
public class BatchProcessorTests
{
    [Fact]
    public async Task Enqueue()
    {
        var processDelay = TimeSpan.FromMilliseconds(20);
        var batchDelay = TimeSpan.FromMilliseconds(100);
 
        using (var batchProcessor = new SampleBatchProcessor<int>(
            processDelay, 
            2))
        {
            // 1
            batchProcessor.Enqueue(1);
            batchProcessor.Enqueue(2);
 
            // 2
            batchProcessor.Enqueue(3);
            batchProcessor.Enqueue(4);
 
            await Task.Delay(batchDelay);
 
            // 3
            batchProcessor.Enqueue(5);
 
            await Task.Delay(batchDelay);
 
            // 4
            batchProcessor.Enqueue(6);
            batchProcessor.Enqueue(7);
 
            // 5
            batchProcessor.Enqueue(8);
 
            await Task.Delay(batchDelay);
 
            Assert.Equal(5, batchProcessor.LogLines.Count);
 
            string logLine;
 
            batchProcessor.LogLines.TryDequeue(out logLine);
            Assert.Equal("1, 2", logLine);
 
            batchProcessor.LogLines.TryDequeue(out logLine);
            Assert.Equal("3, 4", logLine);
 
            batchProcessor.LogLines.TryDequeue(out logLine);
            Assert.Equal("5", logLine);
 
            batchProcessor.LogLines.TryDequeue(out logLine);
            Assert.Equal("6, 7", logLine);
 
            batchProcessor.LogLines.TryDequeue(out logLine);
            Assert.Equal("8", logLine);
        }
    }
}

Enjoy,
Tom

2 comments:


  1. What do you think about ring buffers? Actually I mean LMAX Disruptor (.Net version has not been updated since 2013 :( ). It was created with idea of batch processing. When you set size of ring buffer you automatically limit the number of unprocessed messages. With ConcurrentDictionary you can easily get out of memory.

    You will not need TryStartProcessLoop - to insert new item in ring buffer the only thing required - Interlocked.increment of int counter.

    ReplyDelete
    Replies
    1. You are right; there is no memory cap on the concurrent queue, and that could lead to problems with very large collections. A cheap fix would just be to make a TryEnqueue method that checks the count of the queue before inserting.

      I am not familiar with the LMAX Disruptor, but I found the GitHub page and I'll have to check it out! :)

      Delete

Real Time Web Analytics