'via Blog this'
Here's how fluent extension methods might be defined:public static class Extensions { public static TransformBlockAddTransform ( this ISourceBlock source, Func transform, ExecutionDataflowBlockOptions options = null) { var transformBlock = new TransformBlock (transform, options ?? new ExecutionDataflowBlockOptions()); source.LinkTo (transformBlock); source.Completion.ContinueWith (_ => transformBlock.Complete()); return transformBlock; } public static TransformBlock AddTransform ( this ISourceBlock source, Func transform, int maxParallelism, int boundedCapacity = -1, CancellationToken cancelToken = default (CancellationToken), TaskScheduler scheduler = null) { return AddTransform (source, transform, GetExecutionOptions (maxParallelism, boundedCapacity, cancelToken, scheduler)); } public static TransformBlock AddTransform ( this ISourceBlock source, Func > transform, ExecutionDataflowBlockOptions options = null) { var transformBlock = new TransformBlock (transform, options ?? new ExecutionDataflowBlockOptions()); source.LinkTo (transformBlock); source.Completion.ContinueWith (_ => transformBlock.Complete()); return transformBlock; } public static TransformBlock AddTransform ( this ISourceBlock source, Func > transform, int maxParallelism, int boundedCapacity = -1, CancellationToken cancelToken = default (CancellationToken), TaskScheduler scheduler = null) { return AddTransform (source, transform, GetExecutionOptions (maxParallelism, boundedCapacity, cancelToken, scheduler)); } public static ActionBlock AddAction ( this ISourceBlock source, Action action, ExecutionDataflowBlockOptions options = null) { var actionBlock = new ActionBlock (action, options ?? new ExecutionDataflowBlockOptions()); source.LinkTo (actionBlock); source.Completion.ContinueWith (_ => actionBlock.Complete()); return actionBlock; } public static ActionBlock AddAction ( this ISourceBlock source, Action action, int maxParallelism, int boundedCapacity = -1, CancellationToken cancelToken = default (CancellationToken), TaskScheduler scheduler = null) { return AddAction (source, action, GetExecutionOptions (maxParallelism, boundedCapacity, cancelToken, scheduler)); } public static BufferBlock AddBuffer (this ISourceBlock source, DataflowBlockOptions options = null) { var bufferBlock = new BufferBlock (options ?? new ExecutionDataflowBlockOptions()); source.LinkTo (bufferBlock); source.Completion.ContinueWith (_ => bufferBlock.Complete()); return bufferBlock; } public static BufferBlock AddBuffer (this ISourceBlock source, int boundedCapacity = -1) { return AddBuffer (source, new System.Threading.Tasks.Dataflow.DataflowBlockOptions { BoundedCapacity = boundedCapacity }); } public static ExecutionDataflowBlockOptions GetExecutionOptions ( int maxParallelism = 1, int boundedCapacity = -1, CancellationToken cancelToken = default (CancellationToken), TaskScheduler scheduler = null) { var options = new ExecutionDataflowBlockOptions { BoundedCapacity = boundedCapacity, MaxDegreeOfParallelism = maxParallelism, CancellationToken = cancelToken }; if (scheduler != null) options.TaskScheduler = scheduler; return options; } }
No comments:
Post a Comment