Last year, I wrote about how to handle dynamically sized batches of data in an asynchronous manner. That original implementation used an abstract base class, and only supported a single background processing thread. I recently updated that implementation to support lambdas rather than requiring inheritance, and support a dynamic number of background threads.
...basically, this is a ConcurrentQueue that supports taking a lambda and thread count to asynchronously process enqueued items.
Unit Tests
public class ParallelProcessorTests
{
[Fact]
public async Task NoDisposeTimeout()
{
var results = new ConcurrentQueue<int>();
using (var processor = new ParallelProcessor<int>(2, async (i, token) =>
{
await Task.Delay(200, token).ConfigureAwait(false);
results.Enqueue(i);
}, disposeTimeoutMs: 0))
{
processor.Enqueue(1);
processor.Enqueue(2);
processor.Enqueue(3);
processor.Enqueue(4);
processor.Enqueue(5);
await Task.Delay(300).ConfigureAwait(false);
}
Assert.Equal(2, results.Count);
}
[Fact]
public void MaxParallelizationLimit()
{
const int parallelism = 3;
var results = new ConcurrentQueue<Tuple<int, int>>();
var active = 0;
using (var processor = new ParallelProcessor<int>(parallelism, async (i, token) =>
{
Interlocked.Increment(ref active);
await Task.Delay(200, token).ConfigureAwait(false);
var currentActive = Interlocked.Decrement(ref active) + 1;
var tuple = Tuple.Create(currentActive, i);
results.Enqueue(tuple);
}))
{
processor.Enqueue(1);
processor.Enqueue(2);
processor.Enqueue(3);
processor.Enqueue(4);
processor.Enqueue(5);
}
Assert.Equal(5, results.Count);
var maxParallelism = results.Max(t => t.Item1);
Assert.Equal(parallelism, maxParallelism);
}
[Fact]
public void BatchProcessor()
{
var results = new List<Tuple<long, List<int>>>();
var sw = Stopwatch.StartNew();
using (var processor = new BatchParallelProcessor<int>(1, 2, async (ints, token) =>
{
await Task.Delay(100, token).ConfigureAwait(false);
var tuple = Tuple.Create(sw.ElapsedMilliseconds, ints);
results.Add(tuple);
}))
{
processor.Enqueue(1);
processor.Enqueue(2);
processor.Enqueue(3);
processor.Enqueue(4);
processor.Enqueue(5);
}
Assert.Equal(3, results.Count);
Assert.Equal(2, results[0].Item2.Count);
Assert.Equal(1, results[0].Item2[0]);
Assert.Equal(2, results[0].Item2[1]);
Assert.True(results[0].Item1 < results[1].Item1);
Assert.Equal(2, results[1].Item2.Count);
Assert.Equal(3, results[1].Item2[0]);
Assert.Equal(4, results[1].Item2[1]);
Assert.True(results[1].Item1 < results[2].Item1);
Assert.Equal(1, results[2].Item2.Count);
Assert.Equal(5, results[2].Item2[0]);
}
}