LAgent: an agent framework in F# – Part X – ActiveObject

-

Download frame­work here.

All posts are here:

If you stare long enough at agents, you start to re­al­ize that they are just glorified locks’. They are a con­ve­nient pro­gram­ming model to pro­tect a re­source from con­cur­rent ac­cess. The pro­gram­ming model is con­ve­nient be­cause both the client and the server can write their code with­out wor­ry­ing about con­cur­rency prob­lems, and yet the pro­gram runs in par­al­lel. Protecting a re­source sounds a lot like state en­cap­su­la­tion and the con­cept of state en­cap­su­la­tion is what ob­ject ori­en­ta­tion is all about.

So you start think­ing if there is a way to en­hance vanilla ob­jects to make them agents. You want to reuse all the con­cepts that you are fa­mil­iar with (i.e. in­her­i­tance, vis­i­bil­ity rules, etc…) and you want your clients to call agents as if they were call­ing nor­mal ob­jects. Obviously, un­der the cover, the method calls won’t ex­e­cute im­me­di­ately, but they would be queued. Let’s look at an ex­am­ple.

This is our sim­ple counter agent:

type CounterMessage =
| Add of int
| Print
let counterF = fun msg count ->
    match msg with
    | Add(i)    -> count + i
    | Print     -> printfn "The value is %i" count; count
let c1 = spawnAgent counterF 0
c1 <-- Add(3)
c1 <—Print

As nice as this looks, there are un­fa­mil­iar things in this model:

  1. The communication is through messages. This requires packing and unpacking which, albeit easy in F#, is unfamiliar and feels like machinery that we’d like to get rid off.
  2. The management of state is bizarre, it gets passed into the lambda and returned from it instead of being represented as fields and properties on the agent

My best at­tempt at cre­at­ing an ob­ject-like syn­tax fol­lows:

type Counter() =
    let w = new WorkQueue()
    let mutable count = 0
    member c.Add x = w.Queue (fun () ->
        count <- count + x
        )
    member c.Print () = w.Queue (fun () ->
        printfn "The value is %i" count
        )
let c = new Counter()
c.Add 3
c.Print

With this syn­tax, you write your agents like you write your vanilla classes ex­cept:

  1. You need a private field of type WorkQueue
  2. You need to write your methods as lambdas passed to the WorkQueue.Queue function
  3. Your methods cannot return values

The most wor­ri­some of these con­straints is 2. be­cause you can eas­ily for­get about it. If you do for­get, then every­thing com­piles just fine, but it does­n’t do what you ex­pect. That’s pure bad­ness. I haven’t found a way to en­force it. This is a place where the lan­guage could help me. Other than that, the whole model works rather nicely.

Regarding the third point, you can con­coct a pro­gram­ming model that al­lows you to re­turn val­ues from your meth­ods. Here it is:

member c.CountTask = w.QueueWithTask(fun () ->
    count
    )
member c.CountAsync = w.QueueWithAsync(fun () ->
    count
    )
printfn "The count using Task is %i" (c.CountTask.Result)

The first method re­turns a Task; the sec­ond method re­turns an AsyncResultCell. Both are ways to rep­re­sent a promise. The lat­ter al­lows a nat­ural in­te­gra­tion with the async block in F# as in the fol­low­ing code:

Async.RunSynchronously (
            async {
                let! count = c.CountAsync
                printfn "The countusing Async is %i" count
            })

As for my­self, I don’t like meth­ods re­turn­ing val­ues. Every time I use them, I end up go­ing back and think­ing about my prob­lem in a tra­di­tional way, aka as method calls that re­turn re­sults, in­stead of think­ing about it in a more ac­tor ori­ented fash­ion. I end up wait­ing for these promises to be ma­te­ri­al­ized and, by do­ing so, I limit the amount of par­al­lelism that I un­leash. As a mat­ter of fact, the whole busi­ness of hid­ing the mes­sage pass­ing na­ture of the pro­gram­ming model is du­bi­ous. It makes for a nicer syn­tax, but you need to make an ex­tra ef­fort in your mind to trans­late it to what it re­ally is: just mes­sage pass­ing with a nice syn­tac­ti­cal ve­neer. I haven’t de­cided yet which model I like the most.

You should have a sense of what WorkQueue is. In essence, it is a Mailbox of lamb­das (look at the red bold code be­low).

