Monday, November 30, 2015

.NET Semaphore Slim that Supports Keys

While making a HUGE update to my CacheRepository project, I needed a way to have a dynamic number of semaphores that would lock on a specified cache key. The SemaphoreSlim is great, but I needed a wrapper around it that allowed me have one for each unique cache key being fetched.

The easiest solution was just to have a concurrent dictionary of string to semaphore, but at high load that would grow in size and I did not want to waste memory. Instead I created a class that does keep a dictionary of semaphores, but then removes them from the dictionary and stores them in a queue for reuse once there is nothing locking off on them.

Enough talking! Below is the code, and as always it comes with unit tests! :)

KeyedSemaphoreSlim Implementation

public class KeyedSemaphoreSlim : IDisposable
{
    private readonly object _lock = new object();
 
    private readonly Queue<SemaphoreWrapper> _wrapperQueue 
        = new Queue<SemaphoreWrapper>();
 
    private readonly Dictionary<string, SemaphoreWrapper> _wrapperMap 
        = new Dictionary<string, SemaphoreWrapper>();
 
    private bool _isDisposed;
 
    public Task<IDisposable> WaitAsync(
        string key, 
        CancellationToken cancelToken = default(CancellationToken))
    {
        lock (_lock)
        {
            SemaphoreWrapper wrapper;
 
            if (_wrapperMap.ContainsKey(key))
                wrapper = _wrapperMap[key];
            else
            {
                wrapper = _wrapperMap[key] = _wrapperQueue.Count == 0
                    ? new SemaphoreWrapper(Release)
                    : _wrapperQueue.Dequeue();
 
                wrapper.Key = key;
            }
 
            return wrapper.WaitAsync(cancelToken);
        }
    }
 
    public void Dispose()
    {
        if (_isDisposed)
            return;
 
        lock (_lock)
            foreach (var value in _wrapperMap.Values)
                value.InternalDispose();
 
        _isDisposed = true;
    }
 
    private void Release(SemaphoreWrapper wrapper)
    {
        lock (_lock)
        {
            var isEmpty = wrapper.Release();
            if (!isEmpty) 
                return;
 
            _wrapperMap.Remove(wrapper.Key);
            _wrapperQueue.Enqueue(wrapper);
        }
    }
        
    private class SemaphoreWrapper : IDisposable
    {
        private readonly Action<SemaphoreWrapper> _parentRelease;
        private readonly SemaphoreSlim _semaphoreSlim;
 
        private int _useCount;
 
        public SemaphoreWrapper(Action<SemaphoreWrapper> parentRelease)
        {
            _parentRelease = parentRelease;
            _semaphoreSlim = new SemaphoreSlim(1, 1);
        }
 
        public string Key { get; set; }
 
        public async Task<IDisposable> WaitAsync(CancellationToken cancelToken)
        {
            _useCount++;
            await _semaphoreSlim.WaitAsync(cancelToken).ConfigureAwait(false);
            return this;
        }
 
        public bool Release()
        {
            _semaphoreSlim.Release();
            _useCount--;
            return _useCount == 0;
        }
 
        public void Dispose()
        {
            _parentRelease(this);
        }
 
        public void InternalDispose()
        {
            _semaphoreSlim.Dispose();
        }
    }
}

KeyedSemaphoreSlim Tests

public class KeyedSemaphoreSlimTests
{
    [Fact]
    public async Task WaitAsync()
    {
        using (var semaphore = new KeyedSemaphoreSlim())
        {
            // Use three keys to create three semaphores.
            var ta1 = semaphore.WaitAsync("A");
            var tb1 = semaphore.WaitAsync("B");
            var ta2 = semaphore.WaitAsync("A");
            var tb2 = semaphore.WaitAsync("B");
            var ta3 = semaphore.WaitAsync("A");
            var tc1 = semaphore.WaitAsync("C");
 
            // Assert that first entry for each key is complete.
            Assert.True(ta1.IsCompleted);
            Assert.True(tb1.IsCompleted);
            Assert.False(ta2.IsCompleted);
            Assert.False(tb2.IsCompleted);
            Assert.False(ta3.IsCompleted);
            Assert.True(tc1.IsCompleted);
 
            // Complete the first entry by disposing.
            var ra1 = await ta1;
            ra1.Dispose();
            var rb1 = await tb1;
            rb1.Dispose();
            var rc1 = await tc1;
            rc1.Dispose();
            await Task.Delay(20);
 
            // Show that the second entry is now complete.
            Assert.True(ta2.IsCompleted);
            Assert.True(tb2.IsCompleted);
            Assert.False(ta3.IsCompleted);
 
            // Complete the second entry by disposing.
            var ra2 = await ta2;
            ra2.Dispose();
            var rb2 = await tb2;
            rb2.Dispose();
            await Task.Delay(20);
 
            // Show that the third entry is now complete.
            Assert.True(ta3.IsCompleted);
 
            // Complete the third entry by disposing.
            var ra3 = await ta3;
            ra3.Dispose();
            await Task.Delay(20);
 
            // Assert that each key shares a unique semaphore instance.
            Assert.Same(ra1, ra2);
            Assert.Same(ra2, ra3);
            Assert.Same(rb1, rb2);
            Assert.NotSame(ra1, rb1);
            Assert.NotSame(ra1, rc1);
 
            // Get four new keys.
            var td1 = semaphore.WaitAsync("D");
            var te1 = semaphore.WaitAsync("E");
            var tf1 = semaphore.WaitAsync("F");
            var tg1 = semaphore.WaitAsync("G");
 
            // Assert that they are all complete.
            Assert.True(td1.IsCompleted);
            Assert.True(te1.IsCompleted);
            Assert.True(tf1.IsCompleted);
            Assert.True(tg1.IsCompleted);
 
            // Complete the first = entry for each.
            var rd1 = await td1;
            rd1.Dispose();
            var re1 = await te1;
            re1.Dispose();
            var rf1 = await tf1;
            rf1.Dispose();
            var rg1 = await tg1;
            rg1.Dispose();
            await Task.Delay(20);
 
            // Assert that the first three reuse the orginal 
            // instances in order of their disposal.
            Assert.Same(rc1, rd1);
            Assert.Same(rb2, re1);
            Assert.Same(ra1, rf1);
 
            // Show that the fourth key is a new semaphore.
            Assert.NotSame(rg1, rd1);
            Assert.NotSame(rg1, re1);
            Assert.NotSame(rg1, rf1);
        }
    }
}

Enjoy,
Tom

1 comment:

  1. Hi Tom, great code, but why are you not disposing _wrapperQueue in KeyedSemaphoreSlim.Dispose?

    ReplyDelete

Real Time Web Analytics