Wednesday, December 18, 2013

Injectable Dataflow Blocks

I really enjoy working with Dataflows, but I always want to resolve my blocks with dependency injection. Thus I have created some abstract wrapper classes around the sealed ActionBlock and TransformBlock classes. This way your can put my logic into the superclass and inject it's dependencies via constructor injection. Additionally, the action method is public, making it even easier to test your code!

Update: I refactored to ditch the constructor parameters in favor of a new abstract BlockOptions.

Example Unit Tests

public class SafeActionBlockTests
{
    public class SafeActionBlock
        : SafeActionBlockBase<string>
    {
        private readonly int? _boundedCapacity;
 
        public readonly ConcurrentStack<string> SuccessLog =
            new ConcurrentStack<string>();
 
        public readonly ConcurrentStack<string> FailureLog =
            new ConcurrentStack<string>();
 
        public SafeActionBlock(int? boundedCapacity = null)
        {
            _boundedCapacity = boundedCapacity;
        }
 
        public override void Action(string item)
        {
            if (item.EndsWith("0")) throw new Exception(item);
            SuccessLog.Push(item);
        }
 
        protected override bool IsExceptionHandled(Exception ex)
        {
            FailureLog.Push(ex.Message);
            return true;
        }
 
        protected override ExecutionDataflowBlockOptions BlockOptions
        {
            get
            {
                return _boundedCapacity.HasValue
                    ? new ExecutionDataflowBlockOptions
                    { BoundedCapacity = 1 }
                    : null;
            }
        }
    }
 
    [Fact]
    public void TestAction()
    {
        // The SafeActionBlock.Action method is public for testing.
        // This way you can test your logic without using Dataflow.
 
        var block = new SafeActionBlock();
        block.Action("1");
 
        Assert.Equal(1, block.SuccessLog.Count);
        Assert.True(block.SuccessLog.Contains("1"));
        Assert.Throws<Exception>(() => block.Action("0"));
    }
 
    [Fact]
    public void Post()
    {
        // Test the post with no bounded capacity.
 
        var block = new SafeActionBlock();
        Parallel.For(0, 100, i => block.Post(i.ToString()));
 
        block.Complete();
        block.Completion.Wait();
 
        Assert.Equal(90, block.SuccessLog.Count);
        Assert.Equal(10, block.FailureLog.Count);
    }
 
    [Fact]
    public void BoundedCapacity()
    {
        // Test the post with no bounded capacity.
        // Only a few succeed because the loop is too fast.
 
        var block = new SafeActionBlock(10);
        Parallel.For(0, 100, i => block.Post(i.ToString()));
 
        block.Complete();
        block.Completion.Wait();
 
        Assert.True(90 > block.SuccessLog.Count);
        Assert.True(10 > block.FailureLog.Count);
    }
 
    [Fact]
    public void BlockingPost()
    {
        // The BlockingPost extension method will block until
        // the item can be posted to bounded block.
 
        var block = new SafeActionBlock(10);
        Parallel.For(0, 100, i => block.BlockingPost(i.ToString()));
 
        block.Complete();
        block.Completion.Wait();
 
        Assert.Equal(90, block.SuccessLog.Count);
        Assert.Equal(10, block.FailureLog.Count);
    }
}

Extension Methods

public static class TargetBlockExtensions
{
    public static bool BlockingPost<T>(
        this ITargetBlock<T> targetBlock, 
        T item)
    {
        var result = targetBlock.SendAsync(item);
        result.Wait();
        return result.Result;
    }
}

Injectable ActionBlock

public abstract class SafeActionBlockBase<T>
    : ITargetBlock<T>
{
    #region Base
 
    private readonly Lazy<ActionBlock<T>> _actionBlock;
 
    protected SafeActionBlockBase()
    {
        _actionBlock = new Lazy<ActionBlock<T>>(CreateActionBlog);
    }
 
    protected ITargetBlock<T> ActionBlock
    {
        get { return _actionBlock.Value; }
    }
 
    protected ITargetBlock<T> TargetBlock
    {
        get { return _actionBlock.Value; }
    }
 
    private ActionBlock<T> CreateActionBlog()
    {
        var blockOptions = BlockOptions;
        return blockOptions == null
            ? new ActionBlock<T>(t => SafeAction(t))
            : new ActionBlock<T>(t => SafeAction(t), BlockOptions);
    }
 
    private void SafeAction(T item)
    {
        try
        {
            Action(item);
        }
        catch (Exception ex)
        {
            var isExceptionHandled = IsExceptionHandled(ex);
            if (!isExceptionHandled)
                throw;
        }
    }
 
    public abstract void Action(T item);
 
    protected abstract bool IsExceptionHandled(Exception ex);
 
    protected abstract ExecutionDataflowBlockOptions BlockOptions { get; }
 
    #endregion
 
    #region ITargetBlock
 
    public virtual DataflowMessageStatus OfferMessage(
        DataflowMessageHeader messageHeader,
        T messageValue,
        ISourceBlock<T> source,
        bool consumeToAccept)
    {
        return TargetBlock.OfferMessage(
            messageHeader,
            messageValue,
            source,
            consumeToAccept);
    }
 
    public void Complete()
    {
        _actionBlock.Value.Complete();
    }
 
    public Task Completion
    {
        get { return _actionBlock.Value.Completion; }
    }
 
    public virtual void Fault(Exception exception)
    {
        TargetBlock.Fault(exception);
    }
 
    #endregion
 
    #region IDisposable
 
    private bool _isDisposed;
 
    public void Dispose()
    {
        if (_isDisposed)
            return;
 
        if (_actionBlock.IsValueCreated)
            _actionBlock.Value.Complete();
 
        _isDisposed = true;
    }
 
    #endregion
}

