Ok so I have been offline for a while now, what with starting a new financial contract in London and not having any broadband access for a while. I have been working on something, honest!
Since the last post I have been reflecting on the pipeline design and it had a distinct object orientated feel to it that I wasnt happy with, so I have amended the structure of the code and come up with the following which simplifies in some areas and expands in others…
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 | |
Summary.
I only want to summarise the code as I think its fairly straight forward to see whats going on.
Interfaces
We have two main interfaces defined IPipelineInput<’a> and IPipelineConnection<’a>, as you can tell by the names they are involved with connecting the pipeline together and getting information into the pipeline. Those two interfaces are merged together in the IPipeline<’a, ‘b> interface, this keeps a nice separation between connecting and inserting into the pipeline, it also makes implementation easier and allows the interfaces to be implemented in other areas of code that need to talk to or connect to a pipeline.
Internals
Inside the pipeline we have the bounded blocking queue which is implemented by the BlockingCollection from TPL. This is used to store the pipeline payloads that are waiting to be processed.
The consumerLoop function is recursive and continually tries to take items from the blocking collection processing and routing each one to the next pipeline stage.
The processor is a function that transforms from type ‘a to type ‘b.
The router is a function that takes a sequence of IPipelineInput<’b> and also the payload ‘b it returns a sequence of IPipelineInput<’b>. What this effectively means is that we can route by the connected stages (i.e. round robin routing, multi-cast routing.) Or we could route by payload contents (i.e. if the payload contains a certain bytes sequence we could choose a certain IPipelineInput<’b>.)
Each item taken is passed to the processor and router via pipeline (|>) and Seq operations, recursively calling itself until an item can no longer be retrieved from the buffer.
The implementation of IPipelineInput<’a>.Insert is the counterpart to the previous function. It first tries to inset the item into the bounded blocking queue, if this cannot be done then the overflow function is called if one is present. Next the async consumer loop is started if it is not already running. The idea behind this is that by keeping the payload processing running on the thread pool while there is work to do it will cut down on the number of context switches between threads. Once an item cannot be taken from the bounding blocking queue the loop will exit.
The rest of the code is pretty standard stuff and should be pretty easy to follow.
I also define some symbolic operations to simply constructing and using the pipeline:
++> Attaches the pipeline stage on the right hand side to the one on the left. –> Detaches the pipelinestage on the right from the one on the left. <<– Inserts a payload on the right into the pipeline stage on the left. –>> Inserts a payload on the left hand side into the pipeline stage on the right.
These help to keep a nice terse description of the pipeline, once things get a little more complex other operators may be required, the now discontinued
Axiom had a whole host of these, its a pity Microsoft dropped the language.
Example
Heres a quick sample pipeline showing the pipeline in use:
- Stage 1 takes a string and splits it based on the ‘,’.
- Stage 2 reverses each string.
- Stage 3 reverses the string back to the original.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 | |
As you can see the assignment of the pipeline stages is pretty simple as is the composition of multiple stages. This was often one of the most difficult areas while developing a similar pipelines in C# you could often find yourself with a few hundred lines of setup code which was a often a nightmare to debug a few weeks later.
Hopefully I have whet your appetite with pipelines, in a future article I will be combining socket operations with pipeline stages to produce a flexible framework to deal with high throughput network applications.
As always I appreciate any comments, until next time…