Showing posts with label Async. Show all posts
Showing posts with label Async. Show all posts

Sunday, September 25, 2016

.NET Asynchronous Parallel Batch Processor

Last year, I wrote about how to handle dynamically sized batches of data in an asynchronous manner. That original implementation used an abstract base class, and only supported a single background processing thread. I recently updated that implementation to support lambdas rather than requiring inheritance, and support a dynamic number of background threads.

...basically, this is a ConcurrentQueue that supports taking a lambda and thread count to asynchronously process enqueued items.

Unit Tests

public class ParallelProcessorTests
{
    [Fact]
    public async Task NoDisposeTimeout()
    {
        var results = new ConcurrentQueue<int>();
 
        using (var processor = new ParallelProcessor<int>(2, async (i, token) =>
        {
            await Task.Delay(200, token).ConfigureAwait(false);
            results.Enqueue(i);
        }, disposeTimeoutMs: 0))
        {
            processor.Enqueue(1);
            processor.Enqueue(2);
            processor.Enqueue(3);
            processor.Enqueue(4);
            processor.Enqueue(5);
 
            await Task.Delay(300).ConfigureAwait(false);
        }
 
        Assert.Equal(2, results.Count);
    }
 
    [Fact]
    public void MaxParallelizationLimit()
    {
        const int parallelism = 3;
        var results = new ConcurrentQueue<Tuple<int, int>>();
        var active = 0;
 
        using (var processor = new ParallelProcessor<int>(parallelism, async (i, token) =>
        {
            Interlocked.Increment(ref active);
            await Task.Delay(200, token).ConfigureAwait(false);
            var currentActive = Interlocked.Decrement(ref active) + 1;
 
            var tuple = Tuple.Create(currentActive, i);
            results.Enqueue(tuple);
        }))
        {
            processor.Enqueue(1);
            processor.Enqueue(2);
            processor.Enqueue(3);
            processor.Enqueue(4);
            processor.Enqueue(5);
        }
 
        Assert.Equal(5, results.Count);
 
        var maxParallelism = results.Max(t => t.Item1);
        Assert.Equal(parallelism, maxParallelism);
    }
 
    [Fact]
    public void BatchProcessor()
    {
        var results = new List<Tuple<long, List<int>>>();
        var sw = Stopwatch.StartNew();
 
        using (var processor = new BatchParallelProcessor<int>(1, 2, async (ints, token) =>
        {
            await Task.Delay(100, token).ConfigureAwait(false);
            var tuple = Tuple.Create(sw.ElapsedMilliseconds, ints);
            results.Add(tuple);
        }))
        {
            processor.Enqueue(1);
            processor.Enqueue(2);
            processor.Enqueue(3);
            processor.Enqueue(4);
            processor.Enqueue(5);
        }
 
        Assert.Equal(3, results.Count);
 
        Assert.Equal(2, results[0].Item2.Count);
        Assert.Equal(1, results[0].Item2[0]);
        Assert.Equal(2, results[0].Item2[1]);
 
        Assert.True(results[0].Item1 < results[1].Item1);
        Assert.Equal(2, results[1].Item2.Count);
        Assert.Equal(3, results[1].Item2[0]);
        Assert.Equal(4, results[1].Item2[1]);
 
        Assert.True(results[1].Item1 < results[2].Item1);
        Assert.Equal(1, results[2].Item2.Count);
        Assert.Equal(5, results[2].Item2[0]);
    }
}

Wednesday, May 4, 2016

IResourceLoader: Balancing Semaphores

Recently I need to get balance getting resources from a restricted number of sources. So, for example...

I am getting resource R, and I have factories A, B and C creating those Rs. Each of those factories has a very limited capacity for creating those resources, and can only create two Rs at a time. It is easy to put the factories behind a semaphore and limit how many threads can be requesting resources from each factory at a time.

The challenge is evenly balancing the workload between all three factories. Also, please note that you can't just round robin the semaphores because there is no way to ensure that each operation will complete in the same amount of time.

To do this I created a generic IResourceLoader interface, and made two implementations: one to wrap a semaphore, and the other to wrap and balance a collection of IResourceLoaders. Below is the implementation, complete with unit tests; let's take a look!

Interface

public interface IResourceLoader<T>
{
    int Available { get; }
    int Count { get; }
    int MaxConcurrency { get; }
 
    Task<T> GetAsync(CancellationToken cancelToken = default(CancellationToken));
    bool TryGet(out Task<T> resource, CancellationToken cancelToken = default(CancellationToken));
}

