A simpler F# MailboxProcessor

-

I al­ways for­get the pat­tern to use to cre­ate a func­tion­ing MailboxProcessor in F#. I mean, which piece has to be async and how to struc­ture the re­cur­sive loop. When I find my­self in that kind of a sit­u­a­tion sit­u­a­tion, my in­stincts scream at me: Wrap it and make it work how your mind ex­pects it to work”. So here is a sim­pli­fi­ca­tion of the par­a­digm.

Let’s see how some stan­dard MailboxProcessor code looks like:

let counter0 =
    MailboxProcessor.Start(fun inbox ->
        let rec loop n =
            async {
                    let! msg = inbox.Receive()
                    return! loop(n+msg) }
        loop 0)

This keeps a run­ning sum of the mes­sages it re­ceives. The only part that is re­ally unique to this guy is n + msg”. All the rest is in­fra­struc­ture.

You’d prob­a­bly pre­fer to write code like the fol­low­ing:

let counter1 = MailboxProcessor.SpawnAgent( (fun msg n -> msg + n), 0)

Yep, just one line of code. But, is it pos­si­ble? Let’s look at one way of do­ing it:

type AfterError<'state> =
| ContinueProcessing of 'state
| StopProcessing
| RestartProcessing
type MailboxProcessor<'a> with
    static member public SpawnAgent<'b>(messageHandler :'a->'b->'b,
initialState : 'b, ?timeout:'b -> int, ?timeoutHandler:'b -> AfterError<'b>,
?errorHandler:
Exception -> 'a option -> 'b -> AfterError<'b>)
: MailboxProcessor<'a> = let timeout = defaultArg timeout (fun _ -> -1) let timeoutHandler = defaultArg timeoutHandler (fun state –>
ContinueProcessing(state)) let errorHandler = defaultArg errorHandler (fun _ _ state –>
ContinueProcessing(state)) MailboxProcessor.Start(fun inbox -> let rec loop(state) = async { let! msg = inbox.TryReceive(timeout(state)) try match msg with | None -> match timeoutHandler state with | ContinueProcessing(newState) ->
return!
loop(newState) | StopProcessing -> return () | RestartProcessing -> return! loop(initialState) | Some(m) -> return! loop(messageHandler m state) with | ex -> match errorHandler ex msg state with | ContinueProcessing(newState) -> return! loop(newState) | StopProcessing -> return () | RestartProcessing -> return! loop(initialState) } loop(initialState))

The funny for­mat­ting is be­cause I have to fit it in the small hor­i­zon­tal space of this blog. In any case, this is just a sim­ple (?) wrap­per for the MailboxProcessor pat­tern. The func­tion takes two nec­es­sary pa­ra­me­ters and two op­tional ones:

An ex­am­ple of how to use er­rorHan­dler to im­ple­ment the CountingAgent in the Expert F# book fol­lows:

type msg = Increment of int | Fetch of AsyncReplyChannel<int> | Stop
exception StopException
type CountingAgent() =
    let counter = MailboxProcessor.SpawnAgent((fun msg n ->
                    match msg with
                    | Increment m ->  n + m
                    | Stop -> raise(StopException)
                    | Fetch replyChannel ->
                        do replyChannel.Reply(n)
                        n
                  ), 0, errorHandler = (fun _ _ _ -> StopProcessing))
    member a.Increment(n) = counter.Post(Increment(n))
    member a.Stop() = counter.Post(Stop)
    member a.Fetch() = counter.PostAndReply(fun replyChannel -> Fetch(replyChannel))
let counter2 = CountingAgent()
counter2.Increment(1)
counter2.Fetch()
counter2.Increment(2)
counter2.Fetch()
counter2.Stop()                             

Sometimes your agent does­n’t need a state, it is purely state­less. Something as sim­ple as the fol­low­ing:

let echo = MailboxProcessor<_>.SpawnWorker(fun msg -> printfn "%s" msg)

You can eas­ily make that hap­pen by us­ing this toned down ver­sion of an agent, called worker:

static member public SpawnWorker(messageHandler,  ?timeout, ?timeoutHandler,?errorHandler) =
    let timeout = defaultArg timeout (fun () -> -1)
    let timeoutHandler = defaultArg timeoutHandler (fun _ -> ContinueProcessing(()))
    let errorHandler = defaultArg errorHandler (fun _ _ -> ContinueProcessing(()))
    MailboxProcessor.SpawnAgent((fun msg _ -> messageHandler msg; ()),
(), timeout, timeoutHandler,
(fun ex msg _ -> errorHandler ex msg))

Given that they are par­al­lel, you might want to run a whole bunch of them at the same time. You might want some­thing that looks like a worker, but that, un­der the cover, ex­e­cute each mes­sage­Han­dler in par­al­lel. Something like:

type msg1 = Message1 | Message2 of int | Message3 of string
let a = MailboxProcessor.SpawnParallelWorker(function
                | Message1 -> printfn "Message1";
                | Message2 n -> printfn "Message2 %i" n;
                | Message3 _ -> failwith "I failed"
                , 10
                , errorHandler = (fun ex _ -> printfn "%A" ex; ContinueProcessing()))
a.Post(Message1)
a.Post(Message2(100))
a.Post(Message3("abc"))
a.Post(Message2(100))

In this ex­am­ple, the dif­fer­ent mes­sages, are likely to cause things to print out of or­der. Notice the num­ber 10 above which is how many work­ers you want to process your mes­sages. This is im­ple­mented by round-robin mes­sages to the var­i­ous work­ers:

static member public SpawnParallelWorker(messageHandler, howMany, ?timeout,
?timeoutHandler,?errorHandler) = let timeout = defaultArg timeout (fun () -> -1) let timeoutHandler = defaultArg timeoutHandler (fun _ -> ContinueProcessing(())) let errorHandler = defaultArg errorHandler (fun _ _ -> ContinueProcessing(())) MailboxProcessor<'a>.SpawnAgent((fun msg (workers:MailboxProcessor<'a> array, index) -> workers.[index].Post msg (workers, (index + 1) % howMany)) , (Array.init howMany
(fun _ -> MailboxProcessor<'a>.SpawnWorker(
messageHandler, timeout, timeoutHandler,
errorHandler)), 0))

One draw­back with the cur­rent code is that it does­n’t sup­ports can­cel­la­tions. It should be pos­si­ble to wrap that too, but I haven’t done it (yet). If you don’t want to cut and paste the code, it is in­side the AgentSystem.fs file here.

Tags