This will be the last post on rebuilding the
MailboxProcessor using TDF,
here’s a quick discussion of the missing pieces…
First, lets start with the simple ones, these don’t really require much discussion.
1 2 3 4 5
This simply provides a mutable property using
Timeout.Infinite as a default setting.
Another simple one, this methods uses into the underlying
BufferBlock to extract its current queue length using its
1 2 3 4 5 6
Here we get a little help from TPL to apply a continuation on completion
ContinueWith. We use a lambda to return either
None, in a time out condition, or
Some tt.Result when we successfully receive an item.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
Things get a little more interesting from here on in. Firstly we need to add a new synchronisation member to the
TryWaitResultSynchronously. We again enlist the help of the TPL primitives to check for the early completion using
source.Task.IsCompleted returning the result if it is there, otherwise we use the
Wait method to check the item returns within the time out interval. In the usual manner,
Some source.Task.Result is returned or
None for a failure.
1 2 3 4
This one wraps a call to
TryPostAndReply with some pattern matching. In the event of a time out
None is returned from
TryPostAndReply in this instance we raise a
TimeoutException otherwise we unwrap the result from the option using
| Some result -> result.
1 2 3 4 5 6 7 8 9 10 11 12 13 14
This one also uses the same
ContinueWith functionality in the recursive
loopForMsg function, perhaps some
of these functions could extracted out and refactored but I prefer to keep the code like this to better explain what’s going
on. The the code is available on GitHub anyway so feel free to clean up any detritus and send me a pull request. Again we use pattern matching to keep calling the
loopForMsg function until the result is returned or a time out occurs.
1 2 3 4 5
Finally we have Scan, this is much like PostAndReply in that it just acts as a wrapper around
TryScan making use of
pattern matching throwing an exception on a time out.
That sums up the last few pieces, completing the TDF implementation of the
MailboxProcessor. I think this series of posts has shown the elegance of F#’s asynchronous workflows. The use of recursive functions and the compositional nature of asynchronous workflows really helps when you are doing this type of programming. It’s also very nice on the eye, each section being clearly defined.
The more astute of you may have noticed something a little different.
TryScan are destructive in this implementation, the unmatched messages are purged from the internal queue. Although I could have mirrored the same functionality of the
MailboxProcessor by using an internal list to keep track of unmatched messages, this leads to performing checks during
Scan and their derivatives to make sure that this list is used first when switching from
I think the separation of concerns are a little fuzzy in the
scan function seems like an after thought, even if you don’t use
Scan you still pay a price for it as there are numerous checks between the internal queue and the unmatched messages list. You can also run into issues while using
TryScan that can result in out of memory conditions due to the inherent unbounded nature. I will briefly describe and explore the conditions that can lead to that in the next post. In the implementation presented here we can get bounded checking by passing in an optional
DataflowBlockOptions and setting a value for the
EDIT: The code for this series of articles is now available on GitHub: FSharpDataflow
Until next time…