module Control.Distributed.Process.Execution.Mailbox
  (
    
    Mailbox()
  , startMailbox
  , startSupervised
  , startSupervisedMailbox
  , createMailbox
  , resize
  , statistics
  , monitor
  , Limit
  , BufferType(..)
  , MailboxStats(..)
    
  , post
    
  , notify
  , deliver
  , active
  , NewMail(..)
  , Delivery(..)
  , FilterResult(..)
  , acceptEverything
  , acceptMatching
    
  , __remoteTable
  ) where
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TChan
  ( TChan
  , newBroadcastTChanIO
  , dupTChan
  , readTChan
  , writeTChan
  )
import Control.Distributed.Process hiding (call, monitor)
import qualified Control.Distributed.Process as P (monitor)
import Control.Distributed.Process.Closure
  ( remotable
  , mkStaticClosure
  )
import Control.Distributed.Process.Serializable hiding (SerializableDict)
import Control.Distributed.Process.Extras.Internal.Types
  ( ExitReason(..)
  , Resolvable(..)
  , Routable(..)
  , Linkable(..)
  )
import Control.Distributed.Process.ManagedProcess
  ( call
  , sendControlMessage
  , channelControlPort
  , handleControlChan
  , handleInfo
  , handleRaw
  , continue
  , defaultProcess
  , UnhandledMessagePolicy(..)
  , InitHandler
  , InitResult(..)
  , ProcessAction
  , ProcessDefinition(..)
  , ControlChannel
  , ControlPort
  )
import qualified Control.Distributed.Process.ManagedProcess as MP
  ( chanServe
  )
import Control.Distributed.Process.ManagedProcess.Server
  ( stop
  )
import Control.Distributed.Process.ManagedProcess.Server.Restricted as Restricted
  ( getState
  , Result
  , RestrictedProcess
  )
import qualified Control.Distributed.Process.ManagedProcess.Server.Restricted as Restricted
  ( handleCall
  , reply
  )
import Control.Distributed.Process.Supervisor (SupervisorPid)
import Control.Distributed.Process.Extras.Time
import Control.Exception (SomeException)
import Data.Accessor
  ( Accessor
  , accessor
  , (^:)
  , (.>)
  , (^=)
  , (^.)
  )
import Data.Binary
import qualified Data.Foldable as Foldable
import Data.Sequence
  ( Seq
  , ViewL(EmptyL, (:<))
  , ViewR(EmptyR, (:>))
  , (<|)
  , (|>)
  )
import qualified Data.Sequence as Seq
import Data.Typeable (Typeable)
import GHC.Generics
#if ! MIN_VERSION_base(4,6,0)
import Prelude hiding (catch, drop)
#else
import Prelude hiding (drop)
#endif
data Mailbox = Mailbox { pid   :: !ProcessId
                       , cchan :: !(ControlPort ControlMessage)
                       } deriving (Typeable, Generic, Eq)
instance Binary Mailbox where
instance Show Mailbox where
  show = ("Mailbox:" ++) . show . pid
instance Linkable Mailbox where
  linkTo = link . pid
instance Resolvable Mailbox where
  resolve = return . Just . pid
instance Routable Mailbox where
  sendTo       = post
  unsafeSendTo = post
sendCtrlMsg :: Mailbox
            -> ControlMessage
            -> Process ()
sendCtrlMsg Mailbox{..} = sendControlMessage cchan
data BufferType =
    Queue 
  | Stack 
  | Ring  
  deriving (Typeable, Eq, Show)
type Limit = Integer
type Filter = Closure (Message -> Process FilterResult)
data NewMail = NewMail !Mailbox !Integer
  deriving (Typeable, Generic, Show)
instance Binary NewMail where
data Delivery = Delivery { box          :: Mailbox 
                         , messages     :: [Message] 
                         , count        :: Integer 
                         , totalDropped :: Integer 
                         }
  deriving (Typeable, Generic)
instance Binary Delivery where
data MailboxStats =
  MailboxStats { pendingMessages :: Integer
               , droppedMessages :: Integer
               , currentLimit    :: Limit
               , owningProcess   :: ProcessId
               } deriving (Typeable, Generic, Show)
