{-# LANGUAGE CPP, ScopedTypeVariables #-}
-----------------------------------------------------------------------------
-- |
-- Module : Control.Parallel.Eden.EdenSkel.TopoSkels
-- Copyright : (c) Philipps Universitaet Marburg 2009-2012
-- License : BSD-style (see the file LICENSE)
--
-- Maintainer : eden@mathematik.uni-marburg.de
-- Stability : beta
-- Portability : not portable
--
-- This Haskell module defines topology skeletons for the parallel functional
-- language Eden. Topology skeletons are skeletons that implement a network of
-- processes interconnected by a characteristic communication topology.
--
-- Depends on GHC. Using standard GHC, you will get a threaded simulation of Eden.
-- Use the forked GHC-Eden compiler from http:\/\/www.mathematik.uni-marburg.de/~eden
-- for a parallel build.
--
-- Eden Group ( http:\/\/www.mathematik.uni-marburg.de/~eden )
module Control.Parallel.Eden.EdenSkel.TopoSkels (
-- * Skeletons that are primarily characterized by their topology.
-- ** Pipeline skeletons
-- |
pipe, pipeRD
-- ** Ring skeletons
-- |
,ringSimple, ring, ringFl, ringAt, ringFlAt
-- ** Torus skeleton
-- |
,torus
-- ** The Hypercube skeleton
-- |
-- ** The All-To-All skeleton
-- |The allToAll skeleton allows distributed data exchange and
-- transformation including data of all processes. Input and output
-- are provided as remote data. A typical application is the
-- distributed transposition of a distributed Martrix.
,allToAllRDAt, allToAllRD, parTransposeRDAt, parTransposeRD, allGatherRDAt, allGatherRD
-- ** The All-Reduce skeleton
-- |The skeleton uses a butterfly topology to reduce the data of
-- participating processes P in log(|P|) communication stages. Input
-- and output are provided as remote data.
,allReduceRDAt, allReduceRD, allGatherBuFlyRDAt, allGatherBuFlyRD
) where
#if defined( __PARALLEL_HASKELL__ ) || defined (NOT_PARALLEL)
import Control.Parallel.Eden
#else
import Control.Parallel.Eden.EdenConcHs
#endif
import Control.Parallel.Eden.EdenSkel.Auxiliary
import Control.Parallel.Eden.EdenSkel.MapSkels
import Data.List
-- |Simple pipe where the parent process creates all pipe processes. The processes communicate their results via the caller process.
pipe :: forall a . Trans a =>
[a -> a] -- ^functions of the pipe
-> a -- ^input
-> a -- ^output
pipe fs = unLiftRD (pipeRD fs)
-- |Process pipe where the processes communicate their Remote Data handles via the caller process but fetch the actual data from their predecessor processes
pipeRD :: forall a . Trans a =>
[a -> a] -- ^functions of the pipe
-> RD a -- ^remote input
-> RD a -- ^remote output
pipeRD fs xs = (last outs) where
outs = spawn ps $ lazy $ xs : outs
ps :: [Process (RD a) (RD a)]
ps = map (process . liftRD) fs
-- | Simple ring skeleton (tutorial version)
-- using remote data for providing direct inter-ring communication
-- without input distribution and output combination
ringSimple :: (Trans i, Trans o, Trans r) =>
(i -> r -> (o,r)) -- ^ ring process function
-> [i] -> [o] -- ^ input output mapping
ringSimple f is = os
where
(os,ringOuts) = unzip (parMap (toRD $ uncurry f)
(zip is $ lazy ringIns))
ringIns = rightRotate ringOuts
toRD :: (Trans i, Trans o, Trans r) =>
((i,r) -> (o,r)) -- ^ ring process function
-> ((i, RD r) -> (o, RD r)) -- ^ -- with remote data
toRD f (i, ringIn) = (o, release ringOut)
where (o, ringOut) = f (i, fetch ringIn)
rightRotate :: [a] -> [a]
rightRotate [] = []
rightRotate xs = last xs : init xs
-- | The ringFlAt establishes a ring topology, the ring process functions
-- transform the initial input of a ring process and the input stream from the ring into the
-- ring output stream and the ring processes' final result. Every ring process
-- applies its individual function which e.g. allows to route individual offline input into the
-- ring processes. This version uses explicit placement.
ringFlAt :: (Trans a,Trans b,Trans r) =>
Places -- ^where to put workers
-> (i -> [a]) -- ^distribute input
-> ([b] -> o) -- ^combine output
-> [(a -> [r] -> (b,[r]))] -- ^ring process fcts
-> i -- ^ring input
-> o -- ^ring output
ringFlAt places distrib combine fs i = combine os where
(os, ringOuts) = unzip $ spawnFAt places (map (toRD . uncurry) fs)
(zip (distrib i) $ lazy ringIns)
ringIns = rightRotate ringOuts
-- | The ringFl establishes a ring topology, the ring process functions
-- transform the initial input of a ring process and the input stream from the ring into the
-- ring output stream and the ring processes' final result. Every ring process
-- applies an individual function which e.g. allows to route individual offline input into the
-- ring processes. Use ringFlAt if explicit placement is desired.
ringFl :: (Trans a,Trans b,Trans r) =>
(i -> [a]) -- ^distribute input
-> ([b] -> o) -- ^combine output
-> [(a -> [r] -> (b,[r]))] -- ^ring process fcts
-> i -- ^ring input
-> o -- ^ring output
ringFl = ringFlAt [0]
-- | Skeleton @ringAt@ establishes a ring topology, the ring process function
-- transforms the initial input of a ring process and the input stream from the ring into the
-- ring output stream and the ring processes' final result. The
-- same function is used by every ring process. Use ringFlAt
-- if you need different functions in the processes. This version uses explicit placement.
ringAt :: (Trans a,Trans b,Trans r) =>
Places -- ^where to put workers
-> (i -> [a]) -- ^distribute input
-> ([b] -> o) -- ^combine output
-> (a -> [r] -> (b,[r])) -- ^ring process fct
-> i -- ^ring input
-> o -- ^ring output
ringAt places distrib combine f i =
ringFlAt places distrib combine [f] i
-- | The ring establishes a ring topology, the ring process function
-- transforms the initial input of a ring process and the input stream from the ring into the
-- ring output stream and the ring processes final result. The
-- same function is used by every ring process. Use ringFl
-- if you need different functions in the processes. Use ringAt if
-- explicit placement is desired.
ring :: (Trans a,Trans b,Trans r) =>
(i -> [a]) -- ^distribute input
-> ([b] -> o) -- ^combine output
-> (a -> [r] -> (b,[r])) -- ^ring process fct
-> i -- ^ring input
-> o -- ^ring output
ring = ringAt [0]
-- | Parallel torus skeleton (tutorial version) with stream rotation in 2 directions: initial inputs for each torus element are given. The node function is used on each torus element to transform the initial input and a stream of inputs from each direction to a stream of outputs to each direction. Each torus input should have the same size in both dimensions, otherwise the smaller input will determine the size of the torus.
torus :: (Trans a, Trans b, Trans c, Trans d) =>
(c -> [a] -> [b] -> (d,[a],[b])) -- ^ node function
-> [[c]] -> [[d]] -- ^ input-output mapping
torus f inss = outss
where
t_outss = spawnPss (repeat (repeat (ptorus f))) t_inss -- optimised
(outss,outssA,outssB) = unzip3 (map unzip3 t_outss)
inssA = map rightRotate outssA
inssB = rightRotate outssB
t_inss = zipWith3 lazyzip3 inss (lazy inssA) (lazy inssB)
lazyzip3 as bs cs = zip3 as (lazy bs) (lazy cs)
-- each individual process of the torus (tutorial version)
ptorus :: (Trans a, Trans b, Trans c, Trans d) =>
(c -> [a] -> [b] -> (d,[a],[b])) ->
Process (c,RD [a],RD [b])
(d,RD [a],RD [b])
ptorus f
= process (\ (fromParent, inA, inB) ->
let (toParent, outA, outB) = f fromParent inA' inB'
(inA',inB') = fetch2 inA inB
in (toParent, release outA, release outB))
-- | The skeleton creates as many processes as elements in the input list (@np@).
-- The processes get all-to-all connected, each process input is transformed to
-- @np@ intermediate values by the first parameter function, where the @i@-th value
-- will be send to process @i@. The second transformation function combines the initial
-- input and the @np@ received intermediate values to the final output.
allToAllRD :: forall a b i. (Trans a, Trans b, Trans i)
=> (Int -> a -> [i]) -- ^transform before bcast (num procs, input, sync-data out)
-> (a -> [i] ->b) -- ^transform after bcast (input, sync-data in, output)
-> [RD a] -- ^remote input for each process
-> [RD b] -- ^remote output for each process
allToAllRD = allToAllRDAt [0]
-- | The skeleton creates as many processes as elements in the input list (@np@).
-- The processes get all-to-all connected, each process input is transformed to
-- @np@ intermediate values by the first parameter function, where the @i@-th value
-- will be send to process @i@. The second transformation function combines the initial
-- input and the @np@ received intermediate values to the final output.
allToAllRDAt :: forall a b i. (Trans a, Trans b, Trans i)
=> Places -- ^where to instantiate
-> (Int -> a -> [i]) -- ^transform before bcast (num procs, input, sync-data out)
-> (a -> [i] ->b) -- ^transform after bcast (input, sync-data in, output)
-> [RD a] -- ^remote input for each process
-> [RD b] -- ^remote output for each process
allToAllRDAt places t1 t2 xs = res where
n = length xs --same amount of procs as #xs
(res,iss) = n `pseq` unzip $ parMapAt places (uncurry p) inp
inp = zip xs $ lazy $ transpose iss
p :: RD a-> [RD i]-> (RD b,[RD i])
p xRD theirIs = (resF theirIs, myIsF x) where
x = fetch xRD
myIsF = releaseAll . t1 n
resF = release . t2 x . fetchAll
-- works similar for splitIntoN and unsplit (concat)???
-- |Parallel transposition for matrizes which are row-wise round robin distributed among the machines, the transposed result matrix is also row-wise round robin distributed.
parTransposeRD :: Trans b
=> [RD [[b]]] -- ^input list of remote partial matrizes
-> [RD [[b]]] -- ^output list of remote partial matrizes
parTransposeRD = parTransposeRDAt [0]
-- works similar for splitIntoN and unsplit (concat)???
-- |Parallel transposition for matrizes which are row-wise round robin distributed among the machines, the transposed result matrix is also row-wise round robin distributed.
parTransposeRDAt :: Trans b
=> Places
-> [RD [[b]]] -- ^input list of remote partial matrizes
-> [RD [[b]]] -- ^output list of remote partial matrizes
parTransposeRDAt places = allToAllRDAt places (\ n -> unshuffle n . transpose)
(\ _ -> map shuffle . transpose)
-- | Performs an all-gather using all to all comunication (based on allToAllRDAt).
-- The initial transformation is applied in the processes to obtain the values that will be reduced.
-- The final combine function is used to create a processes outputs from the initial input and the
-- gathered values.
allGatherRD :: forall a b c. (Trans a, Trans b, Trans c)
=> (a -> b) -- ^initial transform function
-> (a -> [b] -> c) -- ^final combine function
-> [RD a] -> [RD c]
allGatherRD = allGatherRDAt [0]
-- | Performs an all-gather using all to all comunication (based on allToAllRDAt).
-- The initial transformation is applied in the processes to obtain the values that will be reduced.
-- The final combine function is used to create a processes outputs from the initial input and the
-- gathered values.
allGatherRDAt :: forall a b c. (Trans a, Trans b, Trans c)
=> Places -- ^where to instantiate
-> (a -> b) -- ^initial transform function
-> (a -> [b] -> c) -- ^final combine function
-> [RD a] -> [RD c]
allGatherRDAt places t1 t2 = allToAllRDAt places t1' t2 where
t1' :: Int -> a -> [b]
t1' n x = replicate n (t1 x)
-- | Performs an all-reduce with the reduce function using a butterfly scheme.
-- The initial transformation is applied in the processes to obtain the values
-- that will be reduced. The final combine function is used to create a processes outputs.
-- result from the initial input and the reduced value.
allReduceRD :: forall a b c. (Trans a, Trans b, Trans c)
=> (a -> b) -- ^initial transform function
-> (b -> b -> b) -- ^reduce function
-> (a -> b -> c) -- ^final combine function
-> [RD a] -> [RD c]
allReduceRD = allReduceRDAt [0] where
-- | Performs an all-reduce with the reduce function using a butterfly scheme.
-- The initial transformation is applied in the processes to obtain the values
-- that will be reduced. The final combine function is used to create a processes output.
-- result from the initial input and the reduced value.
allReduceRDAt :: forall a b c. (Trans a, Trans b, Trans c)
=> Places -- ^where to instantiate
-> (a -> b) -- ^initial transform function
-> (b -> b -> b) -- ^reduce function
-> (a -> b -> c) -- ^final combine function
-> [RD a] -> [RD c]
allReduceRDAt places initF redF resF rdAs = rdCs where
steps = (ceiling . logBase 2 . fromIntegral . length) rdAs
(rdBss,rdCs) = steps `pseq` unzip $ parMapAt places (uncurry p) inp
inp = zip rdAs $ lazy $ buflyF $ transposeRt rdBss
buflyF = transposeRt . shiftFlipF steps . fillF steps
p :: RD a -> [Maybe (Both (RD b))] -> ([RD b], RD c)
p rdA rdBs = (rdBs'', res) where
res = release $ resF a $ reduced !! steps
rdBs'' = (releaseAll . take steps . lazy) reduced
reduced = scanl redF' b toReduce
toReduce = fetchAll' rdBs'
rdBs' = zipWith (flip maybe Left') (map Right' rdBs'') rdBs
b = initF a
a = fetch rdA
--List encoding:
-- Right': No Partner present, use value b without reduction
-- Left': RD value comes from partner, then inner encoding:
-- Right': Partner is positioned at the right hand side
-- Left': Partner is positioned at the left hand side
-- needed such that redF does not need to be commutativie
redF' :: b -> Either' (Both b) b -> b
redF' _ (Right' b) = b
redF' b (Left' (Right' b')) = redF b b'
redF' b (Left' (Left' b')) = redF b' b
type Both a = Either' a a
--custom fetchAll inside nested Eithers
fetchAll' :: Trans a => [Either' (Both (RD a)) (RD a)] -> [Either' (Both a) a]
fetchAll' = runPA . mapM fetchPA' where
fetchPA' (Left' (Left' rda)) = do a <- fetchPA rda
return $ Left' $ Left' a
fetchPA' (Left' (Right' rda)) = do a <- fetchPA rda
return $ Left' $ Right' a
fetchPA' (Right' rda) = do a <- fetchPA rda
return $ Right' a
--Fill rows to the power of ldn with Nothing, map Just to the rest
fillF :: Int -> [[a]] -> [[Maybe a]]
fillF ldn ass = map fillRow ass where
n = 2 ^ ldn
fillRow as = take n $ (map Just as) ++ (repeat Nothing)
shiftFlipF :: Int -> [[Maybe a]] -> [[Maybe (Both a)]]
shiftFlipF ldn rdBss = zipWith shiftFlipRow [1..ldn] rdBss where
shiftFlipRow ldi rdBs = (shuffle . flipAtHalfF . unshuffle i) rdBs where
i = 2 ^ ldi
flipAtHalfF xs = let (xs1, xs2) = splitAt (i`div`2) xs
in map (map (fmap Right')) xs2 ++ map (map (fmap Left')) xs1
-- | Performs an all-gather using a butterfly scheme (based on allReduceRDAt).
-- The initial transformation is applied in the processes to obtain the values that will be reduced.
-- The final combine function is used to create a processes outputs from the initial input and the
-- gathered values.
allGatherBuFlyRD :: forall a b c. (Trans a, Trans b, Trans c)
=> (a -> b) -- ^initial transform function
-> (a -> [b] -> c) -- ^final combine function
-> [RD a] -> [RD c]
allGatherBuFlyRD = allGatherBuFlyRDAt [0]
-- | Performs an all-gather using a butterfly scheme (based on allReduceRDAt).
-- The initial transformation is applied in the processes to obtain the values that will be reduced.
-- The final combine function is used to create a processes outputs from the initial input and the
-- gathered values.
allGatherBuFlyRDAt :: forall a b c. (Trans a, Trans b, Trans c)
=> Places -- ^where to instantiate
-> (a -> b) -- ^initial transform function
-> (a -> [b] -> c) -- ^final combine function
-> [RD a] -> [RD c]
allGatherBuFlyRDAt places t1 t2 = allReduceRDAt places t1' (++) t2 where
t1' :: a -> [b]
t1' a = [t1 a]
data Either' a b = Left' a | Right' b
deriving (Eq)
instance (NFData a, NFData b) => NFData (Either' a b) where
rnf (Left' x) = rnf x
rnf (Right' y) = rnf y
instance (Trans a,Trans b) => Trans (Either' a b)