-- |
-- Module      : System.Mesos.Scheduler
-- Copyright   : (c) Ian Duncan 2014
-- License     : MIT
--
-- Maintainer  : ian@iankduncan.com
-- Stability   : unstable
-- Portability : non-portable
--
-- Mesos scheduler interface and scheduler driver. A scheduler is used
-- to interact with Mesos in order run distributed computations.

module System.Mesos.Scheduler (
  -- * Creating a new 'Scheduler'
  ToScheduler(..),
  SchedulerDriver,
  -- * Managing framework actions with a 'SchedulerDriver'
  -- ** 'SchedulerDriver' lifecycle management
  withSchedulerDriver,
  start,
  stop,
  abort,
  await,
  run,
  -- ** Managing and interacting with tasks and resources
  requestResources,
  launchTasks,
  killTask,
  declineOffer,
  reviveOffers,
  sendFrameworkMessage,
  reconcileTasks,
  -- * Low-level lifecycle management
  createDriver,
  destroyDriver,
  Scheduler,
  createScheduler,
  destroyScheduler
) where
import           Control.Monad.Managed
import           Data.ByteString            (ByteString, packCStringLen)
import           Data.ByteString.Unsafe     (unsafeUseAsCStringLen)
-- import           Foreign.C
import           Foreign.Ptr
import           System.Mesos.Internal      hiding (marshal, requestResources)
import           System.Mesos.Raw
import           System.Mesos.Raw.Scheduler
import           System.Mesos.Types         hiding (requestResources)

-- | Callback interface to be implemented by frameworks'
-- schedulers. Note that only one callback will be invoked at a time,
-- so it is not recommended that you block within a callback because
-- it may cause a deadlock.
class ToScheduler a where
  -- | Invoked when the 'Scheduler' successfully registers with a Mesos
  -- master. A unique ID (generated by the master) used for
  -- distinguishing this framework from others and 'MasterInfo'
  -- with the ip and port of the current master are provided as arguments.
  registered :: a -> SchedulerDriver -> FrameworkID -> MasterInfo -> IO ()
  registered _ _ _ _ = return ()

  -- | Invoked when the 'Scheduler' re-registers with a newly elected Mesos master.
  -- This is only called when the scheduler has previously been registered.
  -- 'MasterInfo' containing the updated information about the elected master
  -- is provided as an argument.
  reRegistered :: a -> SchedulerDriver -> MasterInfo -> IO ()
  reRegistered _ _ _ = return ()

  -- | Invoked when the 'Scheduler' becomes "disconnected" from the master
  -- (e.g., the master fails and another is taking over).
  disconnected :: a -> SchedulerDriver -> IO ()
  disconnected _ _ = return ()

  -- | Invoked when resources have been offered to this framework. A
  -- single offer will only contain resources from a single slave.
  -- Resources associated with an offer will not be re-offered to
  -- _this_ framework until either (a) this framework has rejected
  -- those resources (see 'launchTasks') or (b) those
  -- resources have been rescinded (see 'offerRescinded').
  -- Note that resources may be concurrently offered to more than one
  -- framework at a time (depending on the allocator being used). In
  -- that case, the first framework to launch tasks using those
  -- resources will be able to use them while the other frameworks
  -- will have those resources rescinded (or if a framework has
  -- already launched tasks with those resources then those tasks will
  -- fail with a 'System.Mesos.Types.Lost' status and a message saying as much).
  resourceOffers :: a -> SchedulerDriver -> [Offer] -> IO ()
  resourceOffers _ _ _ = return ()

  -- | Invoked when an offer is no longer valid (e.g., the slave was
  -- lost or another framework used resources in the offer). If for
  -- whatever reason an offer is never rescinded (e.g., dropped
  -- message, failing over framework, etc.), a framwork that attempts
  -- to launch tasks using an invalid offer will receive 'Lost'
  -- status updates for those tasks (see 'resourceOffers').
  offerRescinded :: a -> SchedulerDriver -> OfferID -> IO ()
  offerRescinded _ _ _ = return ()

  -- | Invoked when the status of a task has changed (e.g., a slave is
  -- lost and so the task is lost, a task finishes and an executor
  -- sends a status update saying so, etc). Note that returning from
  -- this callback _acknowledges_ receipt of this status update! If
  -- for whatever reason the scheduler aborts during this callback (or
  -- the process exits) another status update will be delivered (note,
  -- however, that this is currently not true if the slave sending the
  -- status update is lost/fails during that time).
  statusUpdate :: a -> SchedulerDriver -> TaskStatus -> IO ()
  statusUpdate _ _ _ = return ()

  -- | Invoked when an executor sends a message. These messages are best
  -- effort; do not expect a framework message to be retransmitted in
  -- any reliable fashion.
  frameworkMessage :: a -> SchedulerDriver -> ExecutorID -> SlaveID -> ByteString -> IO ()
  frameworkMessage  _ _ _ _ _ = return ()

  -- | Invoked when a slave has been determined unreachable (e.g.,
  -- machine failure, network partition). Most frameworks will need to
  -- reschedule any tasks launched on this slave on a new slave.
  slaveLost :: a -> SchedulerDriver -> SlaveID -> IO ()
  slaveLost _ _ _ = return ()

  -- | Invoked when an executor has exited/terminated. Note that any
  -- tasks running will have 'Lost' status updates automagically
  -- generated.
  executorLost :: a -> SchedulerDriver -> ExecutorID -> SlaveID -> Status -> IO ()
  executorLost _ _ _ _ _ = return ()

  -- | Invoked when there is an unrecoverable error in the scheduler or
  -- scheduler driver. The driver will be aborted BEFORE invoking this
  -- callback.
  errorMessage :: a -> SchedulerDriver -> ByteString -> IO ()
  errorMessage _ _ _ = return ()

createScheduler :: ToScheduler a => a -> IO Scheduler
createScheduler s = do
  registeredFun <- wrapSchedulerRegistered $ \sdp fp mp -> runManaged $ do
    let sd = SchedulerDriver sdp
    f <- peekCPP fp
    m <- peekCPP mp
    liftIO $ (registered s) sd f m
  reRegisteredFun <- wrapSchedulerReRegistered $ \sdp mip -> runManaged $ do
    let sd = SchedulerDriver sdp
    mi <- unmarshal mip
    liftIO $ (reRegistered s) sd mi
  disconnectedFun <- wrapSchedulerDisconnected $ \sdp -> runManaged $ do
    let sd = SchedulerDriver sdp
    liftIO $ (disconnected s) sd
  resourceOffersFun <- wrapSchedulerResourceOffers $ \sdp os c -> runManaged $ do
    let sd = SchedulerDriver sdp
    offers <- mapM unmarshal =<< peekArray' (os, fromIntegral c)
    liftIO $ (resourceOffers s) sd offers
  offerRescindedFun <- wrapSchedulerOfferRescinded $ \sdp oidp -> do
    let sd = SchedulerDriver sdp
    with (unmarshal oidp) $ \oid ->
      (offerRescinded s) sd oid
  statusUpdateFun <- wrapSchedulerStatusUpdate $ \sdp tsp -> runManaged $ do
    let sd = SchedulerDriver sdp
    ts <- unmarshal tsp
    liftIO $ (statusUpdate s) sd ts
  frameworkMessageFun <- wrapSchedulerFrameworkMessage $ \sdp eip sip ptr c -> runManaged $ do
    let sd = SchedulerDriver sdp
    ei <- unmarshal eip
    si <- unmarshal sip
    bs <- liftIO $ packCStringLen (ptr, c)
    liftIO $ (frameworkMessage s) sd ei si bs
  slaveLostFun <- wrapSchedulerSlaveLost $ \sdp sip -> runManaged $ do
    let sd = SchedulerDriver sdp
    si <- unmarshal sip
    liftIO $ (slaveLost s) sd si
  executorLostFun <- wrapSchedulerExecutorLost $ \sdp eip sip st -> runManaged $ do
    let sd = SchedulerDriver sdp
    ei <- unmarshal eip
    si <- unmarshal sip
    liftIO $ (executorLost s) sd ei si (toEnum $ fromIntegral st)
  errorFun <- wrapSchedulerError $ \sdp ptr c -> do
    let sd = SchedulerDriver sdp
    bs <- packCStringLen (ptr, fromIntegral c)
    (errorMessage s) sd bs
  schedulerPtr <- c_createScheduler registeredFun reRegisteredFun disconnectedFun resourceOffersFun offerRescindedFun statusUpdateFun frameworkMessageFun slaveLostFun executorLostFun errorFun
  return $ Scheduler schedulerPtr registeredFun reRegisteredFun disconnectedFun resourceOffersFun offerRescindedFun statusUpdateFun frameworkMessageFun slaveLostFun executorLostFun errorFun

destroyScheduler :: Scheduler -> IO ()
destroyScheduler s = do
  c_destroyScheduler $ schedulerImpl s
  freeHaskellFunPtr $ rawSchedulerRegistered s
  freeHaskellFunPtr $ rawSchedulerReRegistered s
  freeHaskellFunPtr $ rawSchedulerDisconnected s
  freeHaskellFunPtr $ rawSchedulerResourceOffers s
  freeHaskellFunPtr $ rawSchedulerOfferRescinded s
  freeHaskellFunPtr $ rawSchedulerStatusUpdate s
  freeHaskellFunPtr $ rawSchedulerFrameworkMessage s
  freeHaskellFunPtr $ rawSchedulerSlaveLost s
  freeHaskellFunPtr $ rawSchedulerExecutorLost s
  freeHaskellFunPtr $ rawSchedulerError s

withDriver :: (SchedulerDriverPtr -> IO CInt) -> SchedulerDriver -> IO Status
withDriver f (SchedulerDriver p) = fmap (toEnum . fromIntegral) $ f p

withSchedulerDriver :: ToScheduler a => a
                    -> FrameworkInfo
                    -> ByteString -- ^ @ip:port@ of the master to connect to. For example: @"127.0.0.1:5050"@
                    -> Maybe Credential
                    -> (SchedulerDriver -> IO b)
                    -> IO b
withSchedulerDriver s i h c f = do
  scheduler <- createScheduler s
  driver <- createDriver scheduler i h c
  result <- f driver
  destroyDriver driver
  destroyScheduler scheduler
  return result

createDriver :: Scheduler -> FrameworkInfo -> ByteString -> Maybe Credential -> IO SchedulerDriver
createDriver s i h mc = with (cppValue i) $ \fiP ->
  with (cstring h) $ \(hp, hLen) ->
    fmap SchedulerDriver $ case mc of
                             Nothing -> c_createSchedulerDriver (schedulerImpl s) fiP hp (fromIntegral hLen)
                             Just c -> with (cppValue c) $ \cp -> do
                               c_createSchedulerDriverWithCredentials (schedulerImpl s) fiP hp (fromIntegral hLen) cp

destroyDriver :: SchedulerDriver -> IO ()
destroyDriver = c_destroySchedulerDriver . fromSchedulerDriver

-- | Starts the scheduler driver. This needs to be called before any
-- other driver calls are made.
start :: SchedulerDriver -> IO Status
start = withDriver c_startSchedulerDriver

-- | Stops the scheduler driver. If the 'failover' flag is set to
-- false then it is expected that this framework will never
-- reconnect to Mesos and all of its executors and tasks can be
-- terminated. Otherwise, all executors and tasks will remain
-- running (for some framework specific failover timeout) allowing the
-- scheduler to reconnect (possibly in the same process, or from a
-- different process, for example, on a different machine).
stop :: SchedulerDriver
     -> Bool -- ^ should failover?
     -> IO Status
stop d f = withDriver (\p -> c_stopSchedulerDriver p fi) d
  where
    fi = if f then 1 else 0

-- | Aborts the driver so that no more callbacks can be made to the
-- scheduler. The semantics of abort and stop have deliberately been
-- separated so that code can detect an aborted driver (i.e., via
-- the return status of 'await', see below), and
-- instantiate and start another driver if desired (from within the
-- same process). Note that 'stop' is not automatically called
-- inside 'abort'.
abort :: SchedulerDriver -> IO Status
abort = withDriver c_abortSchedulerDriver

