module Control.Distributed.Process.Execution.Exchange.Internal
  ( Exchange(..)
  , Message(..)
  , ExchangeType(..)
  , startExchange
  , startSupervised
  , startSupervisedRef
  , runExchange
  , post
  , postMessage
  , configureExchange
  , createMessage
  , applyHandlers
  ) where
import Control.Concurrent.MVar (MVar, takeMVar, putMVar, newEmptyMVar)
import Control.DeepSeq (NFData)
import Control.Distributed.Process
  ( Process
  , ProcessMonitorNotification(..)
  , ProcessId
  , liftIO
  , spawnLocal
  , unsafeWrapMessage
  )
import qualified Control.Distributed.Process as P (Message, link)
import Control.Distributed.Process.Serializable hiding (SerializableDict)
import Control.Distributed.Process.Extras.Internal.Types
  ( Resolvable(..)
  )
import Control.Distributed.Process.Extras.Internal.Primitives
  ( Linkable(..)
  )
import Control.Distributed.Process.ManagedProcess
  ( channelControlPort
  , handleControlChan
  , handleInfo
  , handleRaw
  , continue
  , defaultProcess
  , InitHandler
  , InitResult(..)
  , ProcessAction
  , ProcessDefinition(..)
  , ControlChannel
  , ControlPort
  )
import qualified Control.Distributed.Process.ManagedProcess as MP
  ( chanServe
  )
import Control.Distributed.Process.ManagedProcess.UnsafeClient
  ( sendControlMessage
  )
import Control.Distributed.Process.Supervisor (SupervisorPid)
import Control.Distributed.Process.Extras.Time (Delay(Infinity))
import Data.Binary
import Data.Typeable (Typeable)
import GHC.Generics
import Prelude hiding (drop)
data Exchange = Exchange { pid   :: !ProcessId
                         , cchan :: !(ControlPort ControlMessage)
                         , xType :: !String
                         } deriving (Typeable, Generic, Eq)
instance Binary Exchange where
instance Show Exchange where
  show Exchange{..} = (xType ++ ":" ++ (show pid))
instance Resolvable Exchange where
  resolve = return . Just . pid
instance Linkable Exchange where
  linkTo = P.link . pid
sendCtrlMsg :: Exchange -> ControlMessage -> Process ()
sendCtrlMsg Exchange{..} = sendControlMessage cchan
data Message =
  Message { key     :: !String  
          , headers :: ![(String, String)] 
          , payload :: !P.Message  
          } deriving (Typeable, Generic, Show)
instance Binary Message where
instance NFData Message where
data ControlMessage =
    Configure !P.Message
  | Post      !Message
    deriving (Typeable, Generic)
instance Binary ControlMessage where
instance NFData ControlMessage where
data ExchangeType s =
  ExchangeType { name        :: String
               , state       :: s
               , configureEx :: s -> P.Message -> Process s
               , routeEx     :: s -> Message -> Process s
               }
startExchange :: forall s. ExchangeType s -> Process Exchange
startExchange = doStart Nothing
startSupervisedRef :: forall s . ExchangeType s
                   -> SupervisorPid
                   -> Process (ProcessId, P.Message)
startSupervisedRef t s = do
  ex <- startSupervised t s
  return (pid ex, unsafeWrapMessage ex)
startSupervised :: forall s . ExchangeType s
                -> SupervisorPid
                -> Process Exchange
startSupervised t s = doStart (Just s) t
doStart :: Maybe SupervisorPid -> ExchangeType s -> Process Exchange
doStart mSp t = do
  cchan <- liftIO $ newEmptyMVar
  spawnLocal (maybeLink mSp >> runExchange t cchan) >>= \pid -> do
    cc <- liftIO $ takeMVar cchan
    return $ Exchange pid cc (name t)
  where
    maybeLink Nothing   = return ()
    maybeLink (Just p') = P.link p'
runExchange :: forall s.
               ExchangeType s
            -> MVar (ControlPort ControlMessage)
            -> Process ()
runExchange t tc = MP.chanServe t exInit (processDefinition t tc)
exInit :: forall s. InitHandler (ExchangeType s) (ExchangeType s)
exInit t = return $ InitOk t Infinity
post :: Serializable a => Exchange -> a -> Process ()
post ex msg = postMessage ex $ Message "" [] (unsafeWrapMessage msg)
postMessage :: Exchange -> Message -> Process ()
postMessage ex msg = msg `seq` sendCtrlMsg ex $ Post msg
configureExchange :: Serializable m => Exchange -> m -> Process ()
configureExchange e m = sendCtrlMsg e $ Configure (unsafeWrapMessage m)
createMessage :: Serializable m => String -> [(String, String)] -> m -> Message
createMessage k h m = Message k h $ unsafeWrapMessage m
applyHandlers :: a
              -> P.Message
              -> [P.Message -> Process (Maybe a)]
              -> Process a
applyHandlers e _ []     = return e
applyHandlers e m (f:fs) = do
  r <- f m
  case r of
    Nothing -> applyHandlers e m fs
    Just r' -> return r'
processDefinition :: forall s.
                     ExchangeType s
                  -> MVar (ControlPort ControlMessage)
                  -> ControlChannel ControlMessage
                  -> Process (ProcessDefinition (ExchangeType s))
processDefinition _ tc cc = do
  liftIO $ putMVar tc $ channelControlPort cc
  return $
    defaultProcess {
        apiHandlers  = [ handleControlChan cc handleControlMessage ]
      , infoHandlers = [ handleInfo handleMonitor
                       , handleRaw convertToCC
                       ]
      } :: Process (ProcessDefinition (ExchangeType s))
handleMonitor :: forall s.
                 ExchangeType s
              -> ProcessMonitorNotification
              -> Process (ProcessAction (ExchangeType s))
handleMonitor ex m = do
  handleControlMessage ex (Configure (unsafeWrapMessage m))
convertToCC :: forall s.
               ExchangeType s
            -> P.Message
            -> Process (ProcessAction (ExchangeType s))
convertToCC ex msg = do
  liftIO $ putStrLn "convert to cc"
  handleControlMessage ex (Post $ Message "" [] msg)
handleControlMessage :: forall s.
                        ExchangeType s
                     -> ControlMessage
                     -> Process (ProcessAction (ExchangeType s))
handleControlMessage ex@ExchangeType{..} cm =
  let action = case cm of
                 Configure msg -> configureEx state msg
                 Post      msg -> routeEx     state msg
  in action >>= \s -> continue $ ex { state = s }