This will be the last post on rebuilding the MailboxProcessor using TDF,
here’s a quick discussion of the missing pieces…
First, lets start with the simple ones, these don’t really require much discussion.
DefaultTimeout
1 2 3 4 5 | |
This simply provides a mutable property using Timeout.Infinite as a default setting.
CurrentQueueLength
1
| |
Another simple one, this methods uses into the underlying BufferBlock to extract its current queue length using its Count property.
TryReceive
1 2 3 4 5 6 | |
Here we get a little help from TPL to apply a continuation on completion
using ContinueWith. We use a lambda to return either None, in a time out condition, or Some tt.Result when we successfully receive an item.
TryPostAndReply
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | |
Things get a little more interesting from here on in. Firstly we need to add a new synchronisation member to the AsyncResultCell<'a> type: TryWaitResultSynchronously. We again enlist the help of the TPL primitives to check for the early completion using source.Task.IsCompleted returning the result if it is there, otherwise we use the Task property’s Wait method to check the item returns within the time out interval. In the usual manner, Some source.Task.Result is returned or None for a failure.
PostAndReply
1 2 3 4 | |
This one wraps a call to TryPostAndReply with some pattern matching. In the event of a time out None is returned from TryPostAndReply in this instance we raise a TimeoutException otherwise we unwrap the result from the option using | Some result -> result.
TryScan
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | |
This one also uses the same ContinueWith functionality in the recursive loopForMsg function, perhaps some
of these functions could extracted out and refactored but I prefer to keep the code like this to better explain what’s going
on. The the code is available on GitHub anyway so feel free to clean up any detritus and send me a pull request. Again we use pattern matching to keep calling the loopForMsg function until the result is returned or a time out occurs.
Scan
1 2 3 4 5 | |
Finally we have Scan, this is much like PostAndReply in that it just acts as a wrapper around TryScan making use of
pattern matching throwing an exception on a time out.
That sums up the last few pieces, completing the TDF implementation of the MailboxProcessor. I think this series of posts has shown the elegance of F#’s asynchronous workflows. The use of recursive functions and the compositional nature of asynchronous workflows really helps when you are doing this type of programming. It’s also very nice on the eye, each section being clearly defined.
The more astute of you may have noticed something a little different. Scan and TryScan are destructive in this implementation, the unmatched messages are purged from the internal queue. Although I could have mirrored the same functionality of the MailboxProcessor by using an internal list to keep track of unmatched messages, this leads to performing checks during Receive and Scan and their derivatives to make sure that this list is used first when switching from Scan and Receive functionality.
I think the separation of concerns are a little fuzzy in the MailboxProcessor. The scan function seems like an after thought, even if you don’t use Scan you still pay a price for it as there are numerous checks between the internal queue and the unmatched messages list. You can also run into issues while using Scan and TryScan that can result in out of memory conditions due to the inherent unbounded nature. I will briefly describe and explore the conditions that can lead to that in the next post. In the implementation presented here we can get bounded checking by passing in an optional DataflowBlockOptions and setting a value for the BoundedCapacity property.
EDIT: The code for this series of articles is now available on GitHub: FSharpDataflow
Until next time…