{-# LANGUAGE DeriveDataTypeable #-} -- | This module provides data dependency resolution and -- fault tolerance via /promises/ (known elsewhere as /futures/). -- It's implemented in terms of the "Remote.Process" module. module Remote.Task ( -- * Tasks and promises TaskM, Promise, PromiseList(..), runTask, newPromise, newPromiseAt, newPromiseNear, newPromiseHere, newPromiseAtRole, toPromise, toPromiseAt, toPromiseNear, toPromiseImm, readPromise, -- * MapReduce MapReduce(..), mapReduce, -- * Useful auxilliaries chunkify, shuffle, tsay, tlogS, Locality(..), TaskException(..), -- * Internals, not for general use __remoteCallMetaData, serialEncodeA, serialDecodeA ) where import Remote.Reg (putReg,getEntryByIdent,RemoteCallMetaData) import Remote.Encoding (serialEncodePure,hGetPayload,hPutPayload,Payload,getPayloadContent,Serializable,serialDecode,serialEncode) import Remote.Process (roundtripQuery, ServiceException(..), TransmitStatus(..),diffTime,getConfig,Config(..),matchProcessDown,terminate,nullPid,monitorProcess,TransmitException(..),MonitorAction(..),ptry,LogConfig(..),getLogConfig,setNodeLogConfig,nodeFromPid,LogLevel(..),LogTarget(..),logS,getLookup,say,LogSphere,NodeId,ProcessM,ProcessId,PayloadDisposition(..),getSelfPid,getSelfNode,matchUnknownThrow,receiveWait,receiveTimeout,roundtripResponse,roundtripResponseAsync,roundtripQueryImpl,match,makePayloadClosure,spawn,spawnLocal,spawnLocalAnd,setDaemonic,send,makeClosure) import Remote.Closure (Closure(..)) import Remote.Peer (getPeers) import Data.Dynamic (Dynamic, toDyn, fromDynamic) import System.IO (withFile,IOMode(..)) import System.Directory (renameFile) import Data.Binary (Binary,get,put,putWord8,getWord8) import Control.Exception (SomeException,Exception,throw) import Data.Typeable (Typeable) import Control.Monad (liftM,when) import Control.Monad.Trans (liftIO) import Control.Concurrent.MVar (MVar,modifyMVar,modifyMVar_,newMVar,newEmptyMVar,takeMVar,putMVar,readMVar,withMVar) import qualified Data.Map as Map (Map,insert,lookup,empty,insertWith',toList) import Data.List ((\\),union,nub,groupBy,sortBy,delete) import Data.Time (UTCTime,getCurrentTime) -- imports required for hashClosure; is there a lighter-weight of doing this? import Data.Digest.Pure.MD5 (md5) import Data.ByteString.Lazy.UTF8 (fromString) import qualified Data.ByteString.Lazy as B (concat) ---------------------------------------------- -- * Promises and tasks ---------------------------------------------- type PromiseId = Integer type Hash = String data PromiseList a = PlChunk a (Promise (PromiseList a)) | PlNil deriving Typeable instance (Serializable a) => Binary (PromiseList a) where put (PlChunk a p) = putWord8 0 >> put a >> put p put PlNil = putWord8 1 get = do w <- getWord8 case w of 0 -> do a <- get p <- get return $ PlChunk a p 1 -> return PlNil -- | The basic data type for expressing data dependence -- in the 'TaskM' monad. A Promise represents a value that -- may or may not have been computed yet; thus, it's like -- a distributed thunk (in the sense of a non-strict unit -- of evaluation). These are created by 'newPromise' and friends, -- and the underlying value can be gotten with 'readPromise'. data Promise a = PromiseBasic { _psRedeemer :: ProcessId, _psId :: PromiseId } | PromiseImmediate a deriving Typeable -- psRedeemer should maybe be wrapped in an IORef so that it can be updated in case of node failure instance (Serializable a) => Binary (Promise a) where put (PromiseBasic a b) = putWord8 0 >> put a >> put b put (PromiseImmediate a) = putWord8 1 >> put a get = do a <- getWord8 case a of 0 -> do b <- get c <- get return $ PromiseBasic b c 1 -> do b <- get return $ PromiseImmediate b -- | Stores the data produced by a promise, in one of its -- various forms. If it's currently in memory, we keep it -- as a payload, to be decoded by its ultimate user (who -- of course has the static type information), the time -- it was last touched (so we know when to flush it), -- and perhaps also a decoded version, so that it doesn't -- need to be decoded repeatedly: this makes this go a lot -- faster. If it's been flushed to disk, we keep track of -- where, and if the promise didn't complete, but threw -- an exception during its execution, we mark there here -- as well: the exception will be propagated to -- dependents. data PromiseStorage = PromiseInMemory PromiseData UTCTime (Maybe Dynamic) | PromiseOnDisk FilePath | PromiseException String type PromiseData = Payload {- UNUSED type TimeStamp = UTCTime -} -- | Keeps track of what we know about currently running promises. -- The closure and locality and provided by the initial call to -- newPromise, the nodeboss is where it is currently running. -- We need this info to deal with complaints. data PromiseRecord = PromiseRecord ProcessId (Closure PromiseData) Locality data MasterState = MasterState { -- | Promise IDs are allocated serially from here msNextId :: PromiseId, -- | All currently known nodes, with the role, node ID, and node boss. Updated asychronously by prober thread msNodes :: MVar [(String,NodeId,ProcessId)], -- | Given a nodeboss, which promises belong to it. Not sure what this is good for msAllocation :: Map.Map ProcessId [PromiseId], -- | Given a promise, what do we know about it. Include its nodeboss, its closure, and its locality preference msPromises :: Map.Map PromiseId PromiseRecord, -- | The locality preference of new worker tasks, if not specified otherwise msDefaultLocality :: Locality } data MmNewPromise = MmNewPromise (Closure Payload) Locality Queueing deriving (Typeable) instance Binary MmNewPromise where get = do a <- get l <- get q <- get return $ MmNewPromise a l q put (MmNewPromise a l q) = put a >> put l >> put q data MmNewPromiseResponse = MmNewPromiseResponse ProcessId PromiseId | MmNewPromiseResponseFail deriving (Typeable) instance Binary MmNewPromiseResponse where put (MmNewPromiseResponse a b) = do putWord8 0 put a put b put MmNewPromiseResponseFail = putWord8 1 get = do a <- getWord8 case a of 0 -> do b <- get c <- get return $ MmNewPromiseResponse b c 1 -> return MmNewPromiseResponseFail data MmStatus = MmStatus deriving Typeable instance Binary MmStatus where get = return MmStatus put MmStatus = put () data MmStatusResponse = MmStatusResponse [NodeId] (Map.Map ProcessId [PromiseId]) deriving Typeable instance Binary MmStatusResponse where get = do a <- get b <- get return $ MmStatusResponse a b put (MmStatusResponse a b) = put a >> put b data MmComplain = MmComplain ProcessId PromiseId deriving (Typeable) instance Binary MmComplain where put (MmComplain a b) = put a >> put b get = do a <- get b <- get return $ MmComplain a b data MmComplainResponse = MmComplainResponse ProcessId deriving (Typeable) instance Binary MmComplainResponse where put (MmComplainResponse a) = put a get = do a <- get return $ MmComplainResponse a data TmNewPeer = TmNewPeer NodeId deriving (Typeable) instance Binary TmNewPeer where get = do a <- get return $ TmNewPeer a put (TmNewPeer nid) = put nid data NmStart = NmStart PromiseId (Closure Payload) Queueing deriving (Typeable) instance Binary NmStart where get = do a <- get b <- get c <- get return $ NmStart a b c put (NmStart a b c) = put a >> put b >> put c data NmStartResponse = NmStartResponse Bool deriving (Typeable) instance Binary NmStartResponse where get = do a <- get return $ NmStartResponse a put (NmStartResponse a) = put a data NmRedeem = NmRedeem PromiseId deriving (Typeable) instance Binary NmRedeem where get = do a <- get return $ NmRedeem a put (NmRedeem prid) = put prid data NmRedeemResponse = NmRedeemResponse Payload | NmRedeemResponseUnknown | NmRedeemResponseException deriving (Typeable) instance Binary NmRedeemResponse where get = do a <- getWord8 case a of 0 -> do b <- get return $ NmRedeemResponse b 1 -> return NmRedeemResponseUnknown 2 -> return NmRedeemResponseException put (NmRedeemResponse a) = putWord8 0 >> put a put (NmRedeemResponseUnknown) = putWord8 1 put (NmRedeemResponseException) = putWord8 2 data TaskException = TaskException String deriving (Show,Typeable) instance Exception TaskException -- | (Currently ignored.) data Queueing = QuNone | QuExclusive | QuSmall deriving (Typeable,Ord,Eq) defaultQueueing :: Queueing defaultQueueing = QuNone instance Binary Queueing where put QuNone = putWord8 0 put QuExclusive = putWord8 1 put QuSmall = putWord8 2 get = do a <- getWord8 case a of 0 -> return QuNone 1 -> return QuExclusive 2 -> return QuSmall -- | A specification of preference -- of where a promise should be allocated, -- among the nodes visible to the master. data Locality = LcUnrestricted -- ^ The promise can be placed anywhere. | LcDefault -- ^ The default preference is applied, which is for nodes having a role of NODE of WORKER | LcByRole [String] -- ^ Nodes having the given roles will be preferred | LcByNode [NodeId] -- ^ The given nodes will be preferred instance Binary Locality where put LcUnrestricted = putWord8 0 put LcDefault = putWord8 1 put (LcByRole a) = putWord8 2 >> put a put (LcByNode a) = putWord8 3 >> put a get = do a <- getWord8 case a of 0 -> return LcUnrestricted 1 -> return LcDefault 2 -> do r <- get return $ LcByRole r 3 -> do r <- get return $ LcByNode r defaultLocality :: Locality defaultLocality = LcByRole ["WORKER","NODE"] taskError :: String -> a taskError s = throw $ TaskException s serialEncodeA :: (Serializable a) => a -> TaskM Payload serialEncodeA = liftTask . liftIO . serialEncode serialDecodeA :: (Serializable a) => Payload -> TaskM (Maybe a) serialDecodeA = liftTask . liftIO . serialDecode monitorTask :: ProcessId -> ProcessId -> ProcessM TransmitStatus monitorTask monitor monitee = do res <- ptry $ monitorProcess monitor monitee MaMonitor case res of Right _ -> return QteOK Left (ServiceException e) -> return $ QteOther e roundtripImpl :: (Serializable a, Serializable b) => ProcessId -> a -> ProcessM (Either TransmitStatus b) roundtripImpl pid dat = roundtripQueryImpl 0 PldUser pid dat id [] roundtrip :: (Serializable a, Serializable b) => ProcessId -> a -> TaskM (Either TransmitStatus b) roundtrip apid dat = TaskM $ \ts -> case Map.lookup apid (tsMonitoring ts) of Nothing -> do mypid <- getSelfPid res0 <- monitorTask mypid apid case res0 of QteOK -> do res <- roundtripImpl apid dat return (ts {tsMonitoring=Map.insert apid () (tsMonitoring ts)},res) _ -> return (ts,Left res0) Just _ -> do res <- roundtripImpl apid dat return (ts,res) -- roundtrip a b = liftTask $ roundtripQueryUnsafe PldUser a b spawnDaemonic :: ProcessM () -> ProcessM ProcessId spawnDaemonic p = spawnLocalAnd p setDaemonic runWorkerNode :: ProcessId -> NodeId -> ProcessM ProcessId runWorkerNode masterpid nid = do clo <- makeClosure "Remote.Task.runWorkerNode__impl" (masterpid) :: ProcessM (Closure (ProcessM ())) spawn nid clo runWorkerNode__impl :: Payload -> ProcessM () runWorkerNode__impl pl = do setDaemonic -- maybe it's good to have the node manager be daemonic, but prolly not. If so, the MASTERPID must be terminated when user-provided MASTERPROC ends mpid <- liftIO $ serialDecode pl case mpid of Just masterpid -> handler masterpid Nothing -> error "Failure to extract in rwn__impl" where handler masterpid = startNodeManager masterpid passthrough__implPl :: Payload -> TaskM Payload passthrough__implPl pl = return pl passthrough__closure :: (Serializable a) => a -> Closure (TaskM a) passthrough__closure a = Closure "Remote.Task.passthrough__impl" (serialEncodePure a) __remoteCallMetaData :: RemoteCallMetaData __remoteCallMetaData x = putReg runWorkerNode__impl "Remote.Task.runWorkerNode__impl" (putReg passthrough__implPl "Remote.Task.passthrough__implPl" x) updatePromiseInMemory :: PromiseStorage -> IO PromiseStorage updatePromiseInMemory (PromiseInMemory p _ d) = do utc <- getCurrentTime return $ PromiseInMemory p utc d updatePromiseInMemory other = return other makePromiseInMemory :: PromiseData -> Maybe Dynamic -> IO PromiseStorage makePromiseInMemory p dyn = do utc <- getCurrentTime return $ PromiseInMemory p utc dyn forwardLogs :: Maybe ProcessId -> ProcessM () forwardLogs masterpid = do lc <- getLogConfig selfnid <- getSelfNode let newlc = lc {logTarget = case masterpid of Just mp | nodeFromPid mp /= selfnid -> LtForward $ nodeFromPid mp _ -> LtStdout} in setNodeLogConfig newlc hashClosure :: Closure a -> Hash hashClosure (Closure s pl) = show $ md5 $ B.concat [fromString s, getPayloadContent pl] undiskify :: FilePath -> MVar PromiseStorage -> ProcessM (Maybe PromiseData) undiskify fpIn mps = do wrap $ liftIO $ modifyMVar mps (\val -> case val of PromiseOnDisk fp -> do pl <- withFile fp ReadMode hGetPayload inmem <- makePromiseInMemory pl Nothing return (inmem,Just pl) PromiseInMemory payload _ _ -> return (val,Just payload) _ -> return (val,Nothing)) where wrap a = do res <- ptry a case res of Left e -> do logS "TSK" LoCritical $ "Error reading promise from file "++fpIn++": "++show (e::IOError) return Nothing Right r -> return r diskify :: FilePath -> MVar PromiseStorage -> Bool -> ProcessM () diskify fp mps reallywrite = do cfg <- getConfig when (cfgPromiseFlushDelay cfg > 0) (handler (cfgPromiseFlushDelay cfg)) where handler delay = do _ <- receiveTimeout delay [] again <- wrap $ liftIO $ modifyMVar mps (\val -> case val of PromiseInMemory payload utc _ -> do now <- getCurrentTime if diffTime now utc > delay then do when reallywrite $ do liftIO $ withFile tmp WriteMode (\h -> hPutPayload h payload) renameFile tmp fp return (PromiseOnDisk fp,False) else return (val,True) _ -> return (val,False)) when again (diskify fp mps reallywrite) tmp = fp ++ ".tmp" wrap a = do res <- ptry a case res of Left z -> do logS "TSK" LoImportant $ "Error writing promise to disk on file "++fp++": "++show (z::IOError) return False Right v -> return v startNodeWorker :: ProcessId -> NodeBossState -> MVar PromiseStorage -> Closure Payload -> ProcessM () startNodeWorker masterpid nbs mps clo@(Closure cloname cloarg) = do self <- getSelfPid _ <- spawnLocalAnd (starter self) (prefix self) return () where prefix nodeboss = do self <- getSelfPid monitorProcess self nodeboss MaLink setDaemonic starter nodeboss = -- TODO try to do an undiskify here, if the promise is left over from a previous, failed run let initialState = TaskState {tsMaster=masterpid,tsNodeBoss=Just nodeboss, tsPromiseCache=nsPromiseCache nbs, tsRedeemerForwarding=nsRedeemerForwarding nbs, tsMonitoring=Map.empty} tasker = do tbl <- liftTask $ getLookup case getEntryByIdent tbl cloname of Just funval -> do val <- funval cloarg p <- liftTaskIO $ makePromiseInMemory val Nothing liftTaskIO $ putMVar mps p cfg <- liftTask $ getConfig let cachefile = cfgPromisePrefix cfg++hashClosure clo liftTask $ diskify cachefile mps True Nothing -> taskError $ "Failed looking up "++cloname++" in closure table" in do res <- ptry $ runTaskM tasker initialState :: ProcessM (Either SomeException (TaskState,())) case res of Left ex -> liftIO (putMVar mps (PromiseException (show ex))) >> throw ex Right _ -> return () data NodeBossState = NodeBossState { nsPromiseCache :: MVar (Map.Map PromiseId (MVar PromiseStorage)), nsRedeemerForwarding :: MVar (Map.Map PromiseId ProcessId) } startNodeManager :: ProcessId -> ProcessM () startNodeManager masterpid = let handler :: NodeBossState -> ProcessM a handler state = let promisecache = nsPromiseCache state nmStart = roundtripResponse (\(NmStart promise clo _queueing) -> do promisestore <- liftIO $ newEmptyMVar ret <- liftIO $ modifyMVar promisecache (\pc -> let newpc = Map.insert promise promisestore pc in return (newpc,True)) when (ret) (startNodeWorker masterpid state promisestore clo) return (NmStartResponse ret,state)) nmTermination = matchProcessDown masterpid $ do forwardLogs Nothing logS "TSK" LoInformation $ "Terminating nodeboss after my master "++show masterpid++" is gone" terminate nmRedeem = roundtripResponseAsync (\(NmRedeem promise) ans -> let answerer = do pc <- liftIO $ readMVar promisecache case Map.lookup promise pc of Nothing -> ans NmRedeemResponseUnknown Just v -> do rv <- liftIO $ readMVar v -- possibly long wait case rv of PromiseInMemory rrv _ _ -> do liftIO $ modifyMVar_ v (\_ -> updatePromiseInMemory rv) ans (NmRedeemResponse rrv) PromiseOnDisk fp -> do mpd <- undiskify fp v case mpd of Nothing -> ans (NmRedeemResponseUnknown) Just a -> ans (NmRedeemResponse a) diskify fp v False PromiseException _ -> ans NmRedeemResponseException in do _ <- spawnLocal answerer return state) False in receiveWait [nmStart, nmRedeem, nmTermination, matchUnknownThrow] >>= handler in do forwardLogs $ Just masterpid mypid <- getSelfPid monitorProcess mypid masterpid MaMonitor logS "TSK" LoInformation $ "Starting a nodeboss owned by " ++ show masterpid pc <- liftIO $ newMVar Map.empty pf <- liftIO $ newMVar Map.empty let initState = NodeBossState {nsPromiseCache=pc,nsRedeemerForwarding=pf} handler initState -- | Starts a new context for executing a 'TaskM' environment. -- The node on which this function is run becomes a new master -- in a Task application; as a result, the application should -- only call this function once. The master will attempt to -- control all nodes that it can find; if you are going to be -- running more than one CH application on a single network, -- be sure to give each application a different network -- magic (via cfgNetworkMagic). The master TaskM environment -- created by this function can then spawn other threads, -- locally or remotely, using 'newPromise' and friends. runTask :: TaskM a -> ProcessM a runTask = startMaster startMaster :: TaskM a -> ProcessM a startMaster proc = do mvmaster <- liftIO $ newEmptyMVar mvdone <- liftIO $ newEmptyMVar master <- runMaster (masterproc mvdone mvmaster) liftIO $ putMVar mvmaster master liftIO $ takeMVar mvdone where masterproc mvdone mvmaster nodeboss = do master <- liftIO $ takeMVar mvmaster pc <- liftIO $ newMVar Map.empty pf <- liftIO $ newMVar Map.empty let initialState = TaskState {tsMaster=master,tsNodeBoss=Just nodeboss, tsPromiseCache=pc, tsRedeemerForwarding=pf, tsMonitoring=Map.empty} res <- liftM snd $ runTaskM proc initialState liftIO $ putMVar mvdone res {- UNUSED type LocationSelector = MasterState -> ProcessM (NodeId,ProcessId) -} selectLocation :: MasterState -> Locality -> ProcessM (Maybe (String,NodeId,ProcessId)) selectLocation ms locality = let nodes = msNodes ms in liftIO $ modifyMVar nodes (\n -> case n of [] -> return (n,Nothing) _ -> let dflt = (rotate n,Just $ head n) filterify f = case filter f n of [] -> return dflt (a:_) -> return ((delete a n) ++ [a],Just a) in case cond locality of LcUnrestricted -> return dflt LcDefault -> return dflt LcByRole l -> filterify (\(r,_,_) -> r `elem` l) LcByNode l -> filterify (\(_,r,_) -> r `elem` l)) where rotate [] = [] rotate (h:t) = t ++ [h] cond l = case l of LcDefault -> msDefaultLocality ms _ -> l countLocations :: MasterState -> ProcessM Int countLocations ms = liftIO $ withMVar (msNodes ms) (\a -> return $ length a) findPeers :: ProcessM [(String,NodeId)] findPeers = liftM (concat . (map (\(role,v) -> [ (role,x) | x <- v] )) . Map.toList) getPeers sendSilent :: (Serializable a) => ProcessId -> a -> ProcessM () sendSilent pid a = do res <- ptry $ send pid a case res of Left (TransmitException _) -> return () Right _ -> return () {- UNUSED getStatus :: TaskM () getStatus = do master <- getMaster res <- roundtrip master MmStatus case res of Left _ -> return () Right (MmStatusResponse nodes promises) -> let verboseNodes = intercalate ", " (map show nodes) verbosePromises = intercalate "\n" $ map (\(nb,l) -> (show nb)++" -- "++intercalate "," (map show l)) (Map.toList promises) in tsay $ "\nKnown nodes: " ++ verboseNodes ++ "\n\nNodebosses: " ++ verbosePromises -} runMaster :: (ProcessId -> ProcessM ()) -> ProcessM ProcessId runMaster masterproc = let probeOnce nodes seen masterpid = do recentlist <- findPeers -- TODO if a node fails to response to a probe even once, it's gone forever; be more flexible let newseen = seen `union` recentlist let topidlist = recentlist \\ seen let cleanOut n = filter (\(_,nid,_) -> nid `elem` (map snd recentlist)) n newlypidded <- mapM (\(role,nid) -> do pid <- runWorkerNode masterpid nid return (role,nid,pid)) topidlist (_newlist,totalseen) <- liftIO $ modifyMVar nodes (\oldlist -> return ((cleanOut oldlist) ++ newlypidded,(recentlist,newseen))) let newlyadded = totalseen \\ seen mapM_ (\nid -> sendSilent masterpid (TmNewPeer nid)) (map snd newlyadded) return totalseen proberDelay = 10000000 -- how often do we check the network to see what nodes are available? prober nodes seen masterpid = do totalseen <- probeOnce nodes seen masterpid _ <- receiveTimeout proberDelay [matchUnknownThrow] prober nodes totalseen masterpid master state = let tryAlloc clo promiseid locality queueing = do ns <- selectLocation state locality case ns of Nothing -> do logS "TSK" LoCritical "Attempt to allocate a task, but no nodes found" return Nothing Just (_,nid,nodeboss) -> do res <- roundtripQuery PldUser nodeboss (NmStart promiseid clo queueing) -- roundtripQuery monitors and then unmonitors, which generates a lot of traffic; we probably don't need to do this case res of Left e -> do logS "TSK" LoImportant $ "Failed attempt to start "++show clo++" on " ++show nid ++": "++show e return Nothing Right (NmStartResponse True) -> return $ Just nodeboss _ -> do logS "TSK" LoImportant $ "Failed attempt to start "++show clo++" on " ++show nid return Nothing basicAllocate clo promiseid locality queueing = do count <- countLocations state res1 <- tryAlloc clo promiseid locality queueing case res1 of Just _ -> return res1 Nothing -> -- TODO we should try all matching locations before moving on to Unrestricted do res <- stubborn count $ tryAlloc clo promiseid LcUnrestricted queueing case res of Nothing -> do logS "TSK" LoCritical $ "Terminally failed to start "++show clo return res _ -> return res statusMsg = roundtripResponse (\x -> case x of MmStatus -> do thenodes <- liftIO $ readMVar $ msNodes state let knownNodes = map (\(_,n,_) -> n) thenodes proctree = msAllocation state return (MmStatusResponse knownNodes proctree,state)) complainMsg = roundtripResponse (\x -> case x of MmComplain procid promid -> case Map.lookup promid (msPromises state) of Nothing -> return (MmComplainResponse nullPid,state) -- failure Just (PromiseRecord curprocid curclo curlocality) | curprocid /= procid -> return (MmComplainResponse curprocid,state) | otherwise -> do res <- basicAllocate curclo promid curlocality defaultQueueing case res of Nothing -> return (MmComplainResponse nullPid,state) -- failure Just newprocid -> let newpromises = Map.insert promid (PromiseRecord newprocid curclo curlocality) (msPromises state) in return (MmComplainResponse newprocid,state {msPromises=newpromises})) promiseMsg = roundtripResponse (\x -> case x of MmNewPromise clo locality queueing -> do let promiseid = msNextId state res <- basicAllocate clo promiseid locality queueing case res of Just nodeboss -> let newstate = state {msAllocation=newAllocation,msPromises=newPromises,msNextId=promiseid+1} newAllocation = Map.insertWith' (\a b -> nub $ a++b) nodeboss [promiseid] (msAllocation state) newPromises = Map.insert promiseid (PromiseRecord nodeboss clo locality) (msPromises state) in return (MmNewPromiseResponse nodeboss promiseid,newstate) Nothing -> return (MmNewPromiseResponseFail,state)) simpleMsg = match (\x -> case x of TmNewPeer nid -> do logS "TSK" LoInformation $ "Found new peer " ++show nid return state) in receiveWait [simpleMsg, promiseMsg, complainMsg,statusMsg] >>= master -- TODO matchUnknownThrow in do nodes <- liftIO $ newMVar [] selfnode <- getSelfNode selfpid <- getSelfPid let initState = MasterState {msNextId=0, msAllocation=Map.empty, msPromises=Map.empty, msNodes=nodes, msDefaultLocality = defaultLocality} masterpid <- spawnDaemonic (master initState) seennodes <- probeOnce nodes [] masterpid let getByNid _ [] = Nothing getByNid nid ((_,n,nodeboss):xs) = if nid==n then Just nodeboss else getByNid nid xs res <- liftIO $ withMVar nodes (\n -> return $ getByNid selfnode n) _ <- case res of Nothing -> taskError "Can't find self: make sure cfgKnownHosts includes the master" Just x -> spawnLocalAnd (masterproc x) (do myself <- getSelfPid monitorProcess selfpid myself MaLinkError) _ <- spawnDaemonic (prober nodes seennodes masterpid) return masterpid stubborn :: (Monad m) => Int -> m (Maybe a) -> m (Maybe a) stubborn 0 a = a stubborn n a | n>0 = do r <- a case r of Just _ -> return r Nothing -> stubborn (n-1) a -- TODO: setDefaultLocality :: Locality -> TaskM () -- | Like 'newPromise', but creates a promise whose -- values is already known. In other words, it puts -- a given, already-calculated value in a promise. -- Conceptually (but not syntactically, due to closures), -- you can consider it like this: -- -- > toPromise a = newPromise (return a) toPromise :: (Serializable a) => a -> TaskM (Promise a) toPromise = toPromiseAt LcDefault -- | A variant of 'toPromise' that lets the user -- express a locality preference, i.e. some information -- about which node will become the owner of the -- new promise. These preferences will not necessarily -- be respected. toPromiseAt :: (Serializable a) => Locality -> a -> TaskM (Promise a) toPromiseAt locality a = newPromiseAt locality (passthrough__closure a) -- | Similar to 'toPromiseAt' and 'newPromiseNear' toPromiseNear :: (Serializable a,Serializable b) => Promise b -> a -> TaskM (Promise a) toPromiseNear (PromiseImmediate _) = toPromise -- TODO should I consult tsRedeemerForwarding here? toPromiseNear (PromiseBasic prhost _prid) = toPromiseAt (LcByNode [nodeFromPid prhost]) -- | Creates an /immediate promise/, which is to say, a promise -- in name only. Unlike a regular promise (created by 'toPromise'), -- this kind of promise contains the value directly. The -- advantage is that promise redemption is very fast, requiring -- no network communication. The downside is that it the -- underlying data will be copied along with the promise. -- Useful only for small data. toPromiseImm :: (Serializable a) => a -> TaskM (Promise a) toPromiseImm = return . PromiseImmediate -- | Given a function (expressed here as a closure, see "Remote.Call") -- that computes a value, returns a token identifying that value. -- This token, a 'Promise' can be moved about even if the -- value hasn't been computed yet. The computing function -- will be started somewhere among the nodes visible to the -- current master, preferring those nodes that correspond -- to the 'defaultLocality'. Afterwards, attempts to -- redeem the promise with 'readPromise' will contact the node -- where the function is executing. newPromise :: (Serializable a) => Closure (TaskM a) -> TaskM (Promise a) newPromise = newPromiseAt LcDefault -- | A variant of 'newPromise' that prefers to start -- the computing function on the same node as the caller. -- Useful if you plan to use the resulting value -- locally. newPromiseHere :: (Serializable a) => Closure (TaskM a) -> TaskM (Promise a) newPromiseHere clo = do mynode <- liftTask $ getSelfNode newPromiseAt (LcByNode [mynode]) clo -- | A variant of 'newPromise' that prefers to start -- the computing function on the same node where some -- other promise lives. The other promise is not -- evaluated. newPromiseNear :: (Serializable a, Serializable b) => Promise b -> Closure (TaskM a) -> TaskM (Promise a) newPromiseNear (PromiseImmediate _) = newPromise newPromiseNear (PromiseBasic prhost _prid) = newPromiseAt (LcByNode [nodeFromPid prhost]) -- | A variant of 'newPromise' that prefers to start -- the computing functions on some set of nodes that -- have a given role (assigned by the cfgRole configuration -- option). newPromiseAtRole :: (Serializable a) => String -> Closure (TaskM a) -> TaskM (Promise a) newPromiseAtRole role clo = newPromiseAt (LcByRole [role]) clo -- | A variant of 'newPromise' that lets the user -- specify a 'Locality'. The other flavors of newPromise, -- such as 'newPromiseAtRole', 'newPromiseNear', and -- 'newPromiseHere' at just shorthand for a call to this function. newPromiseAt :: (Serializable a) => Locality -> Closure (TaskM a) -> TaskM (Promise a) newPromiseAt locality clo = let realclo = makePayloadClosure clo in case realclo of Just plclo -> do master <- getMaster res <- roundtrip master (MmNewPromise plclo locality defaultQueueing) case res of Right (MmNewPromiseResponse pid prid) -> return $ PromiseBasic pid prid Right (MmNewPromiseResponseFail) -> taskError $ "Spawning of closure "++show clo++" by newPromise failed" Left tms -> taskError $ "Spawning of closure "++show clo++" by newPromise resulted in "++show tms Nothing -> taskError $ "The specified closure, "++show clo++", can't produce payloads" -- | Given a promise, gets the value that is being -- calculated. If the calculation has finished, -- the owning node will be contacted and the data -- moved to the current node. If the calculation -- has not finished, this function will block -- until it has. If the calculation failed -- by throwing an exception (e.g. divide by zero), -- then this function will throw an excption as well -- (a 'TaskException'). If the node owning the -- promise is not accessible, the calculation -- will be restarted. readPromise :: (Serializable a) => Promise a -> TaskM a readPromise (PromiseImmediate a) = return a readPromise thepromise@(PromiseBasic prhost prid) = do mp <- lookupCachedPromise prid case mp of Nothing -> do fprhost <- liftM (maybe prhost id) $ lookupForwardedRedeemer prid res <- roundtrip fprhost (NmRedeem prid) case res of Left e -> do tlogS "TSK" LoInformation $ "Complaining about promise " ++ show prid ++" on " ++show fprhost++" because of "++show e complain fprhost prid Right NmRedeemResponseUnknown -> do tlogS "TSK" LoInformation $ "Complaining about promise " ++ show prid ++" on " ++show fprhost++" because allegedly unknown" complain fprhost prid Right (NmRedeemResponse thedata) -> do extracted <- extractFromPayload thedata promiseinmem <- liftTaskIO $ makePromiseInMemory thedata (Just $ toDyn extracted) putPromiseInCache prid promiseinmem return extracted Right NmRedeemResponseException -> taskError "Failed promise redemption" -- don't redeem, this is a terminal failure Just mv -> do val <- liftTaskIO $ readMVar mv -- possible long wait here case val of -- TODO this read/write MVars should be combined! PromiseInMemory v _utc thedyn -> case thedyn of Just thedynvalue -> case fromDynamic thedynvalue of Nothing -> do liftTask $ logS "TSK" LoStandard "Insufficiently dynamic promise cache" extractFromPayload v Just realval -> do updated <- liftTaskIO $ makePromiseInMemory v thedyn putPromiseInCache prid updated return realval Nothing -> do extracted <- extractFromPayload v updated <- liftTaskIO $ makePromiseInMemory v (Just $ toDyn extracted) putPromiseInCache prid updated return extracted PromiseException _ -> taskError $ "Redemption of promise failed" PromiseOnDisk fp -> do mpd <- liftTask $ undiskify fp mv _ <- liftTask $ spawnLocal $ diskify fp mv False case mpd of Just dat -> extractFromPayload dat _ -> taskError "Promise extraction from disk failed" where extractFromPayload v = do out <- liftTaskIO $ serialDecode v case out of Just r -> return r Nothing -> taskError "Unexpected payload type" complain fprhost prid = do master <- getMaster response <- roundtrip master (MmComplain fprhost prid) case response of Left a -> taskError $ "Couldn't file complaint with master about " ++ show fprhost ++ " because " ++ show a Right (MmComplainResponse newhost) | newhost == nullPid -> taskError $ "Couldn't file complaint with master about " ++ show fprhost | otherwise -> do setForwardedRedeemer prid newhost readPromise thepromise data TaskState = TaskState { tsMaster :: ProcessId, tsNodeBoss :: Maybe ProcessId, tsPromiseCache :: MVar (Map.Map PromiseId (MVar PromiseStorage)), tsRedeemerForwarding :: MVar (Map.Map PromiseId ProcessId), tsMonitoring :: Map.Map ProcessId () } data TaskM a = TaskM { runTaskM :: TaskState -> ProcessM (TaskState, a) } deriving (Typeable) instance Monad TaskM where m >>= k = TaskM $ \ts -> do (ts',a) <- runTaskM m ts (ts'',a') <- runTaskM (k a) (ts') return (ts'',a') return x = TaskM $ \ts -> return $ (ts,x) lookupForwardedRedeemer :: PromiseId -> TaskM (Maybe ProcessId) lookupForwardedRedeemer q = TaskM $ \ts -> liftIO $ withMVar (tsRedeemerForwarding ts) $ (\fwd -> let lo = Map.lookup q fwd in return (ts,lo)) setForwardedRedeemer :: PromiseId -> ProcessId -> TaskM () setForwardedRedeemer from to = TaskM $ \ts -> liftIO $ modifyMVar (tsRedeemerForwarding ts) (\fwd -> let newmap = Map.insert from to fwd in return ( newmap,(ts,()) ) ) lookupCachedPromise :: PromiseId -> TaskM (Maybe (MVar PromiseStorage)) lookupCachedPromise prid = TaskM $ \ts -> do mv <- liftIO $ withMVar (tsPromiseCache ts) (\pc -> return $ Map.lookup prid pc) return (ts,mv) putPromiseInCache :: PromiseId -> PromiseStorage -> TaskM () putPromiseInCache prid ps = TaskM $ \ts -> do liftIO $ modifyMVar_ (tsPromiseCache ts) (\pc -> do mv <- newMVar ps return $ Map.insert prid mv pc) return (ts,()) getMaster :: TaskM ProcessId getMaster = TaskM $ \ts -> return (ts,tsMaster ts) liftTask :: ProcessM a -> TaskM a liftTask a = TaskM $ \ts -> a >>= (\x -> return (ts,x)) liftTaskIO :: IO a -> TaskM a liftTaskIO = liftTask . liftIO -- | A Task-monadic version of 'Remote.Process.say'. -- Puts text messages in the log. tsay :: String -> TaskM () tsay a = liftTask $ say a -- | Writes various kinds of messages to the -- "Remote.Process" log. tlogS :: LogSphere -> LogLevel -> String -> TaskM () tlogS a b c = liftTask $ logS a b c ---------------------------------------------- -- * MapReduce ---------------------------------------------- -- | A data structure that stores the important -- user-provided functions that are the namesakes -- of the MapReduce algorithm. -- The number of mapper processes can be controlled -- by the user by controlling the length of the string -- returned by mtChunkify. The number of reducer -- promises is controlled by the number of values -- values returned by shuffler. -- The user must provide their own mapper and reducer. -- For many cases, the default chunkifier ('chunkify') -- and shuffler ('shuffle') are adequate. data MapReduce rawinput input middle1 middle2 result = MapReduce { mtMapper :: input -> Closure (TaskM [middle1]), mtReducer :: middle2 -> Closure (TaskM result), mtChunkify :: rawinput -> [input], mtShuffle :: [middle1] -> [middle2] } -- | A convenient way to provide the 'mtShuffle' function -- as part of 'mapReduce'. shuffle :: Ord a => [(a,b)] -> [(a,[b])] shuffle q = let semi = groupBy (\(a,_) (b,_) -> a==b) (sortBy (\(a,_) (b,_) -> compare a b) q) in map (\x -> (fst $ head x,map snd x)) semi -- | A convenient way to provide the 'mtChunkify' function -- as part of 'mapReduce'. chunkify :: Int -> [a] -> [[a]] chunkify numChunks l | numChunks <= 0 = taskError "Can't chunkify into less than one chunk" | otherwise = splitSize (ceiling ((fromIntegral (length l) / fromIntegral numChunks) :: Double)) l where splitSize _ [] = [] splitSize i v = let (first,second) = splitAt i v in first : splitSize i second -- | The MapReduce algorithm, implemented in a very -- simple form on top of the Task layer. Its -- use depends on four user-determined data types: -- -- * input -- The data type provided as the input to the algorithm as a whole and given to the mapper. -- -- * middle1 -- The output of the mapper. This may include some /key/ which is used by the shuffler to allocate data to reducers. -- If you use the default shuffler, 'shuffle', this type must have the form @Ord a => (a,b)@. -- -- * middle2 -- The output of the shuffler. The default shuffler emits a type in the form @Ord => (a,[b])@. Each middle2 output -- by shuffler is given to a separate reducer. -- -- * result -- The output of the reducer, upon being given a bunch of middles. mapReduce :: (Serializable i,Serializable k,Serializable m,Serializable r) => MapReduce ri i k m r -> ri -> TaskM [r] mapReduce mr inputs = let chunks = (mtChunkify mr) inputs in do pmapResult <- mapM (\chunk -> newPromise ((mtMapper mr) chunk) ) chunks mapResult <- mapM readPromise pmapResult let shuffled = (mtShuffle mr) (concat mapResult) pres <- mapM (\mid2 -> newPromise ((mtReducer mr) mid2)) shuffled mapM readPromise pres