{-# LANGUAGE UndecidableInstances #-}
module Control.Eff.Concurrent.Protocol.Observer.Queue
( ObservationQueue(..)
, ObservationQueueReader
, readObservationQueue
, tryReadObservationQueue
, flushObservationQueue
, withObservationQueue
, spawnLinkObservationQueueWriter
)
where
import Control.Concurrent.STM
import Control.Eff
import Control.Eff.Concurrent.Protocol
import Control.Eff.Concurrent.Protocol.Observer
import Control.Eff.Concurrent.Protocol.StatefulServer
import Control.Eff.Concurrent.Process
import Control.Eff.ExceptionExtra ( )
import Control.Eff.Log
import Control.Eff.Reader.Strict
import Control.Exception.Safe as Safe
import Control.Monad.IO.Class
import Control.Monad ( unless )
import qualified Data.Text as T
import Data.Typeable
import GHC.Stack
newtype ObservationQueue a = ObservationQueue (TBQueue a)
type ObservationQueueReader a = Reader (ObservationQueue a)
logPrefix :: forall o proxy . (HasCallStack, Typeable o) => proxy o -> T.Text
logPrefix px = "observation queue: " <> T.pack (show (typeRep px))
readObservationQueue
:: forall o r
. ( Member (ObservationQueueReader o) r
, HasCallStack
, MonadIO (Eff r)
, Typeable o
, Member Logs r
)
=> Eff r o
readObservationQueue = do
ObservationQueue q <- ask @(ObservationQueue o)
liftIO (atomically (readTBQueue q))
tryReadObservationQueue
:: forall o r
. ( Member (ObservationQueueReader o) r
, HasCallStack
, MonadIO (Eff r)
, Typeable o
, Member Logs r
)
=> Eff r (Maybe o)
tryReadObservationQueue = do
ObservationQueue q <- ask @(ObservationQueue o)
liftIO (atomically (tryReadTBQueue q))
flushObservationQueue
:: forall o r
. ( Member (ObservationQueueReader o) r
, HasCallStack
, MonadIO (Eff r)
, Typeable o
, Member Logs r
)
=> Eff r [o]
flushObservationQueue = do
ObservationQueue q <- ask @(ObservationQueue o)
liftIO (atomically (flushTBQueue q))
withObservationQueue
:: forall o b e len
. ( HasCallStack
, Typeable o
, Show o
, Member Logs e
, Lifted IO e
, Integral len
, Member Interrupts e
)
=> len
-> Eff (ObservationQueueReader o ': e) b
-> Eff e b
withObservationQueue queueLimit e = do
q <- lift (newTBQueueIO (fromIntegral queueLimit))
res <- handleInterrupts (return . Left)
(Right <$> runReader (ObservationQueue q) e)
rest <- lift (atomically (flushTBQueue q))
unless
(null rest)
(logNotice (logPrefix (Proxy @o) <> " unread observations: " <> T.pack (show rest)))
either (\em -> logError (T.pack (show em)) >> lift (throwIO em)) return res
spawnLinkObservationQueueWriter
:: forall o q h
. ( TangibleObserver o
, IsPdu (Observer o) 'Asynchronous
, Member Logs q
, Lifted IO q
, LogsTo h (Processes q)
, HasCallStack)
=> ObservationQueue o
-> Eff (Processes q) (Observer o)
spawnLinkObservationQueueWriter q = do
cbo <- startLink (MkObservationQueue q)
pure (toObserver cbo)
instance (TangibleObserver o, IsPdu (Observer o) 'Asynchronous, Lifted IO q, Member Logs q) => Server (ObservationQueue o) (Processes q) where
type Protocol (ObservationQueue o) = Observer o
data instance StartArgument (ObservationQueue o) (Processes q) =
MkObservationQueue (ObservationQueue o)
update (MkObservationQueue (ObservationQueue q)) =
\case
OnCast r ->
handleObservations (lift . atomically . writeTBQueue q) r
otherMsg -> logError ("unexpected: " <> T.pack (show otherMsg))