{-# LANGUAGE UndecidableInstances #-}
module Control.Eff.Concurrent.Protocol.Observer.Queue
( ObservationQueue(..)
, ObservationQueueReader
, observe
, await
, tryRead
, flush
)
where
import Control.Concurrent.STM
import Control.Eff
import Control.Eff.Concurrent.Protocol
import Control.Eff.Concurrent.Protocol.Observer
import Control.Eff.Concurrent.Protocol.StatefulServer
import Control.Eff.Concurrent.Process
import Control.Eff.ExceptionExtra ( )
import Control.Eff.Log
import Control.Eff.Reader.Strict
import Control.Exception.Safe as Safe
import Control.Lens
import Control.Monad.IO.Class
import Control.Monad ( unless, when )
import qualified Data.Text as T
import Data.Typeable
import GHC.Stack
newtype ObservationQueue a = ObservationQueue (TBQueue a)
type ObservationQueueReader a = Reader (ObservationQueue a)
logPrefix :: forall event proxy . (HasCallStack, Typeable event) => proxy event -> T.Text
logPrefix px = "observation queue: " <> T.pack (show (typeRep px))
await
:: forall event r
. ( Member (ObservationQueueReader event) r
, HasCallStack
, MonadIO (Eff r)
, Typeable event
, Member Logs r
)
=> Eff r event
await = do
ObservationQueue q <- ask @(ObservationQueue event)
liftIO (atomically (readTBQueue q))
tryRead
:: forall event r
. ( Member (ObservationQueueReader event) r
, HasCallStack
, MonadIO (Eff r)
, Typeable event
, Member Logs r
)
=> Eff r (Maybe event)
tryRead = do
ObservationQueue q <- ask @(ObservationQueue event)
liftIO (atomically (tryReadTBQueue q))
flush
:: forall event r
. ( Member (ObservationQueueReader event) r
, HasCallStack
, MonadIO (Eff r)
, Typeable event
, Member Logs r
)
=> Eff r [event]
flush = do
ObservationQueue q <- ask @(ObservationQueue event)
liftIO (atomically (flushTBQueue q))
observe
:: forall event eventSource e q len b
. ( HasCallStack
, HasProcesses e q
, LogIo q
, LogIo e
, IsObservable eventSource event
, Integral len
, Server (ObservationQueue event) (Processes q)
, Tangible (Pdu eventSource 'Asynchronous)
)
=> len
-> Endpoint eventSource
-> Eff (ObservationQueueReader event ': e) b
-> Eff e b
observe queueLimit eventSource e =
withObservationQueue queueLimit (withWriter @event eventSource e)
withObservationQueue
:: forall event b e len
. ( HasCallStack
, Typeable event
, Show event
, Member Logs e
, Lifted IO e
, Integral len
, Member Interrupts e
)
=> len
-> Eff (ObservationQueueReader event ': e) b
-> Eff e b
withObservationQueue queueLimit e = do
q <- lift (newTBQueueIO (fromIntegral queueLimit))
res <- handleInterrupts (return . Left)
(Right <$> runReader (ObservationQueue q) e)
rest <- lift (atomically (flushTBQueue q))
unless
(null rest)
(logNotice (logPrefix (Proxy @event) <> " unread observations: " <> T.pack (show rest)))
either (\em -> logError (T.pack (show em)) >> lift (throwIO em)) return res
spawnWriter
:: forall event r q h
. ( Member Logs q
, Lifted IO q
, LogsTo h (Processes q)
, HasProcesses r q
, Typeable event
, HasCallStack
, Server (ObservationQueue event) (Processes q)
)
=> ObservationQueue event
-> Eff r (Endpoint (Observer event))
spawnWriter q =
start @_ @r @q @h (MkObservationQueue q)
withWriter
:: forall event eventSource e q b
. ( HasCallStack
, HasProcesses e q
, LogIo q
, IsObservable eventSource event
, Member (ObservationQueueReader event) e
, Tangible (Pdu eventSource 'Asynchronous)
)
=> Endpoint eventSource
-> Eff e b
-> Eff e b
withWriter eventSource e = do
q <- ask @(ObservationQueue event)
w <- spawnWriter @event q
registerObserver @event eventSource w
res <- e
forgetObserver @event eventSource w
sendShutdown (w^.fromEndpoint) ExitNormally
pure res
instance (Typeable event, Lifted IO q, Member Logs q) => Server (ObservationQueue event) (Processes q) where
type instance Protocol (ObservationQueue event) = Observer event
data instance StartArgument (ObservationQueue event) (Processes q) =
MkObservationQueue (ObservationQueue event)
update _ (MkObservationQueue (ObservationQueue q)) =
\case
OnCast (Observed !event) -> do
isFull <- lift . atomically $ do
isFull <- isFullTBQueue q
unless isFull (writeTBQueue q event)
pure isFull
when isFull $
logWarning "queue full"
otherMsg ->
logError ("unexpected: " <> T.pack (show otherMsg))