{-# LANGUAGE CPP, ScopedTypeVariables #-}
-----------------------------------------------------------------------------
-- |
-- Module : Control.Parallel.Eden.Workpool
-- Copyright : (c) Philipps Universitaet Marburg 2009-2014
-- License : BSD-style (see the file LICENSE)
--
-- Maintainer : eden@mathematik.uni-marburg.de
-- Stability : beta
-- Portability : not portable
--
-- This Haskell module defines workpool skeletons for dynamic task
-- distribution for the parallel functional language Eden.
--
-- Depends on GHC. Using standard GHC, you will get a threaded simulation of Eden.
-- Use the forked GHC-Eden compiler from http:\/\/www.mathematik.uni-marburg.de/~eden
-- for a parallel build.
--
-- Eden Group ( http:\/\/www.mathematik.uni-marburg.de/~eden )
module Control.Parallel.Eden.Workpool (
-- * Workpool skeletons with a single master
-- ** Simple workpool skeletons
-- | The workpool skeletons use the non-deterministic merge function to achieve dynamic load balancing.
workpool, workpoolSorted, workpoolSortedNonBlock, workpoolReduce,
-- ** Simple workpool skeletons - versions using explicit placement
-- | The workpool skeletons use the non-deterministic merge function to achieve dynamic load balancing.
workpoolAt, workpoolSortedAt, workpoolSortedNonBlockAt, workpoolReduceAt,
workpoolAuxAt,
-- * Hierarchical workpool skeleton
-- |These skeletons can be nested with an arbitrary number of submaster
-- levels to unload the top master.
wpNested,
-- ** Hierarchical workpool skeleton with dynamic task creation
-- |The worker function is extended such that dynamic creation of
-- new tasks is possible. New tasks are added to the end of the
-- task list, thus tasks are traversed breath first (not strictly
-- because of the skeletons' nondeterminism). Furthermore, these
-- skeletons can be nested with an arbitrary number of submaster
-- levels to unload the top master.
wpDynNested, wpDNI,
-- * Distributed workpool skeletons with state and dynamic task creation
distribWPAt,
-- * Deprecated skeletons
masterWorker, mwNested, mwDynNested, mwDNI,
) where
#if defined( __PARALLEL_HASKELL__ ) || defined (NOT_PARALLEL)
import Control.Parallel.Eden
#else
import Control.Parallel.Eden.EdenConcHs
#endif
import Control.Parallel.Eden.Auxiliary
import Control.Parallel.Eden.Map
import Control.Parallel.Eden.Topology
import Data.List
import Data.Maybe(maybeToList,mapMaybe)
import System.IO.Unsafe
import Control.Monad
import Control.Concurrent
-- | Simple workpool (result list in non-deterministic order)
-- This version takes places for instantiation.
--
-- Notice: Result list in non-deterministic order.
workpoolAt :: forall t r . (Trans t, Trans r) =>
Places
-> Int -- ^number of child processes (workers)
-> Int -- ^prefetch of tasks (for workers)
-> (t -> r) -- ^worker function (mapped to tasks)
-> [t] -> [r] -- ^what to do
workpoolAt pos np prefetch wf tasks
= map snd fromWorkers
where
fromWorkers :: [(Int,r)]
fromWorkers = merge (tagWithPids (parMapAt pos worker taskss))
taskss :: [[t]]
taskss = distribute np (initialReqs ++ newReqs) tasks
initialReqs, newReqs :: [Int]
initialReqs = concat (replicate prefetch [0..np-1])
newReqs = map fst fromWorkers
worker = map wf
-- | Tag each element of the inner list with the id of its inner list.
tagWithPids :: [[r]] -- ^untagged input
-> [[(Int,r)]] -- ^tagged output
tagWithPids rss = [ zip (repeat i) rs |(i,rs) <-zip [0..] rss]
-- | Simple workpool (result list in non-deterministic order)
--
-- Notice: Result list in non-deterministic order.
workpool :: (Trans t, Trans r) =>
Int -- ^number of child processes (workers)
-> Int -- ^prefetch of tasks (for workers)
-> (t -> r) -- ^worker function (mapped to tasks)
-> [t] -> [r] -- ^what to do
workpool = workpoolAt [0]
-- | Workpool version with one result stream for each worker and meta
-- information about the task distribution.
-- This meta-skeleton can be used to define workpool-skeletons which
-- can reestablish the result list order.
--
-- Notice: Result list in non-deterministic order.
workpoolAuxAt :: (Trans t, Trans r) =>
Places
-> Int -- ^number of child processes
-- (workers)
-> Int -- ^prefetch of tasks (for
-- workers)
-> (t -> r) -- ^worker function (tasks to
-- results mapping)
-> [t] -- ^tasks
-> ([Int],[[Int]],[[r]]) -- ^(input distribution (input i
-- is in sub-list distribs!i),
-- task positions (element i of
-- result-sub-list j was in the
-- input list at (poss!j)!i ),
-- result streams of workers)
workpoolAuxAt pos np prefetch wf tasks
= (reqs,poss,fromWorkers)
where fromWorkers = parMapAt pos (map wf) taskss
(taskss,poss) = distributeWithPos np reqs tasks
-- generate only as many reqs as there are tasks
reqs = map snd $ zip tasks $ initialReqs ++ newReqs
initialReqs = concat (replicate prefetch [0..np-1])
newReqs = merge $ [ [ i | j<-rs]
| (i,rs) <-zip [0..] fromWorkers]
-- | Task distribution according to worker requests. Returns additionally to @ distribute @ a nested list of the positions to indicate where the tasks have been located in the original list
--
distributeWithPos :: Int -- ^number of workers
-> [Int] -- ^ request stream (worker IDs ranging from 0 to n-1)
-> [t] -- ^ task list
-> ([[t]],[[Int]]) -- ^(task positions in original list, task distribution), each inner list for one worker.
distributeWithPos np reqs tasks
= lazyunzip [unzip (taskList reqs tasks [0..] n) | n<-[0..np-1]]
where taskList (r:rs) (t:ts) (tag:tags) pe
| pe == r = (t,tag):(taskList rs ts tags pe)
| otherwise = taskList rs ts tags pe
taskList _ _ _ _ = []
lazyunzip = foldr (\(~(a,b)) ~(as,bs) -> (a:as,b:bs)) ([],[])
-- | Sorted workpool (results in the order of the tasks).
-- This version takes places for instantiation.
workpoolSortedAt :: (Trans t, Trans r) =>
Places
-> Int -- ^number of child processes
-- (workers)
-> Int -- ^prefetch of tasks (for workers)
-> (t -> r) -- ^worker function
-> [t] -- ^tasks
-> [r] -- ^results
workpoolSortedAt pos np prefetch f tasks = res
where (_, poss, ress) = workpoolAuxAt pos np prefetch f tasks
res = map snd $ mergeByPos ress'
ress' = map (uncurry zip) (zip poss ress)
-- | Join sorted lists into one sorted list.
-- This function uses a balanced binary combination scheme to merge sublists.
mergeByPos :: [[(Int,r)]] -- ^ tagged input
-> [(Int,r)] -- ^ output sorted by tags
mergeByPos [] = []
mergeByPos [wOut] = wOut
mergeByPos [w1,w2] = merge2ByTag w1 w2
mergeByPos wOuts = merge2ByTag
(mergeHalf wOuts)
(mergeHalf (tail wOuts))
where mergeHalf = mergeByPos . (takeEach 2)
merge2ByTag [] w2 = w2
merge2ByTag w1 [] = w1
merge2ByTag w1@(r1@(i,_):w1s) w2@(r2@(j,_):w2s)
| i < j = r1: merge2ByTag w1s w2
| i > j = r2: merge2ByTag w1 w2s
| otherwise = error "found tags i == j"
-- | Sorted workpool: Efficient implementation using a the
-- distribution lookup list.
--
-- Notice: Results in the order of the tasks.
workpoolSorted :: (Trans t, Trans r) =>
Int -- ^number of child processes (workers)
-> Int -- ^prefetch of tasks (for workers)
-> (t -> r) -- ^worker function (mapped to tasks)
-> [t] -- ^tasks
-> [r] -- ^results
workpoolSorted = workpoolSortedAt [0]
-- | Non-blocking sorted workpool. Result list is structurally defined
-- up to the position where tasks are distributed, independent of the
-- received worker results. This version needs still performance
-- testing.
--
-- Notice: Results in the order of the tasks.
workpoolSortedNonBlockAt :: (Trans t, Trans r) =>
Places
-> Int -- ^number of child processes (workers)
-> Int -- ^prefetch of tasks (for workers)
-> (t -> r) -- ^worker function (mapped to tasks)
-> [t] -> [r] -- ^what to do
workpoolSortedNonBlockAt pos np prefetch f tasks
= orderBy fromWorkers reqs
where (reqs, _ ,fromWorkers) = workpoolAuxAt pos np prefetch f tasks
-- | Orders a nested list (sublist are ordered) by a given distribution (non-blocking on result elements of other sub lists)
orderBy :: forall idx r . Integral idx =>
[[r]] -- ^ nested input list (@inputss@)
-> [idx] -- ^ distribution (result i is in sublist @inputss!(idxs!i)@)
-> [r] -- ^ ordered result list
orderBy _ [] = []
orderBy [rs] is = rs
orderBy (xs:rss) is = fst $ foldl (f is) (xs,1) rss
where f :: [idx] -> ([r],idx) -> [r] -> ([r], idx)
f is (xs,a) ys = (m xs ys is, a+1)
where m :: [r] -> [r] -> [idx] -> [r]
m _ _ [] = []
m xs ys (i:is) | i < a = head xs : m (tail xs) ys is
| i==a = head ys : m xs (tail ys) is
| otherwise = m xs ys is
-- | orders a nested list by a given distribution (alternative code)
orderBy' :: Integral idx =>
[[r]] -- ^ nested input list (@inputss@)
-> [idx] -- ^ distribution (result i is in sublist @inputss!(idxs!i)@)
-> [r] -- ^ ordered result list
orderBy' rss [] = []
orderBy' rss (r:reqs)
= let (rss1,(rs2:rss2)) = splitAt (fromIntegral r) rss
in (head rs2): orderBy' (rss1 ++ ((tail rs2):rss2)) reqs
-- | Non-blocking sorted workpool (results in the order of the
-- tasks). Result list is structurally defined up to the position
-- where tasks are distributed, independent of the received worker
-- results. This version needs still performance testing. This
-- version takes places for instantiation.
--
-- Notice: Results in the order of the tasks.
workpoolSortedNonBlock :: (Trans t, Trans r) =>
Int -- ^number of child processes (workers)
-> Int -- ^prefetch of tasks (for workers)
-> (t -> r) -- ^worker function (mapped to tasks)
-> [t] -> [r] -- ^what to do
workpoolSortedNonBlock = workpoolSortedNonBlockAt [0]
-- | Simple workpool with additional reduce function for worker outputs.
-- This version takes places for instantiation.
--
-- Notice: Result list in non-deterministic order.
workpoolReduceAt :: forall t r r' . (Trans t, Trans r, Trans r') =>
Places
-> Int -- ^number of child processes (workers)
-> Int -- ^prefetch of tasks (for workers)
-> (r' -> r -> r) -- ^reduce function
-> r -- ^neutral for reduce function
-> (t -> r') -- ^worker function (mapped to tasks)
-> [t] -- ^tasks
-> [r] -- ^results (one from each worker)
workpoolReduceAt pos np prefetch rf e wf tasks
= map snd fromWorkers
where
fromWorkers :: [([Int],r)]
fromWorkers = spawnFAt pos (map worker [0..np-1]) taskss
taskss = distribute np (initialReqs ++ newReqs) tasks
initialReqs = concat (replicate prefetch [0..np-1])
newReqs = merge (map fst fromWorkers)
worker i ts = (map (\r -> rnf r `seq` i) rs, foldr rf e rs)
where rs = map wf ts
-- | Simple workpool with additional reduce function for worker outputs.
-- This version takes places for instantiation.
--
-- Notice: Result list in non-deterministic order.
workpoolReduce :: forall t r r' . (Trans t, Trans r, Trans r') =>
Int -- ^number of child processes (workers)
-> Int -- ^prefetch of tasks (for workers)
-> (r' -> r -> r) -- ^reduce function
-> r -- ^neutral for reduce function
-> (t -> r') -- ^worker function (mapped to tasks)
-> [t] -- ^tasks
-> [r] -- ^results (one from each worker)
workpoolReduce = workpoolReduceAt [0]
-- |Hierachical WP-Skeleton. The worker
-- function is mapped to the worker input stream (list type). A worker
-- produces a result. The workers are located on the leaves of a
-- WP-hierarchy, in the intermediate levels are submasters which unload
-- the master by streaming 'result' streams of their child
-- processes into a single result stream.
--
-- Notice: Result list in non-deterministic order.
wpNested :: forall t r . (Trans t, Trans r) =>
[Int] -- ^branching degrees: the i-th
-- element defines the branching
-- degree of for the i-th level of
-- the WP-hierarchy. Use a singleton
-- list for a flat MW-Skeleton.
-> [Int] -- ^Prefetches for the
-- sub-master/worker levels
-> (t -> r) -- ^worker function
-> [t] -- ^initial tasks
-> [r] -- ^results
wpNested ns pfs wf initTasks = wpDynNested ns pfs (\t -> (wf t,[])) initTasks
-- |Simple interface for 'wpDynNested'. Parameters are the number of child processes, the first
-- level branching degree, the nesting depth (use 1 for a
-- single master), and the task prefetch amount for the worker level.
-- All processes that are not needed for the submasters are
-- used for the workers. If the number of submasters in the last level
-- and the number of remaining child processes are prime to each
-- other, then the next larger divisor is chosen for the number of
-- workers.
--
-- Notice: Result list in non-deterministic order.
wpDNI :: (Trans t, Trans r) =>
Int -- ^number of processes (submasters and workers)
-> Int -- ^nesting depth
-> Int -- ^branching degree of the first submaster
-- level (further submaster levels are
-- branched binary)
-> Int -- ^task prefetch for the workers
-> (t -> (r,[t])) -- ^worker function - produces a tuple of
-- result and tasks for the processed task
-> [t] -- ^initial tasks
-> [r] -- ^results
wpDNI np levels l_1 pf f tasks
= let nesting = mkNesting np levels l_1
in wpDynNested nesting (mkPFs pf nesting) f tasks
-- |Hierachical WP-Skeleton with dynamic task creation. The worker
-- function is mapped to the worker input stream (list type). A worker
-- produces a tuple of result and dynamicly created tasks for each
-- processed task. The workers are located on the leaves of a
-- WP-hierarchy, in the intermediate levels are submasters which unload
-- the master by streamlining 'result/newtask' streams of their child
-- processes into a single result/newtask stream. Furthermore, the
-- submasters retain locally half of the tasks which are
-- dynamically created by the workers in their subtree.
--
-- Notice: Result list in non-deterministic order.
wpDynNested :: forall t r . (Trans t, Trans r) =>
[Int] -- ^branching degrees: the i-th
-- element defines the branching
-- degree of for the i-th level of
-- the MW-hierarchy. Use a singleton
-- list for a flat MW-Skeleton.
-> [Int] -- ^Prefetches for the
-- sub-master/worker levels
-> (t -> (r,[t])) -- ^worker function - produces a
-- tuple of result and new tasks for
-- the processed task
-> [t] -- ^initial tasks
-> [r] -- ^results
wpDynNested ns pfs wf initTasks
= topMasterStride strideTM (head ns) (head pfs) subWF initTasks
where subWF = foldr fld wf' (zip3 stds (tail ns) (tail pfs))
wf' :: [Maybe t] -> [(r,[t],Int)]
-- explicit Nothing-Request after each result
wf' xs = map(\ (r,ts) -> (r,ts,0)) (map (wf)(inp xs))
inp ((Just x):rest) = x:inp rest
inp (Nothing:_) = [] -- STOP!!!
-- placement:
(strideTM:stds) = scanr (\x y -> ((y*x)+1)) 1 (tail ns)
fld :: (Int, Int, Int) -> ([Maybe t] -> [(r,[t],Int)])
-> [Maybe t] -> [(r,[t],Int)]
fld (stride,n,pf) wf = mwDynSubStride stride n pf wf
--------------------------mwDynNested auxiliary---------------------------------
topMasterStride :: forall t r . (Trans t, Trans r) =>
Int -> Int -> Int -> ([Maybe t] -> [(r,[t],Int)]) -> [t] -> [r]
topMasterStride stride branch prefetch wf initTasks = finalResults
where
-- identical to static task pool except for the type of workers
ress = merge (spawnAt places workers inputs)
places = nextPes branch stride
workers :: [Process [Maybe t] [(Int,(r,[t],Int))]]
workers = [process (zip [i,i..] . wf) | i <- [0..branch-1]]
inputs = distribute branch (initReqs ++ reqs) tasks
initReqs = concat (replicate prefetch [0..branch-1])
-- additions for task queue management and
-- termination detection
tasks = (map Just initTasks) ++ newTasks
-----------------------------
initNumTasks = length initTasks
-- => might lead to deadlock!
-----------------------------
(finalResults, reqs, newTasks)
= tdetectTop ress initNumTasks
mwDynSubStride :: forall t r . (Trans t, Trans r) =>
Int -> Int -> Int -> ([Maybe t] -> [(r,[t],Int)])
-> [Maybe t] -> [(r,[t],Int)]
mwDynSubStride stride branch prefetch wf initTasks = finalResults where
fromWorkers = map flatten (spawnAt places workers inputs)
places = nextPes branch stride
workers :: [Process [Maybe t] [(Int,(r,[t],Int))]]
workers = [process (zip [i,i..] . wf) | i <- [0..branch-1]]
inputs = distribute branch (initReqs ++ reqs) tasks
initReqs = concat (replicate prefetch [0..branch-1])
-- task queue management
controlInput = merge (map Right initTasks: map (map Left) fromWorkers)
(finalResults, tasks, reqs)
= tcontrol controlInput 0 0 (branch * (prefetch+1)) False
-- task queue control for termination detection
tdetectTop :: [(Int,(r,[t],Int))] -> Int -> ([r], [Int], [Maybe t])
tdetectTop ((req,(r,ts,subHoldsTs)):ress) numTs
| numTs == 1 && null ts && subHoldsTs == 0
= ([r], [], repeat Nothing) -- final result
| subHoldsTs == 1
= (r:moreRes, moreReqs, (map Just ts) ++ moreTs)
| otherwise
= (r:moreRes, req:moreReqs, (map Just ts) ++ moreTs)
where --localNumTaks is 0 or 1, if it's 1 -> no Request
-- -> numTs will not be decreased
(moreRes, moreReqs, moreTs)
= tdetectTop ress (numTs-1+length ts+subHoldsTs)
tcontrol :: [Either (Int,r,[t],Int) (Maybe t)] -> -- controlInput
Int -> Int -> -- task / hold counter
Int -> Bool -> -- prefetch, split mode
([(r,[t],Int)],[Maybe t],[Int]) -- (results,tasks,requests)
tcontrol ((Right Nothing):_) 0 _ _ _
= ([],repeat Nothing,[]) -- Final termination
tcontrol ((Right (Just t)):ress) numTs hldTs pf even -- task from above
= let (moreRes, moreTs, reqs) = tcontrol ress (numTs+1) hldTs pf even
in (moreRes, (Just t):moreTs, reqs)
tcontrol ((Left (i,r,ts,subHoldsTs)):ress) numTs hldTs pf even
= let (moreRes, moreTs, reqs)
= tcontrol ress (numTs + differ) (hldTs') pf evenAct
differ = length localTs + subHoldsTs - 1
hldTs' = max (hldTs + differ) 0
holdInf | (hldTs+differ+1 > 0) = 1
| otherwise = 0
(localTs,fatherTs,evenAct)
= split numTs pf ts even -- part of tasks to parent
newreqs | (subHoldsTs == 0) = i:reqs
| otherwise = reqs -- no tasks kept below?
in ((r,fatherTs,holdInf):moreRes,
(map Just localTs) ++ moreTs, newreqs)
-- error case, not shown in paper
tcontrol ((Right Nothing):_) n _ _ _
= error "Received Stop signal, although not finished!"
flatten [] = []
flatten ((i,(r,ts,n)):ps) = (i,r,ts,n) : flatten ps
split :: Int -> Int -> [t] -> Bool ->([t],[t],Bool)
split num pf ts even-- = splitAt (2*pf - num) ts
| num >= 2*pf = ([],ts,False)
--no tasks or odd -> keep first
| ((not even)||(num == 1)) = oddEven ts
| otherwise = evenOdd ts
-- | num < pf `div` 2 = (ts,[])
oddEven :: [t] -> ([t],[t],Bool)
oddEven [] = ([],[],False)
oddEven (x:xs) = (x:localT ,fatherT, even)
where (localT,fatherT,even) = evenOdd xs
evenOdd :: [t] -> ([t],[t],Bool)
evenOdd [] = ([],[],True)
evenOdd (x:xs) = (localT, x:fatherT, even)
where (localT,fatherT,even) = oddEven xs
nextPes :: Int -> Int -> [Int]
nextPes n stride | start > noPe = replicate n noPe
| otherwise = concat (replicate n ps)
where ps = cycle (takeWhile (<= noPe) [start,start+stride..])
start = selfPe + 1
mkNesting :: Int -> Int -> Int -> [Int]
mkNesting np 1 _ = [np]
mkNesting np depth level1 = level1:(replicate (depth - 2) 2) ++ [numWs]
where -- leaves = np - #submasters
leaves = np - level1 * ( 2^(depth-1) - 1 )
-- num of lowest submasters
numSubMs = level1 * 2^(depth - 2)
-- workers per branch (rounded up)
numWs = (leaves + numSubMs - 1) `div` numSubMs
mkPFs :: Int -> -- prefetch value for worker processes
[Int] -> -- branching per level top-down
[Int] -- list of prefetches
mkPFs pf nesting
= [ factor * (pf+1) | factor <- scanr1 (*) (tail nesting) ] ++ [pf]
---------------------------distributed workpool skeletons-------------------------------------------
-- | A distributed workpool skeleton that uses task generation and a global state (s) with a total order.
-- Split and Detatch policy must give tasks away (may not produce empty lists), unless all tasks are pruned!
distribWPAt :: (Trans onT, Trans t, Trans r, Trans s, NFData r') =>
Places -- ^ where to put workers
-> ((t,s) -> (Maybe (r',s),[t])) -- ^ worker function
-> (Maybe ofT -> Maybe onT -> [t]) -- ^ input transformation (e.g. (fetch . fromJust . snd) for online input of type [RD[t]])
-> ([Maybe (r',s)] -> s -> r) -- ^ result transformation (prior to release results in the workers)
-> ([t]->[t]->s->[t]) -- ^ taskpool transform attach function
-> ([t]->s->([t],Maybe (t,s))) -- ^ taskpool transform detach function (local request)
-> ([t]->s->([t],[t])) -- ^ taskpool transform split function (remote request)
-> (s->s->Bool) -- ^ state comparison (checks if new state is better than old state)
-> s -- ^ initial state (offline input)
-> [ofT] -- ^ offline input (if non empty, outer list defines the number of workers, else the shorter list does)
-> [onT] -- ^ dynamic input (if non empty, outer list defines the number of workers, else the shorter list does)
-> [r] -- ^ results of workers
distribWPAt places wf inputT resT ttA ttD ttSplit sUpdate st ofTasks onTasks = res where
res = ringFlAt places id id workers onTasks'
workers = zipWith (worker) (True:(repeat False)) ofTasks'
-- length of ontasks and oftasks is adjusted such that the desired number of workers will be instantiated
(ofTasks',onTasks') | null ofTasks = (repeat Nothing , map Just onTasks)
| null onTasks = (map Just ofTasks, repeat Nothing )
| otherwise = (map Just ofTasks, map Just onTasks)
-- worker functionality
worker isFirst ofTasks onTasks ringIn = (resT ress sFinal,init ringOut) where
ts = (inputT ofTasks onTasks)
(ress,ntssNtoMe) = unzip $ genResNReqs $ map wf ts' --seperate
reqList = Me : mergeS [ringIn, --merge external Requests
ntssNtoMe] rnf --and nf reduced local Requests
-- manage Taskpool & Requests
(ts',ringOut) = control ttA ttSplit ttD sUpdate reqList ts st isFirst
sFinal = getState $ last ringOut
control:: (Trans t, Trans s) =>
([t]->[t]->s->[t]) -> --Taskpool Transform Attach
([t]->s->([t],[t])) -> --Split Policy (remote Req)
([t]->s->([t],Maybe (t,s))) -> --tt Detach (local Req)
(s->s->Bool)-> --Checks if newState is better than oldState
[Req [t] s]->[t]->s->Bool-> --reqs,tasks,state,isFirstInRing
([(t,s)],[Req [t] s]) --localTasks,RequestsToRing
control ttA ttSplit ttD sUpdate requests initTasks st isFirst
= distribWork requests initTasks Nothing st (0,0)
where
--until no tasks left: react on own and Others requests & add new Tasks
--distribWork :: Trans t => [Req [t] s] -> [t] -> $
-- Maybe (ChanName (Req [t] s))->(Int,Int)-> ([t],[ReqS [t] s],s)
distribWork (TasksNMe nts:rs) tS replyCh st sNr --selfmade Tasks arrive
= distribWork (Me:rs) (ttA tS nts st) replyCh st sNr --add Tasks and MeReq
distribWork (NewState st':rs) tS replyCh st sNr --Updated State arrives
| sUpdate st' st = let (tS',wReqs') =distribWork rs tS replyCh st' sNr
in (tS',(NewState st':wReqs')) --then check and send
| otherwise = distribWork rs tS replyCh st sNr --or discard
distribWork (req@(Others tag tCh):rs) [] replyCh st sNr --Others Request &
| tag==None = (tS',req:wReqs') --no tasks left --> pass Request
| otherwise = (tS', Others Black tCh : wReqs') --I'm still working -> Black
where(tS',wReqs') = distribWork rs [] replyCh st sNr
distribWork (Me:rs) [] replyCh st sNr = --last own Task solved and no new ones
new (\reqChan (newTS,replyChan) -> --gen new reqChan to get newTS & replyC
let (tS',wReqs) = passWhileReceive (mergeS [rs, --wait for fst newTask
newTS `pseq` [TasksNMe newTS]] r0) replyChan st sNr --to merge
tag | isFirst = Black --First Worker starts Black (For Termination)
| otherwise = None --normal Workers with None Tag
in(case replyCh of --First own Request into Ring- others dynamic
Nothing -> (tS', Others tag reqChan : wReqs)
Just replyCh' -> parfill replyCh' (Others tag reqChan)
(tS',wReqs)))
distribWork (Me:rs) tS replyCh st sNr --local Request and Tasks present
= let (tS',tDetatch) = ttD tS st --TaskpoolTransform Detatch
(tsDetatch,wReqs) = case tDetatch of
Nothing -> distribWork (Me:rs) [] replyCh st sNr
Just t -> distribWork rs tS' replyCh st sNr
in ((maybeToList tDetatch)++tsDetatch,wReqs) --add Maybe one Task to wf
distribWork reqs@(Others _ tCh :rs) tS replyCh st (s,r) --foreign Req & have Ts
= let (holdTs,sendTs) = ttSplit tS st --split ts to send and ts to hold
((tS',wReqs'),replyReq)
= new (\replyChan replyReq' -> --gen ReplyC and send Tasks & new
parfill tCh (sendTs,Just replyChan) --ReplyC in Chan of the Req
((distribWork rs holdTs replyCh st (s+1,r)),replyReq'))
in case sendTs of --ReplyReqToOutp
[] -> distribWork reqs [] replyCh st (s,r) --No tasks left
(_:_) -> (tS',mergeS [[replyReq],wReqs'] rnf)
-- Pass all until foreign Tasks arrive or Termination starts
-- passWhileRecive :: Trans t => [ReqS [t] s] -> Maybe(ChanName (ReqS [t] s))
-- -> (Int,Int) -> ([t],[ReqS [t] s])
passWhileReceive (NewState st':rs) replyCh st sNr --Updated State arrives
| sUpdate st' st =let (tS',wReqs')=passWhileReceive rs replyCh st' sNr
in (tS',(NewState st':wReqs')) --then check and send
| otherwise = passWhileReceive rs replyCh st sNr --or discard
passWhileReceive (req@(Others None tCh):rs) replyCh st sNr --Req of normal
= let (tS',wReqs) = passWhileReceive rs replyCh st sNr --Worker -> pass it
in (tS',req:wReqs)
passWhileReceive (req@(Others Black tCh ):rs) replyCh st (s,r) --Black Req
| (not isFirst) = (tS',req :wReqs) --normal Workers: pass it
| otherwise = (tS',req':wReqs) --First Worker: new Round starts White
where (tS',wReqs) = passWhileReceive rs replyCh st (s,r)
req'= Others (White s r 0 0) tCh --Start with own Counters
passWhileReceive (Others (White s1 r1 s2 r2) tCh :rs) replyCh st (s,r)
| (not isFirst) = (tS',req':wReqs) --Normal Workers: add Counter and pass
--4 counters equal -> pass Black as end of reqs Symbol, start Termination
| otherwise = if terminate
then ([],Others Black tCh : (termRing rs ++ [NewState st]))
else (tS',req'':wReqs) --no Termination
where (tS',wReqs) = passWhileReceive rs replyCh st (s,r)
req' = Others (White (s1+s) (r1+r) s2 r2) tCh --add Counters
req'' = Others (White s r s1 r1) tCh --Move Counters->NewTurn
terminate = (s1==r1)&&(r1==s2)&&(s2==r2) --Check Termination
passWhileReceive (TasksNMe newTS:rs) replyCh st (s,r) --Task List arrives
| null newTS = ([],(termRing rs) --got empty newTs -> begin Termination
++ [NewState st]) --attach final State at the End
| otherwise = (distribWork (Me:rs) newTS replyCh st (s,r+1)) --have newTs
data Req t s =
Me | --Work Request of local Worker - Fuction
--Request of other Workers with Tag, and Chanel to Send Tasks and
Others { getTag :: Tag,
getReplyChan :: (ChanName (t,Maybe (ChanName(Req t s))))
} | --Reply Chanel
TasksNMe { getTask :: t} | -- New Tasks and additional Me Request to add
NewState { getState :: s }
instance (NFData t, NFData s) => NFData (Req t s)
where
rnf Me = ()
rnf (Others t c) = rnf t `pseq` rnf c
rnf (TasksNMe t) = rnf t
rnf (NewState s) = rnf s
instance (Trans t,Trans s) => Trans (Req t s)
data Tag = Black | --no Termination Situation / Term Mode: Last Request in Ring
White Int Int Int Int | --check Termination Situation:(send&recv)
None --Request of normal Worker
instance NFData Tag
where rnf Black = ()
rnf None = ()
rnf (White a b c d) = rnf (a,b,c,d)
instance Eq Tag where
Black == Black = True
None == None = True
White a b c d == White a' b' c' d' = (a,b,c,d)==(a',b',c',d')
a == b = False
---------------------auxiliary-----------------
termRing :: (Trans t,Trans s) => [Req [t] s] -> [Req [t] s]
termRing [] = [] -- Predecessors tells no more reqs
termRing (Others Black tCh : _ ) = parfill tCh ([],Nothing) [] --reply last req
termRing (Others None tCh : rs) = parfill tCh ([],Nothing) termRing rs --reply
termRing (_ : rs) = termRing rs --ignore isFirsts reply
genResNReqs :: (NFData t,NFData r',NFData s)=>
[(Maybe (r',s),[t])] -> [(Maybe (r',s),Req [t] s)]
genResNReqs [] = [] --No more tasks
genResNReqs ((res@(Nothing,nts)):ress) --No new State -> Attach new Tasks
= rnf res `pseq` (Nothing,TasksNMe nts): genResNReqs ress
genResNReqs ((res@(Just (r,st),nts)):ress) --New State found -> Attach
= rnf res `pseq` (Just (r,st),NewState st): genResNReqs ((Nothing,nts):ress)
-----------------------------DEPRECATED---------------------------------
-- | Deprecated, same as 'workpoolSortedNonBlock'
{-# DEPRECATED masterWorker "better use workpoolSortedNonBlock instead" #-}
masterWorker :: (Trans a, Trans b) =>
Int -> Int -> (a -> b) -> [a] -> [b]
masterWorker = workpoolSortedNonBlock
-- | Deprecated, same as 'wpNested'
{-# DEPRECATED mwNested "better use wpNested instead" #-}
mwNested :: forall t r . (Trans t, Trans r) =>
[Int] -> [Int] -> (t -> r) -> [t] -> [r]
mwNested = wpNested
-- | Deprecated, same as 'wpDNI'
{-# DEPRECATED mwDNI "better use wpDNI instead" #-}
mwDNI :: (Trans t, Trans r) =>
Int
-> Int
-> Int
-> Int
-> (t -> (r,[t]))
-> [t]
-> [r]
mwDNI = wpDNI
-- | Deprecated, same as 'wpDynNested'
{-# DEPRECATED mwDynNested "better use wpDynNested instead" #-}
mwDynNested :: forall t r . (Trans t, Trans r) =>
[Int]
-> [Int]
-> (t -> (r,[t]))
-> [t]
-> [r]
mwDynNested = wpDynNested