Right, no messing about this time, straight to the code.
Construction
This is pretty straight forward and I don’t want to detract from the important bits of this post, the only thing
of note is the cancellationToken which is initialized to a default value using the defaultArg function if the
optional parameter cancellationToken is not supplied. The TDF construct that we to use to perform most of the hard
work is incomingMessages which is a BufferBlock<'Msg>.
1 2 3 4 5 6 7 | |
Error
This is the public facing part for the Error event. The [<CLIEvent>] attribute exposes the event in a friendly manner to other .Net languages by adding the add_Error and remove_Error event handler properties to allow subscription to take place. The Error event fires when an exception is thrown in the initial asynchronous workflow.
1 2 | |
Start
This is implemented the same as the MailboxProcessor. An exception is thrown if the agent has already started as this is not valid operation. We set the mutable field started to true and proceed to start the initial asynchronous workflow. This workflow is wrapped in a try with block so that if an exception is thrown we catch it and trigger the Error event. The computation is then started with Async.Start(...).
1 2 3 4 5 6 7 8 | |
Receive
The Receive member is used by the agent as a way of waiting for a message to arrive without blocking. Because the TDF functionality is all TPL Task based we use the the Async helper functions. In this instance we utilise the Async.AwaitTask passing in the incomingMessages ReceiveAsync method to wait for
a message to arrive. The integration between F# async and TDF is nice and succinct here.
1 2 | |
Post
The Post member allows a message to be sent to the agents, this member simply calls the incomingMessages Post method passing in the item. We raise an exception if there is a problem posting (i.e. the incomingMessages internal queue is full).
1 2 3 4 | |
PostAndTryAsyncReply / PostAndAsyncReply
I’m grouping both of these together as they are related in functionality. In the previous post I purposely left
out some ancillary code as it added unnecessary complexity to the introduction. There are a two types we need to be able to replicate the PostAndTryAsyncReply and PostAndAsyncReply members of the MailboxProcessor.
AsyncReplyChannel
The first type we need is the AsyncReplyChannel<'Reply>. This type takes a function that accepts a generic 'Reply and returns a unit. It is used as a way of communicating back to the caller of the PostAndTryAsyncReply and PostAndAsyncReply members via its single member Reply. This should become a little clearer when we see it used in context.
An AsyncRepyChannel does actually exist in F# under the Microsoft.FSharp.Control namespace and is used my the MailboxPRocessor, unfortunately its constructor is marked as internal so we are not able to reuse it here.
1 2 | |
AsyncResultCell
The next type we need is the AsyncResultCell<'a>. We use this as a way to await for the results of an asynchronous operation. We create a TaskCompletionSource (source), which is a TPL type that we use as a way of signalling to a callback / lambda expression when a message has arrived.
RegisterResult is used as a way of notifying when a message has been arrived, this is used internally by our agent as a result of a reply being made to the AsyncReplyChannel.
AsyncWaitResult is a continuation wrapper, it is called when we want to wait indefinitely for the result to be returned. It wraps a successful completion with a call to task.Result which then returns the result.
GetWaitHandle is used as a mechanism to force the asynchronous result to return within a specified timeout interval. If a result is not returned within the timeout then this function will return false.
GrabResult returns the result from the TaskCompletionSource object source. This is set earlier by the RegisterResult member.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | |
PostAndTryAsyncReply
This one is a little more tricky and I have added a few line number references to try and make it easier. On line 3 we
declare an resultCell to collect the result of the asynchronous operation. This is used on line 4 when we create a msg
to post to incomingMessages on line 5. The replyChannelMsg is a function that takes an AsyncReplyChannel and returns
a message, so we create an AsyncReplyChannel with a lambda expression that registers the reply with the resultCell. This
is the key to how this works, you have to remember that will be done the other side of the operation which will be within the
asynchronous processing loop of the agent when Reply is called on the AsyncReplyChannel.
Finally pattern matching is used on line 7 to call either AsyncWaitResult or GetWaitHandle on the resultCell. The AsyncWaitResult function is used to wait indefinitely and the GetWaitHandle function is used if we want to use a timeout. Both of these are asynchronous workflows that either return a result or return an option type containing the result.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | |
PostAndAsyncReply
This member uses the same functionality as PostAndTryAsyncReply, creating a message using the AsyncReplyChannel. The main
difference is that an asynchronous workflow is created that wraps a call to PostAndTryAsyncReply if the timeout is specified.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | |
Static Start
The static Start function is used as a way to construct and start the agent than using the constructor and then calling the Start function. This is really just a simple short cut for this common use case.
1 2 3 4 | |
Until next time…