{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs            #-}
{-# LANGUAGE Rank2Types       #-}
{-# LANGUAGE TypeFamilies     #-}
--------------------------------------------------------------------------------
-- |
-- Module    :  EventSource.Aggregate
-- Copyright :  (C) 2017 Yorick Laupa
-- License   :  (see the file LICENSE)
-- Maintainer:  Yorick Laupa <yo.eight@gmail.com>
-- Stability :  experimental
-- Portability: non-portable
--
-- Implementation of an aggregate abstraction.
-- Link: https://en.wikipedia.org/wiki/Domain-driven_design.
--------------------------------------------------------------------------------
module EventSource.Aggregate
  ( StreamId(..)
  -- * Aggregate
  , Aggregate(..)
  , Validate(..)
  , Decision
  , Agg
  , aggId
  , runAgg
  , newAgg
  , loadAgg
  , loadOrCreateAgg
  -- * Interactions
  , submitCmd
  , submitEvt
  , snapshot
  , route
  , closeAgg
  , execute
  -- * Internal
  , Action'(..)
  , Action
  , askEnv
  , getState
  , putState
  , AggEnv(..)
  , AggState(..)
  , persist
  ) where

--------------------------------------------------------------------------------
import Control.Exception (SomeException, throwIO)
import Control.Monad (ap)
import Data.Foldable (for_, traverse_)

--------------------------------------------------------------------------------
import           Control.Concurrent.Async.Lifted (wait)
import qualified Control.Concurrent.Lifted as Concurrent
import           Control.Concurrent.STM (atomically)
import qualified Control.Concurrent.STM.TBMQueue as Queue
import           Control.Exception.Enclosed (tryAny)
import           Control.Monad.Base (MonadBase, liftBase)
import           Control.Monad.Except (runExceptT)
import           Control.Monad.Loops (whileJust_)
import           Control.Monad.Trans (MonadTrans(..))
import           Control.Monad.Trans.Control (MonadBaseControl)
import           Data.IORef.Lifted (IORef, newIORef, readIORef, atomicWriteIORef)

--------------------------------------------------------------------------------
import EventSource

--------------------------------------------------------------------------------
-- | Maps an id to a 'StreamName'.
class StreamId a where
  toStreamName :: a -> StreamName

--------------------------------------------------------------------------------
-- | Represents a stream aggregate. An aggregate can rebuild its internal state
--   by replaying all the stream's events that aggregate is responsible for.
class Aggregate a where
  -- | Type of the id associated to the aggregate.
  type Id a  :: *

  -- | Type of event handled by the aggregate.
  type Evt a :: *

  -- | Type of monad stack used by the aggregate.
  type M a :: * -> *

  -- | Given current aggregate state, updates it according to the event the
  --   aggregate receives.
  apply :: a -> Evt a -> M a a

--------------------------------------------------------------------------------
-- | When validating a command, tells if the command was valid. If the command
--   is valid, it returns an event. Otherwise, it returns an error.
type Decision a = Either (Err a) (Evt a)

--------------------------------------------------------------------------------
-- | Represents an aggregate that support validation. An aggregate that supports
--   validation can receive command and decide if it was valid or not. When the
--   validation is successful, The aggregate emits an event that will be
--   persisted and pass to 'apply' function.
class Aggregate a => Validate a where
  -- | Type of command supported by the aggregate.
  type Cmd a :: *

  -- | Type of error that aggregate can yield.
  type Err a :: *

  -- | Validates a command. If the command validation succeeds, it will emits
  --   an event. Otherwise, it will returns an error.
  validate :: a -> Cmd a -> M a (Decision a)

--------------------------------------------------------------------------------
-- | Internal aggregate action. An action is executed by an aggregate. An action
--   embodies fundamental operations like submitting event, validating command
--   or returning the current snapshot of an aggregate. Action are CPS-ed
--   encoded so the execution model can be flexible. An action can perform
--   synchronously or asynchronously.
newtype Action' e s m a =
  Action' { runAction :: e
                      -> s
                      -> (s -> a -> m ())
                      -> m () }

--------------------------------------------------------------------------------
instance Functor (Action' e s m) where
  fmap f (Action' k) = Action' $ \e s resp ->
    k e s (\s' a -> resp s' (f a))

--------------------------------------------------------------------------------
instance Applicative (Action' e s m) where
  pure  = return
  (<*>) = ap

--------------------------------------------------------------------------------
instance Monad (Action' e s m) where
  return a = Action' $ \_ s resp -> resp s a

  Action' k >>= f = Action' $ \e s resp ->
    k e s (\s' a -> runAction (f a) e s' resp)

--------------------------------------------------------------------------------
instance MonadTrans (Action' e s) where
  lift m = Action' $ \_ s resp -> m >>= resp s

--------------------------------------------------------------------------------
-- | Returns an action environment.
askEnv :: Action' e s m e
askEnv = Action' $ \e s resp -> resp s e

--------------------------------------------------------------------------------
-- | Returns an action current state.
getState :: Action' e s m s
getState = Action' $ \_ s resp -> resp s s

--------------------------------------------------------------------------------
-- | Set an action state.
putState :: s -> Action' e s m ()
putState s = Action' $ \_ _ resp -> resp s ()

--------------------------------------------------------------------------------
-- | An action configured to aggregate internal types.
type Action a r = Action' (AggEnv a) (AggState a) (M a) r

--------------------------------------------------------------------------------
-- | Aggregate internal environment.
data AggEnv a =
  AggEnv { aggEnvStore :: SomeStore
           -- ^ Handle to an eventstore.
         , aggEnvId :: Id a
           -- ^ Identification of the aggregate.
         }

--------------------------------------------------------------------------------
-- | Aggregate internal state.
data AggState a =
  AggState { aggStateVersion :: !ExpectedVersion
             -- ^ Expected version of the next write. This isn't expose to
             --   user and it's updated automatically.
           , aggState :: !a
             -- ^ Aggregate current state.
           }

--------------------------------------------------------------------------------
-- | A stream aggregate. An aggregate updates its internal based on the event
--  it receives. You can read its current state by using 'snapshot'. If it
--  supports validation, through 'Validated' typeclass, it can receive
--  command and emits an event if the command was successful. Otherwise, it will
--  yield an error. When receiving valid command, an aggregate will persist the
--  resulting event. An aggregate is only responsible of its own stream.
data Agg a where
  Agg :: AggEnv a
      -> M a ()
      -> IORef (Maybe SomeException) -- Last exception ever captured.
      -> (forall r. Action a r -> (r -> M a ()) -> M a ())
      -> Agg a

--------------------------------------------------------------------------------
-- | Returns an aggregate id.
aggId :: Agg a -> Id a
aggId (Agg env _ _ _) = aggEnvId env

--------------------------------------------------------------------------------
-- | Executes an action on an aggregate.
runAgg :: MonadBase IO (M a) => Agg a -> Action a r -> (r -> M a ()) -> M a ()
runAgg (Agg _ _ errRef k) action resp = do
  errLast <- readIORef errRef
  traverse_ (liftBase . throwIO) errLast
  k action resp

--------------------------------------------------------------------------------
-- | Holds an existantially quantified action so it can passed around easily
--   to aggregate's internal concurrent channel.
data Msg a where
  Msg :: Action a r -> (r -> M a ()) -> Msg a

--------------------------------------------------------------------------------
-- | Creates a new aggregate given an eventstore handle, an id and an initial
--   state.
newAgg :: (Aggregate a, MonadBaseControl IO (M a))
       => SomeStore
       -> Id a
       -> a
       -> M a (Agg a)
newAgg store aId seed = do
  queue   <- liftBase $ Queue.newTBMQueueIO 500 -- Max parked messages.
  ref     <- newIORef (AggState AnyVersion seed)
  errLast <- newIORef Nothing

  let takeFromQueue  = liftBase $ atomically $ Queue.readTBMQueue queue
      closeAggThread = liftBase $ atomically $ Queue.closeTBMQueue queue
      env            = AggEnv store aId

  _ <- Concurrent.fork $ whileJust_ takeFromQueue $ \(Msg action k) ->
         do s   <- readIORef ref
            res <- tryAny $ runAction action env s $ \s' r -> do
              atomicWriteIORef ref s'
              k r

            case res of
              Left e ->
                do atomicWriteIORef errLast (Just e)
                   liftBase $ atomically $ Queue.closeTBMQueue queue

              _ -> pure ()

  pure $ Agg env closeAggThread errLast $ \action k ->
    liftBase $ atomically $ Queue.writeTBMQueue queue (Msg action k)

--------------------------------------------------------------------------------
-- | Creates an aggregate and replays its entire stream to rebuild its
--   internal state.
loadAgg :: (Aggregate a, StreamId (Id a), DecodeEvent (Evt a), MonadBaseControl IO (M a))
        => SomeStore
        -> Id a
        -> a
        -> M a (Either ForEventFailure (Agg a))
loadAgg store aId seed = do
  agg <- newAgg store aId seed
  res <- execute agg (loadEventsAction aId)

  -- We don't need to let a running thread for nothing if we couldn't load the
  -- aggregate properly.
  case res of
    Left _ -> closeAgg agg
    _      -> pure ()

  pure (agg <$ res)

--------------------------------------------------------------------------------
-- | Like 'loadAgg' but call 'loadAgg' in case of 'ForEventFailure' error.
loadOrCreateAgg :: (Aggregate a, StreamId (Id a), DecodeEvent (Evt a), MonadBaseControl IO (M a))
                => SomeStore
                -> Id a
                -> a
                -> M a (Agg a)
loadOrCreateAgg store aId seed = do
  agg <- newAgg store aId seed
  _   <- execute agg (loadEventsAction aId)
  pure agg

--------------------------------------------------------------------------------
-- | Submits a command to the aggregate. If the command was valid, it returns
-- an event otherwise an error. In case of a valid command, the aggregate
-- persist the resulting event to the eventstore. The aggregate will also
-- update its internal state accordingly.
submitCmd :: (Validate a, MonadBase IO (M a), StreamId (Id a), EncodeEvent (Evt a))
          => Agg a
          -> Cmd a
          -> M a (Decision a)
submitCmd agg cmd = execute agg (submitCmdAction cmd)

--------------------------------------------------------------------------------
-- | Submits an event. The aggregate will update its internal state accondingly.
submitEvt :: (Aggregate a, MonadBase IO (M a)) => Agg a -> Evt a -> M a ()
submitEvt agg evt = execute agg (submitEvtAction evt)

--------------------------------------------------------------------------------
-- | Returns current aggregate state.
snapshot :: MonadBase IO (M a) => Agg a -> M a a
snapshot agg = execute agg snapshotAction

--------------------------------------------------------------------------------
-- | Uses usually by Root aggregates which usually have unusual workflow and
--  make great use of a CPS-ed computation.
--   http://blog.sapiensworks.com/post/2016/07/14/DDD-Aggregate-Decoded-1
route :: MonadBase IO (M a)
      => Agg a
      -> (SomeStore -> a -> (a -> r -> M a ()) -> M a ())
      -> M a r
route agg k = execute agg (routeAction k)

--------------------------------------------------------------------------------
-- | Executes an action.
execute :: MonadBase IO (M a) => Agg a -> Action a r -> M a r
execute agg action = do
  var <- Concurrent.newEmptyMVar
  runAgg agg action (Concurrent.putMVar var)
  Concurrent.takeMVar var

--------------------------------------------------------------------------------
-- | Persists an event to the eventstore.
persist :: (StreamId id, EncodeEvent event, MonadBase IO m)
        => SomeStore
        -> id
        -> ExpectedVersion
        -> event
        -> m EventNumber
persist store aid ver event =
  liftBase (appendEvent store (toStreamName aid) ver event >>= wait)

--------------------------------------------------------------------------------
-- | Closes internal aggregate state.
closeAgg :: Agg a -> M a ()
closeAgg (Agg _ action _ _) = action

--------------------------------------------------------------------------------
-- // Internal commands.
--------------------------------------------------------------------------------
submitCmdAction :: (Validate a, MonadBase IO (M a), StreamId (Id a), EncodeEvent (Evt a))
                => Cmd a
                -> Action a (Decision a)
submitCmdAction cmd = do
  env    <- askEnv
  s      <- getState
  result <- lift $ validate (aggState s) cmd

  for_ result $ \event ->
    do next <- lift $ persist (aggEnvStore env)
                              (aggEnvId env)
                              (aggStateVersion s)
                              event

       let s' = s { aggStateVersion = ExactVersion next }

       putState s'
       submitEvtAction event

  pure result

--------------------------------------------------------------------------------
submitEvtAction :: (Aggregate a, Monad (M a)) => Evt a -> Action a ()
submitEvtAction event = do
  s  <- getState
  a' <- lift $ apply (aggState s) event
  let s' = s { aggState = a' }
  putState s'

--------------------------------------------------------------------------------
snapshotAction :: Monad (M a) => Action a a
snapshotAction = fmap aggState getState

--------------------------------------------------------------------------------
loadEventsAction :: (Aggregate a, StreamId (Id a), DecodeEvent (Evt a), MonadBase IO (M a))
                 => Id a
                 -> Action a (Either ForEventFailure a)
loadEventsAction aId = do
  seed <- getState
  env  <- askEnv

  let go num s event =
        do a <- apply (aggState s) event
           pure s { aggState        = a
                  , aggStateVersion = ExactVersion num
                  }

  res <- lift $ runExceptT
       $ foldEventsWithNumberM (aggEnvStore env) (toStreamName aId) go seed

  traverse_ putState res
  pure (fmap aggState res)

--------------------------------------------------------------------------------
routeAction :: (SomeStore -> a -> (a -> r -> M a ()) -> M a ())
            -> Action a r
routeAction k = Action' $ \env s resp -> do
  k (aggEnvStore env) (aggState s) $ \a r ->
    let s' = s { aggState = a } in
    resp s' r