{-# LANGUAGE CPP, ScopedTypeVariables #-} ----------------------------------------------------------------------------- -- | -- Module : Control.Parallel.Eden.EdenSkel.Auxiliary -- Copyright : (c) Philipps Universitaet Marburg 2009-2012 -- License : BSD-style (see the file LICENSE) -- -- Maintainer : eden@mathematik.uni-marburg.de -- Stability : beta -- Portability : not portable -- -- This Haskell module defines auxiliary functions for -- programming with the parallel functional language Eden. -- -- Depends on GHC. Using standard GHC, you will get a threaded simulation of Eden. -- Use the forked GHC-Eden compiler from http:\/\/www.mathematik.uni-marburg.de/~eden -- for a parallel build. -- -- Eden Group ( http:\/\/www.mathematik.uni-marburg.de/~eden ) #if defined(NOT_PARALLEL) #warning !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\ Eden BUILD WITH CONCURRENT HASKELL SIMULATION OF PARALLEL PRIMITIVES, \ DON'T EXPECT BIG SPEEDUPS! USE THE EDEN VERSION OF GHC FROM \ http://www.mathematik.uni-marburg.de/~eden \ FOR A PARALLEL BUILD \ !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!' #endif module Control.Parallel.Eden.EdenSkel.Auxiliary ( -- * Distribution and combine functions -- | ... of form: @ Int -> [a] -> [[a]] @ / @ [[a]] -> [a] @ unshuffle, shuffle, splitIntoN, unSplit, chunk, unchunk, -- * Distribution function for workpools distribute, -- * Lazy functions lazy, lazy1ZipWith, lazy2ZipWith, lazy1Zip, lazy2Zip, lazyTranspose, -- * other useful functions takeEach, transposeRt, -- * unLiftRDs unLiftRD, unLiftRD2, unLiftRD3, unLiftRD4, -- * More predefined Parallel Actions spawnPss, fetch2, fetchRDss, mergeS ) where import Data.List import Data.Maybe(maybeToList,mapMaybe) import Control.Concurrent import System.IO.Unsafe(unsafePerformIO,unsafeInterleaveIO) #if defined( __PARALLEL_HASKELL__ ) || defined (NOT_PARALLEL) import Control.Parallel.Eden #else import Control.Parallel.Eden.EdenConcHs #endif -- | Round robin distribution - inverse to shuffle -- unshuffle :: Int -- ^number of sublists -> [a] -- ^input list -> [[a]] -- ^distributed output unshuffle n xs = [takeEach n (drop i xs) | i <- [0..n-1]] takeEach :: Int -> [a] -> [a] takeEach n [] = [] takeEach n (x:xs) = x : takeEach n (drop (n-1) xs) -- | Simple shuffling - inverse to round robin distribution shuffle :: [[a]] -- ^ sublists -> [a] -- ^ shuffled sublists shuffle = concat . transpose -- | transpose for matrices of rectangular shape (rows of equal length). Top level list of the resulting matrix is defined as soon as the first row of the original matrix is closed. transposeRt :: [[a]] -> [[a]] transposeRt [] = [] transposeRt ([] : xss) = [] -- originally transpose xss -- keeps top level list open until all rows are done transposeRt ((x:xs) : xss) = (x : [h | (h:_) <- xss]) : transposeRt (xs : [ t | (_:t) <- xss]) -- | Block distribution, @ splitIntoN @ distributes one list on n lists with -- equal distribution ((+-1) without precondition on length). splitIntoN :: Int -- ^ number of blocks -> [a] -- ^list to be split -> [[a]] -- ^list of blocks splitIntoN n xs = chunkBalance (l `div` n) (l `mod` n) xs where l = length xs -- | Inverse function to @ splitIntoN @ - alias for concat. unSplit :: [[a]] -- ^list of blocks -> [a] -- ^restored list unSplit = concat -- | Creates a list of chunks of length @ d @. The first @ m @ chunks will -- contain an extra element. -- -- Result: list of chunks (blocks) chunkBalance :: Int -- ^@ d @: chunk-length -> Int -- ^@ m @: number of bigger blocks -> [a] -- ^list to be split -> [[a]] -- ^list of chunks (blocks) chunkBalance d = chunk' where chunk' _ [] = [] chunk' 0 xs = ys : chunk' 0 zs where (ys,zs) = splitAt d xs chunk' m xs = ys : chunk' (m-1) zs where (ys,zs) = splitAt (d+1) xs -- | Creates a list of chunks of length @ d@ . -- -- Result: list of chunks (blocks) chunk :: Int -- ^@ d @: chunk-length -> [a] -- ^list to be split -> [[a]] -- ^list of chunks (blocks) chunk l = chunkBalance l 0 -- | Inverse function to @ chunk @ - alias for concat. unchunk :: [[a]] -- ^list of chunks -> [a] -- ^restored list unchunk = concat -- | Task distribution according to worker requests. -- distribute :: Int -- ^number of workers -> [Int] -- ^ request stream (worker IDs ranging from 0 to n-1) -> [t] -- ^ task list -> [[t]] -- ^ task distribution, each inner list for one worker distribute np reqs tasks = [taskList reqs tasks n | n<-[0..np-1]] where taskList (r:rs) (t:ts) pe | pe == r = t:(taskList rs ts pe) | otherwise = taskList rs ts pe taskList _ _ _ = [] ----------------------------Lazy Functions---------------------------- -- | A lazy list is an infinite stream lazy :: [a] -> [a] lazy ~(x:xs) = x : lazy xs -- |lazy in first argument lazy1ZipWith :: (a->b->c) -> [a] -> [b] -> [c] lazy1ZipWith f xs = zipWith f (lazy xs) -- |lazy in second argument lazy2ZipWith :: (a->b->c) -> [a] -> [b] -> [c] lazy2ZipWith f xs ys = zipWith f xs (lazy ys) -- |lazy in first argument lazy1Zip :: [a] -> [b] -> [(a,b)] lazy1Zip xs ys = zip (lazy xs) ys -- |lazy in second argument lazy2Zip :: [a] -> [b] -> [(a,b)] lazy2Zip xs ys = zip xs (lazy ys) -- |lazy in tail lists lazyTranspose :: [[a]] -> [[a]] lazyTranspose = foldr (lazy2ZipWith (:)) (repeat []) ---------------------------unLiftRDs------------------------------ unLiftRD :: (Trans a, Trans b) => (RD a -> RD b) -- ^ Function to be unlifted -> a -- ^ input -> b -- ^ output unLiftRD f = fetch . f . release -- | see @liftRD@ unLiftRD2 :: (Trans a, Trans b, Trans c) => (RD a -> RD b -> RD c) -- ^ Function to be unlifted -> a -- ^ First input -> b -- ^ Second input -> c -- ^ output unLiftRD2 f x = unLiftRD (f (release x)) -- | see @liftRD@ unLiftRD3 :: (Trans a, Trans b, Trans c, Trans d) => (RD a -> RD b -> RD c -> RD d) -> a -> b -> c -> d unLiftRD3 f x = unLiftRD2 (f (release x)) -- | see @liftRD@ unLiftRD4 :: (Trans a, Trans b, Trans c, Trans d, Trans e) => (RD a -> RD b -> RD c -> RD d -> RD e) -> a -> b -> c -> d -> e unLiftRD4 f x = unLiftRD3 (f (release x)) ---------------------------Parallel Actions--------------------------------- -- | Spawn a matrix of processes spawnPss :: (Trans a, Trans b) => [[Process a b]] -> [[a]] -> [[b]] spawnPss pss xss = runPA $ sequence $ zipWith3 (\is ps xs -> sequence (zipWith3 instantiateAt is ps xs)) iss pss xss where iss = (unshuffle (length (zip pss xss)) [selfPe+1..]) -- | Fetch two Remote Data values fetch2 :: (Trans a, Trans b) => RD a -> RD b -> (a,b) fetch2 a b = runPA $ do a' <- fetchPA a b' <- fetchPA b return (a',b') -- | Fetch a matrix of Remote Data fetchRDss :: Trans a => [[RD a]] -> [[a]] fetchRDss rda = runPA $ mapM (mapM fetchPA) rda --------------------merge variant mergeS----------------- -- | A variant of non-deterministic list merging, which applies a strategy to list elements prior to merging them and stops the additional merge thread (the suckIO_S thread) when only one input stream is left. mergeS:: [[a]] -> Strategy a -> [a] mergeS l st = unsafePerformIO (nmergeIO_S l st) nmergeIO_S :: [[a]] -> Strategy a -> IO [a] nmergeIO_S lss st = let len = length lss in newEmptyMVar >>= \ tail_node -> newMVar tail_node >>= \ tail_list -> newQSem max_buff_size >>= \ e -> newMVar len >>= \ branches_running -> let buff = (tail_list,e) in mapIO (\ x -> forkIO (suckIO_S branches_running buff x st)) lss >> takeMVar tail_node >>= \ val -> signalQSem e >> return val where mapIO f xs = sequence (map f xs) suckIO_S :: MVar Int -> Buffer a -> [a] -> Strategy a -> IO () suckIO_S branches_running buff@(tail_list,e) vs st = do count <- takeMVar branches_running if count == 1 then takeMVar tail_list >>= \ node -> putMVar node vs >> putMVar tail_list node else putMVar branches_running count >> case vs of [] -> takeMVar branches_running >>= \ val -> if val == 1 then takeMVar tail_list >>= \ node -> putMVar node [] >> putMVar tail_list node else putMVar branches_running (val-1) (x:xs) -> (st x `seq` waitQSem e) >> takeMVar tail_list >>= \ node -> newEmptyMVar >>= \ next_node -> unsafeInterleaveIO ( takeMVar next_node >>= \ y -> signalQSem e >> return y) >>= \ next_node_val -> putMVar node (x:next_node_val) >> putMVar tail_list next_node >> suckIO_S branches_running buff xs st type Buffer a = (MVar (MVar [a]), QSem) max_buff_size :: Int max_buff_size = 1