-- | Observer Effects
--
-- This module supports the implementation of observers and observables. One use
-- case is event propagation. The tools in this module are tailored towards
-- 'Api' servers/clients.
{-# LANGUAGE IncoherentInstances #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE DeriveFunctor #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeOperators #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE GADTs #-}
module Control.Eff.Concurrent.Api.Observer
  ( Observer(..)
  , Observable(..)
  , notifyObserver
  , registerObserver
  , forgetObserver
  , SomeObserver(..)
  , notifySomeObserver
  , Observers()
  , manageObservers
  , addObserver
  , removeObserver
  , notifyObservers
  , CallbackObserver
  , spawnCallbackObserver
  , spawnLoggingObserver
  ) where

import GHC.Stack
import Data.Dynamic
import Data.Set (Set)
import qualified Data.Set as Set
import Control.Eff
import Control.Eff.Concurrent.Process
import Control.Eff.Concurrent.Api
import Control.Eff.Concurrent.Api.Client
import Control.Eff.Concurrent.Api.Server
import Control.Eff.Log
import Control.Eff.State.Lazy
import Control.Lens
import Control.Monad

-- * Observation API

-- | An 'Api' index that support observation of the
-- another 'Api' that is 'Observable'.
class (Typeable p, Observable o) => Observer p o where
  -- | Wrap the 'Observation' and the 'ProcessId' (i.e. the 'Server')
  -- that caused the observation into a 'Api' value that the
  -- 'Observable' understands.
  observationMessage :: Server o -> Observation o -> Api p 'Asynchronous

-- | An 'Api' index that supports registration and de-registration of
-- 'Observer's.
class (Typeable o, Typeable (Observation o)) => Observable o where
  -- | Type of observations visible on this observable
  data Observation o
  -- | Return the 'Api' value for the 'cast_' that registeres an observer
  registerObserverMessage :: SomeObserver o -> Api o 'Asynchronous
  -- | Return the 'Api' value for the 'cast_' that de-registeres an observer
  forgetObserverMessage :: SomeObserver o -> Api o 'Asynchronous

-- | Send an 'Observation' to an 'Observer'
notifyObserver :: ( SetMember Process (Process q) r
                 , Observable o
                 , Observer p o
                 , HasCallStack
                 )
               => SchedulerProxy q -> Server p -> Server o -> Observation o -> Eff r ()
notifyObserver px observer observed observation =
  cast px observer (observationMessage observed observation)

-- | Send the 'registerObserverMessage'
registerObserver :: ( SetMember Process (Process q) r
                 , Observable o
                 , Observer p o
                 , HasCallStack
                 )
               => SchedulerProxy q -> Server p -> Server o -> Eff r ()
registerObserver px observer observed =
  cast px observed (registerObserverMessage (SomeObserver observer))

-- | Send the 'forgetObserverMessage'
forgetObserver :: ( SetMember Process (Process q) r
                , Observable o
                , Observer p o)
              => SchedulerProxy q -> Server p -> Server o -> Eff r ()
forgetObserver px observer observed =
  cast px observed (forgetObserverMessage (SomeObserver observer))

-- ** Generalized observation

-- | An existential wrapper around a 'Server' of an 'Observer'.
-- Needed to support different types of observers to observe the
-- same 'Observable' in a general fashion.
data SomeObserver o where
  SomeObserver :: (Show (Server p), Typeable p, Observer p o) => Server p -> SomeObserver o

deriving instance Show (SomeObserver o)

instance Ord (SomeObserver o) where
  compare (SomeObserver (Server o1)) (SomeObserver (Server o2)) =
    compare o1 o2

instance Eq (SomeObserver o) where
  (==) (SomeObserver (Server o1)) (SomeObserver (Server o2)) =
    o1 == o2

-- | Send an 'Observation' to 'SomeObserver'.
notifySomeObserver :: ( SetMember Process (Process q) r
                     , Observable o
                     , HasCallStack
                     )
                   => SchedulerProxy q
                   -> Server o
                   -> Observation o
                   -> SomeObserver o
                   -> Eff r ()
notifySomeObserver px observed observation (SomeObserver observer) =
  notifyObserver px observer observed observation

-- ** Manage 'Observers's

-- | Internal state for 'manageobservers'
data Observers o =
  Observers { _observers :: Set (SomeObserver o) }

observers :: Iso' (Observers o) (Set (SomeObserver o))
observers = iso _observers Observers

-- | Keep track of registered 'Observer's Observers can be added and removed,
-- and an 'Observation' can be sent to all registerd observers at once.
manageObservers :: Eff (State (Observers o) ': r) a -> Eff r a
manageObservers = flip evalState (Observers Set.empty)

-- | Add an 'Observer' to the 'Observers' managed by 'manageObservers'.
addObserver
  :: ( SetMember Process (Process q) r
    , Member (State (Observers o)) r
    , Observable o)
  => SomeObserver o -> Eff r ()
addObserver = modify . over observers . Set.insert

-- | Delete an 'Observer' from the 'Observers' managed by 'manageObservers'.
removeObserver
  ::  ( SetMember Process (Process q) r
    , Member (State (Observers o)) r
    , Observable o)
  => SomeObserver o -> Eff r ()
removeObserver = modify . over observers . Set.delete


-- | Send an 'Observation' to all 'SomeObserver's in the 'Observers' state.
notifyObservers
  :: forall o r q
    . ( Observable o
      , SetMember Process (Process q) r
      , Member (State (Observers o)) r)
  => SchedulerProxy q -> Observation o -> Eff r ()
notifyObservers px observation = do
  me <- asServer @o <$> self px
  os <- view observers <$> get
  mapM_ (notifySomeObserver px me observation) os

-- * Callback 'Observer'

-- | An 'Observer' that schedules the observations to an effectful callback.
data CallbackObserver o
  deriving Typeable

data instance Api (CallbackObserver o) r where
  CbObserved :: (Typeable o, Typeable (Observation o)) =>
             Server o -> Observation o -> Api (CallbackObserver o) 'Asynchronous
  deriving Typeable

deriving instance Show (Observation o) => Show (Api (CallbackObserver o) r)

instance (Observable o) => Observer (CallbackObserver o) o where
  observationMessage = CbObserved

-- | Start a new process for an 'Observer' that schedules
-- all observations to an effectful callback.
spawnCallbackObserver
  :: forall o r q .
  ( SetMember Process (Process q) r
  , Typeable o
  , Show (Observation o)
  , Observable o
  , Member (Logs String) q)
  => SchedulerProxy q
  -> (Server o -> Observation o -> Eff (Process q ': q) Bool)
  -> Eff r (Server (CallbackObserver o))
spawnCallbackObserver px onObserve =
  asServer @(CallbackObserver o)
  <$>
  (spawn @r @q $ do
      let loopUntil =
            serve px
            (ApiHandler @(CallbackObserver o)
              (handleCast loopUntil)
              (unhandledCallError px)
              (defaultTermination px))
      loopUntil)
 where
   handleCast k (CbObserved fromSvr v) =
     onObserve fromSvr v >>= flip when k

-- | Use 'spawnCallbackObserver' to create a universal logging observer,
-- using the 'Show' instance of the 'Observation'.
-- | Start a new process for an 'Observer' that schedules
-- all observations to an effectful callback.
--
-- @since 0.3.0.0
spawnLoggingObserver
  :: forall o r q .
  ( SetMember Process (Process q) r
  , Typeable o
  , Show (Observation o)
  , Observable o
  , Member (Logs String) q)
  => SchedulerProxy q
  -> Eff r (Server (CallbackObserver o))
spawnLoggingObserver px =
  spawnCallbackObserver px
  (\s o -> logMsg (show s ++ " OBSERVED: " ++ show o) >> return True)