Moirae

Software Engineering Ltd

The Lurking Horror

Deep in the darkest depths lurks an ancient horror, when the time is right it will rise forth and leave you screaming for mercy and begging for forgiveness…

OK, I have a penchant for being over dramatic but in this post I am going to reveal some little known caveats in a well known and much revelled area of F#, agents aka the MailboxProcessor. Gasp!

First let me give you a demonstration:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
open System
open System.Diagnostics
type internal BadAgentMessage =
  | Message of string * int
  | Lock
  | Unlock

type BadAgent() =

  let agent = MailboxProcessor.Start(fun agent ->
    let sw = Stopwatch()
    let rec waiting () =
      agent.Scan(function
        | Unlock -> Some(working ())
        | _ -> None)

    and working() = async {
      let! msg = agent.Receive()
      match msg with
      | Lock ->   return! waiting()
      | Unlock -> return! working()
      | Message (msg, iter) ->
          if iter = 0 then sw.Start()
          if iter % 10000 = 0
            then sw.Stop()
                 printfn "%s : %i in: %fms" msg iter sw.Elapsed.TotalMilliseconds
                 sw.Restart()
          return! working() }
    working())

  member x.Msg(msg) = agent.Post(Message msg)
  member x.Lock() = agent.Post(Lock)
  member x.Unlock() = agent.Post(Unlock)

The BadAgentMessage type defines a discriminated union that we are going to use for the agents message interface. This is comprised of three elements:

  • Message: This will just be a simple string-based message and an int used as a counter.
  • Lock: This is used to stop message processing within the agent by causing it to wait for an Unlock message to arrive.
  • Unlock: This message is used to resume the processing within the agent, effectively exiting the locked state.

We have two main sections to the agents body which I will describe below.

working

The purpose of the working function is to dequeue the messages from the agent and process them with pattern matching; let! msg = agent.Receive() is used to get the next message which is then pattern matched to be one of the three messages types of the BadAgentMessage. When the Lock message is encountered return! waiting() is used to place the agent in a state where it is waiting for an Unlock message to arrive. An Unlock message simply resumes processing by calling return! working(). The only real purpose of the Unlock message is to exit from the locked state that is introduced by the Lock message. The Message message simply starts a StopWatch on the first operation by using the Messages counter, and then stops it again on the 10,000th operation. At this point the time taken is also printed to the console and the StopWatch is restarted before resuming the main processing loop by calling return! working()

waiting

This function is using the agents Scan function to wait for an Unlock message to arrive, once it does it puts the agent back into normal operation by calling returning Some(working()) from the Scanfunction. If the message does not match an Unlock message then None is returned and the agent simply waits for the next message before trying again.

The rest of the agent is just ancillary member functions to allow easy sending of the three message types.

Test Harness

And here’s a very simple test harness:

1
2
3
4
5
6
7
8
9
10
11
12
13
let ba = BadAgent()

printfn "Press and key to start"
Console.ReadLine() |> ignore
let dump number =
    for i in 0 .. number do
        ba.Msg("A message", i)

ta.Lock()
dump 200000
ta.Unlock()

Console.ReadLine() |> ignore

OK, so this is a very synthetic test but I just wanted to highlight some of the internal behaviour. If I run this code I get the following console output:

You can see that the time to process the first 10,000 messages is 3083ms then it steadily decreases until the last 10,000 messages are processed in 94ms. The processing time for 10,000 messages is about 33 times slower at the beginning than as it is at the end. Why?

Opening it up

Let’s take a look at some of the internals of the MailboxProcessor to understand what’s going on. First of all the core functionality is actually contained within the Mailbox type with the MailboxProcessor acting as an augmenter. TryPostAndReply, PostAndReply, PostAndTryAsyncReply, and PostAndAsyncReply all add a single functionality to the Mailbox type; the ability to synchronously or asynchronously reply to a message once it arrives. TryPostAndReply and PostAndReply both wait synchronously for a message to arrive before replying, whereas PostAndTryAsyncReply and PostAndAsyncReply both reply asynchronously. This functionality is achieved with the use of the ResultCell and AsyncReplyChannel types. For an in-depth discussion on this you might want to refer to my earlier series which describes implementing the MailboxProcessor with TPL Dataflow (see Part 1, Part 2 and Part 3).

Below are some snippets of code from the Mailbox type you might want to take a peek yourself at the FSharp repository over at Github for a closer inspection, be warned thought there is a lot of code in there!

Here’s the initial type definition for the Mailbox, you can see that there are two mutable fields:

1
2
3
type Mailbox<'Msg>() =
    let mutable inboxStore  = null
    let mutable arrivals = new Queue<'Msg>()

inboxStore is a generic List type System.Collection.Generic.List<T> and arrivals is a System.Collections.Generic.Queue<T> type.

For now the inboxStore is null and is only ever assigned via Scan or TryScan and this is done indirectly via the inbox member shown here:

inbox
1
2
3
4
5
member x.inbox =
    match inboxStore with
    | null -> inboxStore <- new System.Collections.Generic.List<'Msg>(1) // ResizeArray
    | _ -> ()
    inboxStore

Understanding the code in the Mailbox can be difficult given the amount of code, so I’ll highlight the key functions in the sections below to make it a little easier.

Scan / TryScan

Scan is just an async wrapper around TryScan. If TryScan returns None an exception is raised, if not then the result from TryScan is returned.

So now lets take a look at the source of TryScan.

TryScan
1
2
3
4
5
6
7
8
9
10
11
12
13
member x.TryScan ((f: 'Msg -> (Async<'T>) option), timeout) : Async<'T option> =
    let rec scan() =
        async { match x.scanArrivals(f) with
                | None -> // Deschedule and wait for a message. When it comes, rescan the arrivals
                          let! ok = waitOne(timeout)
                          if ok then return! scan() else return None
                | Some resP -> let! res = resP
                               return Some(res) }
    // Look in the inbox first
    async { match x.scanInbox(f,0) with
            | None  -> return! scan()
            | Some resP -> let! res = resP
                           return Some(res) }

You can see here that an async workflow is declared that first pattern matches on x.scanInbox, passing in the predicate scan function f and the literal 0. If None is returned then there is no match and the recursive function scan is returned. This time the function x.scanArrivals is be called, again passing in the predicate function f.

  • An interesting point to note, is that each message that arrives that doesn’t match the predicate f resets the timer: let! ok = waitOne(timeout), this means that any number of trivial messages that arrive keep the TryScan function running. This was also mentioned by Jon Harrop in a Stackoverflow question entitled How to use TryScan in F# properly. Jon also mentions locking which I will address in the scanArrivals section below.

So what’s the difference between scanArrivals and scanInbox?

scanInbox operates on the inboxStore which you might recall is a List<T> type, whereas scanArrivals operates on arrivals which is a Queue<T> type. The big difference between these two is that as messages first arrive in the Mailbox they end up in the arrivals queue first, and when messages are not matched by the predicate function f they are added to the inboxStore, hence the need to always check the inboxStore before the arrivals queue otherwise previously unmatched scan messages would not be processed correctly. You might be asking yourself why not use a Queue<T> for both the inbox and the arrivals? It comes down to the fact that it’s not possible to easily use a Queue<T> for arrivals because of the way that Scan works. At any point in the queue there could do a potential match so each item would have to be dequeued and processed separately, an indexed List<T> type is the best fit for this situation.