instance Binary MailboxStats where
data Post = Post !Message
  deriving (Typeable, Generic)
instance Binary Post where
data StatsReq = StatsReq
  deriving (Typeable, Generic)
instance Binary StatsReq where
data FilterResult = Keep | Skip | Send
  deriving (Typeable, Generic)
instance Binary FilterResult
data Mode =
    Active !Filter 
  | Notify  
  | Passive 
  deriving (Typeable, Generic)
instance Binary Mode where
instance Show Mode where
  show (Active _) = "Active"
  show Notify     = "Notify"
  show Passive    = "Passive"
data ControlMessage =
    Resize !Integer
  | SetActiveMode !Mode
  deriving (Typeable, Generic)
instance Binary ControlMessage where
class Buffered a where
  tag    :: a -> BufferType
  push   :: Message -> a -> a
  pop    :: a -> Maybe (Message, a)
  adjust :: Limit -> a -> a
  drop   :: Integer -> a -> a
data BufferState =
  BufferState { _mode    :: Mode
              , _bufferT :: BufferType
              , _limit   :: Limit
              , _size    :: Integer
              , _dropped :: Integer
              , _owner   :: ProcessId
              , ctrlChan :: ControlPort ControlMessage
              }
defaultState :: BufferType
             -> Limit
             -> ProcessId
             -> ControlPort ControlMessage
             -> BufferState
defaultState bufferT limit' pid cc =
  BufferState { _mode    = Passive
              , _bufferT = bufferT
              , _limit   = limit'
              , _size    = 0
              , _dropped = 0
              , _owner   = pid
              , ctrlChan = cc
              }
data State = State { _buffer :: Seq Message
                   , _state  :: BufferState
                   }
