module Control.Distributed.Process.Internal.Types
  ( 
    NodeId(..)
  , LocalProcessId(..)
  , ProcessId(..)
  , Identifier(..)
  , nodeOf
  , firstNonReservedProcessId
  , nullProcessId
    
  , LocalNode(..)
  , Tracer(..)
  , MxEventBus(..)
  , LocalNodeState(..)
  , LocalProcess(..)
  , LocalProcessState(..)
  , Process(..)
  , runLocalProcess
  , ImplicitReconnect(..)
    
  , LocalSendPortId
  , SendPortId(..)
  , TypedChannel(..)
  , SendPort(..)
  , ReceivePort(..)
    
  , Message(..)
  , isEncoded
  , createMessage
  , createUnencodedMessage
  , unsafeCreateUnencodedMessage
  , messageToPayload
  , payloadToMessage
    
  , MonitorRef(..)
  , ProcessMonitorNotification(..)
  , NodeMonitorNotification(..)
  , PortMonitorNotification(..)
  , ProcessExitException(..)
  , ProcessLinkException(..)
  , NodeLinkException(..)
  , PortLinkException(..)
  , ProcessRegistrationException(..)
  , DiedReason(..)
  , DidUnmonitor(..)
  , DidUnlinkProcess(..)
  , DidUnlinkNode(..)
  , DidUnlinkPort(..)
  , SpawnRef(..)
  , DidSpawn(..)
  , WhereIsReply(..)
  , RegisterReply(..)
  , ProcessInfo(..)
  , ProcessInfoNone(..)
  , NodeStats(..)
    
  , NCMsg(..)
  , ProcessSignal(..)
    
  , localProcesses
  , localPidCounter
  , localPidUnique
  , localConnections
  , localProcessWithId
  , localConnectionBetween
  , monitorCounter
  , spawnCounter
  , channelCounter
  , typedChannels
  , typedChannelWithId
    
  , forever'
  ) where
import System.Mem.Weak (Weak)
import Data.Map (Map)
import Data.Int (Int32)
import Data.Data (Data)
import Data.Typeable (Typeable, typeOf)
import Data.Binary (Binary(put, get), putWord8, getWord8, encode)
import qualified Data.ByteString as BSS (ByteString, concat, copy)
import qualified Data.ByteString.Lazy as BSL
  ( ByteString
  , toChunks
  , splitAt
  , fromChunks
  , length
  )
import qualified Data.ByteString.Lazy.Internal as BSL (ByteString(..))
import Data.Accessor (Accessor, accessor)
import Control.Category ((>>>))
import Control.DeepSeq (NFData(..))
import Control.Exception (Exception)
import Control.Concurrent (ThreadId)
import Control.Concurrent.Chan (Chan)
import Control.Concurrent.STM (STM)
import Control.Concurrent.STM.TChan (TChan)
import qualified Network.Transport as NT (EndPoint, EndPointAddress, Connection)
import Control.Applicative (Applicative, Alternative, (<$>), (<*>))
import Control.Monad.Reader (MonadReader(..), ReaderT, runReaderT)
import Control.Monad.IO.Class (MonadIO)
import Control.Distributed.Process.Serializable
  ( Fingerprint
  , Serializable
  , fingerprint
  , encodeFingerprint
  , sizeOfFingerprint
  , decodeFingerprint
  , showFingerprint
  )
import Control.Distributed.Process.Internal.CQueue (CQueue)
import Control.Distributed.Process.Internal.StrictMVar (StrictMVar)
import Control.Distributed.Process.Internal.WeakTQueue (TQueue)
import Control.Distributed.Static (RemoteTable, Closure)
import qualified Control.Distributed.Process.Internal.StrictContainerAccessors as DAC (mapMaybe)
import Data.Hashable
import GHC.Generics
newtype NodeId = NodeId { nodeAddress :: NT.EndPointAddress }
  deriving (Eq, Ord, Typeable, Data, Generic)