Injectable TransformBlock

public abstract class SafeTransformBlockBase<TTarget, TSource>
    : ITargetBlock<TTarget>, ISourceBlock<TSource>, IDisposable
{
    #region Base
 
    private readonly Lazy<TransformBlock<TTarget, TSource>> _transformBlock;
 
    protected SafeTransformBlockBase()
    {
        _transformBlock = new Lazy<TransformBlock<TTarget, TSource>>(CreateTransformBlog);
    }
 
    protected ITargetBlock<TTarget> TargetBlock
    {
        get { return _transformBlock.Value; }
    }
 
    protected ISourceBlock<TSource> SourceBlock
    {
        get { return _transformBlock.Value; }
    }
 
    private TransformBlock<TTarget, TSource> CreateTransformBlog()
    {
        var blockOptions = BlockOptions;
        return blockOptions == null
            ? new TransformBlock<TTarget, TSource>(t => SafeTransform(t))
            : new TransformBlock<TTarget, TSource>(t => SafeTransform(t), BlockOptions);
    }
 
    private TSource SafeTransform(TTarget item)
    {
        try
        {
            return Transform(item);
        }
        catch (Exception ex)
        {
            var isExceptionHandled = IsExceptionHandled(ex);
            if (!isExceptionHandled)
                throw;
 
            return default(TSource);
        }
    }
 
    public abstract TSource Transform(TTarget item);
 
    protected abstract bool IsExceptionHandled(Exception ex);
 
    protected abstract ExecutionDataflowBlockOptions BlockOptions { get; }
 
    #endregion
 
    #region ITargetBlock
 
    public virtual DataflowMessageStatus OfferMessage(
        DataflowMessageHeader messageHeader,
        TTarget messageValue,
        ISourceBlock<TTarget> source,
        bool consumeToAccept)
    {
        return TargetBlock.OfferMessage(
            messageHeader,
            messageValue,
            source,
            consumeToAccept);
    }
 
    public void Complete()
    {
        _transformBlock.Value.Complete();
    }
 
    public Task Completion
    {
        get { return _transformBlock.Value.Completion; }
    }
 
    public virtual void Fault(Exception exception)
    {
        TargetBlock.Fault(exception);
    }
 
    #endregion
 
    #region ISourceBlock
 
    public virtual TSource ConsumeMessage(
        DataflowMessageHeader messageHeader,
        ITargetBlock<TSource> target,
        out bool messageConsumed)
    {
        return SourceBlock.ConsumeMessage
            (messageHeader,
                target,
                out messageConsumed);
    }
 
    public virtual IDisposable LinkTo(
        ITargetBlock<TSource> target,
        DataflowLinkOptions linkOptions)
    {
        return _transformBlock.Value.LinkTo(target, linkOptions);
    }
 
    public virtual void ReleaseReservation(
        DataflowMessageHeader messageHeader,
        ITargetBlock<TSource> target)
    {
        SourceBlock.ReleaseReservation(messageHeader, target);
    }
 
    public virtual bool ReserveMessage(
        DataflowMessageHeader messageHeader,
        ITargetBlock<TSource> target)
    {
        return SourceBlock.ReserveMessage(messageHeader, target);
    }
 
    #endregion
 
    #region IDisposable
 
    private bool _isDisposed;
 
    public void Dispose()
    {
        if (_isDisposed)
            return;
 
        if (_transformBlock.IsValueCreated)
            _transformBlock.Value.Complete();
 
        _isDisposed = true;
    }
 
    #endregion
}
Shout it

Enjoy,
Tom

No comments:

Post a Comment

Post a Comment

Real Time Web Analytics