Download framework here.
All posts are here:
- Part I - Workers and ParallelWorkers
- Part II - Agents and control messages
- Part III - Default error management
- Part IV - Custom error management
- Part V - Timeout management
- Part VI - Hot swapping of code
- Part VII - An auction framework
- Part VIII — Implementing MapReduce (user model)
- Part IX — Counting words …
Let’s now use our mapReduce to do something more interesting, for example ﬁnding the frequency of words in several books. Now the agent that processes the output needs to be a bit more complex.
let gathererF = fun msg (data:List<string * int>, counter, step) -> match msg with | Reduced(key, value) -> if counter % step = 0 then printfn "Processed %i words. Now processing %s" counter key data.Add((key, value |> Seq.hd)) data, counter + 1, step | MapReduceDone -> data |> Seq.distinctBy (fun (key, _) -> key.ToLower()) |> Seq.filter (fun (key, _) -> not(key = "" || key = """ ||
(fst (Double.TryParse(key))))) |> Seq.to_array |> Array.sortBy snd |> Array.rev |> Seq.take 20 |> Seq.iter (fun (key, value) -> printfn "%Att%A" key value) printfn "All done!!" data, counter, step let gatherer = spawnAgent gathererF (new List<string * int>(), , 1000)
Every time a new word is reduced, a message is printed out and the result is added to a running list. When everything is done such a list is printed out by ﬁrst manipulating it to reduce weirdness and limit the number of items. BTW: there are at least two bugs in this code, maybe more (late night quick-and-dirty-see-if-the-algo-works kind of coding).
We want to maximize the number of processors to use, so let’s split the books in chunks so that they can be operated in parallel. The code below roughly does it (I say roughly because it doesn’t chunk the lines in the right order, but for this particular case it doesn’t matter).
let gatherer = spawnAgent gathererF (new List<string * int>(), , 1000) let splitBook howManyBlocks fileName = let buffers = Array.init howManyBlocks (fun _ -> new StringBuilder()) fileName |> File.ReadAllLines |> Array.iteri (fun i line -> buffers.[i % (howManyBlocks)].Append(line) |> ignore) buffers let blocks1 = "C:UserslucabolDesktopAgentsAgentskjv10.txt" |> splitBook 100 let blocks2 = "C:UserslucabolDesktopAgentsAgentswarandpeace.txt" |> splitBook 100 let input = blocks1 |> Array.append blocks2 |> Array.mapi (fun i b -> i.ToString(), b.ToString())
And let’s execute!!
mapReduce input map reduce gatherer 20 20 partitionF
On my machine I get the following, which could be the right result.
"a" 16147 "And" 13071 "I" 11349 "unto" 8125 "as" 6400 "her" 5865 "which" 5544 "from" 5378 "at" 5175 "on" 5155 "have" 5135 "me" 5068 "my" 4629 "this" 3782 "out" 3653 "ye" 3399 "when" 3312 "an" 2841 "upon" 2558 "so" 2489 All done!!