Download framework here.
All posts are here:
- Part I - Workers and ParallelWorkers
- Part II - Agents and control messages
- Part III - Default error management
- Part IV - Custom error management
- Part V - Timeout management
- Part VI - Hot swapping of code
- Part VII - An auction framework
- Part VIII — Implementing MapReduce (user model)
- Part IX — Counting words …
For this post I use a newer version of the framework that I just uploaded on CodeGallery. In the process of using LAgent I grew more and more unhappy with the weakly typed way of sending messages. The code that implements that feature is nasty: full of upcasts and downcasts. I was losing faith in it. Bugs were cropping up in all sorts of scenarios (i.e. using generic union types as messages).
In the end I decided to re-architecture the framework so to make it strongly typed. In essence now each agent can just receive messages of a single type. The limitations that this design choice introduces (i.e. more limited hot swapping) are compensated by the catching of errors at compile time and the streamlining of the code. I left the old framework on the site in case you disagree with me.
In any case, today’s post is about MapReduce. It assumes that you know what it is (link to the original Google paper that served as inspiration is here: Google Research Publication- MapReduce). What would it take to implement an in-memory MapReduce using my agent framework?
Let’s start with the user model.
let mapReduce (inputs:seq<'in_key * 'in_value>) (map:'in_key -> 'in_value -> seq<'out_key * 'out_value>) (reduce:'out_key -> seq<'out_value> -> seq<'reducedValues>) outputAgent M R partitionF =
mapReduce takes seven parameters:
- inputs: a sequence of input key/value pairs.
- map: this function operates on each input key/value pair. It returns a sequence of output key/value pairs. The type of the output sequence can be different from the type of the inputs.
- reduce: this function operates on an output key and all the values associated with it. It returns a sequence of reduced values (i.e. the average of all the values for this key)
- ouputAgent: this is the agent that gets notiﬁed every time a new output key has been reduced and at the end when all the operation ends.
- M: how many mapper agents to instantiate
- R: how many reducer agents to instantiate
- partitionF: the partition function used to choose which of the reducers is associated with a key
Let’s look at how to use this function to ﬁnd how often each word is used in a set of ﬁles. First a simple partition function can be deﬁned as:
let partitionF = fun key M -> abs(key.GetHashCode()) % M
Given a key and some buckets, it picks one of the buckets. Its type is: ’a –> int –> int, so it’s fairly reusable.
Let’s also create a basic agent that just prints out the reduced values:
let printer = spawnWorker (fun msg -> match msg with | Reduced(key, value) -> printfn "%A %A" key value | MapReduceDone -> printfn "All done!!")
The agent gets notiﬁed whenever a new key is reduced or the algorithm ends. It is useful to be notiﬁed immediately instead of waiting for everything to be done. If I hadn’t written this code using agents I would have not realized that possibility. I would simply have framed the problem as a function that takes an input and returns an output. Agents force you to think explicitly about the parallelism in your app. That’s a good thing.
The mapping function simply split the content of a ﬁle into words and adds a word/1 pair to the list. I know that there are much better ways to do this (i.e. regular expressions for the parsing and summing words counts inside the function), but I wanted to test the basic framework capabilities and doing it this way does it better.
let map = fun (fileName:string) (fileContent:string) -> let l = new List<string * int>() let wordDelims = [|' ';',';';';'.';':';'?';'!';'(';')';'n';'t';'f';'r';'b'|] fileContent.Split(wordDelims) |> Seq.iter (fun word -> l.Add((word, 1))) l :> seq<string * int>
The reducer function simply sums the various word statistics sent by the mappers:
let reduce = fun key (values:seq<int>) -> [values |> Seq.sum] |> seq<int>
Now we can create some fake input to check that it works:
let testInput = ["File1", "I was going to the airport when I saw someone crossing";
"File2", "I was going home when I saw you coming toward me"]
And execute the mapReduce:
mapReduce testInput map reduce printer 2 2 partitionF
On my machine I get the following. You might get a different order because of the async/parallel processing involved. If I wanted a stable order I would need to change the printer agent to cache results on Reduced and process them on MapReduceDone (see next post).
In the next post we’ll process some real books …