scanArrivals / scanArrivalsUnsafe

Lets look at the scanArrivals function, it’s just a lock construct around the scanArrivals function. This leads to an important point, the scan function is operating under a lock, which effectively means that end user code is also executed under the lock and if you hold onto the lock for any length of time then there will be significant blocking of the normal receive mechanism due to it also using the same lock when receiving.

scanArrivals/scanArrivalsUnsafe
1
2
3
4
5
6
7
8
9
10
11
member x.scanArrivalsUnsafe(f) =
    if arrivals.Count = 0 then None
    else let msg = arrivals.Dequeue()
         match f msg with
         | None ->
             x.inbox.Add(msg);
             x.scanArrivalsUnsafe(f)
         | res -> res

// Lock the arrivals queue while we scan that
member x.scanArrivals(f) = lock syncRoot (fun () -> x.scanArrivalsUnsafe(f))

If we pause for a second and review the MailBoxProcessor documentation on MSDN:

For each agent, at most one concurrent reader may be active, so no more than one concurrent call to Receive, TryReceive, Scan or TryScan may be active.

Obeying this rule should ensure that no deadlock situations will arise but lock contentions can still arise as messages will still be being posted to the mailbox, which will in turn attempt to acquire the same syncRoot lock.

Lets move onto the next function, I have saved this one for last as its the most interesting.

scanInbox

A quick glance at scanInbox reveals another function which, to my eye, could have heavy-weight performance implications. The inbox is a List<T> type, and the RemoveAt function does an internal Array.Copy for each removal. This is an O(n) operation where n is (Count - index), so as soon as the list gets to a reasonable size then this then is going to really start chewing into your processing time.

1
2
3
4
5
6
7
8
9
10
11
member x.scanInbox(f,n) =
    match inboxStore with
    | null -> None
    | inbox ->
        if n >= inbox.Count
        then None
        else
            let msg = inbox.[n]
            match f msg with
            | None -> x.scanInbox (f,n+1)
            | res -> inbox.RemoveAt(n); res

In order to check this theory lets do some quick profiling of the console test that we showed earlier:

This screen shot was taken using Jet Brains DotTrace 5.1. This is one of my favourite performance profilers because it captures results to line level and maps back to the F# source code relatively easily.

Yeah there it is, a whopping 44.41% of the time is spent in RemoveAt. Also notice that there were 200,000 calls which mirrors the number we placed in the queue before using the Lock/Unlock message types.

One of the things that really stands out for me is that the inbox is a simple list and completely unbounded. In a high throughput situation where the scan function is being used it’s perfectly feasible to get into a runaway memory or CPU condition where the unmatched messages are sitting in the inbox taking longer and longer to processes due to the O(n) operation that takes place in the RemoveAt function. Given a consistent throughput then eventually you are going to either run out memory, or the processing time will make throughput drop to dire levels which in turn will back up the inbox even further, effectively this is a death spiral.

Conclusion

So what conclusion can we draw from all of this?

  • Firstly be careful with usage of Scan and TryScan, in certain situations the internal queue could back up to a certain size where you will be constantly struggling against the O(n) operation cost.
  • Agents are not a silver bullet solution. They cannot solve every problem. Although it’s possible to use agent based techniques to solve various problems like blocking collections and such like, you have to use care and diligence in the solution to avoid introducing another problems into the mix. I have seen several implementations that I have been able to break relatively easily.
  • Do I still use agents? Absolutely! Agents are a fabulous tool to have in our toolbox and some extremely elegant solution exist to solve very complex problems.
  • Do I use Scan or TryScan? Not in its current form in the MailboxProcessor. I chose to implement a destructive scan in my TDF agent for the reasons discussed here.

Before we finish, I’d like to briefly cover TryScan from my TDF based agent to complete the picture.

Destructive TryScan

