Sunday, June 28, 2015

.NET Asynchronous Batch Processor

The .NET Framework offers a series of Thread-Safe Collections that allows you to consume collections across threads. Processing the contents of these collections still requires a thread, and while there is a BlockingCollection there is unfortunately no such class to support this in an asynchronous fashion. (Please note that the always awesome Stephen Cleary did actually implement an AsyncCollection.)

What if you want to handle dynamically sized batches of data in an asynchronous manner?

You could use a series of Dataflow blocks, or if you are looking for a simple solution you can write a small class that uses an async loop to process a ConcurrentQueue. Below is an abstract base class that can help you implement this:

Base Class

public abstract class BatchProcessorBase<T> : IDisposable
{
    protected readonly int MaxBatchSize;
    private readonly ConcurrentQueue<T> _queue;
    private readonly CancellationTokenSource _cancelSource;
    private readonly object _queueTaskLock;
    private Task _queueTask;
    private bool _isDiposed;
 
    protected BatchProcessorBase(int maxBatchSize)
    {
        MaxBatchSize = maxBatchSize;
        _queue = new ConcurrentQueue<T>();
        _cancelSource = new CancellationTokenSource();
        _queueTaskLock = new object();
        _queueTask = Task.FromResult(true);
    }
        
    public void Enqueue(T item)
    {
        _queue.Enqueue(item);
        TryStartProcessLoop();
    }
 
    public void Dispose()
    {
        if (_isDiposed)
            return;
 
        _cancelSource.Cancel();
        _isDiposed = true;
    }
 
    protected abstract Task ProcessBatchAsync(
        IList<T> list, 
        CancellationToken cancelToken);
 
    private void TryStartProcessLoop()
    {
        // Lock so only one thread can manipulate the queue task.
        lock (_queueTaskLock)
        {
            // If cancellationhas been requested, do not start.
            if (_cancelSource.IsCancellationRequested)
                return;
 
            // If the loop is still active, do not start.
            if (!_queueTask.IsCompleted)
                return;
 
            // If the queue is empty, do not start.
            if (_queue.Count == 0)
                return;
 
            // Start a new task to process the queue.
            _queueTask = Task.Run(() => ProcessQueue(), _cancelSource.Token);
 
            // When the process queue task completes, check to see if
            // the queue has been populated again and needs to restart.
            _queueTask.ContinueWith(t => TryStartProcessLoop());
        }
    }
 
    private async Task ProcessQueue()
    {
        // Stay alive until the queue is empty or cancellation is requested.
        while (!_cancelSource.IsCancellationRequested && _queue.Count > 0)
        {
            var list = new List<T>();
            T item;
 
            // Dequeue up to a full batch from the queue.
            while (list.Count < MaxBatchSize && _queue.TryDequeue(out item))
                list.Add(item);
 
            // Process the dequeued items.
            await ProcessBatchAsync(list, _cancelSource.Token);
        }
    }
}

Wednesday, June 24, 2015

Capture xUnit Test Output with NLog and Common Logging

I recently blogged about How To Capture Test Output in xUnit 2.0. This is great, but how can we pass the ITestOutputHelper into our code to capture log output?

You could just wrap the xUnit helper in an ILog or ILogger, but we can also take it a step further and get all of the NLog features too! By creating an NLog target that wraps the ITestOutputHelper we can enable ourselves to use multiple targets, layouts, variables, verbosity levels, and more.

Sample Unit Test

public class NLogTests : IDisposable
{
    private readonly ILogger _logger;
 
    public NLogTests(ITestOutputHelper outputHelper)
    {
        _logger = outputHelper.GetNLogLogger();
    }
 
    public void Dispose()
    {
        _logger.RemoveTestOutputHelper();
    }
 
    [Fact]
    public void Hello()
    {
        _logger.Trace("World Trace");
        _logger.Debug("World Debug");
        _logger.Warn("World Warn");
        _logger.Error("World Error");
    }
}

Monday, June 22, 2015

How To: Kill child process when parent process is killed

UPDATE (2/15/2016): There is now a v2 of this class!

Killing child all child process spawned by a parent process is an extremely useful trick that is not directly supported by the .NET framework. Fortunately the windows operating system, more specifically Kernel32.dll, does support the ability to link one process to another on shutdown. A huge thanks to Matt Howell for sharing this solution on Stack Overflow!

I took the liberty of cleaning up a few small things in the code and creating a demo:

Real Time Web Analytics