Sunday, October 4, 2015

Throttles - Delay vs Semaphore

A while back I had talked about how to await an interval with a simple Throttle class that I had made. This is a very easy way to control how often you start an operation, but it does not ensure a limit to how many operations are happening at a given time.

Problem: You want to only make 5 requests per second to a remote service.

With the throttle class set to only allow one new call to start every 200 milliseconds you will only be able to start 5 new requests per second. However, if those calls take longer than two seconds to complete, then during the next second you will have 10 requests in flight at the same time.

Solution: Set your throttle to 200 millisecond and add a semaphore with a count of 5.

You can solve this problem by combining your throttle with a semaphore. This will ensure that you only start a new operation at your schedule frequency, and also that you never have more than a predetermined number in flight at the same time.

Please note that only using a semaphore would not solve the problem because if, in the same example, the calls take sub 200 milliseconds to complete then more than 5 requests will start each second.

Below is a set of helper classes (and tests) to help extend the throttle class to support this functionality within a using block.

Interfaces

public interface IUsableSemaphore : IDisposable
{
    Task<IUsableSemaphoreWrapper> WaitAsync();
}
 
public interface IUsableSemaphoreWrapper : IDisposable
{
    TimeSpan Elapsed { get; }
}
 
public interface IThrottle
{
    Task WaitAsync();
}

UsableSemaphoreThrottle

public class UsableSemaphoreThrottle : IUsableSemaphore
{
    private readonly IThrottle _throttle;
 
    private readonly IUsableSemaphore _semaphore;
        
    public UsableSemaphoreThrottle(TimeSpan interval, int initialCount)
    {
        _throttle = new Throttle(interval);
        _semaphore = new UsableSemaphoreSlim(initialCount);
    }
 
    public UsableSemaphoreThrottle(TimeSpan interval, int initialCount, int maxCount)
    {
        _throttle = new Throttle(interval);
        _semaphore = new UsableSemaphoreSlim(initialCount, maxCount);
    }
 
    public UsableSemaphoreThrottle(IThrottle throttle, IUsableSemaphore semaphore)
    {
        _throttle = throttle;
        _semaphore = semaphore;
    }
        
    public async Task<IUsableSemaphoreWrapper> WaitAsync()
    {
        IUsableSemaphoreWrapper wrapper = null;
 
        try
        {
            wrapper = await _semaphore.WaitAsync().ConfigureAwait(false);
            await _throttle.WaitAsync().ConfigureAwait(false);
            return wrapper;
        }
        catch (Exception)
        {
            if (wrapper != null)
                wrapper.Dispose();
 
            throw;
        }
    }
 
    public void Dispose()
    {
        _semaphore.Dispose();
    }
}

UsableSemaphoreSlim

public class UsableSemaphoreSlim : IUsableSemaphore
{
    private readonly SemaphoreSlim _semaphore;
 
    public UsableSemaphoreSlim(int initialCount)
    {
        _semaphore = new SemaphoreSlim(initialCount);
    }
 
    public UsableSemaphoreSlim(int initialCount, int maxCount)
    {
        _semaphore = new SemaphoreSlim(initialCount, maxCount);
    }
 
    public async Task<IUsableSemaphoreWrapper> WaitAsync()
    {
        var wrapper = new UsableSemaphoreWrapper(_semaphore);
 
        try
        {
            await wrapper.WaitAsync().ConfigureAwait(false);
            return wrapper;
        }
        catch (Exception)
        {
            wrapper.Dispose();
            throw;
        }
    }
 
    public void Dispose()
    {
        _semaphore.Dispose();
    }
 
    private class UsableSemaphoreWrapper : IUsableSemaphoreWrapper
    {
        private readonly SemaphoreSlim _semaphore;
 
        private readonly Stopwatch _stopwatch;
 
        private bool _isDisposed;
 
        public UsableSemaphoreWrapper(SemaphoreSlim semaphore)
        {
            _semaphore = semaphore;
            _stopwatch = new Stopwatch();
        }
 
        public TimeSpan Elapsed
        {
            get
            {
                return _stopwatch.Elapsed;
            }
        }
 
        public Task WaitAsync()
        {
            if (_stopwatch.IsRunning)
                throw new InvalidOperationException("Already Initialized");
 
            _stopwatch.Start();
 
            return _semaphore.WaitAsync();
        }
 
        public void Dispose()
        {
            if (_isDisposed)
                return;
 
            if (_stopwatch.IsRunning)
            {
                _stopwatch.Stop();
                _semaphore.Release();
            }
 
            _isDisposed = true;
        }
    }
}

Throttle

public class Throttle : IThrottle
{
    private readonly object _lock = new object();
 
    private readonly TimeSpan _interval;
        
    private DateTime _nextTime;
 
    public Throttle(TimeSpan interval)
    {
        _interval = interval;
        _nextTime = DateTime.Now.Subtract(interval);
    }
 
    public Task WaitAsync()
    {
        lock (_lock)
        {
            var now = DateTime.Now;
 
            _nextTime = _nextTime.Add(_interval);
 
            if (_nextTime > now)
            {
                var delay = _nextTime - now;
                return Task.Delay(delay);
            }
 
            _nextTime = now;
        }
 
        return Task.FromResult(true);
    }
}

UsableSemaphoreThrottleTests

public class UsableSemaphoreThrottleTests
{
    [Theory]
    [InlineData(500, 100)]
    [InlineData(100, 500)]
    public async Task WaitSemaphoreThrottleAsync(
        int semaphoreMilliseconds,
        int taskMilliseconds)
    {
        var semaphoreTimeSpan = TimeSpan.FromMilliseconds(semaphoreMilliseconds);
        var taskTimeSpan = TimeSpan.FromMilliseconds(taskMilliseconds);
 
        using (var throttle = new UsableSemaphoreThrottle(semaphoreTimeSpan, 2))
        {
            var taskA = DoThings(throttle, taskTimeSpan);
            var taskB = DoThings(throttle, taskTimeSpan);
            var taskC = DoThings(throttle, taskTimeSpan);
 
            await Task.WhenAll(taskA, taskB, taskC);
 
            Assert.True(taskA.Result > taskTimeSpan);
            Assert.True(taskA.Result < taskB.Result);
 
            Assert.True(taskB.Result > taskTimeSpan + semaphoreTimeSpan);
            Assert.True(taskB.Result < taskC.Result);
        }
    }
 
    private static async Task<TimeSpan> DoThings(
        IUsableSemaphore throttle, 
        TimeSpan taskTimeSpan)
    {
        using (var wrapper = await throttle.WaitAsync())
        {
            await Task.Delay(taskTimeSpan);
            return wrapper.Elapsed;
        }
    }
}

Enjoy,
Tom

1 comment:

  1. I'm so glad to see you are still blogging, Tom. Keep it up!

    ReplyDelete

Real Time Web Analytics