Download framework here.
All posts are here:
- Part I - Workers and ParallelWorkers
- Part II - Agents and control messages
- Part III - Default error management
- Part IV - Custom error management
- Part V - Timeout management
- Part VI - Hot swapping of code
- Part VII - An auction framework
- Part VIII – Implementing MapReduce (user model)
- Part IX – Counting words …
Here is an application that uses the framework we have been creating. It is an auction application and it is described in more detail here.
Let’s go through it.
type AuctionMessage = | Offer of int * AsyncAgent // Make a bid | Inquire of AsyncAgent // Check the status and AuctionReply = | StartBidding | Status of int * DateTime // Asked sum and expiration | BestOffer // Ours is the best offer | BeatenOffer of int // Yours is beaten by another offer | AuctionConcluded of // Auction concluded AsyncAgent * AsyncAgent | AuctionFailed // Failed without any bids | AuctionOver // Bidding is closed let timeToShutdown = 3000 let bidIncrement = 10
This is the format of the messages that the clients can send and the action agent can reply to. F# is really good at this sort of thing. First, we need an auction agent:
let auctionAgent seller minBid closing = let agent = spawnAgent (fun msg (isConcluded, maxBid, maxBidder) -> match msg with | Offer (_, client) when isConcluded -> client <-- AuctionOver (isConcluded, maxBid, maxBidder) | Offer(bid, client) when not(isConcluded) -> if bid >= maxBid + bidIncrement then if maxBid >= minBid then maxBidder <-- BeatenOffer bid client <-- BestOffer (isConcluded, bid, client) else client <-- BeatenOffer maxBid (isConcluded, maxBid, maxBidder) | Inquire client -> client <-- Status(maxBid, closing) (isConcluded, maxBid, maxBidder)) (false, (minBid - bidIncrement), spawnWorker (fun _ -> ()))
Notice that, if the action is concluded, the agent replies to offers by sending an AuctionOver message. If the auction is still open, then, in case the bid is higher than the max, it sets a new max and notify the two parties involved; otherwise it notifies the bidder that the offer wasn’t successful. Also you can ask for the status of the auction.
This is what the code above says. Maybe the code is simpler than words. Anyhow, we need to treat the case where no message is received for some amount of time.
agent <-- SetTimeoutHandler (closing - DateTime.Now).Milliseconds (fun (isConcluded: bool, maxBid, maxBidder) -> if maxBid >= minBid then let reply = AuctionConcluded(seller, maxBidder) maxBidder <-- reply seller <-- reply else seller <-- AuctionFailed agent <-- SetTimeoutHandler timeToShutdown (fun (_:bool, _:int,_:AsyncAgent) -> StopProcessing) ContinueProcessing (true, maxBid, maxBidder)) agent
We start by waiting for the amount of time to the closing of the auction. If we get no messages, then two things might happen: we have an offer that is more than the minimum or we don’t. If we do, we tell everyone that it’s finished. Otherwise, we tell the seller that its item wasn’t successful. In any case, we prepare the agent to shutdown by setting its next timeout to be timeoutToShutdown.
It is interesting that we set the timeout handler inside the timeout handler. This is not a problem because of the nature of message processing (aka it processes one message at the time).
We then need a bunch of of symbols …
module Auction = let random = new Random() let minBid = 100 let closing = DateTime.Now.AddMilliseconds 10000. let seller = spawnWorker (fun (msg:AuctionReply) -> ()) let auction = auctionAgent seller minBid closing
Not a very smart seller we have here … Next up is our definition of a client.
let rec c = spawnAgent ( fun msg (max, current) -> let processBid (aMax, aCurrent) = if aMax >= top then log "too high for me" (aMax, aCurrent) elif aCurrent < aMax then let aCurrent = aMax + increment Thread.Sleep (1 + random.Next 1000) auction <-- Offer(aCurrent, c) (aMax, aCurrent) else (aMax, aCurrent) match msg with | StartBidding -> auction <-- Inquire c (max, current) | Status(maxBid,_) -> log <| sprintf "status(%d)" maxBid let s = processBid (maxBid, current) c <-- SetTimeoutHandler timeToShutdown (fun _ -> StopProcessing) s | BestOffer -> log <| sprintf "bestOffer(%d)" current processBid(max, current) | BeatenOffer maxBid -> log <| sprintf "beatenOffer(%d)" maxBid processBid(maxBid, current) | AuctionConcluded(seller, maxBidder) -> log "auctionConcluded" c <-- Stop (max, current) | AuctionOver -> log "auctionOver" c <-- Stop (max, current)) (0,0) c
Something that I like about agents is the fact that you need to understand just small snippets of code at the time. For example, you can read the processing for BestOffer and figure out if it makes sense. I have an easy time personalizing them as in : “Ok, the guy just got a notification that there has been a better offer, what is he going to do next?”.
The code should be self explanatory for the most part. In essence, if you can offer more, do it otherwise wait for the auction to end. I’m not even sure the processing is completely right. I confess I’m just trying to do the same as Matthews code from the link above.
We can then start up the whole thing and enjoy the cool output.
open Auction (client 1 20 200) <-- StartBidding (client 2 10 300) <-- StartBidding (client 3 30 150) <-- StartBidding Console.ReadLine() |> ignore
Now for the nasty part. Implementing the framework.