module Control.Parallel.HdpH.Internal.Sparkpool
(
SparkM,
run,
liftCommM,
liftIO,
blockSched,
wakeupSched,
getSpark,
putSpark,
Msg(..),
dispatch,
handleFISH,
handleSCHEDULE,
handleNOWORK,
readPoolSize,
readFishSentCtr,
readSparkRcvdCtr,
readMaxSparkCtr,
readSparkGenCtr,
readSparkConvCtr
) where
import Prelude hiding (error)
import Control.Concurrent (threadDelay)
import Control.DeepSeq (NFData, rnf)
import Control.Monad (unless, when, replicateM_)
import Control.Monad.Reader (ReaderT, runReaderT, ask)
import Control.Monad.Trans (lift)
import Data.Functor ((<$>))
import Data.IORef (IORef, newIORef, readIORef, writeIORef, atomicModifyIORef)
import Data.Serialize (Serialize)
import qualified Data.Serialize (put, get)
import Data.Set (Set)
import qualified Data.Set as Set (size, fromList, singleton, notMember)
import Data.Word (Word8)
import System.Random (randomRIO)
import Control.Parallel.HdpH.Conf
(RTSConf(maxHops, maxFish, minSched, minFishDly, maxFishDly))
import Control.Parallel.HdpH.Internal.Comm (CommM)
import qualified Control.Parallel.HdpH.Internal.Comm as Comm
(liftIO, send, nodes, myNode, allNodes)
import Control.Parallel.HdpH.Internal.Data.Deque
(DequeIO, emptyIO, pushFrontIO, pushBackIO, popFrontIO, popBackIO,
lengthIO, maxLengthIO)
import Control.Parallel.HdpH.Internal.Data.Sem (Sem)
import qualified Control.Parallel.HdpH.Internal.Data.Sem as Sem (wait, signal)
import Control.Parallel.HdpH.Internal.Location
(NodeId, dbgMsgSend, dbgSpark, error)
import qualified Control.Parallel.HdpH.Internal.Location as Location (debug)
import Control.Parallel.HdpH.Internal.Misc (encodeLazy, ActionServer, reqAction)
import Control.Parallel.HdpH.Internal.Type.Par (Spark)
type SparkM m = ReaderT (State m) CommM
data State m =
State {
s_conf :: RTSConf,
s_pool :: DequeIO (Spark m),
s_sparkOrig :: IORef (Maybe NodeId),
s_fishing :: IORef Bool,
s_noWork :: ActionServer,
s_idleScheds :: Sem,
s_fishSent :: IORef Int,
s_sparkRcvd :: IORef Int,
s_sparkGen :: IORef Int,
s_sparkConv :: IORef Int }
run :: RTSConf -> ActionServer -> Sem -> SparkM m a -> CommM a
run conf noWorkServer idleSem action = do
pool <- Comm.liftIO $ emptyIO
sparkOrig <- Comm.liftIO $ newIORef Nothing
fishing <- Comm.liftIO $ newIORef False
fishSent <- Comm.liftIO $ newIORef 0
sparkRcvd <- Comm.liftIO $ newIORef 0
sparkGen <- Comm.liftIO $ newIORef 0
sparkConv <- Comm.liftIO $ newIORef 0
let s0 = State { s_conf = conf,
s_pool = pool,
s_sparkOrig = sparkOrig,
s_fishing = fishing,
s_noWork = noWorkServer,
s_idleScheds = idleSem,
s_fishSent = fishSent,
s_sparkRcvd = sparkRcvd,
s_sparkGen = sparkGen,
s_sparkConv = sparkConv }
runReaderT action s0
liftCommM :: CommM a -> SparkM m a
liftCommM = lift
liftIO :: IO a -> SparkM m a
liftIO = liftCommM . Comm.liftIO
getPool :: SparkM m (DequeIO (Spark m))
getPool = s_pool <$> ask
readPoolSize :: SparkM m Int
readPoolSize = getPool >>= liftIO . lengthIO
getSparkOrigHist :: SparkM m (IORef (Maybe NodeId))
getSparkOrigHist = s_sparkOrig <$> ask
readSparkOrigHist :: SparkM m (Maybe NodeId)
readSparkOrigHist = getSparkOrigHist >>= liftIO . readIORef
updateSparkOrigHist :: NodeId -> SparkM m ()
updateSparkOrigHist mostRecentOrigin = do
sparkOrigHistRef <- getSparkOrigHist
liftIO $ writeIORef sparkOrigHistRef (Just mostRecentOrigin)
getFishingFlag :: SparkM m (IORef Bool)
getFishingFlag = s_fishing <$> ask
getNoWorkServer :: SparkM m ActionServer
getNoWorkServer = s_noWork <$> ask
getIdleSchedsSem :: SparkM m Sem
getIdleSchedsSem = s_idleScheds <$> ask
getFishSentCtr :: SparkM m (IORef Int)
getFishSentCtr = s_fishSent <$> ask
readFishSentCtr :: SparkM m Int
readFishSentCtr = getFishSentCtr >>= readCtr
getSparkRcvdCtr :: SparkM m (IORef Int)
getSparkRcvdCtr = s_sparkRcvd <$> ask
readSparkRcvdCtr :: SparkM m Int
readSparkRcvdCtr = getSparkRcvdCtr >>= readCtr
getSparkGenCtr :: SparkM m (IORef Int)
getSparkGenCtr = s_sparkGen <$> ask
readSparkGenCtr :: SparkM m Int
readSparkGenCtr = getSparkGenCtr >>= readCtr
getSparkConvCtr :: SparkM m (IORef Int)
getSparkConvCtr = s_sparkConv <$> ask
readSparkConvCtr :: SparkM m Int
readSparkConvCtr = getSparkConvCtr >>= readCtr
readMaxSparkCtr :: SparkM m Int
readMaxSparkCtr = getPool >>= liftIO . maxLengthIO
getMaxHops :: SparkM m Int
getMaxHops = maxHops <$> s_conf <$> ask
getMaxFish :: SparkM m Int
getMaxFish = maxFish <$> s_conf <$> ask
getMinSched :: SparkM m Int
getMinSched = minSched <$> s_conf <$> ask
getMinFishDly :: SparkM m Int
getMinFishDly = minFishDly <$> s_conf <$> ask
getMaxFishDly :: SparkM m Int
getMaxFishDly = maxFishDly <$> s_conf <$> ask
blockSched :: SparkM m ()
blockSched = getIdleSchedsSem >>= liftIO . Sem.wait
wakeupSched :: Int -> SparkM m ()
wakeupSched n = getIdleSchedsSem >>= liftIO . replicateM_ n . Sem.signal
getSpark :: Int -> SparkM m (Maybe (Spark m))
getSpark schedID = do
pool <- getPool
maybe_spark <- liftIO $ popFrontIO pool
sendFISH
case maybe_spark of
Just _ -> do getSparkConvCtr >>= incCtr
sparks <- liftIO $ lengthIO pool
debug dbgSpark $
"#sparks=" ++ show sparks ++ " (spark converted)"
return maybe_spark
Nothing -> do return maybe_spark
putSpark :: Int -> Spark m -> SparkM m ()
putSpark schedID spark = do
pool <- getPool
liftIO $ pushBackIO pool spark
wakeupSched 1
getSparkGenCtr >>= incCtr
sparks <- liftIO $ lengthIO pool
debug dbgSpark $
"#sparks=" ++ show sparks ++ " (spark created)"
data Msg m = FISH
!NodeId
!NodeId
!Int
| NOWORK
| SCHEDULE
(Spark m)
!NodeId
| PUSH
(Spark m)
instance Show (Msg m) where
showsPrec _ (FISH fisher target hops) = showString "FISH(" . shows fisher .
showString "," . shows target .
showString "," . shows hops .
showString ")"
showsPrec _ (NOWORK) = showString "NOWORK"
showsPrec _ (SCHEDULE _spark sender) = showString "SCHEDULE(_," .
shows sender . showString ")"
showsPrec _ (PUSH _spark) = showString "PUSH(_)"
instance NFData (Msg m) where
rnf (FISH fisher target hops) = rnf fisher `seq` rnf target `seq` rnf hops
rnf (NOWORK) = ()
rnf (SCHEDULE spark sender) = rnf spark `seq` rnf sender
rnf (PUSH spark) = rnf spark
instance Serialize (Msg m) where
put (FISH fisher target hops) = Data.Serialize.put (0 :: Word8) >>
Data.Serialize.put fisher >>
Data.Serialize.put target >>
Data.Serialize.put hops
put (NOWORK) = Data.Serialize.put (1 :: Word8)
put (SCHEDULE spark sender) = Data.Serialize.put (2 :: Word8) >>
Data.Serialize.put spark >>
Data.Serialize.put sender
put (PUSH spark) = Data.Serialize.put (3 :: Word8) >>
Data.Serialize.put spark
get = do tag <- Data.Serialize.get
case tag :: Word8 of
0 -> do fisher <- Data.Serialize.get
target <- Data.Serialize.get
hops <- Data.Serialize.get
return $ FISH fisher target hops
1 -> do return $ NOWORK
2 -> do spark <- Data.Serialize.get
sender <- Data.Serialize.get
return $ SCHEDULE spark sender
3 -> do spark <- Data.Serialize.get
return $ PUSH spark
sendFISH :: SparkM m ()
sendFISH = do
pool <- getPool
fishingFlag <- getFishingFlag
isFishing <- readFlag fishingFlag
unless isFishing $ do
nodes <- liftCommM $ Comm.nodes
maxFish <- getMaxFish
sparks <- liftIO $ lengthIO pool
when (nodes > 1 && sparks <= maxFish) $ do
ok <- setFlag fishingFlag
when ok $ do
fisher <- liftCommM $ Comm.myNode
hops <- getMaxHops
maybe_target <- readSparkOrigHist
target <- case maybe_target of
Just node -> return node
Nothing -> do allNodes <- liftCommM $ Comm.allNodes
let avoidNodes = Set.singleton fisher
randomOtherElem avoidNodes allNodes nodes
let msg = FISH fisher target hops :: Msg m
debug dbgMsgSend $
show msg ++ " ->> " ++ show target
liftCommM $ Comm.send target $ encodeLazy msg
getFishSentCtr >>= incCtr
dispatch :: Msg m -> SparkM m ()
dispatch msg@(FISH _ _ _) = handleFISH msg
dispatch msg@(SCHEDULE _ _) = handleSCHEDULE msg
dispatch msg@(NOWORK) = handleNOWORK msg
dispatch msg = error $ "HdpH.Internal.Sparkpool.dispatch: " ++
show msg ++ " unexpected"
handleFISH :: forall m . Msg m -> SparkM m ()
handleFISH msg@(FISH fisher target hops) = do
here <- liftCommM $ Comm.myNode
sparks <- readPoolSize
minSched <- getMinSched
done <- if sparks < minSched
then do return False
else do
pool <- getPool
maybe_spark <- liftIO $ popBackIO pool
case maybe_spark of
Just spark -> do let msg = SCHEDULE spark here :: Msg m
debug dbgMsgSend $
show msg ++ " ->> " ++ show fisher
liftCommM $ Comm.send fisher $ encodeLazy msg
return True
Nothing -> do return False
unless done $ do
nodes <- liftCommM $ Comm.nodes
let avoidNodes = Set.fromList [fisher, target, here]
if hops > 0 && nodes > Set.size avoidNodes
then do
allNodes <- liftCommM $ Comm.allNodes
node <- randomOtherElem avoidNodes allNodes nodes
let msg = FISH fisher target (hops 1) :: Msg m
debug dbgMsgSend $
show msg ++ " ->> " ++ show node
liftCommM $ Comm.send node $ encodeLazy msg
else do
let msg = NOWORK :: Msg m
debug dbgMsgSend $
show msg ++ " ->> " ++ show fisher
liftCommM $ Comm.send fisher $ encodeLazy msg
handleSCHEDULE :: Msg m -> SparkM m ()
handleSCHEDULE msg@(SCHEDULE spark sender) = do
pool <- getPool
liftIO $ pushFrontIO pool spark
updateSparkOrigHist sender
getSparkRcvdCtr >>= incCtr
getFishingFlag >>= clearFlag
return ()
handleNOWORK :: Msg m -> SparkM m ()
handleNOWORK msg@(NOWORK) = do
fishingFlag <- getFishingFlag
noWorkServer <- getNoWorkServer
idleSchedsSem <- getIdleSchedsSem
minDelay <- getMinFishDly
maxDelay <- getMaxFishDly
let action = do
delay <- randomRIO (minDelay, max minDelay maxDelay)
threadDelay delay
atomicModifyIORef fishingFlag $ const (False, ())
Sem.signal idleSchedsSem
liftIO $ reqAction noWorkServer action
readFlag :: IORef Bool -> SparkM m Bool
readFlag = liftIO . readIORef
setFlag :: IORef Bool -> SparkM m Bool
setFlag flag = liftIO $ atomicModifyIORef flag $ \ v -> (True, not v)
clearFlag :: IORef Bool -> SparkM m Bool
clearFlag flag = liftIO $ atomicModifyIORef flag $ \ v -> (False, v)
readCtr :: IORef Int -> SparkM m Int
readCtr = liftIO . readIORef
incCtr :: IORef Int -> SparkM m ()
incCtr ctr = liftIO $ atomicModifyIORef ctr $ \ v ->
let v' = v + 1 in v' `seq` (v', ())
randomOtherElem :: (Ord a) => Set a -> [a] -> Int -> SparkM m a
randomOtherElem avoid xs n = do
let candidates = filter (`Set.notMember` avoid) xs
i <- liftIO $ randomRIO (0, n Set.size avoid 1)
return (candidates !! i)
debug :: Int -> String -> SparkM m ()
debug level message = liftIO $ Location.debug level message