instance Binary NodeId where
instance NFData NodeId
instance Hashable NodeId where
instance Show NodeId where
  show (NodeId addr) = "nid://" ++ show addr
data LocalProcessId = LocalProcessId
  { lpidUnique  ::  !Int32
  , lpidCounter ::  !Int32
  }
  deriving (Eq, Ord, Typeable, Data, Generic, Show)
instance Hashable LocalProcessId where
data ProcessId = ProcessId
  { 
    processNodeId  :: !NodeId
    
  , processLocalId ::  !LocalProcessId
  }
  deriving (Eq, Ord, Typeable, Data, Generic)
instance Binary ProcessId where
instance NFData ProcessId where
instance Hashable ProcessId where
instance Show ProcessId where
  show (ProcessId (NodeId addr) (LocalProcessId _ lid))
    = "pid://" ++ show addr ++ ":" ++ show lid
data Identifier =
    NodeIdentifier !NodeId
  | ProcessIdentifier !ProcessId
  | SendPortIdentifier !SendPortId
  deriving (Eq, Ord, Generic)
instance Hashable Identifier where
instance Show Identifier where
  show (NodeIdentifier nid)     = show nid
  show (ProcessIdentifier pid)  = show pid
  show (SendPortIdentifier cid) = show cid
nodeOf :: Identifier -> NodeId
nodeOf (NodeIdentifier nid)     = nid
nodeOf (ProcessIdentifier pid)  = processNodeId pid
nodeOf (SendPortIdentifier cid) = processNodeId (sendPortProcessId cid)
firstNonReservedProcessId :: Int32
firstNonReservedProcessId = 1
nullProcessId :: NodeId -> ProcessId
nullProcessId nid =
  ProcessId { processNodeId  = nid
            , processLocalId = LocalProcessId { lpidUnique  = 0
                                              , lpidCounter = 0
                                              }
            }
data Tracer = Tracer
              {
                
                tracerPid :: !ProcessId
                
              , weakQ     :: !(Weak (CQueue Message))
              }
data MxEventBus =
    MxEventBusInitialising
  | MxEventBus
    {
      
      agent  :: !ProcessId
      
    , tracer :: !Tracer
      
    , evbuss :: !(Weak (CQueue Message))
      
    , mxNew  :: !(((TChan Message, TChan Message) -> Process ()) -> IO ProcessId)
    }
data LocalNode = LocalNode
  { 
    localNodeId     :: !NodeId
    
  , localEndPoint   :: !NT.EndPoint
    
  , localState      :: !(StrictMVar LocalNodeState)
    
  , localCtrlChan   :: !(Chan NCMsg)
    
  , localEventBus   :: !MxEventBus
    
    
  , remoteTable     :: !RemoteTable
  }
data ImplicitReconnect = WithImplicitReconnect | NoImplicitReconnect
  deriving (Eq, Show)
data LocalNodeState = LocalNodeState
  { 
    _localProcesses   :: !(Map LocalProcessId LocalProcess)
    
  , _localPidCounter  :: !Int32
    
    
  , _localPidUnique   :: !Int32
    
  , _localConnections :: !(Map (Identifier, Identifier)
                               (NT.Connection, ImplicitReconnect))
  }
data LocalProcess = LocalProcess
  { processQueue  :: !(CQueue Message)
  , processWeakQ  :: !(Weak (CQueue Message))
  , processId     :: !ProcessId
  , processState  :: !(StrictMVar LocalProcessState)
  , processThread :: !ThreadId
  , processNode   :: !LocalNode
  }
runLocalProcess :: LocalProcess -> Process a -> IO a
runLocalProcess lproc proc = runReaderT (unProcess proc) lproc
data LocalProcessState = LocalProcessState
  { _monitorCounter :: !Int32
  , _spawnCounter   :: !Int32
  , _channelCounter :: !Int32
  , _typedChannels  :: !(Map LocalSendPortId TypedChannel)
  }
