-- | -- Module : System.Mesos.Scheduler -- Copyright : (c) Ian Duncan 2014 -- License : MIT -- -- Maintainer : ian@iankduncan.com -- Stability : unstable -- Portability : non-portable -- -- Mesos scheduler interface and scheduler driver. A scheduler is used -- to interact with Mesos in order run distributed computations. module System.Mesos.Scheduler ( -- * Creating a new 'Scheduler' ToScheduler(..), SchedulerDriver, -- * Managing framework actions with a 'SchedulerDriver' -- ** 'SchedulerDriver' lifecycle management withSchedulerDriver, start, stop, abort, await, run, -- ** Managing and interacting with tasks and resources requestResources, launchTasks, killTask, declineOffer, reviveOffers, sendFrameworkMessage, reconcileTasks, -- * Low-level lifecycle management createDriver, destroyDriver, Scheduler, createScheduler, destroyScheduler ) where import Control.Monad.Managed import Data.ByteString (ByteString, packCStringLen) import Data.ByteString.Unsafe (unsafeUseAsCStringLen) -- import Foreign.C import Foreign.Ptr import System.Mesos.Internal hiding (marshal) import System.Mesos.Raw import System.Mesos.Raw.Scheduler import System.Mesos.Types -- | Callback interface to be implemented by frameworks' -- schedulers. Note that only one callback will be invoked at a time, -- so it is not recommended that you block within a callback because -- it may cause a deadlock. class ToScheduler a where -- | Invoked when the 'Scheduler' successfully registers with a Mesos -- master. A unique ID (generated by the master) used for -- distinguishing this framework from others and 'MasterInfo' -- with the ip and port of the current master are provided as arguments. registered :: a -> SchedulerDriver -> FrameworkID -> MasterInfo -> IO () registered _ _ _ _ = return () -- | Invoked when the 'Scheduler' re-registers with a newly elected Mesos master. -- This is only called when the scheduler has previously been registered. -- 'MasterInfo' containing the updated information about the elected master -- is provided as an argument. reRegistered :: a -> SchedulerDriver -> MasterInfo -> IO () reRegistered _ _ _ = return () -- | Invoked when the 'Scheduler' becomes "disconnected" from the master -- (e.g., the master fails and another is taking over). disconnected :: a -> SchedulerDriver -> IO () disconnected _ _ = return () -- | Invoked when resources have been offered to this framework. A -- single offer will only contain resources from a single slave. -- Resources associated with an offer will not be re-offered to -- _this_ framework until either (a) this framework has rejected -- those resources (see 'launchTasks') or (b) those -- resources have been rescinded (see 'offerRescinded'). -- Note that resources may be concurrently offered to more than one -- framework at a time (depending on the allocator being used). In -- that case, the first framework to launch tasks using those -- resources will be able to use them while the other frameworks -- will have those resources rescinded (or if a framework has -- already launched tasks with those resources then those tasks will -- fail with a 'System.Mesos.Types.Lost' status and a message saying as much). resourceOffers :: a -> SchedulerDriver -> [Offer] -> IO () resourceOffers _ _ _ = return () -- | Invoked when an offer is no longer valid (e.g., the slave was -- lost or another framework used resources in the offer). If for -- whatever reason an offer is never rescinded (e.g., dropped -- message, failing over framework, etc.), a framwork that attempts -- to launch tasks using an invalid offer will receive 'Lost' -- status updates for those tasks (see 'resourceOffers'). offerRescinded :: a -> SchedulerDriver -> OfferID -> IO () offerRescinded _ _ _ = return () -- | Invoked when the status of a task has changed (e.g., a slave is -- lost and so the task is lost, a task finishes and an executor -- sends a status update saying so, etc). Note that returning from -- this callback _acknowledges_ receipt of this status update! If -- for whatever reason the scheduler aborts during this callback (or -- the process exits) another status update will be delivered (note, -- however, that this is currently not true if the slave sending the -- status update is lost/fails during that time). statusUpdate :: a -> SchedulerDriver -> TaskStatus -> IO () statusUpdate _ _ _ = return () -- | Invoked when an executor sends a message. These messages are best -- effort; do not expect a framework message to be retransmitted in -- any reliable fashion. frameworkMessage :: a -> SchedulerDriver -> ExecutorID -> SlaveID -> ByteString -> IO () frameworkMessage _ _ _ _ _ = return () -- | Invoked when a slave has been determined unreachable (e.g., -- machine failure, network partition). Most frameworks will need to -- reschedule any tasks launched on this slave on a new slave. slaveLost :: a -> SchedulerDriver -> SlaveID -> IO () slaveLost _ _ _ = return () -- | Invoked when an executor has exited/terminated. Note that any -- tasks running will have 'Lost' status updates automagically -- generated. executorLost :: a -> SchedulerDriver -> ExecutorID -> SlaveID -> Status -> IO () executorLost _ _ _ _ _ = return () -- | Invoked when there is an unrecoverable error in the scheduler or -- scheduler driver. The driver will be aborted BEFORE invoking this -- callback. errorMessage :: a -> SchedulerDriver -> ByteString -> IO () errorMessage _ _ _ = return () createScheduler :: ToScheduler a => a -> IO Scheduler createScheduler s = do registeredFun <- wrapSchedulerRegistered $ \sdp fp mp -> runManaged $ do let sd = SchedulerDriver sdp f <- peekCPP fp m <- peekCPP mp liftIO $ (registered s) sd f m reRegisteredFun <- wrapSchedulerReRegistered $ \sdp mip -> runManaged $ do let sd = SchedulerDriver sdp mi <- unmarshal mip liftIO $ (reRegistered s) sd mi disconnectedFun <- wrapSchedulerDisconnected $ \sdp -> runManaged $ do let sd = SchedulerDriver sdp liftIO $ (disconnected s) sd resourceOffersFun <- wrapSchedulerResourceOffers $ \sdp os c -> runManaged $ do let sd = SchedulerDriver sdp offers <- mapM unmarshal =<< peekArray' (os, fromIntegral c) liftIO $ (resourceOffers s) sd offers offerRescindedFun <- wrapSchedulerOfferRescinded $ \sdp oidp -> do let sd = SchedulerDriver sdp with (unmarshal oidp) $ \oid -> (offerRescinded s) sd oid statusUpdateFun <- wrapSchedulerStatusUpdate $ \sdp tsp -> runManaged $ do let sd = SchedulerDriver sdp ts <- unmarshal tsp liftIO $ (statusUpdate s) sd ts frameworkMessageFun <- wrapSchedulerFrameworkMessage $ \sdp eip sip ptr c -> runManaged $ do let sd = SchedulerDriver sdp ei <- unmarshal eip si <- unmarshal sip bs <- liftIO $ packCStringLen (ptr, c) liftIO $ (frameworkMessage s) sd ei si bs slaveLostFun <- wrapSchedulerSlaveLost $ \sdp sip -> runManaged $ do let sd = SchedulerDriver sdp si <- unmarshal sip liftIO $ (slaveLost s) sd si executorLostFun <- wrapSchedulerExecutorLost $ \sdp eip sip st -> runManaged $ do let sd = SchedulerDriver sdp ei <- unmarshal eip si <- unmarshal sip liftIO $ (executorLost s) sd ei si (toEnum $ fromIntegral st) errorFun <- wrapSchedulerError $ \sdp ptr c -> do let sd = SchedulerDriver sdp bs <- packCStringLen (ptr, fromIntegral c) (errorMessage s) sd bs schedulerPtr <- c_createScheduler registeredFun reRegisteredFun disconnectedFun resourceOffersFun offerRescindedFun statusUpdateFun frameworkMessageFun slaveLostFun executorLostFun errorFun return $ Scheduler schedulerPtr registeredFun reRegisteredFun disconnectedFun resourceOffersFun offerRescindedFun statusUpdateFun frameworkMessageFun slaveLostFun executorLostFun errorFun destroyScheduler :: Scheduler -> IO () destroyScheduler s = do c_destroyScheduler $ schedulerImpl s freeHaskellFunPtr $ rawSchedulerRegistered s freeHaskellFunPtr $ rawSchedulerReRegistered s freeHaskellFunPtr $ rawSchedulerDisconnected s freeHaskellFunPtr $ rawSchedulerResourceOffers s freeHaskellFunPtr $ rawSchedulerOfferRescinded s freeHaskellFunPtr $ rawSchedulerStatusUpdate s freeHaskellFunPtr $ rawSchedulerFrameworkMessage s freeHaskellFunPtr $ rawSchedulerSlaveLost s freeHaskellFunPtr $ rawSchedulerExecutorLost s freeHaskellFunPtr $ rawSchedulerError s withDriver :: (SchedulerDriverPtr -> IO CInt) -> SchedulerDriver -> IO Status withDriver f (SchedulerDriver p) = fmap (toEnum . fromIntegral) $ f p withSchedulerDriver :: ToScheduler a => a -> FrameworkInfo -> ByteString -- ^ @ip:port@ of the master to connect to. For example: @"127.0.0.1:5050"@ -> Maybe Credential -> (SchedulerDriver -> IO b) -> IO b withSchedulerDriver s i h c f = do scheduler <- createScheduler s driver <- createDriver scheduler i h c result <- f driver destroyDriver driver destroyScheduler scheduler return result createDriver :: Scheduler -> FrameworkInfo -> ByteString -> Maybe Credential -> IO SchedulerDriver createDriver s i h mc = with (cppValue i) $ \fiP -> with (cstring h) $ \(hp, hLen) -> fmap SchedulerDriver $ case mc of Nothing -> c_createSchedulerDriver (schedulerImpl s) fiP hp (fromIntegral hLen) Just c -> with (cppValue c) $ \cp -> do c_createSchedulerDriverWithCredentials (schedulerImpl s) fiP hp (fromIntegral hLen) cp destroyDriver :: SchedulerDriver -> IO () destroyDriver = c_destroySchedulerDriver . fromSchedulerDriver -- | Starts the scheduler driver. This needs to be called before any -- other driver calls are made. start :: SchedulerDriver -> IO Status start = withDriver c_startSchedulerDriver -- | Stops the scheduler driver. If the 'failover' flag is set to -- false then it is expected that this framework will never -- reconnect to Mesos and all of its executors and tasks can be -- terminated. Otherwise, all executors and tasks will remain -- running (for some framework specific failover timeout) allowing the -- scheduler to reconnect (possibly in the same process, or from a -- different process, for example, on a different machine). stop :: SchedulerDriver -> Bool -- ^ should failover? -> IO Status stop d f = withDriver (\p -> c_stopSchedulerDriver p fi) d where fi = if f then 1 else 0 -- | Aborts the driver so that no more callbacks can be made to the -- scheduler. The semantics of abort and stop have deliberately been -- separated so that code can detect an aborted driver (i.e., via -- the return status of 'await', see below), and -- instantiate and start another driver if desired (from within the -- same process). Note that 'stop' is not automatically called -- inside 'abort'. abort :: SchedulerDriver -> IO Status abort = withDriver c_abortSchedulerDriver -- | Waits for the driver to be stopped or aborted, possibly -- *blocking the current thread indefinitely*. The return status of -- this function can be used to determine if the driver was aborted -- (see 'Status' for more information). await :: SchedulerDriver -> IO Status await = withDriver c_joinSchedulerDriver -- | Starts and immediately 'await's (blocks on) the driver. run :: SchedulerDriver -> IO Status run = withDriver c_runSchedulerDriver -- | Requests resources from Mesos. Any resources available are asynchronously offered to the framework via the 'resourceOffers' callback. requestResources :: SchedulerDriver -> [Request] -> IO Status requestResources (SchedulerDriver p) rs = do fmap (toEnum . fromIntegral) $ with (mapM cppValue rs >>= arrayLen) $ \(rp, l) -> do c_requestResources p rp $ fromIntegral l -- | Launches the given set of tasks. Any resources remaining (i.e., -- not used by the tasks or their executors) will be considered -- declined. The specified filters are applied on all unused -- resources (see 'Filters' for more information). -- -- Available resources are aggregated when mutiple offers are -- provided. Note that all offers must belong to the same slave. -- Invoking this function with an empty collection of tasks declines -- offers in their entirety (see 'declineOffer'). launchTasks :: SchedulerDriver -> [OfferID] -> [TaskInfo] -> Filters -> IO Status launchTasks (SchedulerDriver p) os ts f = with (cppValue f) $ \fp -> with (mapM cppValue os >>= arrayLen) $ \(op, ol) -> with (mapM cppValue ts >>= arrayLen) $ \(tp, tl) -> do res <- c_launchTasks p op (fromIntegral ol) tp (fromIntegral tl) fp return $ toEnum $ fromIntegral res -- | Kills the specified task. Note that attempting to kill a task is -- currently not reliable. If, for example, a scheduler fails over -- while it was attempting to kill a task it will need to retry in -- the future. Likewise, if unregistered / disconnected, the request -- will be dropped (these semantics may be changed in the future). killTask :: SchedulerDriver -> TaskID -> IO Status killTask (SchedulerDriver p) t = with (cppValue t) $ \tid -> do res <- c_killTask p tid return $ toEnum $ fromIntegral res -- | Declines an offer in its entirety and applies the specified -- filters on the resources (see 'Filters'). Note that this can be done at any time, it is not -- necessary to do this within the 'resourceOffers' -- callback. declineOffer :: SchedulerDriver -> OfferID -> Filters -> IO Status declineOffer (SchedulerDriver p) o f = with (cppValue o) $ \oid -> with (cppValue f) $ \fp -> do res <- c_declineOffer p oid fp return $ toEnum $ fromIntegral res -- | Removes all filters previously set by the framework (via -- 'launchTasks'). This enables the framework to receive offers from -- those filtered slaves. reviveOffers :: SchedulerDriver -> IO Status reviveOffers = withDriver c_reviveOffers -- | Sends a message from the framework to one of its executors. These -- messages are best effort; do not expect a framework message to be -- retransmitted in any reliable fashion. sendFrameworkMessage :: SchedulerDriver -> ExecutorID -> SlaveID -> ByteString -> IO Status sendFrameworkMessage (SchedulerDriver p) e s bs = with (cppValue e) $ \ep -> with (cppValue s) $ \sp -> with (cstring bs) $ \(strp, l) -> do res <-c_sendFrameworkMessage p ep sp strp (fromIntegral l) return $ toEnum $ fromIntegral res -- | Reconciliation of tasks causes the master to send status updates for tasks -- whose status differs from the status sent here. reconcileTasks :: SchedulerDriver -> [TaskStatus] -> IO Status reconcileTasks (SchedulerDriver p) ts = with (mapM cppValue ts >>= arrayLen) $ \(tp, l) -> do res <- c_reconcileTasks p tp (fromIntegral l) return $ toEnum $ fromIntegral res