Learning F#: Case study with branch and bound – PART II
Part IV: Parallelized Version
Like testing, parallel computing is a large subject. We will only cover some basic topics and implementations here, but it will be sufficient for developing a solid parallelized version of our BandB
code. There are several mechanisms for implementing parallel processing in F# and .NET, including:
This dataflow model promotes actor-based programming by providing in-process message passing for coarse-grained dataflow and pipelining tasks.
Some definitions of concurrency can also be found. You can also download many examples here as well.
Our idea is to parallelize the evaluation of the search tree nodes, that is let’s evaluate each search node in the queue in parallel. The hitch is that the best value and solution is data that must be shared across all the parallel instances, that is each of these tasks is not data-independent. Without getting too fussy about the terminology let’s call each parallel instance a task. That is a task is a bit of computation that may happen in parallel (at the same time) as other tasks.
We can’t simply read/write to these variables from each task instance without race conditions or data hazards. Without some form of coordination, we might face a write-after-write (WAW) hazard: picture that two tasks each read the current bestUtility
and each have a better solution. Let’s say Task 1’s bestUtility is better than Task 2’s, but Task 2 writes its value after Task 1 does. This would leave bestUtility as not the best value found as Task 1’s better find was overwritten.
Furthermore, since the data update includes the bestUtility and the best solution, bestNode
, there is also a chance that that one task may update one of these while another task is updating the other, leaving the data in an invalid state. We must make the update to all related data as a single action. That is, the update to bestValue, bestNode and anything else related must be completed as a single atomic operation or inconsistencies will arise. Generally, when implementing parallel code, its important to keep in mind that each parallel task instance can be at any step in its code. The operating system may suspend and re-start each parallel task at any time, including while it is in the midst of writing a single F# level data element (e.g. List.sort
or a let
or <-
assignment might be suspended in the middle of its execution, a separate task that might read this data can’t assume that the whole sort or assignment happens as an atomic operation). You can only be sure that a particular task (possibly running in a separate thread
) is executed in order, but cannot make any assumption about when in its code the task might be suspended or when a different task may access shared data.
With these warnings in mind, coming back to the selection of the parallel implementation, we note that the first three, namely Array.Parallel.map
, Parallel.For
, are oriented to processing a large batch of given data. We need to dynamically create parallel tasks as we explore the search tree, so this doesn’t cleanly fit what we need to do so rule them out (actually these are coercible for use, but I will comment on this later). The dataflow model does not really fit the computations we need to do either, so let’s look at the Task
or async
based models.
Task-based parallel implementation
We need to decide then between async and the TPL, which is a library for .NET. This reference and this article have nice, introductory descriptions of F# task parallel coding using the async
feature of F#. The asynchronous workflow model is nice as it is language integrated and easy to use. Indeed, this article has discussion regarding async
versus the TPL, to quote:
The choice between the two possible implementations depends on many factors. Asynchronous workflows were designed specifically for F#, so they more naturally fit with the language. They offer better performance for I/O bound tasks and provide more convenient exception handling. Moreover, the sequential syntax is quite convenient. On the other hand, tasks are optimized for CPU bound calculations and make it easier to access the result of calculation from other places of the application without explicit caching.
My processor is a 4-core intel i7 which exploits hyperthreading to give the OS an 8-core image.
But unless you are using Async.Parallel
(or ASP.NET), async tasks execute in a ‘shared manner’ on one thread, that is without actual parallelism. The model allows us to develop code as though there is parallelism but it is mimiced using the , only the illusion of execution in parallel. The async model works well in many circumstances and saves the overhead of initiating new threads, but we want to actually run our code on multiple cores (see this for a diagram of async operation and this for more description). We could explore using Async.Parallel
but instead its not difficult to use the TPL, this will execute each of our tasks in a processor thread
OK, so we have decided on the TPL, but how are we going to solve the data hazard/race condition problem? Let’s start by using the mailboxprocessor which acts as an ‘actor’ or ‘agent’ capable of receiving and sending messages:
The agent encapsulates a message queue that supports multiple-writers and a single reader agent.
and from here (note the async execution model can also be viewed as using a simulated thread):
The "thread" of control is actually a lightweight simulated thread implemented via asynchronous reactions to messages.
Although not explicitly stated, messages sent from a particular thread are queued in order to the mailbox processor, but messages sent from different threads have no particular ordering. Our idea is that the agent will serialize requests to set and read the BestUtility
and BestNode
values that all the threads need to access. This will eliminate data hazards associated with these datum.
First parallel implementation
In the following code AgentMessage
is a type that defines what messages are recieved or sent by the mailbox processor contained in the class MyMailBoxAgent
. Finally, we see the implementation ParBandB
which is our parallel version of the BandB
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16: 17: 18: 19: 20: 21: 22: 23: 24: 25: 26: 27: 28: 29: 30: 31: 32: 33: 34: 35: 36: 37: 38: 39: 40: 41: 42: 43: 44: 45: 46: 47: 48: 49: 50: 51: 52: 53: 54: 55: 56: 57: 58: 59: 60: 61: 62: 63: 64: 65: 66: 67: 68: 69: 70: 71: 72: 73: 74: 75: 76: 77: 78: 79: 80: 81: 82: 83: 84: 85: 86: 87: 88: 89: 90: 91: 92: 93: 94: 95: 96: 97: 98: 99: 100: 101: 102: 103: 104: 105: 106: |
type AgentMessage<'a, 'b when 'b:comparison> = | Quit | SaveValue of 'a * 'b | GetValue of AsyncReplyChannel<'a * 'b> | IncrTaskCount | DecrTaskCount | GetTaskCount of AsyncReplyChannel<int> type MyMailBoxAgent<'a, 'b when 'b:comparison> (compareFunc: 'b -> 'b -> bool, initSearchNode: 'a, initUtility: 'b ) = // enclosed mailboxprocessor let Mailbox = MailboxProcessor<AgentMessage<'a,'b>>.Start(fun inbox -> // upon construction, these are the best we got. let BestUtility = ref initUtility let BestNode = ref initSearchNode let TaskCount = ref 0 let rec loop () = async { let! message = inbox.Receive() match message with | Quit -> return () // out of the loop and we are done. | IncrTaskCount -> incr TaskCount | DecrTaskCount -> decr TaskCount | GetTaskCount(replyChannel) -> replyChannel.Reply(!TaskCount) | SaveValue(node,util) -> if compareFunc util !BestUtility then BestUtility := util BestNode := node //printfn "Message: Save BestUtility = %A" !BestUtility | GetValue(replyChannel) -> replyChannel.Reply(!BestNode,!BestUtility) return! loop() // keep looping } loop() ) // methods for use member this.Save(node,util) = Mailbox.Post (SaveValue (node,util)) // post the message to the mailbox, asynchronously member this.Get() = Mailbox.PostAndReply(fun replyChannel -> GetValue replyChannel) // get he values from the mailbox, synchronously member this.IncrTaskCount () = Mailbox.Post IncrTaskCount member this.DecrTaskCount () = Mailbox.Post DecrTaskCount member this.TasksComplete () = let activeTasks = Mailbox.PostAndReply(fun replyChannel -> GetTaskCount replyChannel) activeTasks = 0 member this.Stop () = Mailbox.Post Quit (* Parallel BandB *) let ParBandB maximize f g branch unFinished initSearchNode initValue = // these are only informational, not needed in the routine let numberOfQueues = ref 1 // we just queued the start, so 1 let numberOfEvals = ref 0 // number of times we evaluated the function with all vars set let compareFunc x y = if maximize then x > y else x < y (* Create my actor with initValue and unknown state *) let agent = new MyMailBoxAgent<_,_> (compareFunc, initSearchNode, initValue) agent.Save( initSearchNode,initValue ) // save initial values let rec bandbProcess searchNode = //printfn "bandbProcess node = %A" searchNode if unFinished searchNode then let util = g searchNode let bestUtil = snd (agent.Get()) // second tuple member is bestUtil if compareFunc util bestUtil then do // ok to branch let branches:'a list = branch searchNode numberOfQueues := !numberOfQueues + branches.Length for branch in branches do agent.IncrTaskCount () Task.Run( fun () -> bandbProcess branch; () ) |> ignore else // we can evaluate let util = f searchNode agent.Save( searchNode,util ) incr numberOfEvals agent.DecrTaskCount () // return is unit // start with the initSearchNode agent.IncrTaskCount () Task.Run( fun () -> bandbProcess initSearchNode ) |> ignore //Task.Factory.StartNew<_>( fun () -> bandbProcess initSearchNode ) |> ignore while not (agent.TasksComplete()) do () // wait agent.Stop() let (bestNode,bestUtil) = agent.Get() (bestUtil,bestNode,!numberOfQueues,!numberOfEvals) let ParBandBmaximize f g branch unFinished node = ParBandB true f g branch unFinished node let ParBandBminimize f g branch unFinished node = ParBandB false f g branch unFinished node |
Here we have an F# class
, namely MyMailBoxAgent
. The let statements in a F# class define essentially private data, while the member
variables and functions are public in scope.
F# cannot sufficiently deduce types used, so we need some type decorations on the methods, arguments and templates. Without the various type declarations, you will get various error FS0001: Type mismatch against the various ,
functions in the main. To explain, on lines 9-12 we see the defintion of the MyMailBoxAgent
class. It is templated against types 'a
and 'b
where 'b
must allow comparison ('b
is the type for bestUtility). The actual types for the generics 'a
and 'b
will vary for each problem type and be the same as in the table on Part II, page 6
class contains the mailbox processor which accepts messages of a discriminated union with one of six message types. For example, the SaveValue
message has a tuple of types 'a
and 'b
— for example for MaxSat, 'a
will be MaxSatDiscreateVar array
and 'b
will be int
. This is used by each task to send a value of utility and the settings (e.g. search node) to the mailbox for storage and later retrieval. The retrieval occurs with the GetValue
message, which returns the current values for these. We see that the function for the mailbox processor loops infinitely, unless a Quit
message comes, reading each message and processing it. In the case of SaveValue
it uses the compareFunc
to determine if the sent value is better than what was previously stored, and overwrites the values if it is better. By virtue of the operation of the mailbox processor, all incoming messages from all tasks are processed sequentially so as long as any of them find the optimal solution we are guaranteed to have the correct solution at the end.
When a GetValue
comes it returns the current best solution settings. There may well be a SaveValue
message later in the incoming queue with a better setting, but we don’t know about it yet. There is a scan
feature for the mailbox processor where we could look ahead in the incoming queue but it seems like extra work that may not pay off. Remember, our BestUtility
is monotonically improving, either increasing or decreasing depending if BandB is minimizing or maximizing. If a search node reads a value that is not the best, it might cause extra queues and evaluations (e.g. the value might be better cutting off more work) but it does not invalidate the algorithm. Indeed, we used the idea of setting
to a constant value that is better than any real solution when we did our exhaustive testing in Part III.
Notice that there is no queue in ParBandB
. If the current search node is not completely set and if the solution estimate for the node is better than the current value — which is retrieved from the mailbox processor synchronously then this search node is branched and a task is scheduled for each, applying the bandbProcess
function. Note that the exploration is kicked off by starting a task with the initState, line 90.
There is another important element missing, namely exception testing to catch if one of the tasks fails before reaching the end, but this topic is left for the next page
So how do we know when we are done? In the other non-parallel code we knew we were done when the search node queue was empty. Well, here we know we are done when there are no tasks running. Notice that a task execution sort of takes the place of a search node to be evaluated on the queue in the non-parallel version. The way we detect when we are done is to have each task that runs, namely the bandbProcess
, increment a counter and decrement it when complete. This counter is kept in the mailbox processor, line 18, and accessed via messaging to the mailbox — lines 47,48 are stup to send messages to the mailbox which then uses either F# incr
or decr
operators to increment or decrement this counter.
One important observation: I used the agent.DecrTaskCount
at the end of bandbProcess
to decrement this counter (line 85), why did I not include the agent.IncrTaskCount ()
at the beginning of this routine, instead placing this separately to kick off the process (line 89) and for each one in the body (line 78)? Well, suppose I did … then line 90 would start off the computation, asking that another thread start which would at some time send the agent.IncrTaskCount ()
message. However, before this may have happened (very important to remember there is no relative schedule that can be assumed among threads) we might have reached line 93 and gotten a zero back thinking we were done before we even started! By placing the increment call in the same thread as the task kick-off we know that these messages are sent in the order we want.
I also added some tests. These just check that the solution found by the parallel version is the same as the serial version.
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16: 17: 18: 19: 20: 21: 22: 23: 24: 25: 26: 27: 28: 29: 30: 31: 32: 33: 34: 35: 36: 37: 38: 39: 40: 41: 42: 43: 44: 45: 46: 47: 48: 49: 50: 51: 52: 53: 54: 55: 56: 57: 58: 59: 60: 61: 62: 63: 64: 65: 66: 67: 68: 69: 70: 71: 72: 73: 74: 75: 76: 77: |
(* Lets add some tests for ParBandB *) [<Fact>] let ``ParBandB = BandB - knapsack``() = (* KNAPSACK EXAMPLE 1 *) let vars1 = [|{Name = "food"; Setting = ZeroOneVarSetting.Unset; Weight = 5.0; Utility = 8.0}; {Name = "tent"; Setting = ZeroOneVarSetting.Unset; Weight = 14.5; Utility = 5.0}; {Name = "gps"; Setting = ZeroOneVarSetting.Unset; Weight = 1.0; Utility = 3.0}; {Name = "map"; Setting = ZeroOneVarSetting.Unset; Weight = 0.5; Utility = 3.0} |] let weightLimit1 = 16.0 let vars1Sorted = (Array.sortBy (fun elem -> -elem.Utility / elem.Weight) vars1) // still have this let gknap1 = knapestimator weightLimit1 let knapAnyUnset = Array.exists (fun (elem:KnapDiscreteVar) -> ZeroOneVarSetting.isUnset elem.Setting) let branchKnap1 = branchknap weightLimit1 (* Invoke *) let (PsolutionUtility, PsolutionVars, Pnumqueues, Pnumevals) = ParBandBmaximize knapmultiplySumUtility gknap1 branchKnap1 knapAnyUnset vars1Sorted -1.0 let (solutionUtility, solutionVars, numqueues, numevals) = BandBmaximize knapmultiplySumUtility gknap1 branchKnap1 knapAnyUnset vars1Sorted -1.0 PsolutionUtility |> should equal solutionUtility [<Fact>] let ``ParBandB = BandB - MAXSAT`` () = let vars2 = [| {MaxSatDiscreteVar.Name = "v1"; Setting = Unset}; // need to give a hint for the record type {MaxSatDiscreteVar.Name = "v2"; Setting = Unset}; {MaxSatDiscreteVar.Name = "v3"; Setting = Unset}; {MaxSatDiscreteVar.Name = "v4"; Setting = Unset}; {MaxSatDiscreteVar.Name = "v5"; Setting = Unset}; {MaxSatDiscreteVar.Name = "v6"; Setting = Unset} |] let clauses2 = [| [|-1; 2; -4|]; // positive = var at that index; negative means not var at index [|-1; 3; 4|]; [|-2; 3; 4|]; [|-2; -4; -5|]; [| 2; 3; 6|]; [| 3; 5; -6|]; [|-4; -5; 6|]; [|-2; 5; -6|]; [| 3; -5; 6|]; |] let f2 = clausefulleval clauses2 // partial func application let g2 = clauseNumFalseFunc clauses2 let maxsatAnyUnset = Array.exists (fun (elem:MaxSatDiscreteVar) -> elem.Setting = ZeroOneVarSetting.Unset) let (PbestValue2, Psolution2, Pnumqueues2, Pnumevals2) = ParBandBmaximize f2 g2 maxsat_branch maxsatAnyUnset vars2 -1 let (bestValue2, solution2, numqueues2, numevals2) = BandBmaximize f2 g2 maxsat_branch maxsatAnyUnset vars2 -1 PbestValue2 |> should equal bestValue2 [<Fact>] let ``ParBandB = BandB - TSP`` () = let matrix4_15_aa = [| [| NoEdge; We could use our property-based testing (FsCheck) approach as in part II page 3, along with custom generators to create instances as well. Since the run times for these tests is a bit long, doing 100 tests would take a while (but the number of tests per property can be readily customized). I will leave the testing here with unit tests for the sake of space, but some property-based tests would be a good idea.
All of these tests pass, I will show that on the next page.
So what are our results? I changed the main code to the following to add timing tests for the serial and parallel branch and bound functions, it has a simple function to time another function, timeF
and the code for setting up and running a knapsack, MaxSat and TSP instance using both our old code and the new ParBandB
method. I snipped out the timing for the sorting that I already showed in Part III
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16: 17: 18: 19: 20: 21: 22: 23: 24: 25: 26: 27: 28: 29: 30: 31: 32: 33: 34: 35: 36: 37: 38: 39: 40: 41: 42: 43: 44: 45: 46: 47: 48: 49: 50: 51: 52: 53: 54: 55: 56: 57: 58: 59: 60: 61: 62: 63: 64: 65: 66: 67: 68: 69: 70: 71: 72: 73: 74: 75: 76: 77: 78: 79: 80: 81: 82: 83: 84: 85: 86: 87: 88: 89: 90: 91: 92: 93: 94: 95: 96: 97: |
let mutable bestKSserial = System.Double.PositiveInfinity let mutable bestKSparallel = System.Double.PositiveInfinity let mutable maxKSparallel = 0.0 let mutable bestMSserial = System.Double.PositiveInfinity let mutable bestMSparallel = System.Double.PositiveInfinity let mutable maxMSparallel = 0.0 let mutable bestTSPserial = System.Double.PositiveInfinity let mutable bestTSPparallel = System.Double.PositiveInfinity let mutable maxTSPparallel = 0.0 for _ in 1..10 do (* *** SNIPPED OUT TEST SAME AS PART III *) (* ParBandB tests *) (* KNAPSACK EXAMPLE 1 *) let vars1 = [|{Name = "food"; Setting = ZeroOneVarSetting.Unset; Weight = 5.0; Utility = 8.0}; {Name = "tent"; Setting = ZeroOneVarSetting.Unset; Weight = 14.5; Utility = 5.0}; {Name = "gps"; Setting = ZeroOneVarSetting.Unset; Weight = 1.0; Utility = 3.0}; {Name = "map"; Setting = ZeroOneVarSetting.Unset; Weight = 0.5; Utility = 3.0}; {Name = "blanket"; Setting = ZeroOneVarSetting.Unset; Weight = 0.5; Utility = 5.2}; {Name = "kit"; Setting = ZeroOneVarSetting.Unset; Weight = 0.7; Utility = 7.6}; {Name = "matches"; Setting = ZeroOneVarSetting.Unset; Weight = 0.4; Utility = 2.7}; {Name = "pack"; Setting = ZeroOneVarSetting.Unset; Weight = 3.4; Utility = 3.7}; {Name = "compass"; Setting = ZeroOneVarSetting.Unset; Weight = 0.4; Utility = 1.2} |] let weightLimit1 = 19.2 let vars1SortedSer = (Array.sortBy (fun elem -> -elem.Utility / elem.Weight) vars1) // still have this let vars1SortedPar = (Array.sortBy (fun elem -> -elem.Utility / elem.Weight) vars1) // still have this let gknap1 = knapestimator weightLimit1 let knapAnyUnset = Array.exists (fun (elem:KnapDiscreteVar) -> ZeroOneVarSetting.isUnset elem.Setting) let branchKnap1 = branchknap weightLimit1 // Arrays in F# are mutable by default, so be careful that we are initializing these the same way, if we were 'black let tKSPar = timeF (fun () -> ParBandBmaximize knapmultiplySumUtility gknap1 branchKnap1 knapAnyUnset vars1SortedPar -1.0) let tKSSer = timeF (fun () -> BandBmaximize knapmultiplySumUtility gknap1 branchKnap1 knapAnyUnset vars1SortedSer -1.0) bestKSserial <- min tKSSer bestKSserial bestKSparallel <- min tKSPar bestKSparallel maxKSparallel <- max tKSPar maxKSparallel (* MAXSAT times *) let vars2_rand,clauses2_rand = genMaxSat 15 120 let vars2_rand_par,clauses2_rand_par = genMaxSat 15 120 let f2_rand = clausefulleval clauses2_rand // partial func application let g2_rand = clauseNumFalseFunc clauses2_rand let maxsatAnyUnset = Array.exists (fun (elem:MaxSatDiscreteVar) -> elem.Setting = ZeroOneVarSetting.Unset) let tMSPar = timeF (fun () -> ParBandBmaximize f2_rand g2_rand maxsat_branch maxsatAnyUnset vars2_rand_par -1) let tMSSer = timeF (fun () -> BandBmaximize f2_rand g2_rand maxsat_branch maxsatAnyUnset vars2_rand -1) bestMSserial <- min tMSSer bestMSserial bestMSparallel <- min tMSPar bestMSparallel maxMSparallel <- max tMSPar maxMSparallel (* TSP times *) let matrix4_15_aa = [| [| NoEdge; And the best time results for these 10 BandB cases run are:
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: |
Knapsack times: normal BandB time = 0.334 ms parallel time = 3.072 ms MAX: 66.285 msMAXSAT times: normal BandB time = 751.980 ms parallel time = 69.829 ms MAX: 953.585 msTSP times: normal BandB time = 86.473 ms parallel time = 8617.103 ms MAX: 146704.833 ms |
If we have -processors and a parallel version of a code is -times faster than that is considered very good parallel speed up, exactly linear.
Well that is disappointing. The times for the non-parallel BandB
are quite constant with little variation, but I don’t show these as the maximums as I do for the parallel version. The times for the parallel version vary greatly as can be seen. I made our knapsack instance a bit larger than it was, but the parallel version never beats the serial one. But for MaxSat, which is a larger problem, in one case the parallel version ran 10.7 × faster than the serial one. Since I have a 4-core processor this is super-linear speedup. The parallel branch and bound technique is well-known to be subject to super-linear speedup [1], [2]. What happens is this: first note that the serial version always explores the space in the same order and time variations are only due to system effects such as varied caching and garbage collection, interference from outside processes; however the parallel version explores the space depending on how each task is executed by the system which may vary considerably. To get super-linear speedup one of these tasks finds a good solution early on, which then sets BestUtility
to a good value, which then causes more pruning of the search tree.
For MAXSAT, we did beat the serial version at least once, but that great parallel case was indeed an outlier, many of the parallel runs where a bit longer than the serial version. For knapsack and TSP, we never beat the serial version and in some cases took much longer.
I ran the profiler on the code and see that the agent’s Get
(line 73) is the hold up, taking 90% of the time. This might be expected as this message is synchronous since we need to get the best utility value to do our pruning, let’s see other ways of handling this, on page 2
Download code so far
[1] M. Quinn, N. Deo, "An upper bound for the speedup of parallel best-bound branch-and-bound algorithms", BIT Numerical Mathematics, Vol 26, No 1, 1986, pp 35-43.
[2] Wei Zhang, Parallel Multi-Objective Branch and Bound, Technical University of Denmark thesis, 2008