Wednesday, March 30, 2016

Why method that return a Task should end with Async

I have spoken before about using the Fleck WebSocket server library for .NET, but recently I experienced a small problem with it. Pull request #126 changed several public methods to become async and return a Task instead of void.

This did not follow the recommended naming convention of naming all async methods with the Async suffix.

In my case, this caused us to introduce a bug to a project that was using Fleck. We would ignore Task returned from IWebSocketConnection.Send, and under high load the task scheduler would get backed up.

Even though I do not think that they are going to use it, I created a pull request that updated the the public methods to include the Async suffix. Also, in order to maintain backwards compatibility, I also added obsolete methods to the interfaces without the suffix.

What did I do with the tasks? Rather than write in parallel, I used a concurrent queue and had a background task serially write messages to the connection.

Enjoy,
Tom

Sunday, October 4, 2015

Throttles - Delay vs Semaphore

A while back I had talked about how to await an interval with a simple Throttle class that I had made. This is a very easy way to control how often you start an operation, but it does not ensure a limit to how many operations are happening at a given time.

Problem: You want to only make 5 requests per second to a remote service.

With the throttle class set to only allow one new call to start every 200 milliseconds you will only be able to start 5 new requests per second. However, if those calls take longer than two seconds to complete, then during the next second you will have 10 requests in flight at the same time.

Solution: Set your throttle to 200 millisecond and add a semaphore with a count of 5.

You can solve this problem by combining your throttle with a semaphore. This will ensure that you only start a new operation at your schedule frequency, and also that you never have more than a predetermined number in flight at the same time.

Please note that only using a semaphore would not solve the problem because if, in the same example, the calls take sub 200 milliseconds to complete then more than 5 requests will start each second.

Below is a set of helper classes (and tests) to help extend the throttle class to support this functionality within a using block.

Interfaces

public interface IUsableSemaphore : IDisposable
{
    Task<IUsableSemaphoreWrapper> WaitAsync();
}
 
public interface IUsableSemaphoreWrapper : IDisposable
{
    TimeSpan Elapsed { get; }
}
 
public interface IThrottle
{
    Task WaitAsync();
}

Sunday, June 28, 2015

.NET Asynchronous Batch Processor

The .NET Framework offers a series of Thread-Safe Collections that allows you to consume collections across threads. Processing the contents of these collections still requires a thread, and while there is a BlockingCollection there is unfortunately no such class to support this in an asynchronous fashion. (Please note that the always awesome Stephen Cleary did actually implement an AsyncCollection.)

What if you want to handle dynamically sized batches of data in an asynchronous manner?

You could use a series of Dataflow blocks, or if you are looking for a simple solution you can write a small class that uses an async loop to process a ConcurrentQueue. Below is an abstract base class that can help you implement this:

Base Class

public abstract class BatchProcessorBase<T> : IDisposable
{
    protected readonly int MaxBatchSize;
    private readonly ConcurrentQueue<T> _queue;
    private readonly CancellationTokenSource _cancelSource;
    private readonly object _queueTaskLock;
    private Task _queueTask;
    private bool _isDiposed;
 
    protected BatchProcessorBase(int maxBatchSize)
    {
        MaxBatchSize = maxBatchSize;
        _queue = new ConcurrentQueue<T>();
        _cancelSource = new CancellationTokenSource();
        _queueTaskLock = new object();
        _queueTask = Task.FromResult(true);
    }
        
    public void Enqueue(T item)
    {
        _queue.Enqueue(item);
        TryStartProcessLoop();
    }
 
    public void Dispose()
    {
        if (_isDiposed)
            return;
 
        _cancelSource.Cancel();
        _isDiposed = true;
    }
 
    protected abstract Task ProcessBatchAsync(
        IList<T> list, 
        CancellationToken cancelToken);
 
    private void TryStartProcessLoop()
    {
        // Lock so only one thread can manipulate the queue task.
        lock (_queueTaskLock)
        {
            // If cancellationhas been requested, do not start.
            if (_cancelSource.IsCancellationRequested)
                return;
 
            // If the loop is still active, do not start.
            if (!_queueTask.IsCompleted)
                return;
 
            // If the queue is empty, do not start.
            if (_queue.Count == 0)
                return;
 
            // Start a new task to process the queue.
            _queueTask = Task.Run(() => ProcessQueue(), _cancelSource.Token);
 
            // When the process queue task completes, check to see if
            // the queue has been populated again and needs to restart.
            _queueTask.ContinueWith(t => TryStartProcessLoop());
        }
    }
 
