module Control.Distributed.Process.Execution.Exchange.Router
  ( 
    HeaderName
  , Binding(..)
  , Bindable
  , BindingSelector
  , RelayType(..)
    
  , router
  , supervisedRouter
  , supervisedRouterRef
    
  , route
  , routeMessage
    
  , messageKeyRouter
  , bindKey
    
  , headerContentRouter
  , bindHeader
  ) where
import Control.DeepSeq (NFData)
import Control.Distributed.Process
  ( Process
  , ProcessMonitorNotification(..)
  , ProcessId
  , monitor
  , handleMessage
  , unsafeWrapMessage
  )
import qualified Control.Distributed.Process as P
import Control.Distributed.Process.Serializable (Serializable)
import Control.Distributed.Process.Execution.Exchange.Internal
  ( startExchange
  , startSupervised
  , configureExchange
  , Message(..)
  , Exchange
  , ExchangeType(..)
  , post
  , postMessage
  , applyHandlers
  )
import Control.Distributed.Process.Extras.Internal.Primitives
  ( deliver
  , Resolvable(..)
  )
import Control.Distributed.Process.Supervisor (SupervisorPid)
import Data.Binary
import Data.Foldable (forM_)
import Data.Hashable
import Data.HashMap.Strict (HashMap)
import qualified Data.HashMap.Strict as Map
import Data.HashSet (HashSet)
import qualified Data.HashSet as Set
import Data.Typeable (Typeable)
import GHC.Generics
type HeaderName = String
data Binding =
    BindKey    { bindingKey :: !String }
  | BindHeader { bindingKey :: !String
               , headerName :: !HeaderName
               }
  | BindNone
  deriving (Typeable, Generic, Eq, Show)
instance Binary Binding where
instance NFData Binding where
instance Hashable Binding where
class (Hashable k, Eq k, Serializable k) => Bindable k
instance (Hashable k, Eq k, Serializable k) => Bindable k
type BindingSelector k = (Message -> Process k)
data RelayType = PayloadOnly | WholeMessage
data State k = State { bindings  :: !(HashMap k (HashSet ProcessId))
                     , selector  :: !(BindingSelector k)
                     , relayType :: !RelayType
                     }
type Router k = ExchangeType (State k)
messageKeyRouter :: RelayType -> Process Exchange
messageKeyRouter t = router t matchOnKey 
  where
    matchOnKey :: Message -> Process Binding
    matchOnKey m = return $ BindKey (key m)
headerContentRouter :: RelayType -> HeaderName -> Process Exchange
headerContentRouter t n = router t (checkHeaders n)
  where
    checkHeaders hn Message{..} = do
      case Map.lookup hn (Map.fromList headers) of
        Nothing -> return BindNone
        Just hv -> return $ BindHeader hn hv
router :: (Bindable k) => RelayType -> BindingSelector k -> Process Exchange
router t s = routerT t s >>= startExchange
supervisedRouterRef :: Bindable k
                    => RelayType
                    -> BindingSelector k
                    -> SupervisorPid
                    -> Process (ProcessId, P.Message)
supervisedRouterRef t sel spid = do
  ex <- supervisedRouter t sel spid
  Just pid <- resolve ex
  return (pid, unsafeWrapMessage ex)
supervisedRouter :: Bindable k
                 => RelayType
                 -> BindingSelector k
                 -> SupervisorPid
                 -> Process Exchange
supervisedRouter t sel spid =
  routerT t sel >>= \t' -> startSupervised t' spid
routerT :: Bindable k
        => RelayType
        -> BindingSelector k
        -> Process (Router k)
routerT t s = do
  return $ ExchangeType { name        = "Router"
                        , state       = State Map.empty s t
                        , configureEx = apiConfigure
                        , routeEx     = apiRoute
                        }
bindKey :: String -> Exchange -> Process ()
bindKey k ex = do
  self <- P.getSelfPid
  configureExchange ex (self, BindKey k)
bindHeader :: HeaderName -> String -> Exchange -> Process ()
bindHeader n v ex = do
  self <- P.getSelfPid
  configureExchange ex (self, BindHeader v n)
route :: Serializable m => Exchange -> m -> Process ()
route = post
routeMessage :: Exchange -> Message -> Process ()
routeMessage = postMessage
apiRoute :: forall k. Bindable k
         => State k
         -> Message
         -> Process (State k)
apiRoute st@State{..} msg = do
  binding <- selector msg
  case Map.lookup binding bindings of
    Nothing -> return st
    Just bs -> forM_ bs (fwd relayType msg) >> return st
  where
    fwd WholeMessage m = deliver m
    fwd PayloadOnly  m = P.forward (payload m)
apiConfigure :: forall k. Bindable k
             => State k
             -> P.Message
             -> Process (State k)
apiConfigure st msg = do
  applyHandlers st msg $ [ \m -> handleMessage m (createBinding st)
                         , \m -> handleMessage m (handleMonitorSignal st)
                         ]
  where
    createBinding s@State{..} (pid, bind) = do
      case Map.lookup bind bindings of
        Nothing -> do _ <- monitor pid
                      return $ s { bindings = newBind bind pid bindings }
        Just ps -> return $ s { bindings = addBind bind pid bindings ps }
    newBind b p bs = Map.insert b (Set.singleton p) bs
    addBind b' p' bs' ps = Map.insert b' (Set.insert p' ps) bs'
    handleMonitorSignal s@State{..} (ProcessMonitorNotification _ p _) =
      let bs  = bindings
          bs' = Map.foldlWithKey' (\a k v -> Map.insert k (Set.delete p v) a) bs bs
      in return $ s { bindings = bs' }