{-# LANGUAGE CPP, ScopedTypeVariables #-} ----------------------------------------------------------------------------- -- | -- Module : Control.Parallel.Eden.Workpool -- Copyright : (c) Philipps Universitaet Marburg 2009-2014 -- License : BSD-style (see the file LICENSE) -- -- Maintainer : eden@mathematik.uni-marburg.de -- Stability : beta -- Portability : not portable -- -- This Haskell module defines workpool skeletons for dynamic task -- distribution for the parallel functional language Eden. -- -- Depends on GHC. Using standard GHC, you will get a threaded simulation of Eden. -- Use the forked GHC-Eden compiler from http:\/\/www.mathematik.uni-marburg.de/~eden -- for a parallel build. -- -- Eden Group ( http:\/\/www.mathematik.uni-marburg.de/~eden ) module Control.Parallel.Eden.Workpool ( -- * Workpool skeletons with a single master -- ** Simple workpool skeletons -- | The workpool skeletons use the non-deterministic merge function to achieve dynamic load balancing. workpool, workpoolSorted, workpoolSortedNonBlock, workpoolReduce, -- ** Simple workpool skeletons - versions using explicit placement -- | The workpool skeletons use the non-deterministic merge function to achieve dynamic load balancing. workpoolAt, workpoolSortedAt, workpoolSortedNonBlockAt, workpoolReduceAt, workpoolAuxAt, -- * Hierarchical workpool skeleton -- |These skeletons can be nested with an arbitrary number of submaster -- levels to unload the top master. wpNested, -- ** Hierarchical workpool skeleton with dynamic task creation -- |The worker function is extended such that dynamic creation of -- new tasks is possible. New tasks are added to the end of the -- task list, thus tasks are traversed breath first (not strictly -- because of the skeletons' nondeterminism). Furthermore, these -- skeletons can be nested with an arbitrary number of submaster -- levels to unload the top master. wpDynNested, wpDNI, -- * Distributed workpool skeletons with state and dynamic task creation distribWPAt, -- * Deprecated skeletons masterWorker, mwNested, mwDynNested, mwDNI, ) where #if defined( __PARALLEL_HASKELL__ ) || defined (NOT_PARALLEL) import Control.Parallel.Eden #else import Control.Parallel.Eden.EdenConcHs #endif import Control.Parallel.Eden.Auxiliary import Control.Parallel.Eden.Map import Control.Parallel.Eden.Topology import Data.List import Data.Maybe(maybeToList,mapMaybe) import System.IO.Unsafe import Control.Monad import Control.Concurrent -- | Simple workpool (result list in non-deterministic order) -- This version takes places for instantiation. -- -- Notice: Result list in non-deterministic order. workpoolAt :: forall t r . (Trans t, Trans r) => Places -> Int -- ^number of child processes (workers) -> Int -- ^prefetch of tasks (for workers) -> (t -> r) -- ^worker function (mapped to tasks) -> [t] -> [r] -- ^what to do workpoolAt pos np prefetch wf tasks = map snd fromWorkers where fromWorkers :: [(Int,r)] fromWorkers = merge (tagWithPids (parMapAt pos worker taskss)) taskss :: [[t]] taskss = distribute np (initialReqs ++ newReqs) tasks initialReqs, newReqs :: [Int] initialReqs = concat (replicate prefetch [0..np-1]) newReqs = map fst fromWorkers worker = map wf -- | Tag each element of the inner list with the id of its inner list. tagWithPids :: [[r]] -- ^untagged input -> [[(Int,r)]] -- ^tagged output tagWithPids rss = [ zip (repeat i) rs |(i,rs) <-zip [0..] rss] -- | Simple workpool (result list in non-deterministic order) -- -- Notice: Result list in non-deterministic order. workpool :: (Trans t, Trans r) => Int -- ^number of child processes (workers) -> Int -- ^prefetch of tasks (for workers) -> (t -> r) -- ^worker function (mapped to tasks) -> [t] -> [r] -- ^what to do workpool = workpoolAt [0] -- | Workpool version with one result stream for each worker and meta -- information about the task distribution. -- This meta-skeleton can be used to define workpool-skeletons which -- can reestablish the result list order. -- -- Notice: Result list in non-deterministic order. workpoolAuxAt :: (Trans t, Trans r) => Places -> Int -- ^number of child processes -- (workers) -> Int -- ^prefetch of tasks (for -- workers) -> (t -> r) -- ^worker function (tasks to -- results mapping) -> [t] -- ^tasks -> ([Int],[[Int]],[[r]]) -- ^(input distribution (input i -- is in sub-list distribs!i), -- task positions (element i of -- result-sub-list j was in the -- input list at (poss!j)!i ), -- result streams of workers) workpoolAuxAt pos np prefetch wf tasks = (reqs,poss,fromWorkers) where fromWorkers = parMapAt pos (map wf) taskss (taskss,poss) = distributeWithPos np reqs tasks -- generate only as many reqs as there are tasks reqs = map snd $ zip tasks $ initialReqs ++ newReqs initialReqs = concat (replicate prefetch [0..np-1]) newReqs = merge $ [ [ i | j<-rs] | (i,rs) <-zip [0..] fromWorkers] -- | Task distribution according to worker requests. Returns additionally to @ distribute @ a nested list of the positions to indicate where the tasks have been located in the original list -- distributeWithPos :: Int -- ^number of workers -> [Int] -- ^ request stream (worker IDs ranging from 0 to n-1) -> [t] -- ^ task list -> ([[t]],[[Int]]) -- ^(task positions in original list, task distribution), each inner list for one worker. distributeWithPos np reqs tasks = unzip [unzip (taskList reqs tasks [0..] n) | n<-[0..np-1]] where taskList (r:rs) (t:ts) (tag:tags) pe | pe == r = (t,tag):(taskList rs ts tags pe) | otherwise = taskList rs ts tags pe taskList _ _ _ _ = [] -- | Sorted workpool (results in the order of the tasks). -- This version takes places for instantiation. workpoolSortedAt :: (Trans t, Trans r) => Places -> Int -- ^number of child processes -- (workers) -> Int -- ^prefetch of tasks (for workers) -> (t -> r) -- ^worker function -> [t] -- ^tasks -> [r] -- ^results workpoolSortedAt pos np prefetch f tasks = res where (_, poss, ress) = workpoolAuxAt pos np prefetch f tasks res = map snd $ mergeByPos ress' ress' = map (uncurry zip) (zip poss ress) -- | Join sorted lists into one sorted list. -- This function uses a balanced binary combination scheme to merge sublists. mergeByPos :: [[(Int,r)]] -- ^ tagged input -> [(Int,r)] -- ^ output sorted by tags mergeByPos [] = [] mergeByPos [wOut] = wOut mergeByPos [w1,w2] = merge2ByTag w1 w2 mergeByPos wOuts = merge2ByTag (mergeHalf wOuts) (mergeHalf (tail wOuts)) where mergeHalf = mergeByPos . (takeEach 2) merge2ByTag [] w2 = w2 merge2ByTag w1 [] = w1 merge2ByTag w1@(r1@(i,_):w1s) w2@(r2@(j,_):w2s) | i < j = r1: merge2ByTag w1s w2 | i > j = r2: merge2ByTag w1 w2s | otherwise = error "found tags i == j" -- | Sorted workpool: Efficient implementation using a the -- distribution lookup list. -- -- Notice: Results in the order of the tasks. workpoolSorted :: (Trans t, Trans r) => Int -- ^number of child processes (workers) -> Int -- ^prefetch of tasks (for workers) -> (t -> r) -- ^worker function (mapped to tasks) -> [t] -- ^tasks -> [r] -- ^results workpoolSorted = workpoolSortedAt [0] -- | Non-blocking sorted workpool. Result list is structurally defined -- up to the position where tasks are distributed, independent of the -- received worker results. This version needs still performance -- testing. -- -- Notice: Results in the order of the tasks. workpoolSortedNonBlockAt :: (Trans t, Trans r) => Places -> Int -- ^number of child processes (workers) -> Int -- ^prefetch of tasks (for workers) -> (t -> r) -- ^worker function (mapped to tasks) -> [t] -> [r] -- ^what to do workpoolSortedNonBlockAt pos np prefetch f tasks = orderBy fromWorkers reqs where (reqs, _ ,fromWorkers) = workpoolAuxAt pos np prefetch f tasks -- | Orders a nested list (sublist are ordered) by a given distribution (non-blocking on result elements of other sub lists) orderBy :: forall idx r . Integral idx => [[r]] -- ^ nested input list (@inputss@) -> [idx] -- ^ distribution (result i is in sublist @inputss!(idxs!i)@) -> [r] -- ^ ordered result list orderBy _ [] = [] orderBy [rs] is = rs orderBy (xs:rss) is = fst $ foldl (f is) (xs,1) rss where f :: [idx] -> ([r],idx) -> [r] -> ([r], idx) f is (xs,a) ys = (m xs ys is, a+1) where m :: [r] -> [r] -> [idx] -> [r] m _ _ [] = [] m xs ys (i:is) | i < a = head xs : m (tail xs) ys is | i==a = head ys : m xs (tail ys) is | otherwise = m xs ys is -- | orders a nested list by a given distribution (alternative code) orderBy' :: Integral idx => [[r]] -- ^ nested input list (@inputss@) -> [idx] -- ^ distribution (result i is in sublist @inputss!(idxs!i)@) -> [r] -- ^ ordered result list orderBy' rss [] = [] orderBy' rss (r:reqs) = let (rss1,(rs2:rss2)) = splitAt (fromIntegral r) rss in (head rs2): orderBy' (rss1 ++ ((tail rs2):rss2)) reqs -- | Non-blocking sorted workpool (results in the order of the -- tasks). Result list is structurally defined up to the position -- where tasks are distributed, independent of the received worker -- results. This version needs still performance testing. This -- version takes places for instantiation. -- -- Notice: Results in the order of the tasks. workpoolSortedNonBlock :: (Trans t, Trans r) => Int -- ^number of child processes (workers) -> Int -- ^prefetch of tasks (for workers) -> (t -> r) -- ^worker function (mapped to tasks) -> [t] -> [r] -- ^what to do workpoolSortedNonBlock = workpoolSortedNonBlockAt [0] -- | Simple workpool with additional reduce function for worker outputs. -- This version takes places for instantiation. -- -- Notice: Result list in non-deterministic order. workpoolReduceAt :: forall t r r' . (Trans t, Trans r, Trans r') => Places -> Int -- ^number of child processes (workers) -> Int -- ^prefetch of tasks (for workers) -> (r' -> r -> r) -- ^reduce function -> r -- ^neutral for reduce function -> (t -> r') -- ^worker function (mapped to tasks) -> [t] -- ^tasks -> [r] -- ^results (one from each worker) workpoolReduceAt pos np prefetch rf e wf tasks = map snd fromWorkers where fromWorkers :: [([Int],r)] fromWorkers = spawnFAt pos (map worker [0..np-1]) taskss taskss = distribute np (initialReqs ++ newReqs) tasks initialReqs = concat (replicate prefetch [0..np-1]) newReqs = merge (map fst fromWorkers) worker i ts = (map (\r -> rnf r `seq` i) rs, foldr rf e rs) where rs = map wf ts -- | Simple workpool with additional reduce function for worker outputs. -- This version takes places for instantiation. -- -- Notice: Result list in non-deterministic order. workpoolReduce :: forall t r r' . (Trans t, Trans r, Trans r') => Int -- ^number of child processes (workers) -> Int -- ^prefetch of tasks (for workers) -> (r' -> r -> r) -- ^reduce function -> r -- ^neutral for reduce function -> (t -> r') -- ^worker function (mapped to tasks) -> [t] -- ^tasks -> [r] -- ^results (one from each worker) workpoolReduce = workpoolReduceAt [0] -- |Hierachical WP-Skeleton. The worker -- function is mapped to the worker input stream (list type). A worker -- produces a result. The workers are located on the leaves of a -- WP-hierarchy, in the intermediate levels are submasters which unload -- the master by streaming 'result' streams of their child -- processes into a single result stream. -- -- Notice: Result list in non-deterministic order. wpNested :: forall t r . (Trans t, Trans r) => [Int] -- ^branching degrees: the i-th -- element defines the branching -- degree of for the i-th level of -- the WP-hierarchy. Use a singleton -- list for a flat MW-Skeleton. -> [Int] -- ^Prefetches for the -- sub-master/worker levels -> (t -> r) -- ^worker function -> [t] -- ^initial tasks -> [r] -- ^results wpNested ns pfs wf initTasks = wpDynNested ns pfs (\t -> (wf t,[])) initTasks -- |Simple interface for 'wpDynNested'. Parameters are the number of child processes, the first -- level branching degree, the nesting depth (use 1 for a -- single master), and the task prefetch amount for the worker level. -- All processes that are not needed for the submasters are -- used for the workers. If the number of submasters in the last level -- and the number of remaining child processes are prime to each -- other, then the next larger divisor is chosen for the number of -- workers. -- -- Notice: Result list in non-deterministic order. wpDNI :: (Trans t, Trans r) => Int -- ^number of processes (submasters and workers) -> Int -- ^nesting depth -> Int -- ^branching degree of the first submaster -- level (further submaster levels are -- branched binary) -> Int -- ^task prefetch for the workers -> (t -> (r,[t])) -- ^worker function - produces a tuple of -- result and tasks for the processed task -> [t] -- ^initial tasks -> [r] -- ^results wpDNI np levels l_1 pf f tasks = let nesting = mkNesting np levels l_1 in wpDynNested nesting (mkPFs pf nesting) f tasks -- |Hierachical WP-Skeleton with dynamic task creation. The worker -- function is mapped to the worker input stream (list type). A worker -- produces a tuple of result and dynamicly created tasks for each -- processed task. The workers are located on the leaves of a -- WP-hierarchy, in the intermediate levels are submasters which unload -- the master by streamlining 'result/newtask' streams of their child -- processes into a single result/newtask stream. Furthermore, the -- submasters retain locally half of the tasks which are -- dynamically created by the workers in their subtree. -- -- Notice: Result list in non-deterministic order. wpDynNested :: forall t r . (Trans t, Trans r) => [Int] -- ^branching degrees: the i-th -- element defines the branching -- degree of for the i-th level of -- the MW-hierarchy. Use a singleton -- list for a flat MW-Skeleton. -> [Int] -- ^Prefetches for the -- sub-master/worker levels -> (t -> (r,[t])) -- ^worker function - produces a -- tuple of result and new tasks for -- the processed task -> [t] -- ^initial tasks -> [r] -- ^results wpDynNested ns pfs wf initTasks = topMasterStride strideTM (head ns) (head pfs) subWF initTasks where subWF = foldr fld wf' (zip3 stds (tail ns) (tail pfs)) wf' :: [Maybe t] -> [(r,[t],Int)] -- explicit Nothing-Request after each result wf' xs = map(\ (r,ts) -> (r,ts,0)) (map (wf)(inp xs)) inp ((Just x):rest) = x:inp rest inp (Nothing:_) = [] -- STOP!!! -- placement: (strideTM:stds) = scanr (\x y -> ((y*x)+1)) 1 (tail ns) fld :: (Int, Int, Int) -> ([Maybe t] -> [(r,[t],Int)]) -> [Maybe t] -> [(r,[t],Int)] fld (stride,n,pf) wf = mwDynSubStride stride n pf wf --------------------------mwDynNested auxiliary--------------------------------- topMasterStride :: forall t r . (Trans t, Trans r) => Int -> Int -> Int -> ([Maybe t] -> [(r,[t],Int)]) -> [t] -> [r] topMasterStride stride branch prefetch wf initTasks = finalResults where -- identical to static task pool except for the type of workers ress = merge (spawnAt places workers inputs) places = nextPes branch stride workers :: [Process [Maybe t] [(Int,(r,[t],Int))]] workers = [process (zip [i,i..] . wf) | i <- [0..branch-1]] inputs = distribute branch (initReqs ++ reqs) tasks initReqs = concat (replicate prefetch [0..branch-1]) -- additions for task queue management and -- termination detection tasks = (map Just initTasks) ++ newTasks ----------------------------- initNumTasks = length initTasks -- => might lead to deadlock! ----------------------------- (finalResults, reqs, newTasks) = tdetectTop ress initNumTasks mwDynSubStride :: forall t r . (Trans t, Trans r) => Int -> Int -> Int -> ([Maybe t] -> [(r,[t],Int)]) -> [Maybe t] -> [(r,[t],Int)] mwDynSubStride stride branch prefetch wf initTasks = finalResults where fromWorkers = map flatten (spawnAt places workers inputs) places = nextPes branch stride workers :: [Process [Maybe t] [(Int,(r,[t],Int))]] workers = [process (zip [i,i..] . wf) | i <- [0..branch-1]] inputs = distribute branch (initReqs ++ reqs) tasks initReqs = concat (replicate prefetch [0..branch-1]) -- task queue management controlInput = merge (map Right initTasks: map (map Left) fromWorkers) (finalResults, tasks, reqs) = tcontrol controlInput 0 0 (branch * (prefetch+1)) False -- task queue control for termination detection tdetectTop :: [(Int,(r,[t],Int))] -> Int -> ([r], [Int], [Maybe t]) tdetectTop ((req,(r,ts,subHoldsTs)):ress) numTs | numTs == 1 && null ts && subHoldsTs == 0 = ([r], [], repeat Nothing) -- final result | subHoldsTs == 1 = (r:moreRes, moreReqs, (map Just ts) ++ moreTs) | otherwise = (r:moreRes, req:moreReqs, (map Just ts) ++ moreTs) where --localNumTaks is 0 or 1, if it's 1 -> no Request -- -> numTs will not be decreased (moreRes, moreReqs, moreTs) = tdetectTop ress (numTs-1+length ts+subHoldsTs) tcontrol :: [Either (Int,r,[t],Int) (Maybe t)] -> -- controlInput Int -> Int -> -- task / hold counter Int -> Bool -> -- prefetch, split mode ([(r,[t],Int)],[Maybe t],[Int]) -- (results,tasks,requests) tcontrol ((Right Nothing):_) 0 _ _ _ = ([],repeat Nothing,[]) -- Final termination tcontrol ((Right (Just t)):ress) numTs hldTs pf even -- task from above = let (moreRes, moreTs, reqs) = tcontrol ress (numTs+1) hldTs pf even in (moreRes, (Just t):moreTs, reqs) tcontrol ((Left (i,r,ts,subHoldsTs)):ress) numTs hldTs pf even = let (moreRes, moreTs, reqs) = tcontrol ress (numTs + differ) (hldTs') pf evenAct differ = length localTs + subHoldsTs - 1 hldTs' = max (hldTs + differ) 0 holdInf | (hldTs+differ+1 > 0) = 1 | otherwise = 0 (localTs,fatherTs,evenAct) = split numTs pf ts even -- part of tasks to parent newreqs | (subHoldsTs == 0) = i:reqs | otherwise = reqs -- no tasks kept below? in ((r,fatherTs,holdInf):moreRes, (map Just localTs) ++ moreTs, newreqs) -- error case, not shown in paper tcontrol ((Right Nothing):_) n _ _ _ = error "Received Stop signal, although not finished!" flatten [] = [] flatten ((i,(r,ts,n)):ps) = (i,r,ts,n) : flatten ps split :: Int -> Int -> [t] -> Bool ->([t],[t],Bool) split num pf ts even-- = splitAt (2*pf - num) ts | num >= 2*pf = ([],ts,False) --no tasks or odd -> keep first | ((not even)||(num == 1)) = oddEven ts | otherwise = evenOdd ts -- | num < pf `div` 2 = (ts,[]) oddEven :: [t] -> ([t],[t],Bool) oddEven [] = ([],[],False) oddEven (x:xs) = (x:localT ,fatherT, even) where (localT,fatherT,even) = evenOdd xs evenOdd :: [t] -> ([t],[t],Bool) evenOdd [] = ([],[],True) evenOdd (x:xs) = (localT, x:fatherT, even) where (localT,fatherT,even) = oddEven xs nextPes :: Int -> Int -> [Int] nextPes n stride | start > noPe = replicate n noPe | otherwise = concat (replicate n ps) where ps = cycle (takeWhile (<= noPe) [start,start+stride..]) start = selfPe + 1 mkNesting :: Int -> Int -> Int -> [Int] mkNesting np 1 _ = [np] mkNesting np depth level1 = level1:(replicate (depth - 2) 2) ++ [numWs] where -- leaves = np - #submasters leaves = np - level1 * ( 2^(depth-1) - 1 ) -- num of lowest submasters numSubMs = level1 * 2^(depth - 2) -- workers per branch (rounded up) numWs = (leaves + numSubMs - 1) `div` numSubMs mkPFs :: Int -> -- prefetch value for worker processes [Int] -> -- branching per level top-down [Int] -- list of prefetches mkPFs pf nesting = [ factor * (pf+1) | factor <- scanr1 (*) (tail nesting) ] ++ [pf] ---------------------------distributed workpool skeletons------------------------------------------- -- | A distributed workpool skeleton that uses task generation and a global state (s) with a total order. -- Split and Detatch policy must give tasks away (may not produce empty lists), unless all tasks are pruned! distribWPAt :: (Trans onT, Trans t, Trans r, Trans s, NFData r') => Places -- ^ where to put workers -> ((t,s) -> (Maybe (r',s),[t])) -- ^ worker function -> (Maybe ofT -> Maybe onT -> [t]) -- ^ input transformation (e.g. (fetch . fromJust . snd) for online input of type [RD[t]]) -> ([Maybe (r',s)] -> s -> r) -- ^ result transformation (prior to release results in the workers) -> ([t]->[t]->s->[t]) -- ^ taskpool transform attach function -> ([t]->s->([t],Maybe (t,s))) -- ^ taskpool transform detach function (local request) -> ([t]->s->([t],[t])) -- ^ taskpool transform split function (remote request) -> (s->s->Bool) -- ^ state comparison (checks if new state is better than old state) -> s -- ^ initial state (offline input) -> [ofT] -- ^ offline input (if non empty, outer list defines the number of workers, else the shorter list does) -> [onT] -- ^ dynamic input (if non empty, outer list defines the number of workers, else the shorter list does) -> [r] -- ^ results of workers distribWPAt places wf inputT resT ttA ttD ttSplit sUpdate st ofTasks onTasks = res where res = ringFlAt places id id workers onTasks' workers = zipWith (worker) (True:(repeat False)) ofTasks' -- length of ontasks and oftasks is adjusted such that the desired number of workers will be instantiated (ofTasks',onTasks') | null ofTasks = (repeat Nothing , map Just onTasks) | null onTasks = (map Just ofTasks, repeat Nothing ) | otherwise = (map Just ofTasks, map Just onTasks) -- worker functionality worker isFirst ofTasks onTasks ringIn = (resT ress sFinal,init ringOut) where ts = (inputT ofTasks onTasks) (ress,ntssNtoMe) = unzip $ genResNReqs $ map wf ts' --seperate reqList = Me : mergeS [ringIn, --merge external Requests ntssNtoMe] rnf --and nf reduced local Requests -- manage Taskpool & Requests (ts',ringOut) = control ttA ttSplit ttD sUpdate reqList ts st isFirst sFinal = getState $ last ringOut control:: (Trans t, Trans s) => ([t]->[t]->s->[t]) -> --Taskpool Transform Attach ([t]->s->([t],[t])) -> --Split Policy (remote Req) ([t]->s->([t],Maybe (t,s))) -> --tt Detach (local Req) (s->s->Bool)-> --Checks if newState is better than oldState [Req [t] s]->[t]->s->Bool-> --reqs,tasks,state,isFirstInRing ([(t,s)],[Req [t] s]) --localTasks,RequestsToRing control ttA ttSplit ttD sUpdate requests initTasks st isFirst = distribWork requests initTasks Nothing st (0,0) where --until no tasks left: react on own and Others requests & add new Tasks --distribWork :: Trans t => [Req [t] s] -> [t] -> $ -- Maybe (ChanName (Req [t] s))->(Int,Int)-> ([t],[ReqS [t] s],s) distribWork (TasksNMe nts:rs) tS replyCh st sNr --selfmade Tasks arrive = distribWork (Me:rs) (ttA tS nts st) replyCh st sNr --add Tasks and MeReq distribWork (NewState st':rs) tS replyCh st sNr --Updated State arrives | sUpdate st' st = let (tS',wReqs') =distribWork rs tS replyCh st' sNr in (tS',(NewState st':wReqs')) --then check and send | otherwise = distribWork rs tS replyCh st sNr --or discard distribWork (req@(Others tag tCh):rs) [] replyCh st sNr --Others Request & | tag==None = (tS',req:wReqs') --no tasks left --> pass Request | otherwise = (tS', Others Black tCh : wReqs') --I'm still working -> Black where(tS',wReqs') = distribWork rs [] replyCh st sNr distribWork (Me:rs) [] replyCh st sNr = --last own Task solved and no new ones new (\reqChan (newTS,replyChan) -> --gen new reqChan to get newTS & replyC let (tS',wReqs) = passWhileReceive (mergeS [rs, --wait for fst newTask newTS `pseq` [TasksNMe newTS]] r0) replyChan st sNr --to merge tag | isFirst = Black --First Worker starts Black (For Termination) | otherwise = None --normal Workers with None Tag in(case replyCh of --First own Request into Ring- others dynamic Nothing -> (tS', Others tag reqChan : wReqs) Just replyCh' -> parfill replyCh' (Others tag reqChan) (tS',wReqs))) distribWork (Me:rs) tS replyCh st sNr --local Request and Tasks present = let (tS',tDetatch) = ttD tS st --TaskpoolTransform Detatch (tsDetatch,wReqs) = case tDetatch of Nothing -> distribWork (Me:rs) [] replyCh st sNr Just t -> distribWork rs tS' replyCh st sNr in ((maybeToList tDetatch)++tsDetatch,wReqs) --add Maybe one Task to wf distribWork reqs@(Others _ tCh :rs) tS replyCh st (s,r) --foreign Req & have Ts = let (holdTs,sendTs) = ttSplit tS st --split ts to send and ts to hold ((tS',wReqs'),replyReq) = new (\replyChan replyReq' -> --gen ReplyC and send Tasks & new parfill tCh (sendTs,Just replyChan) --ReplyC in Chan of the Req ((distribWork rs holdTs replyCh st (s+1,r)),replyReq')) in case sendTs of --ReplyReqToOutp [] -> distribWork reqs [] replyCh st (s,r) --No tasks left (_:_) -> (tS',mergeS [[replyReq],wReqs'] rnf) -- Pass all until foreign Tasks arrive or Termination starts -- passWhileRecive :: Trans t => [ReqS [t] s] -> Maybe(ChanName (ReqS [t] s)) -- -> (Int,Int) -> ([t],[ReqS [t] s]) passWhileReceive (NewState st':rs) replyCh st sNr --Updated State arrives | sUpdate st' st =let (tS',wReqs')=passWhileReceive rs replyCh st' sNr in (tS',(NewState st':wReqs')) --then check and send | otherwise = passWhileReceive rs replyCh st sNr --or discard passWhileReceive (req@(Others None tCh):rs) replyCh st sNr --Req of normal = let (tS',wReqs) = passWhileReceive rs replyCh st sNr --Worker -> pass it in (tS',req:wReqs) passWhileReceive (req@(Others Black tCh ):rs) replyCh st (s,r) --Black Req | (not isFirst) = (tS',req :wReqs) --normal Workers: pass it | otherwise = (tS',req':wReqs) --First Worker: new Round starts White where (tS',wReqs) = passWhileReceive rs replyCh st (s,r) req'= Others (White s r 0 0) tCh --Start with own Counters passWhileReceive (Others (White s1 r1 s2 r2) tCh :rs) replyCh st (s,r) | (not isFirst) = (tS',req':wReqs) --Normal Workers: add Counter and pass --4 counters equal -> pass Black as end of reqs Symbol, start Termination | otherwise = if terminate then ([],Others Black tCh : (termRing rs ++ [NewState st])) else (tS',req'':wReqs) --no Termination where (tS',wReqs) = passWhileReceive rs replyCh st (s,r) req' = Others (White (s1+s) (r1+r) s2 r2) tCh --add Counters req'' = Others (White s r s1 r1) tCh --Move Counters->NewTurn terminate = (s1==r1)&&(r1==s2)&&(s2==r2) --Check Termination passWhileReceive (TasksNMe newTS:rs) replyCh st (s,r) --Task List arrives | null newTS = ([],(termRing rs) --got empty newTs -> begin Termination ++ [NewState st]) --attach final State at the End | otherwise = (distribWork (Me:rs) newTS replyCh st (s,r+1)) --have newTs data Req t s = Me | --Work Request of local Worker - Fuction --Request of other Workers with Tag, and Chanel to Send Tasks and Others { getTag :: Tag, getReplyChan :: (ChanName (t,Maybe (ChanName(Req t s)))) } | --Reply Chanel TasksNMe { getTask :: t} | -- New Tasks and additional Me Request to add NewState { getState :: s } instance (NFData t, NFData s) => NFData (Req t s) where rnf Me = () rnf (Others t c) = rnf t `pseq` rnf c rnf (TasksNMe t) = rnf t rnf (NewState s) = rnf s instance (Trans t,Trans s) => Trans (Req t s) data Tag = Black | --no Termination Situation / Term Mode: Last Request in Ring White Int Int Int Int | --check Termination Situation:(send&recv) None --Request of normal Worker instance NFData Tag where rnf Black = () rnf None = () rnf (White a b c d) = rnf (a,b,c,d) instance Eq Tag where Black == Black = True None == None = True White a b c d == White a' b' c' d' = (a,b,c,d)==(a',b',c',d') a == b = False ---------------------auxiliary----------------- termRing :: (Trans t,Trans s) => [Req [t] s] -> [Req [t] s] termRing [] = [] -- Predecessors tells no more reqs termRing (Others Black tCh : _ ) = parfill tCh ([],Nothing) [] --reply last req termRing (Others None tCh : rs) = parfill tCh ([],Nothing) termRing rs --reply termRing (_ : rs) = termRing rs --ignore isFirsts reply genResNReqs :: (NFData t,NFData r',NFData s)=> [(Maybe (r',s),[t])] -> [(Maybe (r',s),Req [t] s)] genResNReqs [] = [] --No more tasks genResNReqs ((res@(Nothing,nts)):ress) --No new State -> Attach new Tasks = rnf res `pseq` (Nothing,TasksNMe nts): genResNReqs ress genResNReqs ((res@(Just (r,st),nts)):ress) --New State found -> Attach = rnf res `pseq` (Just (r,st),NewState st): genResNReqs ((Nothing,nts):ress) -----------------------------DEPRECATED--------------------------------- -- | Deprecated, same as 'workpoolSortedNonBlock' {-# DEPRECATED masterWorker "better use workpoolSortedNonBlock instead" #-} masterWorker :: (Trans a, Trans b) => Int -> Int -> (a -> b) -> [a] -> [b] masterWorker = workpoolSortedNonBlock -- | Deprecated, same as 'wpNested' {-# DEPRECATED mwNested "better use wpNested instead" #-} mwNested :: forall t r . (Trans t, Trans r) => [Int] -> [Int] -> (t -> r) -> [t] -> [r] mwNested = wpNested -- | Deprecated, same as 'wpDNI' {-# DEPRECATED mwDNI "better use wpDNI instead" #-} mwDNI :: (Trans t, Trans r) => Int -> Int -> Int -> Int -> (t -> (r,[t])) -> [t] -> [r] mwDNI = wpDNI -- | Deprecated, same as 'wpDynNested' {-# DEPRECATED mwDynNested "better use wpDynNested instead" #-} mwDynNested :: forall t r . (Trans t, Trans r) => [Int] -> [Int] -> (t -> (r,[t])) -> [t] -> [r] mwDynNested = wpDynNested