Showing posts with label BlockingCollection. Show all posts
Showing posts with label BlockingCollection. Show all posts

Sunday, June 28, 2015

.NET Asynchronous Batch Processor

The .NET Framework offers a series of Thread-Safe Collections that allows you to consume collections across threads. Processing the contents of these collections still requires a thread, and while there is a BlockingCollection there is unfortunately no such class to support this in an asynchronous fashion. (Please note that the always awesome Stephen Cleary did actually implement an AsyncCollection.)

What if you want to handle dynamically sized batches of data in an asynchronous manner?

You could use a series of Dataflow blocks, or if you are looking for a simple solution you can write a small class that uses an async loop to process a ConcurrentQueue. Below is an abstract base class that can help you implement this:

Base Class

public abstract class BatchProcessorBase<T> : IDisposable
{
    protected readonly int MaxBatchSize;
    private readonly ConcurrentQueue<T> _queue;
    private readonly CancellationTokenSource _cancelSource;
    private readonly object _queueTaskLock;
    private Task _queueTask;
    private bool _isDiposed;
 
    protected BatchProcessorBase(int maxBatchSize)
    {
        MaxBatchSize = maxBatchSize;
        _queue = new ConcurrentQueue<T>();
        _cancelSource = new CancellationTokenSource();
        _queueTaskLock = new object();
        _queueTask = Task.FromResult(true);
    }
        
    public void Enqueue(T item)
    {
        _queue.Enqueue(item);
        TryStartProcessLoop();
    }
 
    public void Dispose()
    {
        if (_isDiposed)
            return;
 
        _cancelSource.Cancel();
        _isDiposed = true;
    }
 
    protected abstract Task ProcessBatchAsync(
        IList<T> list, 
        CancellationToken cancelToken);
 
    private void TryStartProcessLoop()
    {
        // Lock so only one thread can manipulate the queue task.
        lock (_queueTaskLock)
        {
            // If cancellationhas been requested, do not start.
            if (_cancelSource.IsCancellationRequested)
                return;
 
            // If the loop is still active, do not start.
            if (!_queueTask.IsCompleted)
                return;
 
            // If the queue is empty, do not start.
            if (_queue.Count == 0)
                return;
 
            // Start a new task to process the queue.
            _queueTask = Task.Run(() => ProcessQueue(), _cancelSource.Token);
 
            // When the process queue task completes, check to see if
            // the queue has been populated again and needs to restart.
            _queueTask.ContinueWith(t => TryStartProcessLoop());
        }
    }
 
    private async Task ProcessQueue()
    {
        // Stay alive until the queue is empty or cancellation is requested.
        while (!_cancelSource.IsCancellationRequested && _queue.Count > 0)
        {
            var list = new List<T>();
            T item;
 
            // Dequeue up to a full batch from the queue.
            while (list.Count < MaxBatchSize && _queue.TryDequeue(out item))
                list.Add(item);
 
            // Process the dequeued items.
            await ProcessBatchAsync(list, _cancelSource.Token);
        }
    }
}

Saturday, June 16, 2012

.NET BlockingQueue

Task.Factory.StartNew is great, but often abused.

Do you need to make a call that could take a long time to complete, but you don't care about the results? If so, then you need to make an async call. Should you make these calls by constantly creating and starting new Tasks? No, as this could use up a lot of resouces, exhaust your thread pool, or possibly even tear down your app domain.

I was recently introduced to System.Collections.Concurrent.BlockingCollection, and I absolutely love that class. However, 99% of my use cases with BlockingCollections are actually more specific to queuing. My solution: create a generic BlockingQueue!

Simple File Example

public class SimpleFile
{
    public string Path { get; set; }
    public string Contents { get; set; }
}
 
public class SimpleFileQueue : BlockingQueue<SimpleFile>
{
    public SimpleFileQueue(int threadCount) : base(threadCount) { }
 
    protected override void ProcessModel(SimpleFile model)
    {
        System.IO.File.WriteAllText(model.Path, model.Contents);
    }
 
    protected override void HandleException(Exception ex)
    {
        // TODO: Log me!
    }
}
 
public static class SimpleFileExample
{
    public static readonly SimpleFileQueue Queue = new SimpleFileQueue(3);
 
    public static void EnqueueSimpleFile(string path, string content)
    {
        Queue.Enqueue(new SimpleFile
        {
            Path = path,
            Contents = content
        });
    }
}

BlockingQueue<T> Implementation

public abstract class BlockingQueue<T> : IDisposable
{
    #region Private Members
 
    private const int Timeout = 60000;
 
    private bool _disposed;
    private readonly CancellationTokenSource _tokenSource;
    private readonly BlockingCollection<T> _collection;
    private readonly Task[] _tasks;
 
    #endregion
 
    #region Public Properties
 
    public int Count
    {
        get { return _collection.Count; }
    }
 
    public bool IsCanceled
    {
        get { return _tokenSource.IsCancellationRequested; }
    }
 
    public bool IsCompleted
    {
        get { return _tasks.All(t => t.IsCompleted); }
    }
 
    #endregion
 
    #region Constructor & Destructor
 
    protected BlockingQueue(int threadCount)
    {
        _tokenSource = new CancellationTokenSource();
            
        var queue = new ConcurrentQueue<T>();
        _collection = new BlockingCollection<T>(queue);
 
        _tasks = new Task[threadCount];
        for(var i=0; i<threadCount; i++)
            _tasks[i] = Task.Factory.StartNew(ProcessQueue);
    }
 
    ~BlockingQueue()
    {
        Dispose(true);
    }
 
    #endregion
 
    #region Abstracts
 
    protected abstract void HandleException(Exception ex);
 
    protected abstract void ProcessModel(T model);
 
    #endregion
 
    #region Methods
 
    public void Enqueue(T model)
    {
        if (IsCompleted)
            throw new Exception("BlockingQueue has been Completed");
 
        if (IsCanceled)
            throw new Exception("BlockingQueue has been Canceled");
 
        _collection.Add(model);
    }
 
    public void Cancel()
    {
        if (!IsCanceled)
            _tokenSource.Cancel(false);
    }
 
    public void CancelAndWait()
    {
        Cancel();
        Task.WaitAll(_tasks);
    }
 
    private void ProcessQueue()
    {
        while (!IsCanceled)
        {
            try
            {
                T model;
                var result = _collection.TryTake(out model, Timeout, _tokenSource.Token);
 
                if (result && model != null)
                    ProcessModel(model);
            }
            catch (OperationCanceledException)
            {
                break;
            }
            catch (Exception ex)
            {
                HandleException(ex);
            }
        }
    }
 
    #endregion
 
    #region IDisposable
 
    public void Dispose()
    {
        Dispose(false);
    }
 
    private void Dispose(bool finalizing)
    {
        if (_disposed)
            return;
 
        Cancel();
 
        if (!finalizing)
            GC.SuppressFinalize(this);
 
        _disposed = true;
    }
 
    #endregion
}
Shout it

Enjoy,
Tom

Real Time Web Analytics