module Remote.Task (
TaskM, Promise, PromiseList(..),
runTask,
newPromise, newPromiseAt, newPromiseNear, newPromiseHere, newPromiseAtRole,
toPromise, toPromiseAt, toPromiseNear, toPromiseImm,
readPromise,
MapReduce(..),
mapReduce,
chunkify,
shuffle,
tsay,
tlogS,
Locality(..),
TaskException(..),
__remoteCallMetaData, serialEncodeA, serialDecodeA
) where
import Remote.Reg (putReg,getEntryByIdent,RemoteCallMetaData)
import Remote.Encoding (serialEncodePure,hGetPayload,hPutPayload,Payload,getPayloadContent,Serializable,serialDecode,serialEncode)
import Remote.Process (roundtripQuery, ServiceException(..), TransmitStatus(..),diffTime,getConfig,Config(..),matchProcessDown,terminate,nullPid,monitorProcess,TransmitException(..),MonitorAction(..),ptry,LogConfig(..),getLogConfig,setNodeLogConfig,nodeFromPid,LogLevel(..),LogTarget(..),logS,getLookup,say,LogSphere,NodeId,ProcessM,ProcessId,PayloadDisposition(..),getSelfPid,getSelfNode,matchUnknownThrow,receiveWait,receiveTimeout,roundtripResponse,roundtripResponseAsync,roundtripQueryImpl,match,makePayloadClosure,spawn,spawnLocal,spawnLocalAnd,setDaemonic,send,makeClosure)
import Remote.Closure (Closure(..))
import Remote.Peer (getPeers)
import Data.Dynamic (Dynamic, toDyn, fromDynamic)
import System.IO (withFile,IOMode(..))
import System.Directory (renameFile)
import Data.Binary (Binary,get,put,putWord8,getWord8)
import Control.Exception (SomeException,Exception,throw)
import Data.Typeable (Typeable)
import Control.Monad (liftM,when)
import Control.Monad.Trans (liftIO)
import Control.Concurrent.MVar (MVar,modifyMVar,modifyMVar_,newMVar,newEmptyMVar,takeMVar,putMVar,readMVar,withMVar)
import qualified Data.Map as Map (Map,insert,lookup,empty,insertWith',toList)
import Data.List ((\\),union,nub,groupBy,sortBy,delete)
import Data.Time (UTCTime,getCurrentTime)
import Data.Digest.Pure.MD5 (md5)
import Data.ByteString.Lazy.UTF8 (fromString)
import qualified Data.ByteString.Lazy as B (concat)
type PromiseId = Integer
type Hash = String
data PromiseList a = PlChunk a (Promise (PromiseList a))
| PlNil deriving Typeable
instance (Serializable a) => Binary (PromiseList a) where
put (PlChunk a p) = putWord8 0 >> put a >> put p
put PlNil = putWord8 1
get = do w <- getWord8
case w of
0 -> do a <- get
p <- get
return $ PlChunk a p
1 -> return PlNil
data Promise a = PromiseBasic { _psRedeemer :: ProcessId, _psId :: PromiseId }
| PromiseImmediate a deriving Typeable
instance (Serializable a) => Binary (Promise a) where
put (PromiseBasic a b) = putWord8 0 >> put a >> put b
put (PromiseImmediate a) = putWord8 1 >> put a
get = do a <- getWord8
case a of
0 -> do b <- get
c <- get
return $ PromiseBasic b c
1 -> do b <- get
return $ PromiseImmediate b
data PromiseStorage = PromiseInMemory PromiseData UTCTime (Maybe Dynamic)
| PromiseOnDisk FilePath
| PromiseException String
type PromiseData = Payload
data PromiseRecord = PromiseRecord ProcessId (Closure PromiseData) Locality
data MasterState = MasterState
{
msNextId :: PromiseId,
msNodes :: MVar [(String,NodeId,ProcessId)],
msAllocation :: Map.Map ProcessId [PromiseId],
msPromises :: Map.Map PromiseId PromiseRecord,
msDefaultLocality :: Locality
}
data MmNewPromise = MmNewPromise (Closure Payload) Locality Queueing deriving (Typeable)
instance Binary MmNewPromise where
get = do a <- get
l <- get
q <- get
return $ MmNewPromise a l q
put (MmNewPromise a l q) = put a >> put l >> put q
data MmNewPromiseResponse = MmNewPromiseResponse ProcessId PromiseId
| MmNewPromiseResponseFail deriving (Typeable)
instance Binary MmNewPromiseResponse where
put (MmNewPromiseResponse a b) =
do putWord8 0
put a
put b
put MmNewPromiseResponseFail = putWord8 1
get = do a <- getWord8
case a of
0 -> do b <- get
c <- get
return $ MmNewPromiseResponse b c
1 -> return MmNewPromiseResponseFail
data MmStatus = MmStatus deriving Typeable
instance Binary MmStatus where
get = return MmStatus
put MmStatus = put ()
data MmStatusResponse = MmStatusResponse [NodeId] (Map.Map ProcessId [PromiseId]) deriving Typeable
instance Binary MmStatusResponse where
get = do a <- get
b <- get
return $ MmStatusResponse a b
put (MmStatusResponse a b) = put a >> put b
data MmComplain = MmComplain ProcessId PromiseId deriving (Typeable)
instance Binary MmComplain where
put (MmComplain a b) = put a >> put b
get = do a <- get
b <- get
return $ MmComplain a b
data MmComplainResponse = MmComplainResponse ProcessId deriving (Typeable)
instance Binary MmComplainResponse where
put (MmComplainResponse a) = put a
get = do a <- get
return $ MmComplainResponse a
data TmNewPeer = TmNewPeer NodeId deriving (Typeable)
instance Binary TmNewPeer where
get = do a <- get
return $ TmNewPeer a
put (TmNewPeer nid) = put nid
data NmStart = NmStart PromiseId (Closure Payload) Queueing deriving (Typeable)
instance Binary NmStart where
get = do a <- get
b <- get
c <- get
return $ NmStart a b c
put (NmStart a b c) = put a >> put b >> put c
data NmStartResponse = NmStartResponse Bool deriving (Typeable)
instance Binary NmStartResponse where
get = do a <- get
return $ NmStartResponse a
put (NmStartResponse a) = put a
data NmRedeem = NmRedeem PromiseId deriving (Typeable)
instance Binary NmRedeem where
get = do a <- get
return $ NmRedeem a
put (NmRedeem prid) = put prid
data NmRedeemResponse = NmRedeemResponse Payload
| NmRedeemResponseUnknown
| NmRedeemResponseException deriving (Typeable)
instance Binary NmRedeemResponse where
get = do a <- getWord8
case a of
0 -> do b <- get
return $ NmRedeemResponse b
1 -> return NmRedeemResponseUnknown
2 -> return NmRedeemResponseException
put (NmRedeemResponse a) = putWord8 0 >> put a
put (NmRedeemResponseUnknown) = putWord8 1
put (NmRedeemResponseException) = putWord8 2
data TaskException = TaskException String deriving (Show,Typeable)
instance Exception TaskException
data Queueing = QuNone
| QuExclusive
| QuSmall deriving (Typeable,Ord,Eq)
defaultQueueing :: Queueing
defaultQueueing = QuNone
instance Binary Queueing where
put QuNone = putWord8 0
put QuExclusive = putWord8 1
put QuSmall = putWord8 2
get = do a <- getWord8
case a of
0 -> return QuNone
1 -> return QuExclusive
2 -> return QuSmall
data Locality = LcUnrestricted
| LcDefault
| LcByRole [String]
| LcByNode [NodeId]
instance Binary Locality where
put LcUnrestricted = putWord8 0
put LcDefault = putWord8 1
put (LcByRole a) = putWord8 2 >> put a
put (LcByNode a) = putWord8 3 >> put a
get = do a <- getWord8
case a of
0 -> return LcUnrestricted
1 -> return LcDefault
2 -> do r <- get
return $ LcByRole r
3 -> do r <- get
return $ LcByNode r
defaultLocality :: Locality
defaultLocality = LcByRole ["WORKER","NODE"]
taskError :: String -> a
taskError s = throw $ TaskException s
serialEncodeA :: (Serializable a) => a -> TaskM Payload
serialEncodeA = liftTask . liftIO . serialEncode
serialDecodeA :: (Serializable a) => Payload -> TaskM (Maybe a)
serialDecodeA = liftTask . liftIO . serialDecode
monitorTask :: ProcessId -> ProcessId -> ProcessM TransmitStatus
monitorTask monitor monitee
= do res <- ptry $ monitorProcess monitor monitee MaMonitor
case res of
Right _ -> return QteOK
Left (ServiceException e) -> return $ QteOther e
roundtripImpl :: (Serializable a, Serializable b) => ProcessId -> a -> ProcessM (Either TransmitStatus b)
roundtripImpl pid dat = roundtripQueryImpl 0 PldUser pid dat id []
roundtrip :: (Serializable a, Serializable b) => ProcessId -> a -> TaskM (Either TransmitStatus b)
roundtrip apid dat =
TaskM $ \ts ->
case Map.lookup apid (tsMonitoring ts) of
Nothing -> do mypid <- getSelfPid
res0 <- monitorTask mypid apid
case res0 of
QteOK ->
do res <- roundtripImpl apid dat
return (ts {tsMonitoring=Map.insert apid () (tsMonitoring ts)},res)
_ -> return (ts,Left res0)
Just _ -> do res <- roundtripImpl apid dat
return (ts,res)
spawnDaemonic :: ProcessM () -> ProcessM ProcessId
spawnDaemonic p = spawnLocalAnd p setDaemonic
runWorkerNode :: ProcessId -> NodeId -> ProcessM ProcessId
runWorkerNode masterpid nid =
do clo <- makeClosure "Remote.Task.runWorkerNode__impl" (masterpid) :: ProcessM (Closure (ProcessM ()))
spawn nid clo
runWorkerNode__impl :: Payload -> ProcessM ()
runWorkerNode__impl pl =
do setDaemonic
mpid <- liftIO $ serialDecode pl
case mpid of
Just masterpid -> handler masterpid
Nothing -> error "Failure to extract in rwn__impl"
where handler masterpid = startNodeManager masterpid
passthrough__implPl :: Payload -> TaskM Payload
passthrough__implPl pl = return pl
passthrough__closure :: (Serializable a) => a -> Closure (TaskM a)
passthrough__closure a = Closure "Remote.Task.passthrough__impl" (serialEncodePure a)
__remoteCallMetaData :: RemoteCallMetaData
__remoteCallMetaData x = putReg runWorkerNode__impl "Remote.Task.runWorkerNode__impl"
(putReg passthrough__implPl "Remote.Task.passthrough__implPl" x)
updatePromiseInMemory :: PromiseStorage -> IO PromiseStorage
updatePromiseInMemory (PromiseInMemory p _ d) = do utc <- getCurrentTime
return $ PromiseInMemory p utc d
updatePromiseInMemory other = return other
makePromiseInMemory :: PromiseData -> Maybe Dynamic -> IO PromiseStorage
makePromiseInMemory p dyn = do utc <- getCurrentTime
return $ PromiseInMemory p utc dyn
forwardLogs :: Maybe ProcessId -> ProcessM ()
forwardLogs masterpid =
do lc <- getLogConfig
selfnid <- getSelfNode
let newlc = lc {logTarget = case masterpid of
Just mp
| nodeFromPid mp /= selfnid -> LtForward $ nodeFromPid mp
_ -> LtStdout}
in setNodeLogConfig newlc
hashClosure :: Closure a -> Hash
hashClosure (Closure s pl) = show $ md5 $ B.concat [fromString s, getPayloadContent pl]
undiskify :: FilePath -> MVar PromiseStorage -> ProcessM (Maybe PromiseData)
undiskify fpIn mps =
do wrap $ liftIO $ modifyMVar mps (\val ->
case val of
PromiseOnDisk fp ->
do pl <- withFile fp ReadMode hGetPayload
inmem <- makePromiseInMemory pl Nothing
return (inmem,Just pl)
PromiseInMemory payload _ _ -> return (val,Just payload)
_ -> return (val,Nothing))
where wrap a = do res <- ptry a
case res of
Left e -> do logS "TSK" LoCritical $ "Error reading promise from file "++fpIn++": "++show (e::IOError)
return Nothing
Right r -> return r
diskify :: FilePath -> MVar PromiseStorage -> Bool -> ProcessM ()
diskify fp mps reallywrite =
do cfg <- getConfig
when (cfgPromiseFlushDelay cfg > 0)
(handler (cfgPromiseFlushDelay cfg))
where
handler delay =
do _ <- receiveTimeout delay []
again <- wrap $ liftIO $ modifyMVar mps (\val ->
case val of
PromiseInMemory payload utc _ ->
do now <- getCurrentTime
if diffTime now utc > delay
then do when reallywrite $
do liftIO $ withFile tmp WriteMode (\h -> hPutPayload h payload)
renameFile tmp fp
return (PromiseOnDisk fp,False)
else return (val,True)
_ -> return (val,False))
when again
(diskify fp mps reallywrite)
tmp = fp ++ ".tmp"
wrap a = do res <- ptry a
case res of
Left z -> do logS "TSK" LoImportant $ "Error writing promise to disk on file "++fp++": "++show (z::IOError)
return False
Right v -> return v
startNodeWorker :: ProcessId -> NodeBossState ->
MVar PromiseStorage -> Closure Payload -> ProcessM ()
startNodeWorker masterpid nbs mps clo@(Closure cloname cloarg) =
do self <- getSelfPid
_ <- spawnLocalAnd (starter self) (prefix self)
return ()
where
prefix nodeboss =
do self <- getSelfPid
monitorProcess self nodeboss MaLink
setDaemonic
starter nodeboss =
let initialState = TaskState {tsMaster=masterpid,tsNodeBoss=Just nodeboss,
tsPromiseCache=nsPromiseCache nbs, tsRedeemerForwarding=nsRedeemerForwarding nbs,
tsMonitoring=Map.empty}
tasker = do tbl <- liftTask $ getLookup
case getEntryByIdent tbl cloname of
Just funval ->
do val <- funval cloarg
p <- liftTaskIO $ makePromiseInMemory val Nothing
liftTaskIO $ putMVar mps p
cfg <- liftTask $ getConfig
let cachefile = cfgPromisePrefix cfg++hashClosure clo
liftTask $ diskify cachefile mps True
Nothing -> taskError $ "Failed looking up "++cloname++" in closure table"
in do res <- ptry $ runTaskM tasker initialState :: ProcessM (Either SomeException (TaskState,()))
case res of
Left ex -> liftIO (putMVar mps (PromiseException (show ex))) >> throw ex
Right _ -> return ()
data NodeBossState =
NodeBossState
{
nsPromiseCache :: MVar (Map.Map PromiseId (MVar PromiseStorage)),
nsRedeemerForwarding :: MVar (Map.Map PromiseId ProcessId)
}
startNodeManager :: ProcessId -> ProcessM ()
startNodeManager masterpid =
let
handler :: NodeBossState -> ProcessM a
handler state =
let promisecache = nsPromiseCache state
nmStart = roundtripResponse (\(NmStart promise clo _queueing) ->
do promisestore <- liftIO $ newEmptyMVar
ret <- liftIO $ modifyMVar promisecache
(\pc -> let newpc = Map.insert promise promisestore pc
in return (newpc,True))
when (ret)
(startNodeWorker masterpid state promisestore clo)
return (NmStartResponse ret,state))
nmTermination = matchProcessDown masterpid $
do forwardLogs Nothing
logS "TSK" LoInformation $ "Terminating nodeboss after my master "++show masterpid++" is gone"
terminate
nmRedeem = roundtripResponseAsync (\(NmRedeem promise) ans ->
let answerer = do pc <- liftIO $ readMVar promisecache
case Map.lookup promise pc of
Nothing -> ans NmRedeemResponseUnknown
Just v -> do rv <- liftIO $ readMVar v
case rv of
PromiseInMemory rrv _ _ ->
do liftIO $ modifyMVar_ v (\_ -> updatePromiseInMemory rv)
ans (NmRedeemResponse rrv)
PromiseOnDisk fp -> do mpd <- undiskify fp v
case mpd of
Nothing ->
ans (NmRedeemResponseUnknown)
Just a ->
ans (NmRedeemResponse a)
diskify fp v False
PromiseException _ -> ans NmRedeemResponseException
in do _ <- spawnLocal answerer
return state) False
in receiveWait [nmStart, nmRedeem, nmTermination, matchUnknownThrow] >>= handler
in do forwardLogs $ Just masterpid
mypid <- getSelfPid
monitorProcess mypid masterpid MaMonitor
logS "TSK" LoInformation $ "Starting a nodeboss owned by " ++ show masterpid
pc <- liftIO $ newMVar Map.empty
pf <- liftIO $ newMVar Map.empty
let initState = NodeBossState {nsPromiseCache=pc,nsRedeemerForwarding=pf}
handler initState
runTask :: TaskM a -> ProcessM a
runTask = startMaster
startMaster :: TaskM a -> ProcessM a
startMaster proc =
do mvmaster <- liftIO $ newEmptyMVar
mvdone <- liftIO $ newEmptyMVar
master <- runMaster (masterproc mvdone mvmaster)
liftIO $ putMVar mvmaster master
liftIO $ takeMVar mvdone
where masterproc mvdone mvmaster nodeboss =
do master <- liftIO $ takeMVar mvmaster
pc <- liftIO $ newMVar Map.empty
pf <- liftIO $ newMVar Map.empty
let initialState = TaskState {tsMaster=master,tsNodeBoss=Just nodeboss,
tsPromiseCache=pc, tsRedeemerForwarding=pf,
tsMonitoring=Map.empty}
res <- liftM snd $ runTaskM proc initialState
liftIO $ putMVar mvdone res
selectLocation :: MasterState -> Locality -> ProcessM (Maybe (String,NodeId,ProcessId))
selectLocation ms locality =
let nodes = msNodes ms
in liftIO $ modifyMVar nodes
(\n -> case n of
[] -> return (n,Nothing)
_ -> let dflt = (rotate n,Just $ head n)
filterify f = case filter f n of
[] -> return dflt
(a:_) -> return ((delete a n) ++ [a],Just a)
in case cond locality of
LcUnrestricted -> return dflt
LcDefault -> return dflt
LcByRole l -> filterify (\(r,_,_) -> r `elem` l)
LcByNode l -> filterify (\(_,r,_) -> r `elem` l))
where rotate [] = []
rotate (h:t) = t ++ [h]
cond l = case l of
LcDefault -> msDefaultLocality ms
_ -> l
countLocations :: MasterState -> ProcessM Int
countLocations ms = liftIO $ withMVar (msNodes ms) (\a -> return $ length a)
findPeers :: ProcessM [(String,NodeId)]
findPeers = liftM (concat . (map (\(role,v) -> [ (role,x) | x <- v] )) . Map.toList) getPeers
sendSilent :: (Serializable a) => ProcessId -> a -> ProcessM ()
sendSilent pid a = do res <- ptry $ send pid a
case res of
Left (TransmitException _) -> return ()
Right _ -> return ()
runMaster :: (ProcessId -> ProcessM ()) -> ProcessM ProcessId
runMaster masterproc =
let
probeOnce nodes seen masterpid =
do recentlist <- findPeers
let newseen = seen `union` recentlist
let topidlist = recentlist \\ seen
let cleanOut n = filter (\(_,nid,_) -> nid `elem` (map snd recentlist)) n
newlypidded <- mapM (\(role,nid) ->
do pid <- runWorkerNode masterpid nid
return (role,nid,pid)) topidlist
(_newlist,totalseen) <- liftIO $ modifyMVar nodes (\oldlist ->
return ((cleanOut oldlist) ++ newlypidded,(recentlist,newseen)))
let newlyadded = totalseen \\ seen
mapM_ (\nid -> sendSilent masterpid (TmNewPeer nid)) (map snd newlyadded)
return totalseen
proberDelay = 10000000
prober nodes seen masterpid =
do totalseen <- probeOnce nodes seen masterpid
_ <- receiveTimeout proberDelay [matchUnknownThrow]
prober nodes totalseen masterpid
master state =
let
tryAlloc clo promiseid locality queueing =
do ns <- selectLocation state locality
case ns of
Nothing -> do logS "TSK" LoCritical "Attempt to allocate a task, but no nodes found"
return Nothing
Just (_,nid,nodeboss) ->
do res <- roundtripQuery PldUser nodeboss (NmStart promiseid clo queueing)
case res of
Left e ->
do logS "TSK" LoImportant $ "Failed attempt to start "++show clo++" on " ++show nid ++": "++show e
return Nothing
Right (NmStartResponse True) -> return $ Just nodeboss
_ -> do logS "TSK" LoImportant $ "Failed attempt to start "++show clo++" on " ++show nid
return Nothing
basicAllocate clo promiseid locality queueing =
do count <- countLocations state
res1 <- tryAlloc clo promiseid locality queueing
case res1 of
Just _ -> return res1
Nothing ->
do res <- stubborn count $ tryAlloc clo promiseid LcUnrestricted queueing
case res of
Nothing -> do logS "TSK" LoCritical $ "Terminally failed to start "++show clo
return res
_ -> return res
statusMsg = roundtripResponse
(\x -> case x of
MmStatus ->
do thenodes <- liftIO $ readMVar $ msNodes state
let knownNodes = map (\(_,n,_) -> n) thenodes
proctree = msAllocation state
return (MmStatusResponse knownNodes proctree,state))
complainMsg = roundtripResponse
(\x -> case x of
MmComplain procid promid ->
case Map.lookup promid (msPromises state) of
Nothing -> return (MmComplainResponse nullPid,state)
Just (PromiseRecord curprocid curclo curlocality)
| curprocid /= procid -> return (MmComplainResponse curprocid,state)
| otherwise ->
do res <- basicAllocate curclo promid curlocality defaultQueueing
case res of
Nothing -> return (MmComplainResponse nullPid,state)
Just newprocid ->
let newpromises = Map.insert promid (PromiseRecord newprocid curclo curlocality) (msPromises state)
in return (MmComplainResponse newprocid,state {msPromises=newpromises}))
promiseMsg = roundtripResponse
(\x -> case x of
MmNewPromise clo locality queueing ->
do
let promiseid = msNextId state
res <- basicAllocate clo promiseid locality queueing
case res of
Just nodeboss ->
let newstate = state {msAllocation=newAllocation,msPromises=newPromises,msNextId=promiseid+1}
newAllocation = Map.insertWith' (\a b -> nub $ a++b) nodeboss [promiseid] (msAllocation state)
newPromises = Map.insert promiseid (PromiseRecord nodeboss clo locality) (msPromises state)
in return (MmNewPromiseResponse nodeboss promiseid,newstate)
Nothing ->
return (MmNewPromiseResponseFail,state))
simpleMsg = match
(\x -> case x of
TmNewPeer nid -> do logS "TSK" LoInformation $ "Found new peer " ++show nid
return state)
in receiveWait [simpleMsg, promiseMsg, complainMsg,statusMsg] >>= master
in do nodes <- liftIO $ newMVar []
selfnode <- getSelfNode
selfpid <- getSelfPid
let initState = MasterState {msNextId=0, msAllocation=Map.empty, msPromises=Map.empty, msNodes=nodes, msDefaultLocality = defaultLocality}
masterpid <- spawnDaemonic (master initState)
seennodes <- probeOnce nodes [] masterpid
let getByNid _ [] = Nothing
getByNid nid ((_,n,nodeboss):xs) = if nid==n then Just nodeboss else getByNid nid xs
res <- liftIO $ withMVar nodes (\n -> return $ getByNid selfnode n)
_ <- case res of
Nothing -> taskError "Can't find self: make sure cfgKnownHosts includes the master"
Just x -> spawnLocalAnd (masterproc x) (do myself <- getSelfPid
monitorProcess selfpid myself MaLinkError)
_ <- spawnDaemonic (prober nodes seennodes masterpid)
return masterpid
stubborn :: (Monad m) => Int -> m (Maybe a) -> m (Maybe a)
stubborn 0 a = a
stubborn n a | n>0
= do r <- a
case r of
Just _ -> return r
Nothing -> stubborn (n1) a
toPromise :: (Serializable a) => a -> TaskM (Promise a)
toPromise = toPromiseAt LcDefault
toPromiseAt :: (Serializable a) => Locality -> a -> TaskM (Promise a)
toPromiseAt locality a = newPromiseAt locality (passthrough__closure a)
toPromiseNear :: (Serializable a,Serializable b) => Promise b -> a -> TaskM (Promise a)
toPromiseNear (PromiseImmediate _) = toPromise
toPromiseNear (PromiseBasic prhost _prid) = toPromiseAt (LcByNode [nodeFromPid prhost])
toPromiseImm :: (Serializable a) => a -> TaskM (Promise a)
toPromiseImm = return . PromiseImmediate
newPromise :: (Serializable a) => Closure (TaskM a) -> TaskM (Promise a)
newPromise = newPromiseAt LcDefault
newPromiseHere :: (Serializable a) => Closure (TaskM a) -> TaskM (Promise a)
newPromiseHere clo =
do mynode <- liftTask $ getSelfNode
newPromiseAt (LcByNode [mynode]) clo
newPromiseNear :: (Serializable a, Serializable b) => Promise b -> Closure (TaskM a) -> TaskM (Promise a)
newPromiseNear (PromiseImmediate _) = newPromise
newPromiseNear (PromiseBasic prhost _prid) = newPromiseAt (LcByNode [nodeFromPid prhost])
newPromiseAtRole :: (Serializable a) => String -> Closure (TaskM a) -> TaskM (Promise a)
newPromiseAtRole role clo = newPromiseAt (LcByRole [role]) clo
newPromiseAt :: (Serializable a) => Locality -> Closure (TaskM a) -> TaskM (Promise a)
newPromiseAt locality clo =
let realclo = makePayloadClosure clo
in case realclo of
Just plclo -> do master <- getMaster
res <- roundtrip master (MmNewPromise plclo locality defaultQueueing)
case res of
Right (MmNewPromiseResponse pid prid) -> return $ PromiseBasic pid prid
Right (MmNewPromiseResponseFail) ->
taskError $ "Spawning of closure "++show clo++" by newPromise failed"
Left tms -> taskError $ "Spawning of closure "++show clo++" by newPromise resulted in "++show tms
Nothing -> taskError $ "The specified closure, "++show clo++", can't produce payloads"
readPromise :: (Serializable a) => Promise a -> TaskM a
readPromise (PromiseImmediate a) = return a
readPromise thepromise@(PromiseBasic prhost prid) =
do mp <- lookupCachedPromise prid
case mp of
Nothing -> do fprhost <- liftM (maybe prhost id) $ lookupForwardedRedeemer prid
res <- roundtrip fprhost (NmRedeem prid)
case res of
Left e -> do tlogS "TSK" LoInformation $ "Complaining about promise " ++ show prid ++" on " ++show fprhost++" because of "++show e
complain fprhost prid
Right NmRedeemResponseUnknown ->
do tlogS "TSK" LoInformation $ "Complaining about promise " ++ show prid ++" on " ++show fprhost++" because allegedly unknown"
complain fprhost prid
Right (NmRedeemResponse thedata) ->
do extracted <- extractFromPayload thedata
promiseinmem <- liftTaskIO $ makePromiseInMemory thedata (Just $ toDyn extracted)
putPromiseInCache prid promiseinmem
return extracted
Right NmRedeemResponseException ->
taskError "Failed promise redemption"
Just mv -> do val <- liftTaskIO $ readMVar mv
case val of
PromiseInMemory v _utc thedyn ->
case thedyn of
Just thedynvalue ->
case fromDynamic thedynvalue of
Nothing -> do liftTask $ logS "TSK" LoStandard "Insufficiently dynamic promise cache"
extractFromPayload v
Just realval -> do updated <- liftTaskIO $ makePromiseInMemory v thedyn
putPromiseInCache prid updated
return realval
Nothing -> do extracted <- extractFromPayload v
updated <- liftTaskIO $ makePromiseInMemory v (Just $ toDyn extracted)
putPromiseInCache prid updated
return extracted
PromiseException _ -> taskError $ "Redemption of promise failed"
PromiseOnDisk fp -> do mpd <- liftTask $ undiskify fp mv
_ <- liftTask $ spawnLocal $ diskify fp mv False
case mpd of
Just dat -> extractFromPayload dat
_ -> taskError "Promise extraction from disk failed"
where extractFromPayload v = do out <- liftTaskIO $ serialDecode v
case out of
Just r -> return r
Nothing -> taskError "Unexpected payload type"
complain fprhost prid =
do master <- getMaster
response <- roundtrip master (MmComplain fprhost prid)
case response of
Left a -> taskError $ "Couldn't file complaint with master about " ++ show fprhost ++ " because " ++ show a
Right (MmComplainResponse newhost)
| newhost == nullPid -> taskError $ "Couldn't file complaint with master about " ++ show fprhost
| otherwise -> do setForwardedRedeemer prid newhost
readPromise thepromise
data TaskState = TaskState
{
tsMaster :: ProcessId,
tsNodeBoss :: Maybe ProcessId,
tsPromiseCache :: MVar (Map.Map PromiseId (MVar PromiseStorage)),
tsRedeemerForwarding :: MVar (Map.Map PromiseId ProcessId),
tsMonitoring :: Map.Map ProcessId ()
}
data TaskM a = TaskM { runTaskM :: TaskState -> ProcessM (TaskState, a) } deriving (Typeable)
instance Monad TaskM where
m >>= k = TaskM $ \ts -> do
(ts',a) <- runTaskM m ts
(ts'',a') <- runTaskM (k a) (ts')
return (ts'',a')
return x = TaskM $ \ts -> return $ (ts,x)
lookupForwardedRedeemer :: PromiseId -> TaskM (Maybe ProcessId)
lookupForwardedRedeemer q =
TaskM $ \ts ->
liftIO $ withMVar (tsRedeemerForwarding ts) $ (\fwd ->
let lo = Map.lookup q fwd
in return (ts,lo))
setForwardedRedeemer :: PromiseId -> ProcessId -> TaskM ()
setForwardedRedeemer from to =
TaskM $ \ts -> liftIO $ modifyMVar (tsRedeemerForwarding ts) (\fwd ->
let newmap = Map.insert from to fwd
in return ( newmap,(ts,()) ) )
lookupCachedPromise :: PromiseId -> TaskM (Maybe (MVar PromiseStorage))
lookupCachedPromise prid = TaskM $ \ts ->
do mv <- liftIO $ withMVar (tsPromiseCache ts)
(\pc -> return $ Map.lookup prid pc)
return (ts,mv)
putPromiseInCache :: PromiseId -> PromiseStorage -> TaskM ()
putPromiseInCache prid ps = TaskM $ \ts ->
do liftIO $ modifyMVar_ (tsPromiseCache ts)
(\pc -> do mv <- newMVar ps
return $ Map.insert prid mv pc)
return (ts,())
getMaster :: TaskM ProcessId
getMaster = TaskM $ \ts -> return (ts,tsMaster ts)
liftTask :: ProcessM a -> TaskM a
liftTask a = TaskM $ \ts -> a >>= (\x -> return (ts,x))
liftTaskIO :: IO a -> TaskM a
liftTaskIO = liftTask . liftIO
tsay :: String -> TaskM ()
tsay a = liftTask $ say a
tlogS :: LogSphere -> LogLevel -> String -> TaskM ()
tlogS a b c = liftTask $ logS a b c
data MapReduce rawinput input middle1 middle2 result
= MapReduce
{
mtMapper :: input -> Closure (TaskM [middle1]),
mtReducer :: middle2 -> Closure (TaskM result),
mtChunkify :: rawinput -> [input],
mtShuffle :: [middle1] -> [middle2]
}
shuffle :: Ord a => [(a,b)] -> [(a,[b])]
shuffle q =
let semi = groupBy (\(a,_) (b,_) -> a==b) (sortBy (\(a,_) (b,_) -> compare a b) q)
in map (\x -> (fst $ head x,map snd x)) semi
chunkify :: Int -> [a] -> [[a]]
chunkify numChunks l
| numChunks <= 0 = taskError "Can't chunkify into less than one chunk"
| otherwise = splitSize (ceiling ((fromIntegral (length l) / fromIntegral numChunks) :: Double)) l
where
splitSize _ [] = []
splitSize i v = let (first,second) = splitAt i v
in first : splitSize i second
mapReduce :: (Serializable i,Serializable k,Serializable m,Serializable r) =>
MapReduce ri i k m r -> ri -> TaskM [r]
mapReduce mr inputs =
let chunks = (mtChunkify mr) inputs
in do
pmapResult <- mapM (\chunk ->
newPromise ((mtMapper mr) chunk) ) chunks
mapResult <- mapM readPromise pmapResult
let shuffled = (mtShuffle mr) (concat mapResult)
pres <- mapM (\mid2 ->
newPromise ((mtReducer mr) mid2)) shuffled
mapM readPromise pres