{-# LANGUAGE UndecidableInstances #-}
-- | A small process to capture and _share_ observation's by enqueueing them into an STM 'TBQueue'.
module Control.Eff.Concurrent.Protocol.Observer.Queue
  ( ObservationQueue(..)
  , Reader
  , observe
  , await
  , tryRead
  , flush
  )
where

import           Control.Concurrent.STM
import           Control.Eff
import           Control.Eff.Concurrent.Misc
import           Control.Eff.Concurrent.Protocol
import           Control.Eff.Concurrent.Protocol.Observer
import           Control.Eff.Concurrent.Protocol.StatefulServer
import           Control.Eff.Concurrent.Process
import           Control.Eff.ExceptionExtra     ( )
import           Control.Eff.Log
import qualified Control.Eff.Reader.Strict     as Eff
import           Control.Exception.Safe        as Safe
import           Control.Lens
import           Control.Monad.IO.Class
import           Control.Monad                  ( unless, when )
import qualified Data.Text                     as T
import           Data.Typeable
import           GHC.Stack
import Data.Default (Default)

-- | Contains a 'TBQueue' capturing observations.
-- See 'observe'.
newtype ObservationQueue a = ObservationQueue (TBQueue a)

-- | A 'Reader' for an 'ObservationQueue'.
type Reader a = Eff.Reader (ObservationQueue a)

logPrefix :: forall event proxy . (HasCallStack, Typeable event) => proxy event -> T.Text
logPrefix _px = "observation queue: " <> T.pack (showSTypeable @event "")

-- | Read queued observations captured and enqueued in the shared 'ObservationQueue' by 'observe'.
--
-- This blocks until something was captured or an interrupt or exceptions was thrown. For a non-blocking
-- variant use 'tryRead' or 'flush'.
--
-- @since 0.28.0
await
  :: forall event r
   . ( Member (Reader event) r
     , HasCallStack
     , MonadIO (Eff r)
     , Typeable event
     , Member Logs r
     )
  => Eff r event
await = do
  ObservationQueue q <- Eff.ask @(ObservationQueue event)
  liftIO (atomically (readTBQueue q))

-- | Read queued observations captured and enqueued in the shared 'ObservationQueue' by 'observe'.
--
-- Return the oldest enqueued observation immediately or 'Nothing' if the queue is empty.
-- Use 'await' to block until an observation is observerRegistryNotify.
--
-- @since 0.28.0
tryRead
  :: forall event r
   . ( Member (Reader event) r
     , HasCallStack
     , MonadIO (Eff r)
     , Typeable event
     , Member Logs r
     )
  => Eff r (Maybe event)
tryRead = do
  ObservationQueue q <- Eff.ask @(ObservationQueue event)
  liftIO (atomically (tryReadTBQueue q))

-- | Read at once all currently queued observations captured and enqueued
-- in the shared 'ObservationQueue' by 'observe'.
--
-- This returns immediately all currently enqueued observations.
-- For a blocking variant use 'await'.
--
-- @since 0.28.0
flush
  :: forall event r
   . ( Member (Reader event) r
     , HasCallStack
     , MonadIO (Eff r)
     , Typeable event
     , Member Logs r
     )
  => Eff r [event]
flush = do
  ObservationQueue q <- Eff.ask @(ObservationQueue event)
  liftIO (atomically (flushTBQueue q))

-- | Listen to, and capture observations in an 'ObservationQueue'.
--
-- Fork an 'Observer' process that runs only while the body expression is executed.
-- Register the observer to the observable process passed to this function.
--
-- The captured observations can be obtained by 'await',
-- 'tryRead' and 'flush'.
--
-- The queue size is limited to the given number.
--
-- ==== __Example__
--
-- @
--
-- import qualified Control.Eff.Concurrent.Observer.Queue as OQ
--
-- foo =
--   do
--     observed <- startLink SomeObservable
--     OQ.observe 100 observed $ do
--       ...
--       cast observed DoSomething
--       evt <- OQ.await \@TestEvent
--       ...
-- @
--
-- @since 0.28.0
observe
  :: forall event eventSource e q len b
  . ( HasCallStack
    , HasProcesses e q
    , FilteredLogging e
    , FilteredLogging q
    , FilteredLogging (Processes q)
    , Lifted IO e
    , Lifted IO q
    , IsObservable eventSource event
    , Integral len
    , Server (ObservationQueue event) (Processes q)
    , Tangible (Pdu eventSource 'Asynchronous)
    )
  => len
  -> Endpoint eventSource
  -> Eff (Reader event ': e) b
  -> Eff e b
observe queueLimit eventSource e =
  withObservationQueue queueLimit (withWriter @event eventSource e)


withObservationQueue
  :: forall event b e len
   . ( HasCallStack
     , Typeable event
     , Show event
     , Member Logs e
     , Lifted IO e
     , Integral len
     , Member Interrupts e
     )
  => len
  -> Eff (Reader event ': e) b
  -> Eff e b
withObservationQueue queueLimit e = do
  q   <- lift (newTBQueueIO (fromIntegral queueLimit))
  res <- handleInterrupts (return . Left)
                          (Right <$> Eff.runReader (ObservationQueue q) e)
  rest <- lift (atomically (flushTBQueue q))
  unless
    (null rest)
    (logNotice (logPrefix (Proxy @event) <> " unread observations: " <> T.pack (show rest)))
  either (\em -> logError (T.pack (show em)) >> lift (throwIO em)) return res

-- | Spawn a process that can be used as an 'Observer' that enqueues the observations into an
--   'ObservationQueue'. See 'withObservationQueue' for an example.
--
-- The observations can be obtained by 'await'. All observations are captured up to
-- the queue size limit, such that the first message received will be first message
-- returned by 'await'.
--
-- @since 0.28.0
spawnWriter
  :: forall event r q
   . ( Member Logs q
     , Lifted IO q
     , FilteredLogging (Processes q)
     , HasProcesses r q
     , Typeable event
     , HasCallStack
     , Server (ObservationQueue event) (Processes q)
     )
  => ObservationQueue event
  -> Eff r (Endpoint (Observer event))
spawnWriter q =
  startLink @_ @r @q (MkObservationQueue q)

-- | Spawn a process that can be used as an 'Observer' that enqueues the observations into an
--   'ObservationQueue'. See 'withObservationQueue'.
--
-- The observations can be obtained by 'await'. All observations are captured up to
-- the queue size limit, such that the first message received will be first message
-- returned by 'await'.
--
-- @since 0.28.0
withWriter
  :: forall event eventSource e q b
  . ( HasCallStack
    , HasProcesses e q
    , Lifted IO q
    , FilteredLogging (Processes q)
    , Member Logs q
    , IsObservable eventSource event
    , Member (Reader event) e
    , Tangible (Pdu eventSource 'Asynchronous)
    )
  => Endpoint eventSource
  -> Eff e b
  -> Eff e b
withWriter eventSource e = do
  q <- Eff.ask @(ObservationQueue event)
  w <- spawnWriter @event q
  registerObserver @event eventSource w
  res <- e
  forgetObserver @event eventSource w
  sendShutdown (w^.fromEndpoint) ExitNormally
  pure res


instance (Typeable event, Lifted IO q, Member Logs q) => Server (ObservationQueue event) (Processes q) where
  type instance Protocol (ObservationQueue event) = Observer event

  data instance StartArgument (ObservationQueue event) =
     MkObservationQueue (ObservationQueue event)

  newtype instance Model (ObservationQueue event) = MkObservationQueueModel () deriving Default

  update _ (MkObservationQueue (ObservationQueue q)) =
    \case
      OnCast (Observed !event) -> do
        isFull <- lift . atomically $ do
          isFull <- isFullTBQueue q
          unless isFull (writeTBQueue q event)
          pure isFull
        when isFull $
          logWarning "queue full"
      otherMsg ->
        logError ("unexpected: " <> T.pack (show otherMsg))