Friday, March 8, 2013

Fluent Dataflow

Fluent Dataflow:

'via Blog this'

Here's how fluent extension methods might be defined:
public static class Extensions
{
    public static TransformBlock AddTransform (
        this ISourceBlock source,
        Func transform,
        ExecutionDataflowBlockOptions options = null)
    {
        var transformBlock = new TransformBlock (transform, options ?? new ExecutionDataflowBlockOptions());
        source.LinkTo (transformBlock);
        source.Completion.ContinueWith (_ => transformBlock.Complete());
        return transformBlock;
    }
   
    public static TransformBlock AddTransform (
        this ISourceBlock source,
        Func transform,
        int maxParallelism,
        int boundedCapacity = -1,
        CancellationToken cancelToken = default (CancellationToken),
        TaskScheduler scheduler = null)
    {
        return AddTransform (source, transform, GetExecutionOptions (maxParallelism, boundedCapacity, cancelToken, scheduler));
    }
   
    public static TransformBlock AddTransform (
        this ISourceBlock source,
        Func> transform,
        ExecutionDataflowBlockOptions options = null)
    {
        var transformBlock = new TransformBlock (transform, options ?? new ExecutionDataflowBlockOptions());
        source.LinkTo (transformBlock);
        source.Completion.ContinueWith (_ => transformBlock.Complete());
        return transformBlock;
    }
   
    public static TransformBlock AddTransform (
        this ISourceBlock source,
        Func> transform,
        int maxParallelism,
        int boundedCapacity = -1,
        CancellationToken cancelToken = default (CancellationToken),
        TaskScheduler scheduler = null)
    {
        return AddTransform (source, transform, GetExecutionOptions (maxParallelism, boundedCapacity, cancelToken, scheduler));
    }
   
    public static ActionBlock AddAction (
        this ISourceBlock source,
        Action action,
        ExecutionDataflowBlockOptions options = null)
    {
        var actionBlock = new ActionBlock(action, options ?? new ExecutionDataflowBlockOptions());
        source.LinkTo (actionBlock);
        source.Completion.ContinueWith (_ => actionBlock.Complete());
        return actionBlock;
    }
   
    public static ActionBlock AddAction (
        this ISourceBlock source,
        Action action,
        int maxParallelism,
        int boundedCapacity = -1,
        CancellationToken cancelToken = default (CancellationToken),
        TaskScheduler scheduler = null)
    {
        return AddAction (source, action, GetExecutionOptions (maxParallelism, boundedCapacity, cancelToken, scheduler));
    }
   
    public static BufferBlock AddBuffer (this ISourceBlock source, DataflowBlockOptions options = null)
    {
        var bufferBlock = new BufferBlock (options ?? new ExecutionDataflowBlockOptions());
        source.LinkTo (bufferBlock);
        source.Completion.ContinueWith (_ => bufferBlock.Complete());
        return bufferBlock;
    }
   
    public static BufferBlock AddBuffer (this ISourceBlock source, int boundedCapacity = -1)
    {
        return AddBuffer (source, new System.Threading.Tasks.Dataflow.DataflowBlockOptions { BoundedCapacity = boundedCapacity });
    }
   
    public static ExecutionDataflowBlockOptions GetExecutionOptions (
        int maxParallelism = 1,
        int boundedCapacity = -1,
        CancellationToken cancelToken = default (CancellationToken),
        TaskScheduler scheduler = null)
    {   
        var options = new ExecutionDataflowBlockOptions
        {
            BoundedCapacity = boundedCapacity,
            MaxDegreeOfParallelism = maxParallelism,
            CancellationToken = cancelToken
        };
        if (scheduler != null) options.TaskScheduler = scheduler;
        return options;
    }
}

No comments: