{-# LANGUAGE UndecidableInstances #-}
-- | A process broker spawns and monitors child processes.
--
-- The child processes are mapped to symbolic identifier values: Child-IDs.
--
-- This is the barest, most minimal version of a broker. Children
-- can be started, but not restarted.
--
-- Children can efficiently be looked-up by an id-value, and when
-- the broker is shutdown, all children will be shutdown these
-- are actually all the features of this broker implementation.
--
-- Also, this minimalist broker only knows how to spawn a
-- single kind of child process.
--
-- When a broker spawns a new child process, it expects the
-- child process to return a 'ProcessId'. The broker will
-- 'monitor' the child process.
--
-- This is in stark contrast to how Erlang/OTP handles things;
-- In the OTP Supervisor, the child has to link to the parent.
-- This allows the child spec to be more flexible in that no
-- @pid@ has to be passed from the child start function to the
-- broker process, and also, a child may break free from
-- the broker by unlinking.
--
-- Now while this seems nice at first, this might actually
-- cause surprising results, since it is usually expected that
-- stopping a broker also stops the children, or that a child
-- exit shows up in the logging originating from the former
-- broker.
--
-- The approach here is to allow any child to link to the
-- broker to realize when the broker was violently killed,
-- and otherwise give the child no chance to unlink itself from
-- its broker.
--
-- This module is far simpler than the Erlang/OTP counter part,
-- of a @simple_one_for_one@ supervisor.
--
-- The future of this broker might not be a-lot more than
-- it currently is. The ability to restart processes might be
-- implemented outside of this broker module.
--
-- One way to do that is to implement the restart logic in
-- a separate module, since the child-id can be reused when a child
-- exits.
--
-- @since 0.23.0
module Control.Eff.Concurrent.Protocol.Broker
  ( startLink
  , statefulChild
  , stopBroker
  , isBrokerAlive
  , monitorBroker
  , getDiagnosticInfo
  , spawnChild
  , spawnOrLookup
  , lookupChild
  , callById
  , castById
  , stopChild
  , ChildNotFound(..)
  , Broker()
  , Pdu(StartC, StopC, LookupC, GetDiagnosticInfo)
  , ChildId
  , Stateful.StartArgument(MkBrokerConfig)
  , brokerConfigChildStopTimeout
  , SpawnErr(AlreadyStarted)
  , ChildEvent(OnChildSpawned, OnChildDown, OnBrokerShuttingDown)
  ) where

import Control.DeepSeq (NFData(rnf))
import Control.Eff as Eff
import Control.Eff.Concurrent.Protocol
import Control.Eff.Concurrent.Protocol.Client
import Control.Eff.Concurrent.Protocol.Wrapper
import Control.Eff.Concurrent.Protocol.Observer
import qualified Control.Eff.Concurrent.Protocol.EffectfulServer as Effectful
import qualified Control.Eff.Concurrent.Protocol.StatefulServer as Stateful
import Control.Eff.Concurrent.Protocol.Broker.InternalState
import Control.Eff.Concurrent.Process
import Control.Eff.Concurrent.Process.Timer
import Control.Eff.Extend (raise)
import Control.Eff.Log
import Control.Eff.State.Strict as Eff
import Control.Lens hiding ((.=), use)
import Data.Default
import Data.Dynamic
import Data.Foldable
import Data.Kind
import qualified Data.Map as Map
import Data.Text (Text, pack)
import Data.Type.Pretty
import GHC.Generics (Generic)
import GHC.Stack
import Control.Applicative ((<|>))

-- * Broker Server API

-- ** Functions

-- | Start and link a new broker process with the given 'SpawnFun'unction.
--
-- To spawn new child processes use 'spawnChild'.
--
-- @since 0.23.0
startLink
  :: forall p e
  . ( HasCallStack
    , IoLogging (Processes e)
    , TangibleBroker p
    , Stateful.Server (Broker p) (Processes e)
    )
  => Stateful.StartArgument (Broker p)
  -> Eff (Processes e) (Endpoint (Broker p))
startLink = Stateful.startLink

-- | A smart constructor for 'MkBrokerConfig' that makes it easy to start a 'Stateful.Server' instance.
--
-- The user needs to instantiate @'ChildId' p@.
--
-- @since 0.30.0
statefulChild
  :: forall p e
  . ( HasCallStack
    , IoLogging e
    , TangibleBroker (Stateful.Stateful p)
    , Stateful.Server (Broker (Stateful.Stateful p)) e
    )
  => Timeout
  -> (ChildId p -> Stateful.StartArgument p)
  -> Stateful.StartArgument (Broker (Stateful.Stateful p))
statefulChild t f = MkBrokerConfig t (Stateful.Init . f)

-- | Stop the broker and shutdown all processes.
--
-- Block until the broker has finished.
--
-- @since 0.23.0
stopBroker
  :: ( HasCallStack
     , HasProcesses e q
     , Member Logs e
     , Lifted IO e
     , TangibleBroker p
     )
  => Endpoint (Broker p)
  -> Eff e ()
stopBroker ep = do
  logInfo ("stopping broker: " <> pack (show ep))
  mr <- monitor (_fromEndpoint ep)
  sendInterrupt (_fromEndpoint ep) NormalExitRequested
  r <- receiveSelectedMessage (selectProcessDown mr)
  logInfo ("broker stopped: " <> pack (show ep) <> " " <> pack (show r))

-- | Check if a broker process is still alive.
--
-- @since 0.23.0
isBrokerAlive
  :: forall p q0 e .
     ( HasCallStack
     , Member Logs e
     , Typeable p
     , HasProcesses e q0
     )
  => Endpoint (Broker p)
  -> Eff e Bool
isBrokerAlive x = isProcessAlive (_fromEndpoint x)

-- | Monitor a broker process.
--
-- @since 0.23.0
monitorBroker
  :: forall p q0 e .
     ( HasCallStack
     , Member Logs e
     , HasProcesses e q0
     , TangibleBroker p
     )
  => Endpoint (Broker p)
  -> Eff e MonitorReference
monitorBroker x = monitor (_fromEndpoint x)

-- | Start and monitor a new child process using the 'SpawnFun' passed
-- to 'startLink'.
--
-- @since 0.23.0
spawnChild
  :: forall p q0 e .
     ( HasCallStack
     , Member Logs e
     , HasProcesses e q0
     , TangibleBroker p
     , Typeable (Effectful.ServerPdu p)
     )
  => Endpoint (Broker p)
  -> ChildId p
  -> Eff e (Either (SpawnErr p) (Endpoint (Effectful.ServerPdu p)))
spawnChild ep cId = call ep (StartC cId)

-- | Start and monitor a new child process using the 'SpawnFun' passed
-- to 'startLink'.
--
-- Call 'spawnChild' and unpack the 'Either' result,
-- ignoring the 'AlreadyStarted' error.
--
-- @since 0.29.2
spawnOrLookup
  :: forall p q0 e .
     ( HasCallStack
     , Member Logs e
     , HasProcesses e q0
     , TangibleBroker p
     , Typeable (Effectful.ServerPdu p)
     )
  => Endpoint (Broker p)
  -> ChildId p
  -> Eff e (Endpoint (Effectful.ServerPdu p))
spawnOrLookup supEp cId =
  do res <- spawnChild supEp cId
     pure $
      case res of
        Left (AlreadyStarted _ ep) -> ep
        Right ep -> ep

-- | Lookup the given child-id and return the output value of the 'SpawnFun'
--   if the client process exists.
--
-- @since 0.23.0
lookupChild ::
    forall p e q0 .
     ( HasCallStack
     , Member Logs e
     , HasProcesses e q0
     , TangibleBroker p
     , Typeable (Effectful.ServerPdu p)
     )
  => Endpoint (Broker p)
  -> ChildId p
  -> Eff e (Maybe (Endpoint (Effectful.ServerPdu p)))
lookupChild ep cId = call ep (LookupC @p cId)

-- | Stop a child process, and block until the child has exited.
--
-- Return 'True' if a process with that ID was found, 'False' if no process
-- with the given ID was running.
--
-- @since 0.23.0
stopChild ::
    forall p e q0 .
     ( HasCallStack
     , Member Logs e
     , HasProcesses e q0
     , TangibleBroker p
     )
  => Endpoint (Broker p)
  -> ChildId p
  -> Eff e Bool
stopChild ep cId = call ep (StopC @p cId (TimeoutMicros 4000000))

callById ::
    forall destination protocol result e q0 .
     ( HasCallStack
     , Member Logs e
     , HasProcesses e q0
     , Lifted IO e
     , Lifted IO q0
     , TangibleBroker protocol
     , TangiblePdu destination ( 'Synchronous result)
     , TangiblePdu protocol ( 'Synchronous result)
     , Embeds (Effectful.ServerPdu destination) protocol
     , Ord (ChildId destination)
     , Tangible (ChildId destination)
     , Typeable (Effectful.ServerPdu destination)
     , Tangible result
     , NFData (Pdu protocol ( 'Synchronous result))
     , NFData (Pdu (Effectful.ServerPdu destination) ( 'Synchronous result))
     , Show (Pdu (Effectful.ServerPdu destination) ( 'Synchronous result))
     )
  => Endpoint (Broker destination)
  -> ChildId destination
  -> Pdu protocol ( 'Synchronous result)
  -> Timeout
  -> Eff e result
callById broker cId pdu tMax =
  lookupChild broker cId
   >>=
    maybe
      (do logError ("callById failed for: " <> pack (show pdu))
          interrupt (InterruptedBy (ChildNotFound cId broker))
      )
      (\cEp -> callWithTimeout cEp pdu tMax)

data ChildNotFound child where
  ChildNotFound :: ChildId child -> Endpoint (Broker child) -> ChildNotFound child
  deriving (Typeable)

instance (Show (ChildId child), Typeable child) => Show (ChildNotFound child) where
  showsPrec d (ChildNotFound cId broker)
    = showParen (d >= 10) ( showString "child not found: "
                           . showsPrec 10 cId
                           . showString " at: "
                           . showsPrec 10 broker
                           )

instance NFData (ChildId c) => NFData (ChildNotFound c) where
  rnf (ChildNotFound cId broker) = rnf cId `seq` rnf broker `seq` ()

castById ::
    forall destination protocol e q0 .
     ( HasCallStack
     , Member Logs e
     , HasProcesses e q0
     , TangibleBroker protocol
     , TangiblePdu destination 'Asynchronous
     , TangiblePdu protocol 'Asynchronous
     )
  => Endpoint (Broker destination)
  -> ChildId destination
  -> Pdu protocol 'Asynchronous
  -> Eff e ()
castById = error "TODO"


-- | Return a 'Text' describing the current state of the broker.
--
-- @since 0.23.0
getDiagnosticInfo
  :: forall p e q0 .
     ( HasCallStack
     , HasProcesses e q0
     , TangibleBroker p
     )
  => Endpoint (Broker p)
  -> Eff e Text
getDiagnosticInfo s = call s (GetDiagnosticInfo @p)

-- ** Types

-- | The index type of 'Server' supervisors.
--
-- A @'Broker' p@ manages the life cycle of the processes, running the @'Server' p@
-- methods of that specific type.
--
-- The broker maps an identifier value of type @'ChildId' p@ to an @'Endpoint' p@.
--
-- @since 0.24.0
data Broker (p :: Type) deriving Typeable

instance Typeable p => HasPdu (Broker p) where
  type instance EmbeddedPduList (Broker p) = '[ObserverRegistry (ChildEvent p)]
  -- | The 'Pdu' instance contains methods to start, stop and lookup a child
  -- process, as well as a diagnostic callback.
  --
  -- @since 0.23.0
  data instance  Pdu (Broker p) r where
          StartC :: ChildId p -> Pdu (Broker p) ('Synchronous (Either (SpawnErr p) (Endpoint (Effectful.ServerPdu p))))
          StopC :: ChildId p -> Timeout -> Pdu (Broker p) ('Synchronous Bool)
          LookupC :: ChildId p -> Pdu (Broker p) ('Synchronous (Maybe (Endpoint (Effectful.ServerPdu p))))
          GetDiagnosticInfo :: Pdu (Broker p) ('Synchronous Text)
          ChildEventObserverRegistry :: Pdu (ObserverRegistry (ChildEvent p)) r -> Pdu (Broker p) r
      deriving Typeable

instance (Typeable p, Show (ChildId p)) => Show (Pdu (Broker p) r) where
  showsPrec d (StartC c) = showParen (d >= 10) (showString "StartC " . showsPrec 10 c)
  showsPrec d (StopC c t) = showParen (d >= 10) (showString "StopC " . showsPrec 10 c . showChar ' ' . showsPrec 10 t)
  showsPrec d (LookupC c) = showParen (d >= 10) (showString "LookupC " . showsPrec 10 c)
  showsPrec _ GetDiagnosticInfo = showString "GetDiagnosticInfo"
  showsPrec d (ChildEventObserverRegistry c) = showParen (d >= 10) (showString "ChildEventObserverRegistry " . showsPrec 10 c)

instance (NFData (ChildId p)) => NFData (Pdu (Broker p) r) where
  rnf (StartC ci) = rnf ci
  rnf (StopC ci t) = rnf ci `seq` rnf t
  rnf (LookupC ci) = rnf ci
  rnf GetDiagnosticInfo = ()
  rnf (ChildEventObserverRegistry x) = rnf x

instance Typeable p => HasPduPrism (Broker p) (ObserverRegistry (ChildEvent p)) where
 embedPdu = ChildEventObserverRegistry
 fromPdu (ChildEventObserverRegistry x) = Just x
 fromPdu _ = Nothing

type instance ToPretty (Broker p) = "broker" <:> ToPretty p

-- | The event type to indicate that a child was started or stopped.
--
-- The need for this type originated for the watchdog functionality introduced
-- in 0.30.0.
-- The watch dog shall restart a crashed child, and in order to do so, it
-- must somehow monitor the child.
-- Since no order is specified in which processes get the 'ProcessDown' events,
-- a watchdog cannot monitor a child and restart it immediately, because it might
-- have received the process down event before the broker.
-- So instead the watchdog can simply use the broker events, and monitor only the
-- broker process.
--
-- @since 0.30.0
data ChildEvent p where
  OnChildSpawned :: Endpoint (Broker p) -> ChildId p -> Endpoint (Effectful.ServerPdu p) -> ChildEvent p
  OnChildDown :: Endpoint (Broker p) -> ChildId p -> Endpoint (Effectful.ServerPdu p) -> Interrupt 'NoRecovery -> ChildEvent p
  OnBrokerShuttingDown :: Endpoint (Broker p) -> ChildEvent p -- ^ The broker is shutting down and will soon begin stopping/killing its children
  deriving (Typeable, Generic)

instance (NFData (ChildId p)) => NFData (ChildEvent p)

instance (Typeable p, Typeable (Effectful.ServerPdu p), Show (ChildId p)) => Show (ChildEvent p) where
  showsPrec d x =
    case x of
     OnChildSpawned s i e ->
        showParen (d >= 10)
          (shows s . showString ": child-spawned: " . shows i . showChar ' ' . shows e)
     OnChildDown s i e r ->
        showParen (d >= 10)
          (shows s . showString ": child-down: " . shows i . showChar ' ' . shows e . showChar ' ' . showsPrec 10 r)
     OnBrokerShuttingDown s ->
        shows s . showString ": shutting down"

-- | The type of value used to index running 'Server' processes managed by a 'Broker'.
--
-- Note, that the type you provide must be 'Tangible'.
--
-- @since 0.24.0
type family ChildId p

type instance ChildId (Stateful.Stateful p) = ChildId p

-- | Constraints on the parameters to 'Broker'.
--
-- @since 0.24.0
type TangibleBroker p =
  ( Tangible (ChildId p)
  , Ord (ChildId p)
  , Typeable p
  )


instance
  ( IoLogging q
  , TangibleBroker p
  , Tangible (ChildId p)
  , Typeable (Effectful.ServerPdu p)
  , Effectful.Server p (Processes q)
  , HasProcesses (Effectful.ServerEffects p (Processes q)) q
  ) => Stateful.Server (Broker p) (Processes q) where

  -- | Options that control the 'Broker p' process.
  --
  -- This contains:
  --
  -- * a 'SpawnFun'
  -- * the 'Timeout' after requesting a normal child exit before brutally killing the child.
  --
  -- @since 0.24.0
  data instance StartArgument (Broker p) = MkBrokerConfig
    {
      brokerConfigChildStopTimeout :: Timeout
    , brokerConfigStartFun :: ChildId p -> Effectful.Init p
    }

  data instance Model (Broker p) =
    BrokerModel { _children :: Children (ChildId p) p
                , _childEventObserver :: ObserverRegistry (ChildEvent p)
                }
      deriving Typeable

  setup _ _cfg = pure (BrokerModel def emptyObserverRegistry, ())
  update _ _brokerConfig (Stateful.OnCast req) =
    case  req of
      ChildEventObserverRegistry x ->
        Stateful.zoomModel @(Broker p) childEventObserverLens (observerRegistryHandlePdu x)

  update me brokerConfig (Stateful.OnCall rt req) =
    case req of
      ChildEventObserverRegistry x ->
        logEmergency ("unreachable: " <> pack (show x))

      GetDiagnosticInfo -> zoomToChildren @p $ do
        p <- (pack . show <$> getChildren @(ChildId p) @p)
        sendReply rt p

      LookupC i -> zoomToChildren @p $ do
        p <- fmap (view childEndpoint) <$> lookupChildById @(ChildId p) @p i
        sendReply rt p

      StopC i t -> zoomToChildren @p $ do
        mExisting <- lookupAndRemoveChildById @(ChildId p) @p i
        case mExisting of
          Nothing -> sendReply rt False
          Just existingChild -> do
            reason <- stopOrKillChild i existingChild t
            sendReply rt True
            Stateful.zoomModel @(Broker p)
              childEventObserverLens
              (observerRegistryNotify @(ChildEvent p) (OnChildDown me i (existingChild^.childEndpoint) reason))

      StartC i -> do
        mExisting <- zoomToChildren @p $ lookupChildById @(ChildId p) @p i
        case mExisting of
          Nothing -> do
            childEp <- raise (raise (Effectful.startLink (brokerConfigStartFun brokerConfig i)))
            let childPid = _fromEndpoint childEp
            cMon <- monitor childPid
            zoomToChildren @p $ putChild i (MkChild @p childEp cMon)
            sendReply rt (Right childEp)
            Stateful.zoomModel @(Broker p)
              childEventObserverLens
              (observerRegistryNotify @(ChildEvent p) (OnChildSpawned me i childEp))
          Just existingChild ->
            sendReply rt (Left (AlreadyStarted i (existingChild ^. childEndpoint)))

  update me _brokerConfig (Stateful.OnDown pd) = do
      wasObserver <- Stateful.zoomModel @(Broker p)
        childEventObserverLens
        (observerRegistryRemoveProcess @(ChildEvent p) (downProcess pd))
      if wasObserver
       then logInfo ("observer process died: " <> pack (show pd))
       else do
        oldEntry <- zoomToChildren @p $ lookupAndRemoveChildByMonitor @(ChildId p) @p (downReference pd)
        case oldEntry of
          Nothing -> logWarning ("unexpected: " <> pack (show pd))
          Just (i, c) -> do
            logInfo (  pack (show pd)
                    <> " for child "
                    <> pack (show i)
                    <> " => "
                    <> pack (show (c^.childEndpoint))
                    )
            Stateful.zoomModel @(Broker p)
                childEventObserverLens
                (observerRegistryNotify @(ChildEvent p) (OnChildDown me i (c^.childEndpoint) (downReason pd)))
  update me brokerConfig (Stateful.OnInterrupt e) =
    case e of
      NormalExitRequested -> do
        logDebug ("broker stopping: " <> pack (show e))
        Stateful.zoomModel @(Broker p)
            childEventObserverLens
            (observerRegistryNotify @(ChildEvent p) (OnBrokerShuttingDown me))
        stopAllChildren @p me (brokerConfigChildStopTimeout brokerConfig)
        exitNormally
      LinkedProcessCrashed linked ->
        logNotice (pack (show linked))
      _ -> do
        logWarning ("broker interrupted: " <> pack (show e))
        Stateful.zoomModel @(Broker p)
            childEventObserverLens
            (observerRegistryNotify @(ChildEvent p) (OnBrokerShuttingDown me))
        stopAllChildren @p me (brokerConfigChildStopTimeout brokerConfig)
        exitBecause (interruptToExit e)

  update _ _brokerConfig o = logWarning ("unexpected: " <> pack (show o))


zoomToChildren :: forall p c e
  . Member (Stateful.ModelState (Broker p)) e
  => Eff (State (Children (ChildId p) p) ': e) c
  -> Eff e c
zoomToChildren = Stateful.zoomModel @(Broker p) childrenLens

childrenLens :: Lens' (Stateful.Model (Broker p)) (Children (ChildId p) p)
childrenLens = lens _children (\(BrokerModel _ o) c -> BrokerModel c o)

childEventObserverLens :: Lens' (Stateful.Model (Broker p)) (ObserverRegistry (ChildEvent p))
childEventObserverLens = lens _childEventObserver (\(BrokerModel o _) c -> BrokerModel o c)

-- | Runtime-Errors occurring when spawning child-processes.
--
-- @since 0.23.0
data SpawnErr p = AlreadyStarted (ChildId p) (Endpoint (Effectful.ServerPdu p))
  deriving (Typeable, Generic)

deriving instance Eq (ChildId p) => Eq (SpawnErr p)
deriving instance Ord (ChildId p) => Ord (SpawnErr p)
deriving instance (Typeable (Effectful.ServerPdu p), Show (ChildId p)) => Show (SpawnErr p)

instance NFData (ChildId p) => NFData (SpawnErr p)

-- Internal Functions

stopOrKillChild
  :: forall p e q0 .
     ( HasCallStack
     , HasProcesses e q0
     , Lifted IO e
     , Lifted IO q0
     , Member Logs e
     , Member (Stateful.ModelState (Broker p)) e
     , TangibleBroker p
     , Typeable (Effectful.ServerPdu p)
     )
  => ChildId p
  -> Child p
  -> Timeout
  -> Eff e (Interrupt 'NoRecovery)
stopOrKillChild cId c stopTimeout =
      do
        broker <- asEndpoint @(Broker p) <$> self
        t <- startTimerWithTitle
                (MkProcessTitle ("child-exit-timer-" <> pack (show broker) <> "-" <> pack (show cId)))
                stopTimeout
        sendInterrupt (_fromEndpoint (c^.childEndpoint)) NormalExitRequested
        r1 <- receiveSelectedMessage (   Right <$> selectProcessDown (c^.childMonitoring)
                                     <|> Left  <$> selectTimerElapsed t )
        demonitor (c^.childMonitoring)
        unlinkProcess (_fromEndpoint (c^.childEndpoint))
        case r1 of
          Left timerElapsed -> do
            logWarning (pack (show timerElapsed) <> ": child "<> pack (show cId) <>" => " <> pack(show (c^.childEndpoint)) <>" did not shutdown in time")
            let reason = interruptToExit (TimeoutInterrupt
                                           ("child did not shut down in time and was terminated by the "
                                             ++ show broker))
            sendShutdown (_fromEndpoint (c^.childEndpoint)) reason
            return reason
          Right downMsg -> do
            logInfo ("child "<> pack (show cId) <>" => " <> pack(show (c^.childEndpoint)) <>" terminated: " <> pack (show (downReason downMsg)))
            return (downReason downMsg)

stopAllChildren
  :: forall p e q0 .
     ( HasCallStack
     , HasProcesses e q0
     , Lifted IO e
     , Lifted IO q0
     , Member Logs e
     , Member (Stateful.ModelState (Broker p)) e
     , TangibleBroker p
     , Typeable (Effectful.ServerPdu p)
     )
  => Endpoint (Broker p) -> Timeout -> Eff e ()
stopAllChildren me stopTimeout =
  zoomToChildren @p
    (removeAllChildren @(ChildId p) @p)
  >>= pure . Map.assocs
  >>= traverse_ killAndNotify
  where
    killAndNotify (cId, c) = do
      reason <- provideInterrupts (stopOrKillChild cId c stopTimeout) >>= either crash return
      Stateful.zoomModel @(Broker p)
          childEventObserverLens
          (observerRegistryNotify @(ChildEvent p) (OnChildDown me cId (c^.childEndpoint) reason))

      where
        crash e = do
          logError (pack (show e) <> " while stopping child: " <> pack (show cId) <> " " <> pack (show c))
          return (interruptToExit e)