This is going to be a new series on using TPL Dataflow with F#. First a little bit of history and background.
TPL Dataflows heritage and background
TPL Dataflow or (TDF) has been around for quite a
while, it first surfaced more than a year ago as the successor to the Concurrency and Coordination Runtime
(CCR) and with coming release of .Net 4.5 it will
be part of the
System.Threading.Tasks.Dataflow namespace. Elements of the now halted project
Axum are also present within the design of TDF.
Concurrency and Coordination Runtime (CCR)
CCR is a library that deals with asynchrony, concurrency, and coordination between blocks of asynchronous code so that the programmer doesn’t have to. All of the low level details of synchronization and error propagation are taken care of in a consistent fashion. CCR is still is included in Microsoft Robotics Studio where it is used extensively to exploit parallel hardware and deal with partial failure of systems.
Axum was another interesting Microsoft research project, it also utilized the actor model embracing the
principles of isolation, and message-passing. There was also extensive use symbolic operators as a terse
short hand way to indicate operations between actors. For example
<-- defined a way to pass a message
to an actor. Theres was also a similarity to CCR as Axum used the concepts of Ports and channels in a similar
way. It was a very interesting project and it was a shame it was put on hold.
TPL Dataflow (TDF)
TDF is built around a number of different blocks which can be combined or linked together. There are three different categories of blocks are as follows:
Buffering blocks simply buffer data in various ways before passing the data on to another block.
BufferBlock<'T>- The BufferBlock act as a first-in-first-out (FIFO) queue, buffering each input.
BroadcastBlock<'T>- The BroadcastBlock linking to multiple targets copying the data to each of the connected blocks.
WriteOnceBlock<'T>- The WriteOnceBlock acts like an immutable target, after an item first item is passed to it, it effectively becomes read only.
The executor blocks run user supplied code in the form of a lambda expressions or a
ActionBlock<'TInput>- The ActionBlock acts like the
Action<'T>delegate performing an action on each datum posted to it.
TransformBlock<'TInput,'TOutput>- The TransformBlock acts just like the ActionBlock except that the action performed can have an output, this output is buffered and behaves just like a BufferBlock.
TransformManyBlock<'TInput,'TOutput>- The TransformManyBlock is just like a TransformBlock except that is can produce more than one output for a given datum.
The Joining Blocks Combining or join data together in different ways.
BatchBlock<'T>- The BatchBlock Combines multiple single items together, the items are represented by arrays of elements. The items are grouped together is batches and then passed on to another block.
JoinBlock<'T1,'T2,…>- The JoinBlock acts as a form of
Enumerable.Zip<'T1,'T2,'TResult>except the zip operation is performed on the items in the source array.
BatchedJoinBlock<'T1,'T2,…>This block as the name suggests simply aggregates the JoinBlock and the BatchBlock together.
Thats an ultra high level tour thats only just scratches the surface. I recommend you check out the Introduction to TPL Dataflow document to read up on the details. Theres a few more resources in the DevLabs area that you might find useful. Hopefully this series should also shed a bit more light on TDF as we go along…
F# Asynchronous Workflows and Agents
So where does that leave us in F#?
In F# we have Asynchronous Workflows and agents and they help immensely in the concurrency and message passing, but that doest mean that we cant take advantage of the new features and refinements much in the same way as we can use Asynchronous Workflows to take advantage of Tasks.
This post is going to be centered around F# agents but with a twist. First of all are going to be
reimplementing a MailboxProcessor using TDF for the underlying processing. This will allow us to to use
all of our existing agent code and examples and also stay within the F# agent paradigm. Following this
approach we could also make use of the
DataflowBlockOptions type, it has some interesting
properties which we will look at in future posts:
In this post we are going replicate the MailboxProcessor, we will be using Tomas Petricek’s caching agent
example from FSSnip). I have made a couple of modification to Tomas’s code.
I replaced the Dictionary type with a ConcurrentDictionary so that the caching agent could be called multiple times successively without the dictionary throwing an exception due to it already containing a key from a previous cached result. I also changed the example code so that it requests cached HTML from the caching agent ten times with a 400ms interval in between each.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
Looking at the implementation above you can see that we need to implement the following members:
unit -> unit
?int -> Async<'Msg>
'Msg -> unit
(AsyncReplyChannel<'Reply> -> 'Msg) * ?int -> Async<'Reply option>
(AsyncReplyChannel<'Reply> -> 'Msg) * int option -> Async<'Reply>
- static member Start:
(MailboxProcessor<'Msg> -> Async<unit>) * ?CancellationToken -> MailboxProcessor<'Msg>
These are the only members we need to complete the caching agent example, I didn’t want bamboozle everyone with an explosion of code from the onset so the remaining members will be implemented as and when we need them. When we have implemented all the members from MailboxProcessor Ill post the full source on my GitHub account.
The following members will be outstanding but it should be fairly trivial to implement them once we have completed the code here.
(AsyncReplyChannel<'Reply> -> 'Msg) * int option -> 'Reply
('Msg -> Async<'T> option) * ?int -> Async<'T>
(AsyncReplyChannel<'Reply> -> 'Msg) * ?int -> 'Reply option
?int -> Async<'Msg option>
('Msg -> Async<'T> option) * ?int -> Async<'T option>
int with get, set
So here we go, this is the Dataflow implementation of the MailboxProcessor:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
The crux of the implementation from TDF’s point of view is the use of the BufferBlock.
This is one of the most fundamental blocks within TDF. Its the equivalent of the
type from CCR and the
Mailbox type from F# which is used internally within the
MailboxProcessor. As mentioned abouve the BufferBlock type is a first-in-first-out (FIFO) buffer
and is responsible for buffering any data that is Posted to it.
OK, I’m going to leave it at that for now while you digest the code presented here.
In part II I will be drilling into the detail on whats going on internally and also describing more of the TDF model, so tune in soon for Part II.
Until next time…