{-# LANGUAGE CPP, ScopedTypeVariables #-}
-----------------------------------------------------------------------------
-- |
-- Module      :  Control.Parallel.Eden.Topology
-- Copyright   :  (c) Philipps Universitaet Marburg 2009-2014
-- License     :  BSD-style (see the file LICENSE)
-- 
-- Maintainer  :  eden@mathematik.uni-marburg.de
-- Stability   :  beta
-- Portability :  not portable
--
-- This Haskell module defines topology skeletons for the parallel functional
-- language Eden. Topology skeletons are skeletons that implement a network of
-- processes interconnected by a characteristic communication topology.
--
-- Depends on GHC. Using standard GHC, you will get a threaded simulation of Eden. 
-- Use the forked GHC-Eden compiler from http:\/\/www.mathematik.uni-marburg.de/~eden 
-- for a parallel build.
--
-- Eden Group ( http:\/\/www.mathematik.uni-marburg.de/~eden )


module Control.Parallel.Eden.Topology (
  -- * Skeletons that are primarily characterized by their topology.
    
  -- ** Pipeline skeletons
  -- |
  pipe, pipeRD
  -- ** Ring skeletons
  -- |
  ,ringSimple, ring, ringFl, ringAt, ringFlAt
  -- ** Torus skeleton
  -- |  
  ,torus 
  -- ** The Hypercube skeleton
  -- |

    -- ** The All-To-All skeleton 
  -- |The allToAll skeleton allows distributed data exchange and
  -- transformation including data of all processes. Input and output
  -- are provided as remote data. A typical application is the
  -- distributed transposition of a distributed Martrix.
  ,allToAllRDAt, allToAllRD, parTransposeRDAt, parTransposeRD, allGatherRDAt, allGatherRD     
  -- ** The All-Reduce skeleton   
  -- |The skeleton uses a butterfly topology to reduce the data of
  -- participating processes P in log(|P|) communication stages. Input
  -- and output are provided as remote data.
  ,allReduceRDAt, allReduceRD, allGatherBuFlyRDAt, allGatherBuFlyRD  

  ) where
#if defined( __PARALLEL_HASKELL__ ) || defined (NOT_PARALLEL)
import Control.Parallel.Eden
#else
import Control.Parallel.Eden.EdenConcHs
#endif
import Control.Parallel.Eden.Auxiliary
import Control.Parallel.Eden.Map
import Data.List

   
   
-- |Simple pipe where the parent process creates all pipe processes. The processes communicate their results via the caller process. 
pipe :: forall a . Trans a => 
        [a -> a]    -- ^functions of the pipe
        -> a        -- ^input
        -> a        -- ^output
pipe fs = unLiftRD (pipeRD fs)
  
-- |Process pipe where the processes communicate their Remote Data handles via the caller process but fetch the actual data from their predecessor processes
pipeRD :: forall a . Trans a => 
          [a -> a]    -- ^functions of the pipe
          -> RD a     -- ^remote input
          -> RD a     -- ^remote output
pipeRD fs xs = (last outs) where 
  outs = spawn ps $ lazy $ xs : outs
  ps :: [Process (RD a) (RD a)]
  ps = map (process . liftRD) fs


-- | Simple ring skeleton (tutorial version) 
-- using remote data for providing direct inter-ring communication  
-- without input distribution and output combination  
ringSimple      :: (Trans i, Trans o, Trans r) =>
               (i -> r -> (o,r))  -- ^ ring process function
               -> [i] -> [o]      -- ^ input output mapping
ringSimple f is =  os
  where
    (os,ringOuts)  = unzip (parMap (toRD $ uncurry f)
                                   (zip is $ lazy ringIns))
    ringIns        = rightRotate ringOuts

toRD :: (Trans i, Trans o, Trans r) =>
        ((i,r) -> (o,r))          -- ^ ring process function
        -> ((i, RD r) -> (o, RD r)) -- ^ -- with remote data
toRD  f (i, ringIn)  = (o, release ringOut)
  where (o, ringOut) = f (i, fetch ringIn)

rightRotate    :: [a] -> [a]
rightRotate [] =  []
rightRotate xs =  last xs : init xs

-- | The ringFlAt establishes a ring topology, the ring process functions
-- transform the initial input of a ring process and the input stream from the ring into the 
-- ring output stream and the ring processes' final result. Every ring process  
-- applies its individual function which e.g. allows to route individual offline input into the 
-- ring processes. This version uses explicit placement.
ringFlAt :: (Trans a,Trans b,Trans r) =>
        Places                     -- ^where to put workers
        -> (i -> [a])              -- ^distribute input
        -> ([b] -> o)              -- ^combine output
        -> [(a -> [r] -> (b,[r]))] -- ^ring process fcts
        -> i                       -- ^ring input
        -> o                       -- ^ring output
ringFlAt places distrib combine fs i = combine os where
  (os, ringOuts) = unzip $ spawnFAt places (map (toRD . uncurry) fs) 
                                           (zip (distrib i) $ lazy ringIns)
  ringIns        = rightRotate ringOuts

-- | The ringFl establishes a ring topology, the ring process functions
-- transform the initial input of a ring process and the input stream from the ring into the 
-- ring output stream and the ring processes' final result. Every ring process 
-- applies an individual function which e.g. allows to route individual offline input into the 
-- ring processes. Use ringFlAt if explicit placement is desired.
ringFl  :: (Trans a,Trans b,Trans r) =>
        (i -> [a])                 -- ^distribute input
        -> ([b] -> o)              -- ^combine output
        -> [(a -> [r] -> (b,[r]))] -- ^ring process fcts
        -> i                       -- ^ring input
        -> o                       -- ^ring output
ringFl = ringFlAt [0]

-- | Skeleton @ringAt@ establishes a ring topology, the ring process function
-- transforms the initial input of a ring process and the input stream from the ring into the 
-- ring output stream and the ring processes' final result. The 
-- same function is used by every ring process. Use ringFlAt
-- if you need different functions in the processes. This version uses explicit placement.
ringAt :: (Trans a,Trans b,Trans r) =>
        Places                    -- ^where to put workers
        -> (i -> [a])             -- ^distribute input
        -> ([b] -> o)             -- ^combine output
        -> (a -> [r] -> (b,[r]))  -- ^ring process fct
        -> i                      -- ^ring input
        -> o                      -- ^ring output 
ringAt places distrib combine f i =
  ringFlAt places distrib combine [f] i 

-- | The ring establishes a ring topology, the ring process function
-- transforms the initial input of a ring process and the input stream from the ring into the 
-- ring output stream and the ring processes final result. The 
-- same function is used by every ring process. Use ringFl
-- if you need different functions in the processes. Use ringAt if 
-- explicit placement is desired.
ring :: (Trans a,Trans b,Trans r) =>
        (i -> [a])                -- ^distribute input
        -> ([b] -> o)             -- ^combine output
        -> (a -> [r] -> (b,[r]))  -- ^ring process fct
        -> i                      -- ^ring input
        -> o                      -- ^ring output
ring = ringAt [0]



-- | Parallel torus skeleton (tutorial version) with stream rotation in 2 directions: initial inputs for each torus element are given. The node function is used on each torus element to transform the initial input and a stream of inputs from each direction to a stream of outputs to each direction. Each torus input should have the same size in both dimensions, otherwise the smaller input will determine the size of the torus.
torus :: (Trans a, Trans b, Trans c, Trans d) =>
         (c -> [a] -> [b] -> (d,[a],[b])) -- ^ node function
         -> [[c]] -> [[d]]                -- ^ input-output mapping
torus f inss = outss
  where
    t_outss = spawnPss (repeat (repeat (ptorus f))) t_inss    -- optimised
    (outss,outssA,outssB) = unzip3 (map unzip3 t_outss)
    inssA   = map rightRotate outssA
    inssB   = rightRotate outssB
    t_inss  = zipWith3 lazyzip3 inss (lazy inssA) (lazy inssB)
    lazyzip3 as bs cs = zip3 as (lazy bs) (lazy cs)

-- each individual process of the torus (tutorial version)
ptorus :: (Trans a, Trans b, Trans c, Trans d) =>
          (c -> [a] -> [b] -> (d,[a],[b])) ->
          Process (c,RD [a],RD [b])
                  (d,RD [a],RD [b])
ptorus f 
 = process (\ (fromParent,          inA,           inB) ->
               let (toParent, outA, outB) = f fromParent inA' inB'
                   (inA',inB')            = fetch2 inA inB
               in  (toParent,   release outA,  release outB))

-- | The skeleton creates as many processes as elements in the input list (@np@). 
-- The processes get all-to-all connected, each process input is transformed to 
-- @np@ intermediate values by the first parameter function, where the @i@-th value
-- will be send to process @i@. The second transformation function combines the initial
-- input and the @np@ received intermediate values to the final output.
allToAllRD :: forall a b i. (Trans a, Trans b, Trans i) 
                => (Int -> a -> [i]) -- ^transform before bcast (num procs, input, sync-data out)
                -> (a -> [i] ->b)    -- ^transform after bcast (input, sync-data in, output)
                -> [RD a]            -- ^remote input for each process
                -> [RD b]            -- ^remote output for each process
allToAllRD = allToAllRDAt [0]

-- | The skeleton creates as many processes as elements in the input list (@np@). 
-- The processes get all-to-all connected, each process input is transformed to 
-- @np@ intermediate values by the first parameter function, where the @i@-th value
-- will be send to process @i@. The second transformation function combines the initial
-- input and the @np@ received intermediate values to the final output.
allToAllRDAt :: forall a b i. (Trans a, Trans b, Trans i) 
                => Places            -- ^where to instantiate
                -> (Int -> a -> [i]) -- ^transform before bcast (num procs, input, sync-data out)
                -> (a -> [i] ->b)    -- ^transform after bcast (input, sync-data in, output)
                -> [RD a]            -- ^remote input for each process
                -> [RD b]            -- ^remote output for each process
allToAllRDAt places t1 t2 xs = res where
  n = length xs           --same amount of procs as #xs
  (res,iss) = n `pseq` unzip $ parMapAt places (uncurry p) inp
  inp       = zip xs $ lazy $ transpose iss

  p :: RD a-> [RD i]-> (RD b,[RD i])
  p xRD theirIs = (resF theirIs, myIsF x) where
    x      = fetch xRD
    myIsF  = releaseAll . t1 n
    resF   = release . t2 x . fetchAll

-- works similar for splitIntoN and unsplit (concat)??? 
-- |Parallel transposition for matrizes which are row-wise round robin distributed among the machines, the transposed result matrix is also row-wise round robin distributed.
parTransposeRD :: Trans b 
                  => [RD [[b]]] -- ^input list of remote partial matrizes
                  -> [RD [[b]]] -- ^output list of remote partial matrizes
parTransposeRD = parTransposeRDAt [0]


-- works similar for splitIntoN and unsplit (concat)??? 
-- |Parallel transposition for matrizes which are row-wise round robin distributed among the machines, the transposed result matrix is also row-wise round robin distributed.
parTransposeRDAt :: Trans b 
                    => Places
                    -> [RD [[b]]] -- ^input list of remote partial matrizes
                    -> [RD [[b]]] -- ^output list of remote partial matrizes
parTransposeRDAt places = allToAllRDAt places (\ n -> unshuffle n . transpose)
                                              (\ _ -> map shuffle . transpose)

-- | Performs an all-gather using all to all comunication (based on allToAllRDAt). 
-- The initial transformation is applied in  the processes to obtain the values that will be reduced.
-- The final combine function is used to create a processes outputs from the initial input and the 
-- gathered values.
allGatherRD :: forall a b c. (Trans a, Trans b, Trans c)
               => (a -> b)         -- ^initial transform function
               -> (a -> [b] -> c)  -- ^final combine function
               -> [RD a] -> [RD c]
allGatherRD = allGatherRDAt [0]

-- | Performs an all-gather using all to all comunication (based on allToAllRDAt).
-- The initial transformation is applied in  the processes to obtain the values that will be reduced.
-- The final combine function is used to create a processes outputs from the initial input and the 
-- gathered values.
allGatherRDAt :: forall a b c. (Trans a, Trans b, Trans c)
                      => Places           -- ^where to instantiate
                      -> (a -> b)         -- ^initial transform function
                      -> (a -> [b] -> c)  -- ^final combine function
                      -> [RD a] -> [RD c]
allGatherRDAt places t1 t2 = allToAllRDAt places t1' t2 where
  t1' :: Int -> a -> [b]
  t1' n x = replicate n (t1 x)


-- | Performs an all-reduce with the reduce function using a butterfly scheme.
-- The initial transformation is applied in the processes to obtain the values
-- that will be reduced. The final combine function is used to create a processes outputs.
-- result from the initial input and the reduced value.
allReduceRD :: forall a b c. (Trans a, Trans b, Trans c)
               => (a -> b)       -- ^initial transform function
               -> (b -> b -> b)  -- ^reduce function
               -> (a -> b -> c)  -- ^final combine function
               -> [RD a] -> [RD c]
allReduceRD = allReduceRDAt [0] where


-- | Performs an all-reduce with the reduce function using a butterfly scheme.
-- The initial transformation is applied in the processes to obtain the values
-- that will be reduced. The final combine function is used to create a processes output.
-- result from the initial input and the reduced value.
allReduceRDAt :: forall a b c. (Trans a, Trans b, Trans c)
               => Places         -- ^where to instantiate
               -> (a -> b)       -- ^initial transform function
               -> (b -> b -> b)  -- ^reduce function
               -> (a -> b -> c)  -- ^final combine function
               -> [RD a] -> [RD c]
allReduceRDAt places initF redF resF rdAs = rdCs where
  steps = (ceiling . logBase 2 . fromIntegral . length) rdAs
  (rdBss,rdCs) = steps `pseq` unzip $ parMapAt places (uncurry p) inp
  inp          = zip rdAs $ lazy $ buflyF $ transposeRt rdBss
  buflyF       = transposeRt . shiftFlipF steps . fillF steps
  
  p :: RD a -> [Maybe (Both (RD b))] -> ([RD b], RD c)
  p rdA rdBs = (rdBs'', res) where
    res      = release $ resF a $ reduced !! steps
    rdBs''   = (releaseAll . take steps . lazy) reduced
    reduced  = scanl redF' b toReduce
    toReduce = fetchAll' rdBs'
    rdBs'    = zipWith (flip maybe Left) (map Right rdBs'') rdBs
    b        = initF a
    a        = fetch rdA
  
  --List encoding:
  -- Right: No Partner present, use value b without reduction
  -- Left: RD value comes from partner, then inner encoding:
  --       Right: Partner is positioned at the right hand side
  --       Left: Partner is positioned at the left hand side
  -- needed such that redF does not need to be commutativie
  redF' :: b -> Either (Both b) b -> b
  redF' _ (Right b) = b
  redF' b (Left (Right b')) = redF b b'
  redF' b (Left (Left b'))  = redF b' b

