{-# LANGUAGE DeriveGeneric       #-}
{-# LANGUAGE DeriveDataTypeable  #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE FlexibleInstances   #-}

-----------------------------------------------------------------------------
-- |
-- Module      :  Control.Distributed.Process.Supervisor.Management
-- Copyright   :  (c) Tim Watson 2017
-- License     :  BSD3 (see the file LICENSE)
--
-- Maintainer  :  Tim Watson <watson.timothy@gmail.com>
-- Stability   :  experimental
-- Portability :  non-portable (requires concurrency)
--
-----------------------------------------------------------------------------

module Control.Distributed.Process.Supervisor.Management
  ( supervisionAgentId
  , supervisionMonitor
  , monitorSupervisor
  , unmonitorSupervisor
    -- * Mx Event Type
  , MxSupervisor(..)
  )
where
import Control.DeepSeq (NFData)
import Control.Distributed.Process
  ( ProcessId
  , Process()
  , ReceivePort()
  , newChan
  , sendChan
  , getSelfPid
  , unwrapMessage
  )
import Control.Distributed.Process.Internal.Types (SendPort(..))
import Control.Distributed.Process.Management
  ( MxAgentId(..)
  , MxAgent()
  , MxEvent(MxProcessDied, MxUser)
  , mxAgent
  , mxSink
  , mxReady
  , liftMX
  , mxGetLocal
  , mxSetLocal
  , mxNotify
  )
import Control.Distributed.Process.Supervisor.Types
  ( MxSupervisor(..)
  , SupervisorPid
  )
import Data.Binary
import Data.Foldable (mapM_)
import Data.Hashable (Hashable(..))
import Control.Distributed.Process.Extras.Internal.Containers.MultiMap (MultiMap)
import qualified Control.Distributed.Process.Extras.Internal.Containers.MultiMap as Map

import Data.Typeable (Typeable)
import GHC.Generics

data Register = Register !SupervisorPid !ProcessId !(SendPort MxSupervisor)
  deriving (Typeable, (forall x. Register -> Rep Register x)
-> (forall x. Rep Register x -> Register) -> Generic Register
forall x. Rep Register x -> Register
forall x. Register -> Rep Register x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. Register -> Rep Register x
from :: forall x. Register -> Rep Register x
$cto :: forall x. Rep Register x -> Register
to :: forall x. Rep Register x -> Register
Generic)
instance Binary Register where
instance NFData Register where

data UnRegister = UnRegister !SupervisorPid !ProcessId
  deriving (Typeable, (forall x. UnRegister -> Rep UnRegister x)
-> (forall x. Rep UnRegister x -> UnRegister) -> Generic UnRegister
forall x. Rep UnRegister x -> UnRegister
forall x. UnRegister -> Rep UnRegister x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. UnRegister -> Rep UnRegister x
from :: forall x. UnRegister -> Rep UnRegister x
$cto :: forall x. Rep UnRegister x -> UnRegister
to :: forall x. Rep UnRegister x -> UnRegister
Generic)
instance Binary UnRegister where
instance NFData UnRegister where

newtype SupMxChan = SupMxChan { SupMxChan -> SendPort MxSupervisor
smxc :: SendPort MxSupervisor }
  deriving (Typeable, (forall x. SupMxChan -> Rep SupMxChan x)
-> (forall x. Rep SupMxChan x -> SupMxChan) -> Generic SupMxChan
forall x. Rep SupMxChan x -> SupMxChan
forall x. SupMxChan -> Rep SupMxChan x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. SupMxChan -> Rep SupMxChan x
from :: forall x. SupMxChan -> Rep SupMxChan x
$cto :: forall x. Rep SupMxChan x -> SupMxChan
to :: forall x. Rep SupMxChan x -> SupMxChan
Generic, Int -> SupMxChan -> ShowS
[SupMxChan] -> ShowS
SupMxChan -> String
(Int -> SupMxChan -> ShowS)
-> (SupMxChan -> String)
-> ([SupMxChan] -> ShowS)
-> Show SupMxChan
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> SupMxChan -> ShowS
showsPrec :: Int -> SupMxChan -> ShowS
$cshow :: SupMxChan -> String
show :: SupMxChan -> String
$cshowList :: [SupMxChan] -> ShowS
showList :: [SupMxChan] -> ShowS
Show)
instance Binary SupMxChan
instance NFData SupMxChan
instance Hashable SupMxChan where
  hashWithSalt :: Int -> SupMxChan -> Int
hashWithSalt Int
salt SupMxChan
sp = Int -> SendPortId -> Int
forall a. Hashable a => Int -> a -> Int
hashWithSalt Int
salt (SendPortId -> Int) -> SendPortId -> Int
forall a b. (a -> b) -> a -> b
$ SendPort MxSupervisor -> SendPortId
forall a. SendPort a -> SendPortId
sendPortId (SupMxChan -> SendPort MxSupervisor
smxc SupMxChan
sp)
instance Eq SupMxChan where
  == :: SupMxChan -> SupMxChan -> Bool
(==) SupMxChan
a SupMxChan
b = (SendPort MxSupervisor -> SendPortId
forall a. SendPort a -> SendPortId
sendPortId (SendPort MxSupervisor -> SendPortId)
-> SendPort MxSupervisor -> SendPortId
forall a b. (a -> b) -> a -> b
$ SupMxChan -> SendPort MxSupervisor
smxc SupMxChan
a) SendPortId -> SendPortId -> Bool
forall a. Eq a => a -> a -> Bool
== (SendPort MxSupervisor -> SendPortId
forall a. SendPort a -> SendPortId
sendPortId (SendPort MxSupervisor -> SendPortId)
-> SendPort MxSupervisor -> SendPortId
forall a b. (a -> b) -> a -> b
$ SupMxChan -> SendPort MxSupervisor
smxc SupMxChan
b)

type State = MultiMap SupervisorPid (ProcessId, SupMxChan)

-- | The @MxAgentId@ for the node monitoring agent.
supervisionAgentId :: MxAgentId
supervisionAgentId :: MxAgentId
supervisionAgentId = String -> MxAgentId
MxAgentId String
"service.monitoring.supervision"

-- | Monitor the supervisor for the given pid. Binds a typed channel to the
-- calling process, to which the resulting @ReceivePort@ belongs.
--
-- Multiple monitors can be created for any @calling process <-> sup@ pair.
-- Each monitor maintains its own typed channel, which will only contain
-- "MxSupervisor" entries obtained /after/ the channel was established.
--
monitorSupervisor :: SupervisorPid -> Process (ReceivePort MxSupervisor)
monitorSupervisor :: SupervisorPid -> Process (ReceivePort MxSupervisor)
monitorSupervisor SupervisorPid
sup = do
  SupervisorPid
us <- Process SupervisorPid
getSelfPid
  (SendPort MxSupervisor
sp, ReceivePort MxSupervisor
rp) <- Process (SendPort MxSupervisor, ReceivePort MxSupervisor)
forall a. Serializable a => Process (SendPort a, ReceivePort a)
newChan
  Register -> Process ()
forall a. Serializable a => a -> Process ()
mxNotify (Register -> Process ()) -> Register -> Process ()
forall a b. (a -> b) -> a -> b
$ SupervisorPid -> SupervisorPid -> SendPort MxSupervisor -> Register
Register SupervisorPid
sup SupervisorPid
us SendPort MxSupervisor
sp
  ReceivePort MxSupervisor -> Process (ReceivePort MxSupervisor)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ReceivePort MxSupervisor
rp

-- | Removes all monitors for @sup@, associated with the calling process.
-- It is not possible to delete individual monitors (i.e. typed channels).
--
unmonitorSupervisor :: SupervisorPid -> Process ()
unmonitorSupervisor :: SupervisorPid -> Process ()
unmonitorSupervisor SupervisorPid
sup = Process SupervisorPid
getSelfPid Process SupervisorPid
-> (SupervisorPid -> Process ()) -> Process ()
forall a b. Process a -> (a -> Process b) -> Process b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= UnRegister -> Process ()
forall a. Serializable a => a -> Process ()
mxNotify (UnRegister -> Process ())
-> (SupervisorPid -> UnRegister) -> SupervisorPid -> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SupervisorPid -> SupervisorPid -> UnRegister
UnRegister SupervisorPid
sup

-- | Starts the supervision monitoring agent.
supervisionMonitor :: Process ProcessId
supervisionMonitor :: Process SupervisorPid
supervisionMonitor = do
  MxAgentId -> State -> [MxSink State] -> Process SupervisorPid
forall s. MxAgentId -> s -> [MxSink s] -> Process SupervisorPid
mxAgent MxAgentId
supervisionAgentId State
initState [
        ((Register -> MxAgent State MxAction) -> MxSink State
forall s m. Serializable m => (m -> MxAgent s MxAction) -> MxSink s
mxSink ((Register -> MxAgent State MxAction) -> MxSink State)
-> (Register -> MxAgent State MxAction) -> MxSink State
forall a b. (a -> b) -> a -> b
$ \(Register SupervisorPid
sup SupervisorPid
pid SendPort MxSupervisor
sp) -> do
          State -> MxAgent State ()
forall s. s -> MxAgent s ()
mxSetLocal (State -> MxAgent State ())
-> (State -> State) -> State -> MxAgent State ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SupervisorPid -> (SupervisorPid, SupMxChan) -> State -> State
forall k v.
(Insertable k, Insertable v) =>
k -> v -> MultiMap k v -> MultiMap k v
Map.insert SupervisorPid
sup (SupervisorPid
pid, SendPort MxSupervisor -> SupMxChan
SupMxChan SendPort MxSupervisor
sp) (State -> MxAgent State ())
-> MxAgent State State -> MxAgent State ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< MxAgent State State
forall s. MxAgent s s
mxGetLocal
          MxAgent State MxAction
forall s. MxAgent s MxAction
mxReady)
      , ((UnRegister -> MxAgent State MxAction) -> MxSink State
forall s m. Serializable m => (m -> MxAgent s MxAction) -> MxSink s
mxSink ((UnRegister -> MxAgent State MxAction) -> MxSink State)
-> (UnRegister -> MxAgent State MxAction) -> MxSink State
forall a b. (a -> b) -> a -> b
$ \(UnRegister SupervisorPid
sup SupervisorPid
pid) -> do
          State
st <- MxAgent State State
forall s. MxAgent s s
mxGetLocal
          State -> MxAgent State ()
forall s. s -> MxAgent s ()
mxSetLocal (State -> MxAgent State ()) -> State -> MxAgent State ()
forall a b. (a -> b) -> a -> b
$ (SupervisorPid -> (SupervisorPid, SupMxChan) -> Bool)
-> State -> State
forall k v.
Insertable k =>
(k -> v -> Bool) -> MultiMap k v -> MultiMap k v
Map.filterWithKey (\SupervisorPid
k (SupervisorPid, SupMxChan)
v -> if SupervisorPid
k SupervisorPid -> SupervisorPid -> Bool
forall a. Eq a => a -> a -> Bool
== SupervisorPid
sup then ((SupervisorPid, SupMxChan) -> SupervisorPid
forall a b. (a, b) -> a
fst (SupervisorPid, SupMxChan)
v) SupervisorPid -> SupervisorPid -> Bool
forall a. Eq a => a -> a -> Bool
/= SupervisorPid
pid else Bool
True) State
st
          MxAgent State MxAction
forall s. MxAgent s MxAction
mxReady)
      , ((MxEvent -> MxAgent State MxAction) -> MxSink State
forall s m. Serializable m => (m -> MxAgent s MxAction) -> MxSink s
mxSink ((MxEvent -> MxAgent State MxAction) -> MxSink State)
-> (MxEvent -> MxAgent State MxAction) -> MxSink State
forall a b. (a -> b) -> a -> b
$ \(MxEvent
ev :: MxEvent) -> do
          case MxEvent
ev of
            MxUser Message
msg          -> Message -> MxAgent State ()
goNotify Message
msg MxAgent State ()
-> MxAgent State MxAction -> MxAgent State MxAction
forall a b. MxAgent State a -> MxAgent State b -> MxAgent State b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> MxAgent State MxAction
forall s. MxAgent s MxAction
mxReady
            MxProcessDied SupervisorPid
pid DiedReason
_ -> do State
st <- MxAgent State State
forall s. MxAgent s s
mxGetLocal
                                      State -> MxAgent State ()
forall s. s -> MxAgent s ()
mxSetLocal (State -> MxAgent State ()) -> State -> MxAgent State ()
forall a b. (a -> b) -> a -> b
$ ((SupervisorPid, SupMxChan) -> Bool) -> State -> State
forall k v.
Insertable k =>
(v -> Bool) -> MultiMap k v -> MultiMap k v
Map.filter ((SupervisorPid -> SupervisorPid -> Bool
forall a. Eq a => a -> a -> Bool
/= SupervisorPid
pid) (SupervisorPid -> Bool)
-> ((SupervisorPid, SupMxChan) -> SupervisorPid)
-> (SupervisorPid, SupMxChan)
-> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (SupervisorPid, SupMxChan) -> SupervisorPid
forall a b. (a, b) -> a
fst) State
st
                                      MxAgent State MxAction
forall s. MxAgent s MxAction
mxReady
            MxEvent
_                   -> MxAgent State MxAction
forall s. MxAgent s MxAction
mxReady)
    ]
  where
    initState :: State
    initState :: State
initState = State
forall k v. MultiMap k v
Map.empty

    goNotify :: Message -> MxAgent State ()
goNotify Message
msg = do
      Maybe MxSupervisor
ev <- Process (Maybe MxSupervisor) -> MxAgent State (Maybe MxSupervisor)
forall a s. Process a -> MxAgent s a
liftMX (Process (Maybe MxSupervisor)
 -> MxAgent State (Maybe MxSupervisor))
-> Process (Maybe MxSupervisor)
-> MxAgent State (Maybe MxSupervisor)
forall a b. (a -> b) -> a -> b
$ Message -> Process (Maybe MxSupervisor)
forall (m :: * -> *) a.
(Monad m, Serializable a) =>
Message -> m (Maybe a)
unwrapMessage Message
msg :: MxAgent State (Maybe MxSupervisor)
      case Maybe MxSupervisor
ev of
        Just MxSupervisor
ev' -> do State
st <- MxAgent State State
forall s. MxAgent s s
mxGetLocal
                       ((SupervisorPid, SupMxChan) -> MxAgent State ())
-> [(SupervisorPid, SupMxChan)] -> MxAgent State ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (Process () -> MxAgent State ()
forall a s. Process a -> MxAgent s a
liftMX (Process () -> MxAgent State ())
-> ((SupervisorPid, SupMxChan) -> Process ())
-> (SupervisorPid, SupMxChan)
-> MxAgent State ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((SendPort MxSupervisor -> MxSupervisor -> Process ())
-> MxSupervisor -> SendPort MxSupervisor -> Process ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip SendPort MxSupervisor -> MxSupervisor -> Process ()
forall a. Serializable a => SendPort a -> a -> Process ()
sendChan) MxSupervisor
ev' (SendPort MxSupervisor -> Process ())
-> ((SupervisorPid, SupMxChan) -> SendPort MxSupervisor)
-> (SupervisorPid, SupMxChan)
-> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SupMxChan -> SendPort MxSupervisor
smxc (SupMxChan -> SendPort MxSupervisor)
-> ((SupervisorPid, SupMxChan) -> SupMxChan)
-> (SupervisorPid, SupMxChan)
-> SendPort MxSupervisor
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (SupervisorPid, SupMxChan) -> SupMxChan
forall a b. (a, b) -> b
snd)
                             ([(SupervisorPid, SupMxChan)]
-> ([(SupervisorPid, SupMxChan)] -> [(SupervisorPid, SupMxChan)])
-> Maybe [(SupervisorPid, SupMxChan)]
-> [(SupervisorPid, SupMxChan)]
forall b a. b -> (a -> b) -> Maybe a -> b
maybe [] [(SupervisorPid, SupMxChan)] -> [(SupervisorPid, SupMxChan)]
forall a. a -> a
id (Maybe [(SupervisorPid, SupMxChan)]
 -> [(SupervisorPid, SupMxChan)])
-> Maybe [(SupervisorPid, SupMxChan)]
-> [(SupervisorPid, SupMxChan)]
forall a b. (a -> b) -> a -> b
$ SupervisorPid -> State -> Maybe [(SupervisorPid, SupMxChan)]
forall k v. Insertable k => k -> MultiMap k v -> Maybe [v]
Map.lookup (MxSupervisor -> SupervisorPid
supervisorPid MxSupervisor
ev') State
st)
        Maybe MxSupervisor
Nothing  -> () -> MxAgent State ()
forall a. a -> MxAgent State a
forall (m :: * -> *) a. Monad m => a -> m a
return ()