newtype Process a = Process {
    unProcess :: ReaderT LocalProcess IO a
  }
  deriving (Functor, Monad, MonadIO, MonadReader LocalProcess, Typeable, Applicative)
type LocalSendPortId = Int32
data SendPortId = SendPortId {
    
    sendPortProcessId ::  !ProcessId
    
  , sendPortLocalId   ::  !LocalSendPortId
  }
  deriving (Eq, Ord, Typeable, Generic)
instance Hashable SendPortId where
instance Show SendPortId where
  show (SendPortId (ProcessId (NodeId addr) (LocalProcessId _ plid)) clid)
    = "cid://" ++ show addr ++ ":" ++ show plid ++ ":" ++ show clid
data TypedChannel = forall a. Serializable a => TypedChannel (Weak (TQueue a))
newtype SendPort a = SendPort {
    
    sendPortId :: SendPortId
  }
  deriving (Typeable, Generic, Show, Eq, Ord)
instance (Serializable a) => Binary (SendPort a) where
instance (Hashable a) => Hashable (SendPort a) where
instance (NFData a) => NFData (SendPort a) where
newtype ReceivePort a = ReceivePort { receiveSTM :: STM a }
  deriving (Typeable, Functor, Applicative, Alternative, Monad)
data Message =
  EncodedMessage
  { messageFingerprint :: !Fingerprint
  , messageEncoding    :: !BSL.ByteString
  } |
  forall a . Serializable a =>
  UnencodedMessage
  {
    messageFingerprint :: !Fingerprint
  , messagePayload     :: !a
  }
  deriving (Typeable)
instance Show Message where
  show (EncodedMessage fp enc) = show enc ++ " :: " ++ showFingerprint fp []
  show (UnencodedMessage _ uenc) = "[unencoded message] :: " ++ (show $ typeOf uenc)
isEncoded :: Message -> Bool
isEncoded (EncodedMessage _ _) = True
isEncoded _                    = False
createMessage :: Serializable a => a -> Message
createMessage a = EncodedMessage (fingerprint a) (encode a)
createUnencodedMessage :: Serializable a => a -> Message
createUnencodedMessage a =
  let encoded = encode a in BSL.length encoded `seq` UnencodedMessage (fingerprint a) a
unsafeCreateUnencodedMessage :: Serializable a => a -> Message
unsafeCreateUnencodedMessage a = UnencodedMessage (fingerprint a) a
messageToPayload :: Message -> [BSS.ByteString]
messageToPayload (EncodedMessage fp enc) = encodeFingerprint fp : BSL.toChunks enc
messageToPayload (UnencodedMessage fp m) = messageToPayload ((EncodedMessage fp (encode m)))
payloadToMessage :: [BSS.ByteString] -> Message
payloadToMessage payload = EncodedMessage fp (copy msg)
  where
    encFp :: BSL.ByteString
    msg   :: BSL.ByteString
    (encFp, msg) = BSL.splitAt (fromIntegral sizeOfFingerprint)
                 $ BSL.fromChunks payload
    fp :: Fingerprint
    fp = decodeFingerprint . BSS.concat . BSL.toChunks $ encFp
    copy :: BSL.ByteString -> BSL.ByteString
    copy (BSL.Chunk bs BSL.Empty) = BSL.Chunk (BSS.copy bs) BSL.Empty
    copy bsl = BSL.fromChunks . return . BSS.concat . BSL.toChunks $ bsl
data MonitorRef = MonitorRef
  { 
    monitorRefIdent   :: !Identifier
    
  , monitorRefCounter :: !Int32
  }
  deriving (Eq, Ord, Show, Typeable, Generic)
instance Hashable MonitorRef where
data ProcessMonitorNotification =
    ProcessMonitorNotification !MonitorRef !ProcessId !DiedReason
  deriving (Typeable, Show)