type WorkQueue() =
    let workQueue = spawnWorker (fun f -> f())
    member w.Queue (f) = workQueue <-- f
    member w.QueueWithTask f : Task<'T> =
        let source = new TaskCompletionSource<_>()
        workQueue <-- (fun () -> f() |> source.SetResult)
        source.Task
    member w.QueueWithAsync (f:unit -> 'T) : Async<'T> =
        let result = new AsyncResultCell<'T>()
        workQueue <-- (fun () -> f() |> result.RegisterResult )
        result.AsyncWaitResult
    member w.Restart () = workQueue <-! Restart
    member w.Stop () = workQueue <-! Stop
    member w.SetErrorHandler(h) =
        let managerF = fun (_, name:string, ex:Exception, _, _, _) -> h name ex
        let manager = spawnWorker managerF
        workQueue <-! SetManager manager
    member w.SetName(name) = workQueue <-! SetName(name)
    member w.SetQueueHandler(g) = workQueue <-! SetWorkerHandler(g)
    member w.SetTimeoutHandler(timeout, f) = workQueue <-! SetTimeoutHandler(timeout, f)

I im­ple­mented all the ser­vices that are in the mes­sage pass­ing model. The two are equiv­a­lent as ex­press­ing power goes. In case you won­der how a real piece of code looks like us­ing this model, here is an ActiveObject ver­sion of the map re­duce al­go­rithm. One of these days, I will gather the strength to go trough this code and ex­plain what it does, but not to­day 🙂

#load "AgentSystem.fs"
open AgentSystem.LAgent
open System
open System.Collections
open System.Collections.Generic
open System.Threading
type IOutput<'out_key, 'out_value> =
    abstract Reduced: 'out_key -> seq<'out_value> -> unit
    abstract MapReduceDone: unit -> unit
type Mapper<'in_key, 'in_value, 'my_out_key, 'out_value when 'my_out_key : comparison>
(map:'in_key -> 'in_value -> seq<'my_out_key * 'out_value>, i, partitionF) = let w = new WorkQueue() let mutable reducerTracker: BitArray = null let mutable controller = Unchecked.defaultof<Controller<'in_key, 'in_value, 'my_out_key, 'out_value>> let mutable reducers = Unchecked.defaultof<Reducer<'in_key, 'in_value, 'my_out_key, 'out_value> array> member m.Init c reds = w.Queue (fun () -> controller <- c reducers <- reds reducerTracker <- new BitArray(reducers.Length, false)) member m.Process inKey inValue = w.Queue (fun () -> let outKeyValues = map inKey inValue outKeyValues |> Seq.iter (fun (outKey, outValue) -> let reducerUsed = partitionF outKey (reducers.Length) reducerTracker.Set(reducerUsed, true) reducers.[reducerUsed].Add(outKey, outValue))) member m.Done () = w.Queue (fun () -> controller.MapDone i reducerTracker) member m.Stop () = w.Stop () and Reducer<'in_key, 'in_value, 'out_key, 'out_value when 'out_key :
comparison>(reduce:'out_key -> seq<'out_value> -> seq<'out_value>, i, output:IOutput<'out_key, 'out_value>) = let w = new WorkQueue() let mutable workItems = new List<'out_key * 'out_value>() let mutable controller = Unchecked.defaultof<Controller<'in_key, 'in_value, 'out_key, 'out_value>> member r.Init c = w.Queue (fun () -> controller <- c) member r.StartReduction () = w.Queue (fun () -> workItems |> Seq.groupBy fst |> Seq.sortBy fst |> Seq.map (fun (key, values) -> (key, reduce key (values |> Seq.map snd))) |> Seq.iter (fun (key, value) -> output.Reduced key value) controller.ReductionDone i) member r.Add (outKey:'out_key, outValue:'out_value) : unit = w.Queue (fun () -> workItems.Add((outKey, outValue))) member m.Stop () = w.Stop () and Controller<'in_key, 'in_value, 'out_key, 'out_value when 'out_key : comparison>(output:IOutput<'out_key, 'out_value>) = let w = new WorkQueue() let mutable mapperTracker: BitArray = null let mutable reducerUsedByMappers: BitArray = null let mutable reducerDone: BitArray = null let mutable mappers = Unchecked.defaultof<Mapper<'in_key, 'in_value, 'out_key, 'out_value> array> let mutable reducers = Unchecked.defaultof<Reducer<'in_key, 'in_value, 'out_key, 'out_value> array> let BAtoSeq (b:BitArray) = [for x in b do yield x] member c.Init maps reds = w.Queue (fun () -> mappers <- maps reducers <- reds mapperTracker <- new BitArray(mappers.Length, false) reducerUsedByMappers <- new BitArray(reducers.Length, false) reducerDone <- new BitArray(reducers.Length, false)) member c.MapDone (i : int) (reducerTracker : BitArray) : unit = w.Queue (fun () -> mapperTracker.Set(i, true) let reducerUsedByMappers = reducerUsedByMappers.Or(reducerTracker) if not( BAtoSeq mapperTracker |> Seq.exists(fun bit -> bit = false)) then BAtoSeq reducerUsedByMappers |> Seq.iteri (fun i r -> if r = true then reducers.[i].StartReduction ()) mappers |> Seq.iter (fun m -> m.Stop ()) ) member c.ReductionDone (i: int) : unit = w.Queue (fun () -> reducerDone.Set(i, true) if BAtoSeq reducerDone |> Seq.forall2 (fun x y -> x = y) (BAtoSeq reducerUsedByMappers) then output.MapReduceDone () reducers |> Seq.iter (fun r -> r.Stop ()) c.Stop() ) member m.Stop () = w.Stop () let mapReduce (inputs:seq<'in_key * 'in_value>) (map:'in_key -> 'in_value -> seq<'out_key * 'out_value>) (reduce:'out_key -> seq<'out_value> -> seq<'out_value>) (output:IOutput<'out_key, 'out_value>) M R partitionF = let len = inputs |> Seq.length let M = if len < M then len else M let mappers = Array.init M (fun i -> new Mapper<'in_key, 'in_value, 'out_key, 'out_value>(map, i, partitionF)) let reducers = Array.init R (fun i -> new Reducer<'in_key, 'in_value, 'out_key, 'out_value>(reduce, i, output)) let controller = new Controller<'in_key, 'in_value, 'out_key, 'out_value>(output) mappers |> Array.iter (fun m -> m.Init controller reducers) reducers |> Array.iter (fun r -> r. Init controller ) controller.Init mappers reducers inputs |> Seq.iteri (fun i (inKey, inValue) -> mappers.[i % M].Process inKey inValue) mappers |> Seq.iter (fun m -> m.Done ()) let partitionF = fun key M -> abs(key.GetHashCode()) % M let map = fun (fileName:string) (fileContent:string) -> let l = new List<string * int>() let wordDelims = [|' ';',';';';'.';':';'?';'!';'(';')';'n';'t';'f';'r';'b'|] fileContent.Split(wordDelims) |> Seq.iter (fun word -> l.Add((word, 1))) l :> seq<string * int> let reduce = fun key (values:seq<int>) -> [values |> Seq.sum] |> seq<int> let printer () = { new IOutput<string, int> with member o.Reduced key values = printfn "%A %A" key values member o.MapReduceDone () = printfn "All done!!"} let testInput =
["File1", "I was going to the airport when I saw someone crossing"; "File2", "I was going home when I saw you coming toward me"] mapReduce testInput map reduce (printer ()) 2 2 partitionF open System.IO open System.Text let gatherer(step) = let w = new WorkQueue() let data = new List<string * int>() let counter = ref 0 { new IOutput<string, int> with member o.Reduced key values = w.Queue (fun () -> if !counter % step = 0 then printfn "Processed %i words. Now processing %s" !counter key data.Add((key, values |> Seq.hd)) counter := !counter + 1) member o.MapReduceDone () = w.Queue (fun () -> data |> Seq.distinctBy (fun (key, _) -> key.ToLower()) |> Seq.filter (fun (key, _) -> not(key = "" || key = """ || (fst (Double.TryParse(key))))) |> Seq.to_array |> Array.sortBy snd |> Array.rev |> Seq.take 20 |> Seq.iter (fun (key, value) -> printfn "%Att%A" key value) printfn "All done!!") } let splitBook howManyBlocks fileName = let buffers = Array.init howManyBlocks (fun _ -> new StringBuilder()) fileName |> File.ReadAllLines |> Array.iteri (fun i line -> buffers.[i % (howManyBlocks)].Append(line) |> ignore) buffers let blocks1 = __SOURCE_DIRECTORY__ + "kjv10.txt" |> splitBook 100 let blocks2 = __SOURCE_DIRECTORY__ + "warandpeace.txt" |> splitBook 100 let input = blocks1 |> Array.append blocks2 |> Array.mapi (fun i b -> i.ToString(), b.ToString()) //mapReduce input map reduce (gatherer(1000)) 20 20 partitionF type BookSplitter () = let blocks = new List<string * string>() member b.Split howManyBlocks fileName = let b = fileName |> splitBook howManyBlocks |> Array.mapi (fun i b -> i.ToString(), b.ToString()) blocks.AddRange(b) member b.Blocks () = blocks.ToArray() :> seq<string * string> type WordCounter () = let w = new WorkQueue() let words = new Dictionary<string,int>() let worker(wordCounter:WordCounter, ev:EventWaitHandle) = let w1 = new WorkQueue() { new IOutput<string, int> with member o.Reduced key values = w1.Queue (fun() -> wordCounter.AddWord key (values |> Seq.hd)) member o.MapReduceDone () = w1.Queue(fun () -> ev.Set() |> ignore) } member c.AddWord word count = let exist, value = words.TryGetValue(word) if exist then words.[word] <- value + count else words.Add(word, count) member c.Add fileName = w.Queue (fun () -> let s = new BookSplitter() fileName |> s.Split 100 let ev = new EventWaitHandle(false, EventResetMode.AutoReset) let blocks = s.Blocks () mapReduce blocks map reduce (worker(c, ev)) 20 20 partitionF ev.WaitOne() |> ignore ) member c.Words = w.QueueWithAsync (fun () -> words |> Seq.to_array |> Array.map (fun kv -> kv.Key, kv.Value) ) let wc = new WordCounter() wc.Add (__SOURCE_DIRECTORY__ + "kjv10.txt") wc.Add (__SOURCE_DIRECTORY__ + "warandpeace.txt") let wordsToPrint = async { let! words = wc.Words return words |> Seq.distinctBy (fun (key, _) -> key.ToLower()) |> Seq.filter (fun (key, _) -> not(key = "" || key = """ || (fst (Double.TryParse(key))))) |> Seq.to_array |> Array.sortBy snd |> Array.rev |> Seq.take 20 |> Seq.iter (fun (key, value) -> printfn "%Att%A" key value)} Async.RunSynchronously wordsToPrint Thread.Sleep(15000) printfn "Closed session"

Tags