Right, no messing about this time, straight to the code.
This is pretty straight forward and I don’t want to detract from the important bits of this post, the only thing
of note is the
cancellationToken which is initialized to a default value using the
defaultArg function if the
cancellationToken is not supplied. The TDF construct that we to use to perform most of the hard
incomingMessages which is a
1 2 3 4 5 6 7
This is the public facing part for the Error event. The
[<CLIEvent>] attribute exposes the event in a friendly manner to other .Net languages by adding the
remove_Error event handler properties to allow subscription to take place. The
Error event fires when an exception is thrown in the
initial asynchronous workflow.
This is implemented the same as the MailboxProcessor. An exception is thrown if the agent has already started as this is not valid operation. We set the mutable field
started to true and proceed to start the
initial asynchronous workflow. This workflow is wrapped in a
try with block so that if an exception is thrown we catch it and trigger the
Error event. The computation is then started with
1 2 3 4 5 6 7 8
The Receive member is used by the agent as a way of waiting for a message to arrive without blocking. Because the TDF functionality is all TPL Task based we use the the Async helper functions. In this instance we utilise the Async.AwaitTask passing in the
ReceiveAsync method to wait for
a message to arrive. The integration between F# async and TDF is nice and succinct here.
The Post member allows a message to be sent to the agents, this member simply calls the
Post method passing in the
item. We raise an exception if there is a problem posting (i.e. the
incomingMessages internal queue is full).
1 2 3 4
PostAndTryAsyncReply / PostAndAsyncReply
I’m grouping both of these together as they are related in functionality. In the previous post I purposely left
out some ancillary code as it added unnecessary complexity to the introduction. There are a two types we need to be able to replicate the
PostAndAsyncReply members of the
The first type we need is the
AsyncReplyChannel<'Reply>. This type takes a function that accepts a generic
'Reply and returns a unit. It is used as a way of communicating back to the caller of the
PostAndAsyncReply members via its single member
Reply. This should become a little clearer when we see it used in context.
AsyncRepyChannel does actually exist in F# under the Microsoft.FSharp.Control namespace and is used my the
MailboxPRocessor, unfortunately its constructor is marked as internal so we are not able to reuse it here.
The next type we need is the
AsyncResultCell<'a>. We use this as a way to await for the results of an asynchronous operation. We create a TaskCompletionSource (
source), which is a TPL type that we use as a way of signalling to a callback / lambda expression when a message has arrived.
RegisterResult is used as a way of notifying when a message has been arrived, this is used internally by our agent as a result of a reply being made to the AsyncReplyChannel.
AsyncWaitResult is a continuation wrapper, it is called when we want to wait indefinitely for the result to be returned. It wraps a successful completion with a call to task.Result which then returns the result.
GetWaitHandle is used as a mechanism to force the asynchronous result to return within a specified timeout interval. If a result is not returned within the timeout then this function will return false.
GrabResult returns the result from the TaskCompletionSource object
source. This is set earlier by the
1 2 3 4 5 6 7 8 9 10 11 12 13 14
This one is a little more tricky and I have added a few line number references to try and make it easier. On line 3 we
declare an resultCell to collect the result of the asynchronous operation. This is used on line 4 when we create a
to post to
incomingMessages on line 5. The
replyChannelMsg is a function that takes an
AsyncReplyChannel and returns
a message, so we create an
AsyncReplyChannel with a lambda expression that registers the reply with the
is the key to how this works, you have to remember that will be done the other side of the operation which will be within the
asynchronous processing loop of the agent when
Reply is called on the
Finally pattern matching is used on line 7 to call either
GetWaitHandle on the
AsyncWaitResult function is used to wait indefinitely and the
GetWaitHandle function is used if we want to use a timeout. Both of these are asynchronous workflows that either return a result or return an option type containing the result.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
This member uses the same functionality as
PostAndTryAsyncReply, creating a message using the
AsyncReplyChannel. The main
difference is that an asynchronous workflow is created that wraps a call to
PostAndTryAsyncReply if the
timeout is specified.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
The static Start function is used as a way to construct and start the agent than using the constructor and then calling the Start function. This is really just a simple short cut for this common use case.
1 2 3 4
Until next time…