instance Buffered State where
  tag q  = _bufferT $ _state q
  
  push m = (state .> size ^: (+1)) . (buffer ^: (m <|))
  
  pop q = maybe Nothing
                (\(s' :> a) -> Just (a, ( (buffer ^= s')
                                        . (state .> size ^: (1))
                                        $ q))) $ getR (q ^. buffer)
  adjust sz q = (state .> limit ^= sz) $ maybeDrop
    where
      maybeDrop
        | size' <- (q ^. state ^. size),
          size' > sz = (state .> size ^= sz) $ drop (size'  sz) q
        | otherwise  = q
  
  drop n q
    | n > 1     = drop (n  1) $ drop 1 q
    | isQueue q = dropR q
    | otherwise = dropL q
    where
      dropR q' = maybe q' (\(s' :> _) -> dropOne q' s') $ getR (q' ^. buffer)
      dropL q' = maybe q' (\(_ :< s') -> dropOne q' s') $ getL (q' ^. buffer)
      dropOne q' s = ( (buffer ^= s)
                     . (state .> size ^: (\n' -> n'  1))
                     . (state .> dropped ^: (+1))
                     $ q' )
createMailbox :: BufferType -> Limit -> Process Mailbox
createMailbox buffT maxSz =
  getSelfPid >>= \self -> startMailbox self buffT maxSz
startMailbox :: ProcessId -> BufferType -> Limit -> Process Mailbox
startMailbox = doStartMailbox Nothing
startSupervised :: ProcessId
                -> BufferType
                -> Limit
                -> SupervisorPid
                -> Process (ProcessId, Message)
startSupervised p b l s = do
  mb <- startSupervisedMailbox p b l s
  return (pid mb, unsafeWrapMessage mb)
startSupervisedMailbox :: ProcessId
                       -> BufferType
                       -> Limit
                       -> SupervisorPid
                       -> Process Mailbox
startSupervisedMailbox p b l s = doStartMailbox (Just s) p b l
doStartMailbox :: Maybe SupervisorPid
               -> ProcessId
               -> BufferType
               -> Limit
               -> Process Mailbox
doStartMailbox mSp p b l = do
  bchan <- liftIO $ newBroadcastTChanIO
  rchan <- liftIO $ atomically $ dupTChan bchan
  spawnLocal (maybeLink mSp >> runMailbox bchan p b l) >>= \pid -> do
    cc <- liftIO $ atomically $ readTChan rchan
    return $ Mailbox pid cc
  where
    maybeLink Nothing   = return ()
    maybeLink (Just p') = link p'
runMailbox :: TChan (ControlPort ControlMessage)
           -> ProcessId
           -> BufferType
           -> Limit
           -> Process ()
runMailbox tc pid buffT maxSz = do
  link pid
  tc' <- liftIO $ atomically $ dupTChan tc
  MP.chanServe (pid, buffT, maxSz) (mboxInit tc') (processDefinition pid tc)
mboxInit :: TChan (ControlPort ControlMessage)
         -> InitHandler (ProcessId, BufferType, Limit) State
mboxInit tc (pid, buffT, maxSz) = do
  cc <- liftIO $ atomically $ readTChan tc
  return $ InitOk (State Seq.empty $ defaultState buffT maxSz pid cc) Infinity
monitor :: Mailbox -> Process MonitorRef
monitor = P.monitor . pid
notify :: Mailbox -> Process ()
notify mb = sendCtrlMsg mb $ SetActiveMode Notify
active :: Mailbox -> Filter -> Process ()
active mb f = sendCtrlMsg mb $ SetActiveMode $ Active f
resize :: Mailbox -> Integer -> Process ()
resize mb sz = sendCtrlMsg mb $ Resize sz
post :: Serializable a => Mailbox -> a -> Process ()
post Mailbox{..} m = send pid (Post $ wrapMessage m)
statistics :: Mailbox -> Process MailboxStats
statistics mb = call mb StatsReq
everything :: Message -> Process FilterResult
everything _ = return Keep
matching :: Closure (Message -> Process FilterResult)
         -> Message
         -> Process FilterResult
matching predicate msg = do
  pred' <- unClosure predicate :: Process (Message -> Process FilterResult)
  res   <- handleMessage msg pred'
  case res of
    Nothing -> return Skip
    Just fr -> return fr
processDefinition :: ProcessId
                  -> TChan (ControlPort ControlMessage)
                  -> ControlChannel ControlMessage
                  -> Process (ProcessDefinition State)
processDefinition pid tc cc = do
  liftIO $ atomically $ writeTChan tc $ channelControlPort cc
  return $ defaultProcess { apiHandlers = [
                               handleControlChan     cc handleControlMessages
                             , Restricted.handleCall handleGetStats
                             ]
                          , infoHandlers = [ handleInfo handlePost
                                           , handleRaw  handleRawInputs ]
                          , unhandledMessagePolicy = DeadLetter pid
                          } :: Process (ProcessDefinition State)
handleControlMessages :: State
                      -> ControlMessage
                      -> Process (ProcessAction State)
handleControlMessages st cm
  | (SetActiveMode new) <- cm = activateMode st new
  | (Resize sz')        <- cm = continue $ adjust sz' st
  | otherwise                 = stop $ ExitOther "IllegalState"
  where
    activateMode :: State -> Mode -> Process (ProcessAction State)
    activateMode st' new
      | sz <- (st ^. state ^. size)
      , sz == 0           = continue $ updated st' new
      | otherwise         = do
          let updated' = updated st' new
          case new of
            Notify     -> sendNotification updated' >> continue updated'
            (Active _) -> sendMail updated' >>= continue
            Passive    ->  die $ "IllegalState"
    updated s m = (state .> mode ^= m) s
handleGetStats :: StatsReq -> RestrictedProcess State (Result MailboxStats)
handleGetStats _ = Restricted.reply . (^. stats) =<< getState
handleRawInputs :: State -> Message -> Process (ProcessAction State)
handleRawInputs st msg = handlePost st (Post msg)
handlePost :: State -> Post -> Process (ProcessAction State)
handlePost st (Post msg) = do
  let st' = insert msg st
  continue . (state .> mode ^= Passive) =<< forwardIfNecessary st'
  where
    forwardIfNecessary s
      | Notify   <- currentMode = sendNotification s >> return s
      | Active _ <- currentMode = sendMail s
      | otherwise               = return s
    currentMode = st ^. state ^. mode
sendNotification :: State -> Process ()
sendNotification st = do
    pid <- getSelfPid
    send ownerPid $ NewMail (Mailbox pid cchan) pending
  where
    ownerPid = st ^. state ^. owner
    pending  = st ^. state ^. size
    cchan    = ctrlChan (st ^. state)
type Count = Integer
type Skipped = Integer
sendMail :: State -> Process State
sendMail st = do
    let Active f = st ^. state ^. mode
    unCl <- catch (unClosure f >>= return . Just)
                  (\(_ :: SomeException) -> return Nothing)
    case unCl of
      Nothing -> return st 
      Just f' -> do
        (st', cnt, skipped, msgs) <- applyFilter f' st
        us <- getSelfPid
        send ownerPid $ Delivery { box          = Mailbox us (ctrlChan $ st ^. state)
                                 , messages     = Foldable.toList msgs
                                 , count        = cnt
                                 , totalDropped = skipped + droppedMsgs
                                 }
        return $ ( (state .> dropped ^= 0)
                 . (state .> size ^: ((cnt + skipped) ))
                 $ st' )
  where
    applyFilter f s = filterMessages f (s, 0, 0, Seq.empty)
    filterMessages :: (Message -> Process FilterResult)
                   -> (State, Count, Skipped, Seq Message)
                   -> Process (State, Count, Skipped, Seq Message)
    filterMessages f accIn@(buff, cnt, drp, acc) = do
      case pop buff of
        Nothing         -> return accIn
        Just (m, buff') -> do
          res <- f m
          case res of
            Keep -> filterMessages f (buff', cnt + 1, drp, acc |> m)
            Skip -> filterMessages f (buff', cnt, drp + 1, acc)
            Send -> return accIn
    ownerPid    = st ^. state ^. owner
    droppedMsgs = st ^. state ^. dropped
insert :: Message -> State -> State
insert msg st@(State _ BufferState{..}) =
  if _size /= _limit
     then push msg st
     else case _bufferT of
            Ring -> (state .> dropped ^: (+1)) st
            _    -> push msg $ drop 1 st
isQueue :: State -> Bool
isQueue = (== Queue) . _bufferT . _state
isStack :: State -> Bool
isStack = (== Stack) . _bufferT . _state
getR :: Seq a -> Maybe (ViewR a)
getR s =
  case Seq.viewr s of
    EmptyR -> Nothing
    a      -> Just a
getL :: Seq a -> Maybe (ViewL a)
getL s =
  case Seq.viewl s of
    EmptyL -> Nothing
    a      -> Just a
mode :: Accessor BufferState Mode
mode = accessor _mode (\m st -> st { _mode = m })
bufferType :: Accessor BufferState BufferType
bufferType = accessor _bufferT (\t st -> st { _bufferT = t })
limit :: Accessor BufferState Limit
limit = accessor _limit (\l st -> st { _limit = l })
size :: Accessor BufferState Integer
size = accessor _size (\s st -> st { _size = s })
dropped :: Accessor BufferState Integer
dropped = accessor _dropped (\d st -> st { _dropped = d })
owner :: Accessor BufferState ProcessId
owner = accessor _owner (\o st -> st { _owner = o })
buffer :: Accessor State (Seq Message)
buffer = accessor _buffer (\b qb -> qb { _buffer = b })
state :: Accessor State BufferState
state = accessor _state (\s qb -> qb { _state = s })
stats :: Accessor State MailboxStats
stats = accessor getStats (\_ s -> s) 
  where
    getStats (State _ (BufferState _ _ lm sz dr op _)) = MailboxStats sz dr lm op
$(remotable ['everything, 'matching])
acceptEverything :: Closure (Message -> Process FilterResult)
acceptEverything = $(mkStaticClosure 'everything)
acceptMatching :: Closure (Closure (Message -> Process FilterResult)
                           -> Message -> Process FilterResult)
acceptMatching = $(mkStaticClosure 'matching)
deliver :: Mailbox -> Process ()
deliver mb = active mb acceptEverything