Download framework here.
All posts are here:
- Part I - Workers and ParallelWorkers
- Part II - Agents and control messages
- Part III - Default error management
- Part IV - Custom error management
- Part V - Timeout management
- Part VI - Hot swapping of code
- Part VII - An auction framework
- Part VIII – Implementing MapReduce (user model)
- Part IX – Counting words …
If you stare long enough at agents, you start to realize that they are just ‘glorified locks’. They are a convenient programming model to protect a resource from concurrent access. The programming model is convenient because both the client and the server can write their code without worrying about concurrency problems, and yet the program runs in parallel. Protecting a resource sounds a lot like state encapsulation and the concept of state encapsulation is what object orientation is all about.
So you start thinking if there is a way to enhance vanilla objects to make them agents. You want to reuse all the concepts that you are familiar with (i.e. inheritance, visibility rules, etc…) and you want your clients to call agents as if they were calling normal objects. Obviously, under the cover, the method calls won’t execute immediately, but they would be queued. Let’s look at an example.
This is our simple counter agent:
type CounterMessage =
| Add of int
| Print
let counterF = fun msg count ->
match msg with
| Add(i) -> count + i
| Print -> printfn "The value is %i" count; count
let c1 = spawnAgent counterF 0
c1 <-- Add(3)
c1 <-- Print
As nice as this looks, there are unfamiliar things in this model:
- The communication is through messages. This requires packing and unpacking which, albeit easy in F#, is unfamiliar and feels like machinery that we’d like to get rid off.
- The management of state is bizarre, it gets passed into the lambda and returned from it instead of being represented as fields and properties on the agent
My best attempt at creating an object-like syntax follows:
type Counter() =
let w = new WorkQueue()
let mutable count = 0
member c.Add x = w.Queue (fun () ->
count <- count + x
)
member c.Print () = w.Queue (fun () ->
printfn "The value is %i" count
)
let c = new Counter()
c.Add 3
c.Print
With this syntax, you write your agents like you write your vanilla classes except:
- You need a private field of type WorkQueue
- You need to write your methods as lambdas passed to the WorkQueue.Queue function
- Your methods cannot return values
The most worrisome of these constraints is 2. because you can easily forget about it. If you do forget, then everything compiles just fine, but it doesn’t do what you expect. That’s pure badness. I haven’t found a way to enforce it. This is a place where the language could help me. Other than that, the whole model works rather nicely.
Regarding the third point, you can concoct a programming model that allows you to return values from your methods. Here it is:
member c.CountTask = w.QueueWithTask(fun () ->
count
)
member c.CountAsync = w.QueueWithAsync(fun () ->
count
)
printfn "The count using Task is %i" (c.CountTask.Result)
The first method returns a Task; the second method returns an AsyncResultCell. Both are ways to represent a promise. The latter allows a natural integration with the async block in F# as in the following code:
Async.RunSynchronously (
async {
let! count = c.CountAsync
printfn "The countusing Async is %i" count
})
As for myself, I don’t like methods returning values. Every time I use them, I end up going back and thinking about my problem in a traditional way, aka as method calls that return results, instead of thinking about it in a more actor oriented fashion. I end up waiting for these promises to be materialized and, by doing so, I limit the amount of parallelism that I unleash. As a matter of fact, the whole business of hiding the message passing nature of the programming model is dubious. It makes for a nicer syntax, but you need to make an extra effort in your mind to translate it to what it really is: just message passing with a nice syntactical veneer. I haven’t decided yet which model I like the most.
You should have a sense of what WorkQueue is. In essence, it is a Mailbox of lambdas (look at the red bold code below).
type WorkQueue() =
let workQueue = spawnWorker (fun f -> f())
member w.Queue (f) = workQueue <-- f
member w.QueueWithTask f : Task<'T> =
let source = new TaskCompletionSource<_>()
workQueue <-- (fun () -> f() |> source.SetResult)
source.Task
member w.QueueWithAsync (f:unit -> 'T) : Async<'T> =
let result = new AsyncResultCell<'T>()
workQueue <-- (fun () -> f() |> result.RegisterResult )
result.AsyncWaitResult
member w.Restart () = workQueue <-! Restart
member w.Stop () = workQueue <-! Stop
member w.SetErrorHandler(h) =
let managerF = fun (_, name:string, ex:Exception, _, _, _) -> h name ex
let manager = spawnWorker managerF
workQueue <-! SetManager manager
member w.SetName(name) = workQueue <-! SetName(name)
member w.SetQueueHandler(g) = workQueue <-! SetWorkerHandler(g)
member w.SetTimeoutHandler(timeout, f) = workQueue <-! SetTimeoutHandler(timeout, f)
I implemented all the services that are in the message passing model. The two are equivalent as expressing power goes. In case you wonder how a real piece of code looks like using this model, here is an ActiveObject version of the map reduce algorithm. One of these days, I will gather the strength to go trough this code and explain what it does, but not today 🙂
```fsharp
load “AgentSystem.fs”
open AgentSystem.LAgent
open System
open System.Collections
open System.Collections.Generic
open System.Threading
type IOutput<’out_key, ‘out_value> =
abstract Reduced: ‘out_key -> seq<’out_value> -> unit
abstract MapReduceDone: unit -> unit
type Mapper<’in_key, ‘in_value, ‘my_out_key, ‘out_value when ‘my_out_key : comparison>
(map:’in_key -> ‘in_value -> seq<’my_out_key * ‘out_value>, i, partitionF) =
let w = new WorkQueue()
let mutable reducerTracker: BitArray = null
let mutable controller = Unchecked.defaultof<Controller<’in_key, ‘in_value, ‘my_out_key, ‘out_value>>
let mutable reducers = Unchecked.defaultof<Reducer<’in_key, ‘in_value, ‘my_out_key, ‘out_value> array>
member m.Init c reds =
w.Queue (fun () ->
controller <- c
reducers <- reds
reducerTracker <- new BitArray(reducers.Length, false))
member m.Process inKey inValue =
w.Queue (fun () ->
let outKeyValues = map inKey inValue
outKeyValues |> Seq.iter (fun (outKey, outValue) ->
let reducerUsed = partitionF outKey (reducers.Length)
reducerTracker.Set(reducerUsed, true)
reducers.[reducerUsed].Add(outKey, outValue)))
member m.Done () =
w.Queue (fun () ->
controller.MapDone i reducerTracker)
member m.Stop () = w.Stop ()
and Reducer<’in_key, ‘in_value, ‘out_key, ‘out_value when ‘out_key :
comparison>(reduce:’out_key -> seq<’out_value> -> seq<’out_value>, i, output:IOutput<’out_key, ‘out_value>) =
let w = new WorkQueue()
let mutable workItems = new List<’out_key * ‘out_value>()
let mutable controller = Unchecked.defaultof<Controller<’in_key, ‘in_value, ‘out_key, ‘out_value>>
member r.Init c =
w.Queue (fun () ->
controller <- c)
member r.StartReduction () =
w.Queue (fun () ->
workItems
|> Seq.groupBy fst
|> Seq.sortBy fst
|> Seq.map (fun (key, values) -> (key, reduce key (values |> Seq.map snd)))
|> Seq.iter (fun (key, value) -> output.Reduced key value)
controller.ReductionDone i)
member r.Add (outKey:’out_key, outValue:’out_value) : unit =
w.Queue (fun () ->
workItems.Add((outKey, outValue)))
member m.Stop () = w.Stop ()
and Controller<’in_key, ‘in_value, ‘out_key, ‘out_value when ‘out_key : comparison>(output:IOutput<’out_key, ‘out_value>) =
let w = new WorkQueue()
let mutable mapperTracker: BitArray = null
let mutable reducerUsedByMappers: BitArray = null
let mutable reducerDone: BitArray = null
let mutable mappers = Unchecked.defaultof<Mapper<’in_key, ‘in_value, ‘out_key, ‘out_value> array>
let mutable reducers = Unchecked.defaultof<Reducer<’in_key, ‘in_value, ‘out_key, ‘out_value> array>
let BAtoSeq (b:BitArray) = [for x in b do yield x]
member c.Init maps reds =
w.Queue (fun () ->
mappers <- maps
reducers <- reds
mapperTracker <- new BitArray(mappers.Length, false)
reducerUsedByMappers <- new BitArray(reducers.Length, false)
reducerDone <- new BitArray(reducers.Length, false))
member c.MapDone (i : int) (reducerTracker : BitArray) : unit =
w.Queue (fun () ->
mapperTracker.Set(i, true)
let reducerUsedByMappers = reducerUsedByMappers.Or(reducerTracker)
if not( BAtoSeq mapperTracker |> Seq.exists(fun bit -> bit = false)) then
BAtoSeq reducerUsedByMappers |> Seq.iteri (fun i r -> if r = true then reducers.[i].StartReduction ())
mappers |> Seq.iter (fun m -> m.Stop ())
)
member c.ReductionDone (i: int) : unit =
w.Queue (fun () ->
reducerDone.Set(i, true)
if BAtoSeq reducerDone |> Seq.forall2 (fun x y -> x = y) (BAtoSeq reducerUsedByMappers) then
output.MapReduceDone ()
reducers |> Seq.iter (fun r -> r.Stop ())
c.Stop()
)
member m.Stop () = w.Stop ()
let mapReduce (inputs:seq<’in_key * ‘in_value>)
(map:’in_key -> ‘in_value -> seq<’out_key * ‘out_value>)
(reduce:’out_key -> seq<’out_value> -> seq<’out_value>)
(output:IOutput<’out_key, ‘out_value>)
M R partitionF =
let len = inputs |> Seq.length
let M = if len < M then len else M
let mappers = Array.init M (fun i -> new Mapper<’in_key, ‘in_value, ‘out_key, ‘out_value>(map, i, partitionF))
let reducers = Array.init R (fun i -> new Reducer<’in_key, ‘in_value, ‘out_key, ‘out_value>(reduce, i, output))
let controller = new Controller<’in_key, ‘in_value, ‘out_key, ‘out_value>(output)
mappers |> Array.iter (fun m -> m.Init controller reducers)
reducers |> Array.iter (fun r -> r. Init controller )
controller.Init mappers reducers
inputs |> Seq.iteri (fun i (inKey, inValue) -> mappers.[i % M].Process inKey inValue)
mappers |> Seq.iter (fun m -> m.Done ())
let partitionF = fun key M -> abs(key.GetHashCode()) % M
let map = fun (fileName:string) (fileContent:string) ->
let l = new List
View comments on GitHub or email me