TryScan
1
2
3
4
5
6
7
8
9
10
11
12
13
14
member x.TryScan((scanner: 'Msg -> Async<_> option), timeout): Async<_ option> =
    let ts = TimeSpan.FromMilliseconds(float timeout)
    let rec loopForMsg = async {
        let! msg = Async.AwaitTask <| incomingMessages.ReceiveAsync(ts)
                                      .ContinueWith(fun (tt:Task<_>) ->
                                          if tt.IsCanceled || tt.IsFaulted then None
                                          else Some tt.Result)
        match msg with
        | Some m ->  let res = scanner m
                     match res with
                     | None -> return! loopForMsg
                     | Some res -> return! res
        | None -> return None}
    loopForMsg

A message is dequeued on the line 4 with let! msg = Async.AwaitTask .... This is then processed by the pattern matching expression on line 9 | Some m -> let res = scanner m. If the result of the scanner function results in None being returned then the message is discarded and the next operation continues with another call to loopForMsg, otherwise the message is returned with | Some res -> return! res.

One of the areas where I have a lot of experience is using pipelined operations based on input from network I/O. One of the things that always causes a problem is unbounded situations such as having a queue with no absolute limit. There comes a time when you have to protect yourself from what is effective a denial of service, you have to either destructively terminate messages or connections or route the overflowed data for processing later.

Until next time…

Back to the Primitive II

In the last post I discussed an asynchronous version of the ManualResetEvent and as promised this time we will be looking at an asynchronous version of the AutoResetEvent. I’m using Stephen Toubs post as reference and we will be building a version that is functional in style that maps straight into asynchronous work flows without and conversion or adaptors.

What is an AutoResetEvent?

An AutoResetEvent can be described as a turnstile mechanism, it lets a single waiting person through before re-latching waiting for the next signal. This is opposed to a ManualResetEvent which functions like an ordinary gate. Calling Set opens the gate, allowing any number of threads that are waiting to be let through. Calling Reset closes the gate.

AsyncAutoResetEvent

First of all here is the shape of the type that we will be building:

1
2
3
4
type AsyncAutoResetEvent =
    new : ?reusethread:bool -> AsyncAutoResetEvent
    member Set : unit -> unit
    member WaitAsync : unit -> Async<bool>

Fairly simple: implied constructor, Set and WaitAsync members.

Implied Constructor

Thinking about this logically we may need the following items:

  • A queue mechanism to store asynchronous waiters - let mutable awaits = Queue<_>().
  • A way of knowing if a signal has been made in the absence of any waiters - let mutable signalled = false.
  • We can also declare a short-circuit asynchronous workflow for the situation that Set() is called before WaitAsync()
  • let completed = async.Return true. This will save us constructing an AsyncResultCell<_> and going though the rest of the asynchronous mechanism.

Also notice that an optional parameter called reusethread is defined, we use the ? prefix when defining it to make it optional. We then make use of the defaultArg function to give it a default value of false if a one is not passed in. This will be used in the Set operation to determine if the code will run on the same thread or a thread in the ThreadPool.

1
2
3
4
5
6
7
8
9
open System
open System.Threading
open System.Collections.Generic

    type AsyncAutoResetEvent(?reusethread) =
      let mutable awaits = Queue<_>()
      let mutable signalled = false
        let completed = async.Return true
        let reuseThread = defaultArg reusethread false

WaitAsync()

The first step is to use a locking construct to control access to the mutable queue awaits. Inside this lock we check to see if signalled is true and if so we reset it to false and return our pre-built completed asynchronous workflow. If signalled is false then we create a new AsyncResultCell<_> and add it to the queue then return the AsyncResult to the caller.

1
2
3
4
5
6
7
8
9
        member x.WaitAsync() =
            lock awaits (fun () ->
                if signalled then
                    signalled <- false
                    completed
                else
                    let are = AsyncResultCell<_>()
                    awaits.Enqueue are
                    are.AsyncResult)

Set()

We first declare a function called getWaiter(), we use this function to return an option type that is either Some AsyncResultCell<bool> or None. We use the lock function to control access to the mutable queue lock awaits. Once inside the lock we use pattern matching to capture awaits.Count and signalled:

  • The first pattern match (x,_) checks if there are any waiters (awaits.Count > 0) and then dequeues an AsyncResultCell<bool> from the queue and returns it within an option type: Some <| awaits.Dequeue().
  • The second pattern match (_,y) checks whether signalled is set to false before setting its value to true. This causes next WaitAsync() caller to get the short-circuited value completed. This means that an AsyncResultCell<bool> does not need to be created and go though the whole async mechanism. We then return None as there is no waiter to be notified.
  • The final pattern match (_,_) is used when there are no waiting callers and signalled has already being set, there is simply nothing to do in this situation so we return None.

We use the getWaiter() function via pattern match. If we have a result i.e. Some AsyncResultCell then we call RegisterResult passing in AsyncOK(true) to indicate a completion. Notice that we also pass in the reuseThread boolean that was declared as part of the constructor. If reuseThread is true then the notification to the waiter happens synchronously use this with care! Personally I would stick with the default of false to ensure that the operation is completed via the thread pool, unless you have a performance critical reason and the waiting code that executes is very fast.

1
2
3
4
5
6
7
8
9
10
       member x.Set() =
          let getWaiter()=
              lock awaits (fun () ->
                  match (awaits.Count, signalled) with
                  | (x,_) when x > 0 -> Some <| awaits.Dequeue()
                  | (_,y) when not y -> signalled <- true;None
                  | (_,_) -> None)
          match getWaiter() with
          | Some a -> a.RegisterResult(AsyncOk(true), reuseThread)
          | None _ -> ()

The reason for using the getWaiter() function is to separate the locking function away from the notification, if RegisterResult was called within the lock and reuseThread was true then the awaiting function would be called synchronously within the lock which would not be a very good situation to be in.

So there we have it, I could take this series further and convert the other primitives that Stephen Toub describes but there should be enough information in these two posts to set you on your way. If anyone would like me to complete the series then let me know. I may well finish them off and post them on GitHub in the future, time permitting.


Musical inspiration during the creation of this post:

  • Pantera - Cowboys From Hell
  • Cacophony - Go Off

Thanks for tuning in, until next time…

Back to the Primitive

In this post we are going back to the primitive. No it’s not about the same named song by Soulfly, (which incidentally does contains F# notes) but a return to thread synchronisation primitives and their asynchronous counterparts.

We are going to be looking at an asynchronous version of the ManualResetEvent. This was recently covered by Stephen Toub on the pfx team blog. We will be taking a slightly different view on this as we will be using asynchronous workflows which will give us nice idiomatic usage within F#.

First lets look of the shape of the type that Stephen defined:

1
2
3
4
5
6
public class AsyncManualResetEvent
{
    public Task WaitAsync();
    public void Set();
    public void Reset();
}

Now this can be used from within F# by using the Async.AwaitTask function from the Async module but this is like wrapping one asynchronous paradigm with another, and although this does work, what if you want to avoid the overhead of wrappers and stay strictly within async workflows.

1
2
3
4
type asyncManualResetEvent() =
    member x.WaitAsync() : unit -> Async<bool>
    member x.Set() : unit -> unit
    member x.Reset() : unit -> unit

That’s what we want to see! I don’t want to get into the details of the description of how the C# version works as Stephen does a very good job of that already. What I will explain though is how we essentially do the same thing while staying with the realm of functional programming. As we are getting into the lower lever details no doubt we will have to start relying on some low level locking primitives like Monitors, Semaphores, and Interlocked operations, even the F# core libraries have a cornucopia of those.

Lets look at the first member WaitAsync(). The first step is to create a something to store the result of the operation, all we will just be storing and returning asynchronously is a boolean to indicate that the wait handle has been set. To do this we use one of the types from the F# power pack AsyncResultCell<'T>. I think that such a type should of been exposed from the F# core libraries but it was omitted for some reason. There is a type called ResultCell<'T> with much the same functionality in the FSharp.Core.Control namespace but it is marked internal so it’s not available for our use.

We declare a reference cell of type AsyncResultCell<'T> and then create the WaitAsync() member, all we have to do is dereference the value of the reference cell with ! and call its AsyncResult member, this gives us an Async<bool> which we can easily use in an asynchronous workflow.

1
2
3
4
type asyncManualResetEvent() =
    let aResCell = ref <| AsyncResultCell<_>()

    member x.WaitAsync() = (!aResCell).AsyncResult

The next bit is fairly simple too. All we need to do is dereference the value of the reference cell, and invoke the RegisterResult member by passing in a value of AsyncOk(true). The boolean value of true will be used by the type inference system to constrain the value of the Async<_> returned from WaitAsync.

1
    member x.Set() = (!aResCell).RegisterResult(AsyncOk(true))

The last part is the most complex (as usual). Here we create a recursive function called swap that will try to exchange the AsyncResultCell<'T> for a new one. We dereference the reference cell to currentValue, then we use a CAS (Compare And Swap) operation to compare the aResCell with currentValue and if they are equal newVal will replace aResCell. On the next line if the result of the CAS operation means that result and currentValue are equal then we are finished, otherwise we spin the current thread for 20 cycles using Thread.SpinWait 20 before retrying the operation via recursion swap newVal. This will be a lot less expensive than switching to user or kernel mode locking, and the period of contention between threads should be very small. Finally the swap operation is started by passing in a new AsyncResultCell<'T>.

There are various other methods we could of used, for instance we could of wrapped a ManualResetEvent with a call to Async.AwaitWaitHandle, although this would of meant using the kernel mode locking of the ManualResetEvent which is a bit more expensive.

In Stephen Toub’s post he mentions Task’s being orphaned due to the Reset() method being called before the Task<'T> has been completed, that shouldn’t happen in our implementation due the the closures being stored internally for completion by the async infrastructure. Heres a quick test harness to make sure everything works as expected anyway.

1
2
3
4
5
6
7
8
    member x.Reset() =
        let rec swap newVal =
            let currentValue = !aResCell
            let result = Interlocked.CompareExchange<_>(aResCell, newVal, currentValue)
            if obj.ReferenceEquals(result, currentValue) then ()
            else Thread.SpinWait 20
                 swap newVal
        swap <| AsyncResultCell<_>()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
let amre = asyncManualResetEvent()
let x = async{let! x = amre.WaitAsync()
              Console.WriteLine("First signalled")}

let y = async{let! x = amre.WaitAsync()
             Console.WriteLine("Second signalled")}

let z = async{let! x = amre.WaitAsync()
              Console.WriteLine("Third signalled")}
//start async workflows x and y
Async.Start x
Async.Start y

//reset the asyncManualResetEvent, this will test whether the async workflows x and y 
// are orphaned due to the AsyncResultCell being recycled.
amre.Reset()

//now start the async z
Async.Start z

//we set a single time, this should result in the three async workflows completing
amre.Set()

Console.ReadLine() |> ignore

Here we can see everything works out as we expected:

Thats all there is too it, next time I will be exploring an asyncAutoResetEvent in much the same vein.


Musical inspiration during the creation of this post:

  • Smashing Pumpkins - Zeitgeist
  • Soulfly - Primitive
  • FooFighters - FooFighters

Until next time…

Black-Scholes Taste Test

In this edition we are going to be doing a taste test, C# vs F#. Oh yeah, if you quickly glanced at the title you may have thought this was a recipe for black scones, as interesting and tasty as that may be, unfortunately its going to be finance related.

I recently presented a paper on the benefits of F#, part of this was a comparison of the famous Black-Scholes equation in both C# and F#. I was mainly going to be looking at code succinctness and the inherent suitability of the language for calculation based work, but there ended up being more to it than that.

First of all I quickly set up a test rig to run 50 million iterations of the algorithm to see if there were any difference in the processing speed. I want expecting any major differences at this point but here’s what I got:

C# results for 50 million iterations

F# results for 50 million iterations

I think you will agree that’s quite a difference, lets have a look at the code to see what’s going on.

C# Implementation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class Options
{
    public enum Style
    {
        Call,
        Put
    }

    public static double BlackScholes(Style callPut, double s, double x, double t, double r, double v)
    {
        double result = 0.0;
        var d1 = (Math.Log(s / x) + (r + v * v / 2.0) * t) / (v * Math.Sqrt(t));
        var d2 = d1 - v * Math.Sqrt(t);
        switch (callPut)
        {
            case Style.Call:
                result = s * Cnd(d1) -x * Math.Exp(-r * t) * Cnd(d2);
                break;
            case Style.Put:
                result = x * Math.Exp(-r * t) * Cnd(-d2) -s * Cnd(-d1);
                break;
        }
        return result;
    }

    private static double Cnd(double x)
    {
        const double a1 = 0.31938153;
        const double a2 = -0.356563782;
        const double a3 = 1.781477937;
        const double a4 = -1.821255978;
        const double a5 = 1.330274429;
        var l = Math.Abs(x);
        var k = 1.0 / (1.0 + 0.2316419 * l);
        var w = 1.0 - 1.0 / Math.Sqrt(2 * Math.PI) *
            Math.Exp(-l * l / 2.0) * (a1 * k + a2 * k * k + a3 *
                Math.Pow(k, 3) + a4 * Math.Pow(k, 4) + a5 * Math.Pow(k, 5));
        if (x < 0)
        {
            return 1.0 - w;
        }
        return w;
    }
}

F# Implementation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
module options
open System

type Style = Call | Put

let cnd x =
   let a1 = 0.31938153
   let a2 = -0.356563782
   let a3 = 1.781477937
   let a4 = -1.821255978
   let a5 = 1.330274429
   let l  = abs x
   let k  = 1.0 / (1.0 + 0.2316419 * l)
   let w  = (1.0 - 1.0 / sqrt(2.0 * Math.PI) *
                exp(-l * l / 2.0) * (a1 * k + a2 * k * k + a3 *
                    (pown k 3) + a4 * (pown k 4) + a5 * (pown k 5)))
   if x < 0.0 then 1.0 - w
   else w

let blackscholes style s x t r v =
    let d1 = (log(s / x) + (r + v * v / 2.0) * t) / (v * sqrt(t))
    let d2 = d1 - v * sqrt(t)
    match style with
    | Call -> s * cnd(d1) -x * exp(-r * t) * cnd(d2)
    | Put -> x * exp(-r * t) * cnd(-d2) -s * cnd(-d1)

Differences

The most significant differences when the code is compiled comes down to a few areas.

The BlackScholes function

The first thing to note is the code size and number of local variables:

F#
1
2
3
4
// Code size       122 (0x7a)
.maxstack  6
.locals init ([0] float64 d1,
         [1] float64 d2)
C#
1
2
3
4
5
6
// Code size       164 (0xa4)
.maxstack  4
.locals init ([0] float64 d1,
         [1] float64 d2,
         [2] float64 result,
         [3] valuetype CsBs.Options/Style CS$0$0000)

The initial arguments that are loaded in the F# implementation is done in fewer IL op codes then C#.

F#
1
2
3
4
IL_0001:  ldarg.1
IL_0002:  ldarg.2
IL_0003:  div
IL_0004:  call       float64 [mscorlib]System.Math::Log(float64)
C#
1
2
3
4
5
6
7
8
9
10
IL_0000:  ldc.r8     0.0
IL_0009:  stloc.0
IL_000a:  ldc.r8     0.0
IL_0013:  stloc.1
IL_0014:  ldc.r8     0.0
IL_001d:  stloc.2
IL_001e:  ldarg.1
IL_001f:  ldarg.2
IL_0020:  div
IL_0021:  call       float64 [mscorlib]System.Math::Log(float64)

You can see in the C# code is intialising the local variable to 0.0 by pushing them to the stack ldc.r8 then storing them stloc.0.

The pattern matching in the F# code results in a call to get the style options/Style::get_Tag() and then a branch if not equal opcode bne.un.s which causes a jump to IL_005d

1
2
3
IL_0036:  call       instance int32 options/Style::get_Tag()```
IL_003b:  ldc.i4.1
IL_003c:  bne.un.s   IL_005d

The C# version loads the local variable for the Style IL_0053: stloc.3 and then uses the switch opcode to jump table to jump to either position IL_0064 or IL_0083.

1
2
3
4
5
6
IL_0053:  stloc.3
IL_0054:  ldloc.3
IL_0055:  switch     ( 
                      IL_0064,
                      IL_0083)
IL_0062:  br.s       IL_00a2

These are negligible, I’m mealy pointing out the differences in compilation between the two languages.
The F# compiler is more stringent when compiling the code.

The Cnd function

The Cnd function or cumulative normal distribution is where the performance differences occur.

Again at initialization you can see the C# version is larger by 41.

F#
1
2
3
4
5
// Code size       213 (0xd5)
.maxstack  8
.locals init ([0] float64 l,
         [1] float64 k,
         [2] float64 w)
C#
1
2
3
4
5
// Code size       254 (0xfe)
.maxstack  6
.locals init ([0] float64 l,
         [1] float64 k,
         [2] float64 w)

The C# version initialises all the local variables to 0.0.

C#
1
2
3
4
5
6
IL_0000:  ldc.r8     0.0
IL_0009:  stloc.0
IL_000a:  ldc.r8     0.0
IL_0013:  stloc.1
IL_0014:  ldc.r8     0.0
IL_001d:  stloc.2

Interestingly the C# compiler optimises out the call to Math.PI * 2 but the F# compiler doesn’t.

F#
1
2
3
IL_003a:  ldc.r8     2.
IL_0043:  ldc.r8     3.1415926535897931
IL_004c:  mul
C#
1
IL_0057:  ldc.r8     6.2831853071795862

From here everything is identical until we get to the power operator section (Math.Pow in the C# version and pown in F#).

F#
1
2
3
4
IL_0089:  ldloc.1
IL_008a:  ldc.i4.3
IL_008b:  call       float64 [FSharp.Core]Microsoft.FSharp.Core.Operators/OperatorIntrinsics::PowDouble(float64, 
                                                                                                        int32)

In the F# code we are using the pown function which calculates the power to an integer. This is shown in the call to OperatorIntrinsics::PowDouble which uses the value in IL_0089: ldloc.1 and also loads the integer 3 with IL_008a: ldc.i4.3.

C#
1
2
3
4
IL_009c:  ldloc.1
IL_009d:  ldc.r8     3.
IL_00a6:  call       float64 [mscorlib]System.Math::Pow(float64,
                                                        float64)

The C# code is using the standard Math.Pow operator which operates on two float64 numbers. The value of 3 is implicitly converted into a float64 during compilation IL_009d: ldc.r8 3..

The final difference is at the end of the function.

F#
1
2
3
4
5
6
7
8
9
10
11
IL_00b8:  stloc.2
IL_00b9:  ldarg.0
IL_00ba:  ldc.r8     0.0
IL_00c3:  clt
IL_00c5:  brfalse.s  IL_00d3
IL_00c7:  ldc.r8     1.
IL_00d0:  ldloc.2
IL_00d1:  sub
IL_00d2:  ret
IL_00d3:  ldloc.2
IL_00d4:  ret

The F# version uses the clt opcode. This pushes 1 if value one on the stack is less than value two otherwise it pushes 0. There is then a brfalse.s which jumps to location IL_00d3 if the first value on the stack is less than or equal to the second value.

C#
1
2
3
4
5
6
7
8
9
10
IL_00b8:  stloc.2
IL_00b9:  ldarg.0
IL_00e5:  ldc.r8     0.0
IL_00ee:  bge.un.s   IL_00fc
IL_00f0:  ldc.r8     1.
IL_00f9:  ldloc.2
IL_00fa:  sub
IL_00fb:  ret
IL_00fc:  ldloc.2
IL_00fd:  ret

The C# version uses the bge.un.s to jump to location IL_00fc if the first value on the stack is greater than the second. This is negligible in normal runtime but it is interesting to note the difference between the two.

Conclusion

Wow, there was a lot of IL to get through, I hope you stayed with me!

Although the difference in some areas are negligible, every little counts. The implicit conversion of an integer field to a float64 hides the fact that we were using an optimized integer power function in F#, that’s performance increase of 168%! Some other side effects of implicit conversion can also lead to subtle bugs due to truncation and overflow. The other benefits are the compiled code uses less instructions and the source code only uses 25 lines compared to 44 in C#.

Until next time!

FSharp Dataflow Agents III

This will be the last post on rebuilding the MailboxProcessor using TDF, here’s a quick discussion of the missing pieces…

First, lets start with the simple ones, these don’t really require much discussion.

DefaultTimeout

1
2
3
4
5
let mutable defaultTimeout = Timeout.Infinite

member x.DefaultTimeout
   with get() = defaultTimeout
   and set(value) = defaultTimeout <- value

This simply provides a mutable property using Timeout.Infinite as a default setting.

CurrentQueueLength

1
member x.CurrentQueueLength() = incomingMessages.Count

Another simple one, this methods uses into the underlying BufferBlock to extract its current queue length using its Count property.

TryReceive

1
2
3
4
5
6
member x.TryReceive(?timeout) =
    let ts = TimeSpan.FromMilliseconds(float <| defaultArg time out defaultTimeout)
    Async.AwaitTask <| incomingMessages.ReceiveAsync(ts)
                           .ContinueWith(fun (tt:Task<_>) ->
                                             if tt.IsCanceled || tt.IsFaulted then None
                                             else Some tt.Result)

Here we get a little help from TPL to apply a continuation on completion using ContinueWith. We use a lambda to return either None, in a time out condition, or Some tt.Result when we successfully receive an item.

TryPostAndReply

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
type AsyncResultCell<'a>() =
    ...
  member x.TryWaitResultSynchronously(timeout:int) =
      //early completion check
      if source.Task.IsCompleted then
          Some source.Task.Result
      //now force a wait for the task to complete
      else
          if source.Task.Wait(timeout) then
              Some source.Task.Result
          else None

member x.TryPostAndReply(replyChannelMsg, ?timeout) :'Reply option =
    let timeout = defaultArg timeout defaultTimeout
    let resultCell = AsyncResultCell<_>()
    let msg = replyChannelMsg(new AsyncReplyChannel<_>(fun reply -> resultCell.RegisterResult(reply)))
    if incomingMessages.Post(msg) then
        resultCell.TryWaitResultSynchronously(timeout)
    else None

Things get a little more interesting from here on in. Firstly we need to add a new synchronisation member to the AsyncResultCell<'a> type: TryWaitResultSynchronously. We again enlist the help of the TPL primitives to check for the early completion using source.Task.IsCompleted returning the result if it is there, otherwise we use the Task property’s Wait method to check the item returns within the time out interval. In the usual manner, Some source.Task.Result is returned or None for a failure.

PostAndReply

1
2
3
4
member x.PostAndReply(replyChannelMsg, ?timeout) : 'Reply =
    match x.TryPostAndReply(replyChannelMsg, ?timeout = timeout) with
    | None ->  raise (TimeoutException("PostAndReply timed out"))
    | Some result -> result

This one wraps a call to TryPostAndReply with some pattern matching. In the event of a time out None is returned from TryPostAndReply in this instance we raise a TimeoutException otherwise we unwrap the result from the option using | Some result -> result.

TryScan

1
2
3
4
5
6
7
8
9
10
11
12
13
14
member x.TryScan((scanner: 'Msg -> Async<_> option), timeout): Async<_ option> =
    let ts = TimeSpan.FromMilliseconds( float timeout)
    let rec loopForMsg = async {
        let! msg = Async.AwaitTask <| incomingMessages.ReceiveAsync(ts)
                                      .ContinueWith(fun (tt:Task<_>) ->
                                          if tt.IsCanceled || tt.IsFaulted then None
                                          else Some tt.Result)
        match msg with
        | Some m ->  let res = scanner m
                     match res with
                     | None -> return! loopForMsg
                     | Some res -> return! res
        | None -> return None}
    loopForMsg

This one also uses the same ContinueWith functionality in the recursive loopForMsg function, perhaps some of these functions could extracted out and refactored but I prefer to keep the code like this to better explain what’s going on. The the code is available on GitHub anyway so feel free to clean up any detritus and send me a pull request. Again we use pattern matching to keep calling the loopForMsg function until the result is returned or a time out occurs.

Scan

1
2
3
4
5
member x.Scan(scanner, timeout) =
    async { let! res = x.TryScan(scanner, timeout)
            match res with
            | None -> return raise(TimeoutException("Scan TimedOut"))
            | Some res -> return res }

Finally we have Scan, this is much like PostAndReply in that it just acts as a wrapper around TryScan making use of pattern matching throwing an exception on a time out.

That sums up the last few pieces, completing the TDF implementation of the MailboxProcessor. I think this series of posts has shown the elegance of F#’s asynchronous workflows. The use of recursive functions and the compositional nature of asynchronous workflows really helps when you are doing this type of programming. It’s also very nice on the eye, each section being clearly defined.

The more astute of you may have noticed something a little different. Scan and TryScan are destructive in this implementation, the unmatched messages are purged from the internal queue. Although I could have mirrored the same functionality of the MailboxProcessor by using an internal list to keep track of unmatched messages, this leads to performing checks during Receive and Scan and their derivatives to make sure that this list is used first when switching from Scan and Receive functionality.

I think the separation of concerns are a little fuzzy in the MailboxProcessor. The scan function seems like an after thought, even if you don’t use Scan you still pay a price for it as there are numerous checks between the internal queue and the unmatched messages list. You can also run into issues while using Scan and TryScan that can result in out of memory conditions due to the inherent unbounded nature. I will briefly describe and explore the conditions that can lead to that in the next post. In the implementation presented here we can get bounded checking by passing in an optional DataflowBlockOptions and setting a value for the BoundedCapacity property.

EDIT: The code for this series of articles is now available on GitHub: FSharpDataflow

Until next time…

F# Dataflow Agents Part II

Right, no messing about this time, straight to the code.

Construction

This is pretty straight forward and I don’t want to detract from the important bits of this post, the only thing of note is the cancellationToken which is initialized to a default value using the defaultArg function if the optional parameter cancellationToken is not supplied. The TDF construct that we to use to perform most of the hard work is incomingMessages which is a BufferBlock<'Msg>.

1
2
3
4
5
6
7
type DataflowAgent<'Msg>(initial, ?cancellationToken) =
    let cancellationToken =
        defaultArg cancellationToken Async.DefaultCancellationToken
    let mutable started = false
    let errorEvent = new Event<System.Exception>()
    let incomingMessages = new BufferBlock<'Msg>()
    let mutable defaultTimeout = Timeout.Infinite

Error

This is the public facing part for the Error event. The [<CLIEvent>] attribute exposes the event in a friendly manner to other .Net languages by adding the add_Error and remove_Error event handler properties to allow subscription to take place. The Error event fires when an exception is thrown in the initial asynchronous workflow.

1
2
[<CLIEvent>]
member this.Error = errorEvent.Publish

Start

This is implemented the same as the MailboxProcessor. An exception is thrown if the agent has already started as this is not valid operation. We set the mutable field started to true and proceed to start the initial asynchronous workflow. This workflow is wrapped in a try with block so that if an exception is thrown we catch it and trigger the Error event. The computation is then started with Async.Start(...).

1
2
3
4
5
6
7
8
member this.Start() =
    if started
        then raise (new InvalidOperationException("Already Started."))
    else
        started <- true
        let comp = async { try do! initial this
                           with error -> errorEvent.Trigger error }
        Async.Start(computation = comp, cancellationToken = cancellationToken)

Receive

The Receive member is used by the agent as a way of waiting for a message to arrive without blocking. Because the TDF functionality is all TPL Task based we use the the Async helper functions. In this instance we utilise the Async.AwaitTask passing in the incomingMessages ReceiveAsync method to wait for a message to arrive. The integration between F# async and TDF is nice and succinct here.

1
2
member this.Receive(?timeout) =
    Async.AwaitTask <| incomingMessages.ReceiveAsync()

Post

The Post member allows a message to be sent to the agents, this member simply calls the incomingMessages Post method passing in the item. We raise an exception if there is a problem posting (i.e. the incomingMessages internal queue is full).

1
2
3
4
member this.Post(item) =
    let posted = incomingMessages.Post(item)
    if not posted then
        raise (InvalidOperationException("Incoming message buffer full."))

PostAndTryAsyncReply / PostAndAsyncReply

I’m grouping both of these together as they are related in functionality. In the previous post I purposely left out some ancillary code as it added unnecessary complexity to the introduction. There are a two types we need to be able to replicate the PostAndTryAsyncReply and PostAndAsyncReply members of the MailboxProcessor.

AsyncReplyChannel

The first type we need is the AsyncReplyChannel<'Reply>. This type takes a function that accepts a generic 'Reply and returns a unit. It is used as a way of communicating back to the caller of the PostAndTryAsyncReply and PostAndAsyncReply members via its single member Reply. This should become a little clearer when we see it used in context.

An AsyncRepyChannel does actually exist in F# under the Microsoft.FSharp.Control namespace and is used my the MailboxPRocessor, unfortunately its constructor is marked as internal so we are not able to reuse it here.

1
2
type AsyncReplyChannel<'Reply>(replyf : 'Reply -> unit) =
    member x.Reply(reply) = replyf(reply)

AsyncResultCell

The next type we need is the AsyncResultCell<'a>. We use this as a way to await for the results of an asynchronous operation. We create a TaskCompletionSource (source), which is a TPL type that we use as a way of signalling to a callback / lambda expression when a message has arrived.

RegisterResult is used as a way of notifying when a message has been arrived, this is used internally by our agent as a result of a reply being made to the AsyncReplyChannel.

AsyncWaitResult is a continuation wrapper, it is called when we want to wait indefinitely for the result to be returned. It wraps a successful completion with a call to task.Result which then returns the result.

GetWaitHandle is used as a mechanism to force the asynchronous result to return within a specified timeout interval. If a result is not returned within the timeout then this function will return false.

GrabResult returns the result from the TaskCompletionSource object source. This is set earlier by the RegisterResult member.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
type AsyncResultCell<'a>() =
    let source = new TaskCompletionSource<'a>()
    member x.RegisterResult result = source.SetResult(result)

    member x.AsyncWaitResult =
        Async.FromContinuations(fun (cont,_,_) ->
            let apply = fun (task:Task<_>) -> cont (task.Result)
            source.Task.ContinueWith(apply) |> ignore)

    member x.GetWaitHandle(timeout:int) =
        async { let waithandle = source.Task.Wait(timeout)
                return waithandle }

    member x.GrabResult() = source.Task.Result

PostAndTryAsyncReply

This one is a little more tricky and I have added a few line number references to try and make it easier. On line 3 we declare an resultCell to collect the result of the asynchronous operation. This is used on line 4 when we create a msg to post to incomingMessages on line 5. The replyChannelMsg is a function that takes an AsyncReplyChannel and returns a message, so we create an AsyncReplyChannel with a lambda expression that registers the reply with the resultCell. This is the key to how this works, you have to remember that will be done the other side of the operation which will be within the asynchronous processing loop of the agent when Reply is called on the AsyncReplyChannel.

Finally pattern matching is used on line 7 to call either AsyncWaitResult or GetWaitHandle on the resultCell. The AsyncWaitResult function is used to wait indefinitely and the GetWaitHandle function is used if we want to use a timeout. Both of these are asynchronous workflows that either return a result or return an option type containing the result.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
member this.PostAndTryAsyncReply(replyChannelMsg, ?timeout) =
    let timeout = defaultArg timeout defaultTimeout
    let resultCell = AsyncResultCell<_>()
    let msg = replyChannelMsg(AsyncReplyChannel<_>(fun reply -> resultCell.RegisterResult(reply)))
    let posted = incomingMessages.Post(msg)
    if posted then
        match timeout with
        |   Threading.Timeout.Infinite ->
                async { let! result = resultCell.AsyncWaitResult
                        return Some(result) }
        |   _ ->
                async { let! ok =  resultCell.GetWaitHandle(timeout)
                        let res = (if ok then Some(resultCell.GrabResult()) else None)
                        return res }
    else async{return None}

PostAndAsyncReply

This member uses the same functionality as PostAndTryAsyncReply, creating a message using the AsyncReplyChannel. The main difference is that an asynchronous workflow is created that wraps a call to PostAndTryAsyncReply if the timeout is specified.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
member this.PostAndAsyncReply( replyChannelMsg, ?timeout) =
    let timeout = defaultArg timeout defaultTimeout
    match timeout with
    |   Threading.Timeout.Infinite ->
        let resCell = AsyncResultCell<_>()
        let msg = replyChannelMsg (AsyncReplyChannel<_>(fun reply -> resCell.RegisterResult(reply) ))
        let posted = incomingMessages.Post(msg)
        if posted then
            resCell.AsyncWaitResult
        else
            raise (InvalidOperationException("Incoming message buffer full."))
    |   _ ->
        let asyncReply = this.PostAndTryAsyncReply(replyChannelMsg, timeout=timeout)
        async { let! res = asyncReply
                match res with
                | None ->  return! raise (TimeoutException("PostAndAsyncReply TimedOut"))
                | Some res -> return res }

Static Start

The static Start function is used as a way to construct and start the agent than using the constructor and then calling the Start function. This is really just a simple short cut for this common use case.

1
2
3
4
static member Start(initial, ?cancellationToken) =
    let dfa = DataflowAgent<'Msg>(initial, ?cancellationToken = cancellationToken)
    dfa.Start()
    dfa

Until next time…

F# Dataflow Agents Part I

This is going to be a new series on using TPL Dataflow with F#. First a little bit of history and background.

TPL Dataflows heritage and background

TPL Dataflow or (TDF) has been around for quite a while, it first surfaced more than a year ago as the successor to the Concurrency and Coordination Runtime (CCR) and with coming release of .Net 4.5 it will be part of the System.Threading.Tasks.Dataflow namespace. Elements of the now halted project Axum are also present within the design of TDF.

Concurrency and Coordination Runtime (CCR)

CCR is a library that deals with asynchrony, concurrency, and coordination between blocks of asynchronous code so that the programmer doesn’t have to. All of the low level details of synchronization and error propagation are taken care of in a consistent fashion. CCR is still is included in Microsoft Robotics Studio where it is used extensively to exploit parallel hardware and deal with partial failure of systems.

Axum

Axum was another interesting Microsoft research project, it also utilized the actor model embracing the principles of isolation, and message-passing. There was also extensive use symbolic operators as a terse short hand way to indicate operations between actors. For example <-- defined a way to pass a message to an actor. Theres was also a similarity to CCR as Axum used the concepts of Ports and channels in a similar way. It was a very interesting project and it was a shame it was put on hold.

TPL Dataflow (TDF)

TDF builds on CCR and Axum, consolidating and refine to produce a more friendly fluent interface, much in the same vain as Language-Integrated Query (LINQ) and Reactive Extensions (RX).

TDF is built around a number of different blocks which can be combined or linked together. There are three different categories of blocks are as follows:

Buffering Blocks

Buffering blocks simply buffer data in various ways before passing the data on to another block.

  • BufferBlock<'T> - The BufferBlock act as a first-in-first-out (FIFO) queue, buffering each input.
  • BroadcastBlock<'T> - The BroadcastBlock linking to multiple targets copying the data to each of the connected blocks.
  • WriteOnceBlock<'T> - The WriteOnceBlock acts like an immutable target, after an item first item is passed to it, it effectively becomes read only.

Executor Blocks

The executor blocks run user supplied code in the form of a lambda expressions or a Task<'T>.

  • ActionBlock<'TInput> - The ActionBlock acts like the Action<'T> delegate performing an action on each datum posted to it.
  • TransformBlock<'TInput,'TOutput> - The TransformBlock acts just like the ActionBlock except that the action performed can have an output, this output is buffered and behaves just like a BufferBlock.
  • TransformManyBlock<'TInput,'TOutput> - The TransformManyBlock is just like a TransformBlock except that is can produce more than one output for a given datum.

Joining Blocks

The Joining Blocks Combining or join data together in different ways.

  • BatchBlock<'T> - The BatchBlock Combines multiple single items together, the items are represented by arrays of elements. The items are grouped together is batches and then passed on to another block.
  • JoinBlock<'T1,'T2,…> - The JoinBlock acts as a form of Enumerable.Zip<'T1,'T2,'TResult> except the zip operation is performed on the items in the source array.
  • BatchedJoinBlock<'T1,'T2,…> This block as the name suggests simply aggregates the JoinBlock and the BatchBlock together.

Thats an ultra high level tour thats only just scratches the surface. I recommend you check out the Introduction to TPL Dataflow document to read up on the details. Theres a few more resources in the DevLabs area that you might find useful. Hopefully this series should also shed a bit more light on TDF as we go along…

F# Asynchronous Workflows and Agents

So where does that leave us in F#?
In F# we have Asynchronous Workflows and agents and they help immensely in the concurrency and message passing, but that doest mean that we cant take advantage of the new features and refinements much in the same way as we can use Asynchronous Workflows to take advantage of Tasks.

This post is going to be centered around F# agents but with a twist. First of all are going to be reimplementing a MailboxProcessor using TDF for the underlying processing. This will allow us to to use all of our existing agent code and examples and also stay within the F# agent paradigm. Following this approach we could also make use of the DataflowBlockOptions type, it has some interesting properties which we will look at in future posts:

  • TaskScheduler
  • CancellationToken
  • MaxMessagesPerTask
  • BoundedCapacity

Implementation

In this post we are going replicate the MailboxProcessor, we will be using Tomas Petricek’s caching agent example from FSSnip). I have made a couple of modification to Tomas’s code.
I replaced the Dictionary type with a ConcurrentDictionary so that the caching agent could be called multiple times successively without the dictionary throwing an exception due to it already containing a key from a previous cached result. I also changed the example code so that it requests cached HTML from the caching agent ten times with a 400ms interval in between each.

Caching Agent Implementation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
module TplAgents
open System
open System.Collections.Generic
open System.Collections.Concurrent
open FsDataflow
open System.Net
open Microsoft.FSharp.Control.WebExtensions

type CachingMessage =
| Add of string * string
| Get of string * AsyncReplyChannel<option<string>>
| Clear

let caching = DataflowAgent.Start(fun agent -> async {
   let table = ConcurrentDictionary<string, string>()
   while true do
      let! msg = agent.Receive()
      match msg with
      | Add(url, html) ->
         // Add downloaded page to the cache
         table.AddOrUpdate(url, html, fun k v -> html) |> ignore
      | Get(url, repl) ->
         // Get a page from the cache - returns 
         // None if the value isn't in the cache
         if table.ContainsKey(url) then
            repl.Reply(Some table.[url])
         else
            repl.Reply(None)
      | Clear ->
           table.Clear() })
Caching Agent Example
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/// Prints information about the specified web site using cache
let printInfo url = async {
   // Try to get the cached HTML from the caching agent
   let! htmlOpt = caching.PostAndAsyncReply(fun ch -> Get(url, ch))
   match htmlOpt with
   | None ->
       // New url - download it and add it to the cache
       use wc = new WebClient()
       let! text = wc.AsyncDownloadString(Uri(url))
       caching.Post(Add(url, text))
       Console.WriteLine( sprintf "Download: %s (%d)" url text.Length)
   | Some html ->
       // The url was downloaded earlier 
       Console.WriteLine( sprintf "Cached: %s (%d)" url html.Length) }

let printfuncpro = printInfo "http://functional-programming.net"
// Print information about a web site -
// Run this repeatedly to use cached value
for i in 1 .. 10 do
   printfuncpro |> Async.Start
   Async.RunSynchronously <| Async.Sleep 400

// Clear the cache - 'printInfo' will need to
// download data from the web site again
Console.WriteLine(sprintf "Clearing the cache")
caching.Post(Clear)
printfuncpro |> Async.Start

Console.ReadKey() |> ignore

Looking at the implementation above you can see that we need to implement the following members:

  • Start:unit -> unit
  • Receive:?int -> Async<'Msg>
  • Post:'Msg -> unit
  • PostAndTryAsyncReply:(AsyncReplyChannel<'Reply> -> 'Msg) * ?int -> Async<'Reply option>
  • PostAndAsyncReply:(AsyncReplyChannel<'Reply> -> 'Msg) * int option -> Async<'Reply>
  • static member Start:(MailboxProcessor<'Msg> -> Async<unit>) * ?CancellationToken -> MailboxProcessor<'Msg>

These are the only members we need to complete the caching agent example, I didn’t want bamboozle everyone with an explosion of code from the onset so the remaining members will be implemented as and when we need them. When we have implemented all the members from MailboxProcessor Ill post the full source on my GitHub account.

The following members will be outstanding but it should be fairly trivial to implement them once we have completed the code here.

  • PostAndReply:(AsyncReplyChannel<'Reply> -> 'Msg) * int option -> 'Reply
  • Scan:('Msg -> Async<'T> option) * ?int -> Async<'T>
  • TryPostAndReply:(AsyncReplyChannel<'Reply> -> 'Msg) * ?int -> 'Reply option
  • TryReceive:?int -> Async<'Msg option>
  • TryScan:('Msg -> Async<'T> option) * ?int -> Async<'T option>
  • CurrentQueueLength:int
  • DefaultTimeout:int with get, set

So here we go, this is the Dataflow implementation of the MailboxProcessor:

Dataflow Agent
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
module FsDataflow
open System
open System.Threading
open System.Threading.Tasks
open System.Threading.Tasks.Dataflow
open System.Collections.Concurrent

type DataflowAgent<'Msg>(initial, ?cancellationToken) =
    let cancellationToken =
        defaultArg cancellationToken Async.DefaultCancellationToken
    let mutable started = false
    let errorEvent = new Event<System.Exception>()
    let incomingMessages = new BufferBlock<'Msg>()
    let mutable defaultTimeout = Timeout.Infinite

    [<CLIEvent>]
    member this.Error = errorEvent.Publish

    member this.Start() =
        if started
            then raise (new InvalidOperationException("Already Started."))
        else
            started <- true
            let comp = async { try do! initial this
                               with error -> errorEvent.Trigger error }
            Async.Start(computation = comp, cancellationToken = cancellationToken)

    member this.Receive(?timeout) =
        Async.AwaitTask <| incomingMessages.ReceiveAsync()

    member this.Post(item) =
        let posted = incomingMessages.Post(item)
        if not posted then
            raise (InvalidOperationException("Incoming message buffer full."))

    member this.PostAndTryAsyncReply(replyChannelMsg, ?timeout) =
        let timeout = defaultArg timeout defaultTimeout
        let resultCell = AsyncResultCell<_>()
        let msg = replyChannelMsg(AsyncReplyChannel<_>(fun reply -> resultCell.RegisterResult(reply)))
        let posted = incomingMessages.Post(msg)
        if posted then
            match timeout with
            |   Threading.Timeout.Infinite ->
                    async { let! result = resultCell.AsyncWaitResult
                            return Some(result) }
            |   _ ->
                    async { let! ok =  resultCell.GetWaitHandle(timeout)
                            let res = (if ok then Some(resultCell.GrabResult()) else None)
                            return res }
        else async{return None}

    member this.PostAndAsyncReply( replyChannelMsg, ?timeout) =
            let timeout = defaultArg timeout defaultTimeout
            match timeout with
            |   Threading.Timeout.Infinite ->
                let resCell = AsyncResultCell<_>()
                let msg = replyChannelMsg (AsyncReplyChannel<_>(fun reply -> resCell.RegisterResult(reply) ))
                let posted = incomingMessages.Post(msg)
                if posted then
                    resCell.AsyncWaitResult
                else
                    raise (InvalidOperationException("Incoming message buffer full."))
            |   _ ->
                let asyncReply = this.PostAndTryAsyncReply(replyChannelMsg, timeout=timeout)
                async { let! res = asyncReply
                        match res with
                        | None ->  return! raise (TimeoutException("PostAndAsyncReply TimedOut"))
                        | Some res -> return res }

    static member Start(initial, ?cancellationToken) =
        let dfa = DataflowAgent<'Msg>(initial, ?cancellationToken = cancellationToken)
        dfa.Start()
        dfa

The crux of the implementation from TDF’s point of view is the use of the BufferBlock.
This is one of the most fundamental blocks within TDF. Its the equivalent of the Port<'T> type from CCR and the Mailbox type from F# which is used internally within the MailboxProcessor. As mentioned abouve the BufferBlock type is a first-in-first-out (FIFO) buffer and is responsible for buffering any data that is Posted to it.

OK, I’m going to leave it at that for now while you digest the code presented here.

In part II I will be drilling into the detail on whats going on internally and also describing more of the TDF model, so tune in soon for Part II.

Until next time…

Fixing a Hole…

Due to popular demand… well, I had a couple of requests anyway :-) Heres a post inspired by my recent encounters profiling some of the code in Fracture-IO.  

Build 2011 - a Quick Look

Well, I didn’t think I would be doing this but heres some of the sessions Im looking forward to watching from this years Build conference.