data NodeMonitorNotification =
    NodeMonitorNotification !MonitorRef !NodeId !DiedReason
  deriving (Typeable, Show)
data PortMonitorNotification =
    PortMonitorNotification !MonitorRef !SendPortId !DiedReason
  deriving (Typeable, Show)
data ProcessLinkException =
    ProcessLinkException !ProcessId !DiedReason
  deriving (Typeable, Show)
data NodeLinkException =
    NodeLinkException !NodeId !DiedReason
  deriving (Typeable, Show)
data PortLinkException =
    PortLinkException !SendPortId !DiedReason
  deriving (Typeable, Show)
data ProcessRegistrationException =
    ProcessRegistrationException !String
  deriving (Typeable, Show)
data ProcessExitException =
    ProcessExitException !ProcessId !Message
  deriving Typeable
instance Exception ProcessExitException
instance Show ProcessExitException where
  show (ProcessExitException pid _) = "exit-from=" ++ (show pid)
instance Exception ProcessLinkException
instance Exception NodeLinkException
instance Exception PortLinkException
instance Exception ProcessRegistrationException
data DiedReason =
    
    DiedNormal
    
    
  | DiedException !String
    
  | DiedDisconnect
    
  | DiedNodeDown
    
  | DiedUnknownId
  deriving (Show, Eq)
newtype DidUnmonitor = DidUnmonitor MonitorRef
  deriving (Typeable, Binary)
newtype DidUnlinkProcess = DidUnlinkProcess ProcessId
  deriving (Typeable, Binary)
newtype DidUnlinkNode = DidUnlinkNode NodeId
  deriving (Typeable, Binary)
newtype DidUnlinkPort = DidUnlinkPort SendPortId
  deriving (Typeable, Binary)
newtype SpawnRef = SpawnRef Int32
  deriving (Show, Binary, Typeable, Eq)
data DidSpawn = DidSpawn SpawnRef ProcessId
  deriving (Show, Typeable)
data WhereIsReply = WhereIsReply String (Maybe ProcessId)
  deriving (Show, Typeable)
data RegisterReply = RegisterReply String Bool
  deriving (Show, Typeable)
data NodeStats = NodeStats {
     nodeStatsNode            :: NodeId
   , nodeStatsRegisteredNames :: Int
   , nodeStatsMonitors        :: Int
   , nodeStatsLinks           :: Int
   , nodeStatsProcesses       :: Int
   }
   deriving (Show, Eq, Typeable)
data ProcessInfo = ProcessInfo {
    infoNode               :: NodeId
  , infoRegisteredNames    :: [String]
  , infoMessageQueueLength :: Maybe Int
  , infoMonitors           :: [(ProcessId, MonitorRef)]
  , infoLinks              :: [ProcessId]
  } deriving (Show, Eq, Typeable)
data ProcessInfoNone = ProcessInfoNone DiedReason
    deriving (Show, Typeable)
data NCMsg = NCMsg
  { ctrlMsgSender :: !Identifier
  , ctrlMsgSignal :: !ProcessSignal
  }
  deriving Show
data ProcessSignal =
    Link !Identifier
  | Unlink !Identifier
  | Monitor !MonitorRef
  | Unmonitor !MonitorRef
  | Died Identifier !DiedReason
  | Spawn !(Closure (Process ())) !SpawnRef
  | WhereIs !String
  | Register !String !NodeId !(Maybe ProcessId) !Bool 
  | NamedSend !String !Message
  | LocalSend !ProcessId !Message
  | LocalPortSend !SendPortId !Message
  | Kill !ProcessId !String
  | Exit !ProcessId !Message
  | GetInfo !ProcessId
  | SigShutdown
  | GetNodeStats !NodeId
  deriving Show
instance Binary Message where
  put msg = put $ messageToPayload msg
  get = payloadToMessage <$> get
instance Binary LocalProcessId where
  put lpid = put (lpidUnique lpid) >> put (lpidCounter lpid)
  get      = LocalProcessId <$> get <*> get
