Download framework here.

All posts are here:

Let’s now use our mapReduce to do something more interesting, for example finding the frequency of words in several books. Now the agent that processes the output needs to be a bit more complex.

let gathererF = fun msg (data:List<string * int>, counter, step) ->
                    match msg with
                    | Reduced(key, value)   ->
                        if counter % step = 0 then
                            printfn "Processed %i words. Now processing %s" counter key
                        data.Add((key, value |> Seq.hd))
                        data, counter + 1, step
                    | MapReduceDone         ->
                        data
                        |> Seq.distinctBy (fun (key, _) -> key.ToLower())
                        |> Seq.filter (fun (key, _) -> not(key = "" || key = """ ||
                                                             (fst (Double.TryParse(key)))))
                        |> Seq.to_array
                        |> Array.sortBy snd
                        |> Array.rev
                        |> Seq.take 20
                        |> Seq.iter (fun (key, value) -> printfn "%A\t%A" key value)
                        printfn "All done!!"
                        data, counter, step
let gatherer = spawnAgent gathererF (new List<string * int>(), 0, 1000)

Every time a new word is reduced, a message is printed out and the result is added to a running list. When everything is done such a list is printed out by first manipulating it to reduce weirdness and limit the number of items. BTW: there are at least two bugs in this code, maybe more (late night quick-and-dirty-see-if-the-algo-works kind of coding).

We want to maximize the number of processors to use, so let’s split the books in chunks so that they can be operated in parallel. The code below roughly does it (I say roughly because it doesn’t chunk the lines in the right order, but for this particular case it doesn’t matter).

let gatherer = spawnAgent gathererF (new List<string * int>(), 0, 1000)
let splitBook howManyBlocks fileName =
    let buffers = Array.init howManyBlocks (fun _ -> new StringBuilder())
    fileName
    |> File.ReadAllLines
    |> Array.iteri (fun i line -> buffers.[i % (howManyBlocks)].Append(line) |> ignore)
    buffers
let blocks1 = "C:UserslucabolDesktopAgentsAgentskjv10.txt" |> splitBook 100
let blocks2 = "C:UserslucabolDesktopAgentsAgentswarandpeace.txt" |> splitBook 100
let input =
    blocks1
    |> Array.append blocks2
    |> Array.mapi (fun i b -> i.ToString(), b.ToString())

And let’s execute!!

mapReduce input map reduce gatherer 20 20 partitionF

On my machine I get the following, which could be the right result.

```fsharp “a” 16147 “And” 13071 “I” 11349 “unto” 8125 “as” 6400 “her” 5865 “which” 5544 “from” 5378 “at” 5175 “on” 5155 “have” 5135 “me” 5068 “my” 4629 “this” 3782 “out” 3653 “ye” 3399 “when” 3312 “an” 2841 “upon” 2558 “so” 2489 All done!!