type Both a = Either a a

--custom fetchAll inside nested Eithers
fetchAll' :: Trans a => [Either (Both (RD a)) (RD a)] -> [Either (Both a) a]
fetchAll' = runPA . mapM fetchPA' where
  fetchPA' (Left (Left rda))  = do a <- fetchPA rda
                                   return $ Left $ Left a
  fetchPA' (Left (Right rda)) = do a <- fetchPA rda
                                   return $ Left $ Right a
  fetchPA' (Right rda)        = do a <- fetchPA rda
                                   return $ Right a

--Fill rows to the power of ldn with Nothing, map Just to the rest
fillF :: Int -> [[a]] -> [[Maybe a]]
fillF ldn ass = map fillRow ass where
  n = 2 ^ ldn
  fillRow as = take n $ (map Just as) ++ (repeat Nothing)

shiftFlipF :: Int -> [[Maybe a]] -> [[Maybe (Both a)]]
shiftFlipF ldn rdBss = zipWith shiftFlipRow [1..ldn] rdBss  where  
  shiftFlipRow ldi rdBs = (shuffle . flipAtHalfF . unshuffle i) rdBs where
    i = 2 ^ ldi
    flipAtHalfF xs = let (xs1, xs2) = splitAt (i`div`2) xs 
                     in map (map (fmap Right)) xs2 ++ map (map (fmap Left)) xs1


-- | Performs an all-gather using a butterfly scheme (based on allReduceRDAt). 
-- The initial transformation is applied in  the processes to obtain the values that will be reduced.
-- The final combine function is used to create a processes outputs from the initial input and the 
-- gathered values.
allGatherBuFlyRD :: forall a b c. (Trans a, Trans b, Trans c)
                    => (a -> b)         -- ^initial transform function
                    -> (a -> [b] -> c)  -- ^final combine function
                    -> [RD a] -> [RD c]
allGatherBuFlyRD = allGatherBuFlyRDAt [0]

-- | Performs an all-gather using a butterfly scheme (based on allReduceRDAt). 
-- The initial transformation is applied in  the processes to obtain the values that will be reduced.
-- The final combine function is used to create a processes outputs from the initial input and the 
-- gathered values.
allGatherBuFlyRDAt :: forall a b c. (Trans a, Trans b, Trans c)
                      => Places           -- ^where to instantiate
                      -> (a -> b)         -- ^initial transform function
                      -> (a -> [b] -> c)  -- ^final combine function
                      -> [RD a] -> [RD c]
allGatherBuFlyRDAt places t1 t2 = allReduceRDAt places t1' (++) t2 where
  t1' :: a -> [b]
  t1' a = [t1 a]