{-# LANGUAGE UndecidableInstances #-}
module Control.Eff.Concurrent.Protocol.Observer.Queue
( ObservationQueue(..)
, Reader
, observe
, await
, tryRead
, flush
)
where
import Control.Concurrent.STM
import Control.Eff
import Control.Eff.Concurrent.Misc
import Control.Eff.Concurrent.Protocol
import Control.Eff.Concurrent.Protocol.Observer
import Control.Eff.Concurrent.Protocol.StatefulServer
import Control.Eff.Concurrent.Process
import Control.Eff.ExceptionExtra ( )
import Control.Eff.Log
import qualified Control.Eff.Reader.Strict as Eff
import Control.Exception.Safe as Safe
import Control.Lens
import Control.Monad.IO.Class
import Control.Monad ( unless, when )
import qualified Data.Text as T
import Data.Typeable
import GHC.Stack
import Data.Default (Default)
newtype ObservationQueue a = ObservationQueue (TBQueue a)
type Reader a = Eff.Reader (ObservationQueue a)
logPrefix :: forall event proxy . (HasCallStack, Typeable event) => proxy event -> T.Text
logPrefix _px = "observation queue: " <> T.pack (showSTypeable @event "")
await
:: forall event r
. ( Member (Reader event) r
, HasCallStack
, MonadIO (Eff r)
, Typeable event
, Member Logs r
)
=> Eff r event
await = do
ObservationQueue q <- Eff.ask @(ObservationQueue event)
liftIO (atomically (readTBQueue q))
tryRead
:: forall event r
. ( Member (Reader event) r
, HasCallStack
, MonadIO (Eff r)
, Typeable event
, Member Logs r
)
=> Eff r (Maybe event)
tryRead = do
ObservationQueue q <- Eff.ask @(ObservationQueue event)
liftIO (atomically (tryReadTBQueue q))
flush
:: forall event r
. ( Member (Reader event) r
, HasCallStack
, MonadIO (Eff r)
, Typeable event
, Member Logs r
)
=> Eff r [event]
flush = do
ObservationQueue q <- Eff.ask @(ObservationQueue event)
liftIO (atomically (flushTBQueue q))
observe
:: forall event eventSource e q len b
. ( HasCallStack
, HasProcesses e q
, FilteredLogging e
, FilteredLogging q
, FilteredLogging (Processes q)
, Lifted IO e
, Lifted IO q
, IsObservable eventSource event
, Integral len
, Server (ObservationQueue event) (Processes q)
, Tangible (Pdu eventSource 'Asynchronous)
)
=> len
-> Endpoint eventSource
-> Eff (Reader event ': e) b
-> Eff e b
observe queueLimit eventSource e =
withObservationQueue queueLimit (withWriter @event eventSource e)
withObservationQueue
:: forall event b e len
. ( HasCallStack
, Typeable event
, Show event
, Member Logs e
, Lifted IO e
, Integral len
, Member Interrupts e
)
=> len
-> Eff (Reader event ': e) b
-> Eff e b
withObservationQueue queueLimit e = do
q <- lift (newTBQueueIO (fromIntegral queueLimit))
res <- handleInterrupts (return . Left)
(Right <$> Eff.runReader (ObservationQueue q) e)
rest <- lift (atomically (flushTBQueue q))
unless
(null rest)
(logNotice (logPrefix (Proxy @event) <> " unread observations: " <> T.pack (show rest)))
either (\em -> logError (T.pack (show em)) >> lift (throwIO em)) return res
spawnWriter
:: forall event r q
. ( Member Logs q
, Lifted IO q
, FilteredLogging (Processes q)
, HasProcesses r q
, Typeable event
, HasCallStack
, Server (ObservationQueue event) (Processes q)
)
=> ObservationQueue event
-> Eff r (Endpoint (Observer event))
spawnWriter q =
startLink @_ @r @q (MkObservationQueue q)
withWriter
:: forall event eventSource e q b
. ( HasCallStack
, HasProcesses e q
, Lifted IO q
, FilteredLogging (Processes q)
, Member Logs q
, IsObservable eventSource event
, Member (Reader event) e
, Tangible (Pdu eventSource 'Asynchronous)
)
=> Endpoint eventSource
-> Eff e b
-> Eff e b
withWriter eventSource e = do
q <- Eff.ask @(ObservationQueue event)
w <- spawnWriter @event q
registerObserver @event eventSource w
res <- e
forgetObserver @event eventSource w
sendShutdown (w^.fromEndpoint) ExitNormally
pure res
instance (Typeable event, Lifted IO q, Member Logs q) => Server (ObservationQueue event) (Processes q) where
type instance Protocol (ObservationQueue event) = Observer event
data instance StartArgument (ObservationQueue event) =
MkObservationQueue (ObservationQueue event)
newtype instance Model (ObservationQueue event) = MkObservationQueueModel () deriving Default
update _ (MkObservationQueue (ObservationQueue q)) =
\case
OnCast (Observed !event) -> do
isFull <- lift . atomically $ do
isFull <- isFullTBQueue q
unless isFull (writeTBQueue q event)
pure isFull
when isFull $
logWarning "queue full"
otherMsg ->
logError ("unexpected: " <> T.pack (show otherMsg))