    private async Task ProcessQueue()
    {
        // Stay alive until the queue is empty or cancellation is requested.
        while (!_cancelSource.IsCancellationRequested && _queue.Count > 0)
        {
            var list = new List<T>();
            T item;
 
            // Dequeue up to a full batch from the queue.
            while (list.Count < MaxBatchSize && _queue.TryDequeue(out item))
                list.Add(item);
 
            // Process the dequeued items.
            await ProcessBatchAsync(list, _cancelSource.Token);
        }
    }
}

Sunday, February 22, 2015

Await an Interval with a Throttle Class in .NET

Are you writing async code but need to control how often a call can be made? Just use this thread safe implementation of a throttle that will return an evenly spaced Task.Delay each time it is invoked. This will allow you to throttle that your application and control how many calls it makes.

Throttle Class

public interface IThrottle
{
    Task GetNext();
    Task GetNext(out TimeSpan delay);
}
 
public class Throttle : IThrottle
{
    private readonly object _lock = new object();
 
    private readonly TimeSpan _interval;
 
    private DateTime _nextTime;
 
    public Throttle(TimeSpan interval)
    {
        _interval = interval;
        _nextTime = DateTime.Now.Subtract(interval);
    }
 
    public Task GetNext()
    {
        TimeSpan delay;
        return GetNext(out delay);
    }
 
    public Task GetNext(out TimeSpan delay)
    {
        lock (_lock)
        {
            var now = DateTime.Now;
 
            _nextTime = _nextTime.Add(_interval);
                
            if (_nextTime > now)
            {
                delay = _nextTime - now;
                return Task.Delay(delay);
            }
 
            _nextTime = now;
 
            delay = TimeSpan.Zero;
            return Task.FromResult(true);
        }
    }
}

Saturday, September 20, 2014

await await Task.WhenAny

The await operator in C# automatically unwraps faulted tasks and rethrows their exceptions. You can await the completion of multiple tasks by using Task.WhenAll, and then if any of those tasks are faulted all of their exceptions will aggregated and rethrown in a single exception by the await.

However, Task.WhenAny does not work the same way as Task.WhenAll. Task.WhenAny returns an awaitable Task<Task> where the child Task represents whichever Task completed first. A key difference being that the container task will not throw when awaited!

If an exception occurs inside of a Task.WhenAny, you can automatically rethrow the exception by doing (and I realize how weird this sounds) await await Task.WhenAny

Sample Tests

[Fact]
public async Task AwaitWhenAnyWait()
{
    var t1 = Task.Run(async () =>
    {
        await Task.Delay(100);
 
        throw new InvalidOperationException();
    });
 
    // This await will NOT throw.
    var whenAny = await Task.WhenAny(t1);
 
    Assert.True(whenAny.IsFaulted);
    Assert.NotNull(whenAny.Exception);
    Assert.IsType<InvalidOperationException>(whenAny.Exception.InnerException);
}
 
[Fact]
public async Task AwaitWhenAll()
{
    var t1 = Task.Run(async () =>
    {
        await Task.Delay(100);
 
        throw new InvalidOperationException();
    });
 
    try
    {
        // This await WILL throw.
        await Task.WhenAll(t1);
 
        throw new AssertException();
    }
    catch (InvalidOperationException)
    {
    }
}
 
[Fact]
public async Task AwaitAwaitWhenAny()
{
    var t1 = Task.Run(async () =>
    {
        await Task.Delay(100);
 
        throw new InvalidOperationException();
    });
 
    try
    {
        // This await await WILL throw.
        await await Task.WhenAny(t1);
 
        throw new AssertException();
    }
    catch (InvalidOperationException)
    {
    }
}

Enjoy,
Tom

Saturday, June 21, 2014

Waiting for Events with Tasks in .NET

Would you like to just await the next time that an event fires? It takes a little setup, but you can!

Migrating your code from using an Event-based Asynchronous Pattern to an Task-based Asynchronous Pattern can be very tricky. You can use the TaskCompletionSource to manage your Task, and then you just need to create the wire up around registering and unregistering your event handler. Unfortunately this process is not nearly as generic as I would like.

Event-based Asynchronous Pattern Tests

Here is a way of waiting for an event to fire by simply sleeping while we wait.

This is a terrible solution because we can not know how long we will have to wait for the event. This means we have to wait for one long time period, or we have to periodically poll to see if the event has fired.

public delegate void SingleParamTestDelegate(int key, string value);
 
Real Time Web Analytics