instance Binary ProcessMonitorNotification where
  put (ProcessMonitorNotification ref pid reason) = put ref >> put pid >> put reason
  get = ProcessMonitorNotification <$> get <*> get <*> get
instance Binary NodeMonitorNotification where
  put (NodeMonitorNotification ref pid reason) = put ref >> put pid >> put reason
  get = NodeMonitorNotification <$> get <*> get <*> get
instance Binary PortMonitorNotification where
  put (PortMonitorNotification ref pid reason) = put ref >> put pid >> put reason
  get = PortMonitorNotification <$> get <*> get <*> get
instance Binary NCMsg where
  put msg = put (ctrlMsgSender msg) >> put (ctrlMsgSignal msg)
  get     = NCMsg <$> get <*> get
instance Binary MonitorRef where
  put ref = put (monitorRefIdent ref) >> put (monitorRefCounter ref)
  get     = MonitorRef <$> get <*> get
instance Binary ProcessSignal where
  put (Link pid)              = putWord8 0 >> put pid
  put (Unlink pid)            = putWord8 1 >> put pid
  put (Monitor ref)           = putWord8 2 >> put ref
  put (Unmonitor ref)         = putWord8 3 >> put ref
  put (Died who reason)       = putWord8 4 >> put who >> put reason
  put (Spawn proc ref)        = putWord8 5 >> put proc >> put ref
  put (WhereIs label)         = putWord8 6 >> put label
  put (Register label nid pid force) = putWord8 7 >> put label >> put nid >> put pid >> put force
  put (NamedSend label msg)   = putWord8 8 >> put label >> put (messageToPayload msg)
  put (Kill pid reason)       = putWord8 9 >> put pid >> put reason
  put (Exit pid reason)       = putWord8 10 >> put pid >> put (messageToPayload reason)
  put (LocalSend to' msg)      = putWord8 11 >> put to' >> put (messageToPayload msg)
  put (LocalPortSend sid msg) = putWord8 12 >> put sid >> put (messageToPayload msg)
  put (GetInfo about)         = putWord8 30 >> put about
  put (SigShutdown)         = putWord8 31
  put (GetNodeStats nid)         = putWord8 32 >> put nid
  get = do
    header <- getWord8
    case header of
      0  -> Link <$> get
      1  -> Unlink <$> get
      2  -> Monitor <$> get
      3  -> Unmonitor <$> get
      4  -> Died <$> get <*> get
      5  -> Spawn <$> get <*> get
      6  -> WhereIs <$> get
      7  -> Register <$> get <*> get <*> get <*> get
      8  -> NamedSend <$> get <*> (payloadToMessage <$> get)
      9  -> Kill <$> get <*> get
      10 -> Exit <$> get <*> (payloadToMessage <$> get)
      11 -> LocalSend <$> get <*> (payloadToMessage <$> get)
      12 -> LocalPortSend <$> get <*> (payloadToMessage <$> get)
      30 -> GetInfo <$> get
      31 -> return SigShutdown
      32 -> GetNodeStats <$> get
      _ -> fail "ProcessSignal.get: invalid"
instance Binary DiedReason where
  put DiedNormal        = putWord8 0
  put (DiedException e) = putWord8 1 >> put e
  put DiedDisconnect    = putWord8 2
  put DiedNodeDown      = putWord8 3
  put DiedUnknownId     = putWord8 4
  get = do
    header <- getWord8
    case header of
      0 -> return DiedNormal
      1 -> DiedException <$> get
      2 -> return DiedDisconnect
      3 -> return DiedNodeDown
      4 -> return DiedUnknownId
      _ -> fail "DiedReason.get: invalid"
instance Binary DidSpawn where
  put (DidSpawn ref pid) = put ref >> put pid
  get = DidSpawn <$> get <*> get
instance Binary SendPortId where
  put cid = put (sendPortProcessId cid) >> put (sendPortLocalId cid)
  get = SendPortId <$> get <*> get
instance NFData SendPortId where
  rnf cid = (sendPortProcessId cid) `seq` (sendPortLocalId cid) `seq` ()
instance Binary Identifier where
  put (ProcessIdentifier pid)  = putWord8 0 >> put pid
  put (NodeIdentifier nid)     = putWord8 1 >> put nid
  put (SendPortIdentifier cid) = putWord8 2 >> put cid
  get = do
    header <- getWord8
    case header of
      0 -> ProcessIdentifier <$> get
      1 -> NodeIdentifier <$> get
      2 -> SendPortIdentifier <$> get
      _ -> fail "Identifier.get: invalid"
instance Binary WhereIsReply where
  put (WhereIsReply label mPid) = put label >> put mPid
  get = WhereIsReply <$> get <*> get
instance Binary RegisterReply where
  put (RegisterReply label ok) = put label >> put ok
  get = RegisterReply <$> get <*> get
instance Binary ProcessInfo where
  get = ProcessInfo <$> get <*> get <*> get <*> get <*> get
  put pInfo = put (infoNode pInfo)
           >> put (infoRegisteredNames pInfo)
           >> put (infoMessageQueueLength pInfo)
           >> put (infoMonitors pInfo)
           >> put (infoLinks pInfo)
instance Binary NodeStats where
  get = NodeStats <$> get <*> get <*> get <*> get <*> get
  put nStats =  put (nodeStatsNode nStats)
             >> put (nodeStatsRegisteredNames nStats)
             >> put (nodeStatsMonitors nStats)
             >> put (nodeStatsLinks nStats)
             >> put (nodeStatsProcesses nStats)
instance Binary ProcessInfoNone where
  get = ProcessInfoNone <$> get
  put (ProcessInfoNone r) = put r
localProcesses :: Accessor LocalNodeState (Map LocalProcessId LocalProcess)
localProcesses = accessor _localProcesses (\procs st -> st { _localProcesses = procs })
localPidCounter :: Accessor LocalNodeState Int32
localPidCounter = accessor _localPidCounter (\ctr st -> st { _localPidCounter = ctr })
localPidUnique :: Accessor LocalNodeState Int32
localPidUnique = accessor _localPidUnique (\unq st -> st { _localPidUnique = unq })
localConnections :: Accessor LocalNodeState (Map (Identifier, Identifier) (NT.Connection, ImplicitReconnect))
localConnections = accessor _localConnections (\conns st -> st { _localConnections = conns })
localProcessWithId :: LocalProcessId -> Accessor LocalNodeState (Maybe LocalProcess)
localProcessWithId lpid = localProcesses >>> DAC.mapMaybe lpid
localConnectionBetween :: Identifier -> Identifier -> Accessor LocalNodeState (Maybe (NT.Connection, ImplicitReconnect))
localConnectionBetween from' to' = localConnections >>> DAC.mapMaybe (from', to')
monitorCounter :: Accessor LocalProcessState Int32
monitorCounter = accessor _monitorCounter (\cnt st -> st { _monitorCounter = cnt })
spawnCounter :: Accessor LocalProcessState Int32
spawnCounter = accessor _spawnCounter (\cnt st -> st { _spawnCounter = cnt })
channelCounter :: Accessor LocalProcessState LocalSendPortId
channelCounter = accessor _channelCounter (\cnt st -> st { _channelCounter = cnt })
typedChannels :: Accessor LocalProcessState (Map LocalSendPortId TypedChannel)
typedChannels = accessor _typedChannels (\cs st -> st { _typedChannels = cs })
typedChannelWithId :: LocalSendPortId -> Accessor LocalProcessState (Maybe TypedChannel)
typedChannelWithId cid = typedChannels >>> DAC.mapMaybe cid
forever' :: Monad m => m a -> m b
forever' a = let a' = a >> a' in a'