-- | Waits for the driver to be stopped or aborted, possibly
-- *blocking the current thread indefinitely*. The return status of
-- this function can be used to determine if the driver was aborted
-- (see 'Status' for more information).
await :: SchedulerDriver -> IO Status
await = withDriver c_joinSchedulerDriver

-- | Starts and immediately 'await's (blocks on) the driver.
run :: SchedulerDriver -> IO Status
run = withDriver c_runSchedulerDriver

-- | Requests resources from Mesos. Any resources available are asynchronously offered to the framework via the 'resourceOffers' callback.
requestResources :: SchedulerDriver -> [Request] -> IO Status
requestResources (SchedulerDriver p) rs = do
  fmap (toEnum . fromIntegral) $ with (mapM cppValue rs >>= arrayLen) $ \(rp, l) -> do
    c_requestResources p rp $ fromIntegral l

-- | Launches the given set of tasks. Any resources remaining (i.e.,
-- not used by the tasks or their executors) will be considered
-- declined. The specified filters are applied on all unused
-- resources (see 'Filters' for more information).
--
-- Available resources are aggregated when mutiple offers are
-- provided. Note that all offers must belong to the same slave.
-- Invoking this function with an empty collection of tasks declines
-- offers in their entirety (see 'declineOffer').
launchTasks :: SchedulerDriver -> [OfferID] -> [TaskInfo] -> Filters -> IO Status
launchTasks (SchedulerDriver p) os ts f = with (cppValue f) $ \fp ->
  with (mapM cppValue os >>= arrayLen) $ \(op, ol) ->
    with (mapM cppValue ts >>= arrayLen) $ \(tp, tl) -> do
      res <- c_launchTasks p op (fromIntegral ol) tp (fromIntegral tl) fp
      return $ toEnum $ fromIntegral res

-- | Kills the specified task. Note that attempting to kill a task is
-- currently not reliable. If, for example, a scheduler fails over
-- while it was attempting to kill a task it will need to retry in
-- the future. Likewise, if unregistered / disconnected, the request
-- will be dropped (these semantics may be changed in the future).
killTask :: SchedulerDriver -> TaskID -> IO Status
killTask (SchedulerDriver p) t = with (cppValue t) $ \tid -> do
  res <- c_killTask p tid
  return $ toEnum $ fromIntegral res

-- | Declines an offer in its entirety and applies the specified
-- filters on the resources (see 'Filters'). Note that this can be done at any time, it is not
-- necessary to do this within the 'resourceOffers'
-- callback.
declineOffer :: SchedulerDriver -> OfferID -> Filters -> IO Status
declineOffer (SchedulerDriver p) o f = with (cppValue o) $ \oid ->
  with (cppValue f) $ \fp -> do
    res <- c_declineOffer p oid fp
    return $ toEnum $ fromIntegral res

-- | Removes all filters previously set by the framework (via
-- 'launchTasks'). This enables the framework to receive offers from
-- those filtered slaves.
reviveOffers :: SchedulerDriver -> IO Status
reviveOffers = withDriver c_reviveOffers

-- | Sends a message from the framework to one of its executors. These
-- messages are best effort; do not expect a framework message to be
-- retransmitted in any reliable fashion.
sendFrameworkMessage :: SchedulerDriver -> ExecutorID -> SlaveID -> ByteString -> IO Status
sendFrameworkMessage (SchedulerDriver p) e s bs = with (cppValue e) $ \ep -> with (cppValue s) $ \sp ->
  with (cstring bs) $ \(strp, l) -> do
    res <-c_sendFrameworkMessage p ep sp strp (fromIntegral l)
    return $ toEnum $ fromIntegral res

-- | Reconciliation of tasks causes the master to send status updates for tasks
-- whose status differs from the status sent here.
reconcileTasks :: SchedulerDriver -> [TaskStatus] -> IO Status
reconcileTasks (SchedulerDriver p) ts = with (mapM cppValue ts >>= arrayLen) $ \(tp, l) -> do
  res <- c_reconcileTasks p tp (fromIntegral l)
  return $ toEnum $ fromIntegral res