{-# LANGUAGE CPP, ScopedTypeVariables #-} ----------------------------------------------------------------------------- -- | -- Module : Control.Parallel.Eden.EdenSkel.TopoSkels -- Copyright : (c) Philipps Universitaet Marburg 2009-2012 -- License : BSD-style (see the file LICENSE) -- -- Maintainer : eden@mathematik.uni-marburg.de -- Stability : beta -- Portability : not portable -- -- This Haskell module defines topology skeletons for the parallel functional -- language Eden. Topology skeletons are skeletons that implement a network of -- processes interconnected by a characteristic communication topology. -- -- Depends on GHC. Using standard GHC, you will get a threaded simulation of Eden. -- Use the forked GHC-Eden compiler from http:\/\/www.mathematik.uni-marburg.de/~eden -- for a parallel build. -- -- Eden Group ( http:\/\/www.mathematik.uni-marburg.de/~eden ) module Control.Parallel.Eden.EdenSkel.TopoSkels ( -- * Skeletons that are primarily characterized by their topology. -- ** Pipeline skeletons -- | pipe, pipeRD -- ** Ring skeletons -- | ,ringSimple, ring, ringFl, ringAt, ringFlAt -- ** Torus skeleton -- | ,torus -- ** The Hypercube skeleton -- | -- ** The All-To-All skeleton -- | The allToAll skeleton allows distributed data exchange and -- transformation including data of all processes. Input and output -- are provided as remote data. A typical application is the -- distributed transposition of a distributed matrix. ,allToAllRD, parTransposeRD -- ** The All-Reduce skeleton -- | The All-Reduce skeleton uses a butterfly topology to reduce the data of -- participating processes P in log(|P|) communication stages. Input -- and output are provided as remote data. -- -- Notice: The number of processes has to be a power of 2! ,allReduceRD ) where #if defined( __PARALLEL_HASKELL__ ) || defined (NOT_PARALLEL) import Control.Parallel.Eden #else import Control.Parallel.Eden.EdenConcHs #endif import Control.Parallel.Eden.EdenSkel.Auxiliary import Control.Parallel.Eden.EdenSkel.MapSkels import Data.List -- |Simple pipe where the parent process creates all pipe processes. The processes communicate their results via the caller process. pipe :: forall a . Trans a => [a -> a] -- ^functions of the pipe -> a -- ^input -> a -- ^output pipe fs = unLiftRD (pipeRD fs) -- |Process pipe where the processes communicate their Remote Data handles via the caller process but fetch the actual data from their predecessor processes pipeRD :: forall a . Trans a => [a -> a] -- ^functions of the pipe -> RD a -- ^remote input -> RD a -- ^remote output pipeRD fs xs = (last outs) where outs = spawn ps $ lazy $ xs : outs ps :: [Process (RD a) (RD a)] ps = map (process . liftRD) fs -- | Simple ring skeleton (tutorial version) -- using remote data for providing direct inter-ring communication -- without input distribution and output combination ringSimple :: (Trans i, Trans o, Trans r) => (i -> r -> (o,r)) -- ^ ring process function -> [i] -> [o] -- ^ input output mapping ringSimple f is = os where (os,ringOuts) = unzip (parMap (toRD $ uncurry f) (zip is $ lazy ringIns)) ringIns = rightRotate ringOuts toRD :: (Trans i, Trans o, Trans r) => ((i,r) -> (o,r)) -- ^ ring process function -> ((i, RD r) -> (o, RD r)) -- ^ -- with remote data toRD f (i, ringIn) = (o, release ringOut) where (o, ringOut) = f (i, fetch ringIn) rightRotate :: [a] -> [a] rightRotate [] = [] rightRotate xs = last xs : init xs -- | The ringFlAt establishes a ring topology, the ring process functions -- transform the initial input of a ring process and the input stream from the ring into the -- ring output stream and the ring processes' final result. Every ring process -- applies its individual function which e.g. allows to route individual offline input into the -- ring processes. This version uses explicit placement. ringFlAt :: (Trans a,Trans b,Trans r) => Places -- ^where to put workers -> (i -> [a]) -- ^distribute input -> ([b] -> o) -- ^combine output -> [(a -> [r] -> (b,[r]))] -- ^ring process fcts -> i -- ^ring input -> o -- ^ring output ringFlAt places distrib combine fs i = combine os where (os, ringOuts) = unzip $ spawnFAt places (map (toRD . uncurry) fs) (zip (distrib i) $ lazy ringIns) ringIns = rightRotate ringOuts -- | The ringFl establishes a ring topology, the ring process functions -- transform the initial input of a ring process and the input stream from the ring into the -- ring output stream and the ring processes' final result. Every ring process -- applies an individual function which e.g. allows to route individual offline input into the -- ring processes. Use ringFlAt if explicit placement is desired. ringFl :: (Trans a,Trans b,Trans r) => (i -> [a]) -- ^distribute input -> ([b] -> o) -- ^combine output -> [(a -> [r] -> (b,[r]))] -- ^ring process fcts -> i -- ^ring input -> o -- ^ring output ringFl = ringFlAt [0] -- | Skeleton @ringAt@ establishes a ring topology, the ring process function -- transforms the initial input of a ring process and the input stream from the ring into the -- ring output stream and the ring processes' final result. The -- same function is used by every ring process. Use ringFlAt -- if you need different functions in the processes. This version uses explicit placement. ringAt :: (Trans a,Trans b,Trans r) => Places -- ^where to put workers -> (i -> [a]) -- ^distribute input -> ([b] -> o) -- ^combine output -> (a -> [r] -> (b,[r])) -- ^ring process fct -> i -- ^ring input -> o -- ^ring output ringAt places distrib combine f i = ringFlAt places distrib combine [f] i -- | The ring establishes a ring topology, the ring process function -- transforms the initial input of a ring process and the input stream from the ring into the -- ring output stream and the ring processes final result. The -- same function is used by every ring process. Use ringFl -- if you need different functions in the processes. Use ringAt if -- explicit placement is desired. ring :: (Trans a,Trans b,Trans r) => (i -> [a]) -- ^distribute input -> ([b] -> o) -- ^combine output -> (a -> [r] -> (b,[r])) -- ^ring process fct -> i -- ^ring input -> o -- ^ring output ring = ringAt [0] -- | Parallel torus skeleton (tutorial version) with stream rotation in 2 directions: initial inputs for each torus element are given. The node function is used on each torus element to transform the initial input and a stream of inputs from each direction to a stream of outputs to each direction. Each torus input should have the same size in both dimensions, otherwise the smaller input will determine the size of the torus. torus :: (Trans a, Trans b, Trans c, Trans d) => (c -> [a] -> [b] -> (d,[a],[b])) -- ^ node function -> [[c]] -> [[d]] -- ^ input-output mapping torus f inss = outss where t_outss = spawnPss (repeat (repeat (ptorus f))) t_inss -- optimised (outss,outssA,outssB) = unzip3 (map unzip3 t_outss) inssA = map rightRotate outssA inssB = rightRotate outssB t_inss = zipWith3 lazyzip3 inss (lazy inssA) (lazy inssB) lazyzip3 as bs cs = zip3 as (lazy bs) (lazy cs) -- each individual process of the torus (tutorial version) ptorus :: (Trans a, Trans b, Trans c, Trans d) => (c -> [a] -> [b] -> (d,[a],[b])) -> Process (c,RD [a],RD [b]) (d,RD [a],RD [b]) ptorus f = process (\ (fromParent, inA, inB) -> let (toParent, outA, outB) = f fromParent inA' inB' (inA',inB') = fetch2 inA inB in (toParent, release outA, release outB)) -- | The allToAllRD skeleton creates as many processes as elements in the input list (@np@). -- The processes are all-to-all connected, each process' input is transformed to -- @np@ intermediate values by the first parameter function, where the @i@-th value -- will be sent to process @i@. The second transformation function combines the initial -- input and the @np@ received intermediate values to the final output. allToAllRD :: forall a b i. (Trans a, Trans b, Trans i) => (Int -> a -> [i]) -- ^transform before bcast (num procs, input, sync-data out) -> (a -> [i] ->b) -- ^transform after bcast (input, sync-data in, output) -> [RD a] -- ^remote input for each process -> [RD b] -- ^remote output for each process allToAllRD t1 t2 xs = res where n = length xs --same amount of procs as #xs (res,iss) = n `pseq` unzip $ parMap (uncurry p) inp inp = zip xs $ lazy $ transpose iss p :: RD a-> [RD i]-> (RD b,[RD i]) p xRD theirIs = (resF theirIs, myIsF x) where x = fetch xRD myIsF = releaseAll . t1 n resF = release . t2 x . fetchAll -- works similar for splitIntoN and unsplit (concat)??? -- |Parallel transposition for matrices which are row-wise round robin distributed among the machines, the transposed result matrix is also row-wise round robin distributed. parTransposeRD :: Trans b => [RD [[b]]] -- ^input list of remote partial matrices -> [RD [[b]]] -- ^output list of remote partial matrices parTransposeRD = allToAllRD (\ n -> unshuffle n . transpose) (\ _ -> map shuffle . transpose) -- | Performs an all-reduce with the reduce function using a butterfly scheme. -- The input list should have length 2^i, where i is an arbitrary natural number. -- If not, the input list will be truncated to the next smaller power of two. -- The initial transformation is applied in the processes to obtain the values -- that will be reduced. The final combine function is used to create a processes -- result from the initial input and the reduced value. allReduceRD :: forall a b c. (Trans a, Trans b, Trans c) => (a -> b) -- ^initial transform function -> (b -> b -> b) -- ^reduce function -> (a -> b -> c) -- ^final combine function -> [RD a] -- ^remote input -> [RD c] -- ^remote output allReduceRD initF redF resF rdAs = rdCs where steps = (floor . logBase 2 . fromIntegral . length) rdAs rdAs' = take (2^steps) rdAs --cut input to power of 2 -- topology, inputs and instantiation (rdBss,rdCs) = unzip $ parMap (uncurry p) inp --steps in rows bufly = zipWith bitFlipF [1..steps] $ transpose rdBss inp = zip rdAs' $ lazy $ transpose bufly --steps in cols -- process functionality and abstraction p :: RD a -> [RD b] -> ([RD b], RD c) p rdA theirReds = (reduced, release $ resF a $ head res) where reduced = (releaseAll . scanl1 redF) toReduce (toReduce,res) = splitAt steps $ initF a : fetchAll theirReds' theirReds' = zipWith (curry snd) [1..steps] $ lazy theirReds a = fetch rdA bitFlipF :: Int -> [a] -> [a] bitFlipF step xs = (shuffle . flipAtHalfF . unshuffle d) xs where d = (2 ^ step) flipAtHalfF xs = let (xs1, xs2) = splitAt (d `div` 2) xs in xs2 ++ xs1