module Control.Eff.Concurrent.Api.Observer.Queue
( ObservationQueue()
, ObservationQueueReader
, readObservationQueue
, tryReadObservationQueue
, flushObservationQueue
, enqueueObservationsRegistered
, enqueueObservations
)
where
import Control.Concurrent.STM
import Control.Eff
import Control.Eff.ExceptionExtra ( )
import Control.Eff.Lift
import Control.Eff.Concurrent.Process
import Control.Eff.Log
import Control.Eff.Concurrent.Api
import Control.Eff.Concurrent.Api.Client
import Control.Eff.Concurrent.Api.Observer
import Control.Eff.Concurrent.Api.Server
import Control.Eff.Reader.Strict
import Control.Exception.Safe as Safe
import Control.Monad.IO.Class
import Control.Monad ( unless )
import Data.Typeable
import Text.Printf
import GHC.Stack
newtype ObservationQueue a = ObservationQueue (TBQueue (Observation a))
type ObservationQueueReader a = Reader (ObservationQueue a)
logPrefix :: forall o proxy . (HasCallStack, Typeable o) => proxy o -> String
logPrefix px = "observation queue: " ++ show (typeRep px)
readObservationQueue
:: forall o r
. ( Member (ObservationQueueReader o) r
, HasCallStack
, MonadIO (Eff r)
, Typeable o
, HasLogging IO r
)
=> Eff r (Observation o)
readObservationQueue = do
ObservationQueue q <- ask @(ObservationQueue o)
liftIO (atomically (readTBQueue q))
tryReadObservationQueue
:: forall o r
. ( Member (ObservationQueueReader o) r
, HasCallStack
, MonadIO (Eff r)
, Typeable o
, HasLogging IO r
)
=> Eff r (Maybe (Observation o))
tryReadObservationQueue = do
ObservationQueue q <- ask @(ObservationQueue o)
liftIO (atomically (tryReadTBQueue q))
flushObservationQueue
:: forall o r
. ( Member (ObservationQueueReader o) r
, HasCallStack
, MonadIO (Eff r)
, Typeable o
, HasLogging IO r
)
=> Eff r [Observation o]
flushObservationQueue = do
ObservationQueue q <- ask @(ObservationQueue o)
liftIO (atomically (flushTBQueue q))
enqueueObservationsRegistered
:: forall o r q a
. ( ServesApi o r q
, SetMember Process (Process q) r
, Typeable o
, Show (Observation o)
, Observable o
, HasLogging IO q
, HasLogging IO r
, Member Interrupts r
, Lifted IO r
, HasCallStack
, MonadCatch (Eff r)
)
=> SchedulerProxy q
-> Int
-> Eff (ObservationQueueReader o ': r) a
-> Eff r a
enqueueObservationsRegistered px queueLimit k = do
oSvr <- whereIsServer @o
enqueueObservations px oSvr queueLimit k
enqueueObservations
:: forall o r q a
. ( SetMember Process (Process q) r
, Typeable o
, Show (Observation o)
, Observable o
, HasLogging IO r
, HasLogging IO q
, Member Interrupts r
, Lifted IO q
, HasCallStack
, MonadCatch (Eff r)
)
=> SchedulerProxy q
-> Server o
-> Int
-> Eff (ObservationQueueReader o ': r) a
-> Eff r a
enqueueObservations px oSvr queueLimit k = withQueue
queueLimit
(do
ObservationQueue q <- ask @(ObservationQueue o)
do
logDebug
(printf "%s starting with queue limit: %d"
(logPrefix (Proxy @o))
queueLimit
)
cbo <- spawnCallbackObserver
px
(\_from observation -> do
liftIO (atomically (writeTBQueue q observation))
return HandleNextRequest
)
logDebug
(printf "%s started observer process %s"
(logPrefix (Proxy @o))
(show cbo)
)
registerObserver SchedulerProxy cbo oSvr
res <- k
forgetObserver SchedulerProxy cbo oSvr
sendShutdown px (_fromServer cbo) ExitNormally
logDebug (printf "%s stopped observer process" (logPrefix (Proxy @o)))
return res
)
withQueue
:: forall a b e len
. ( HasCallStack
, Typeable a
, Show (Observation a)
, HasLogging IO e
, MonadCatch (Eff e)
, Integral len
)
=> len
-> Eff (ObservationQueueReader a ': e) b
-> Eff e b
withQueue queueLimit e = do
q <- liftIO (newTBQueueIO (fromIntegral queueLimit))
res <- Safe.tryAny (runReader (ObservationQueue q) e)
rest <- liftIO (atomically (flushTBQueue q))
unless
(null rest)
(logNotice (logPrefix (Proxy @a) ++ " unread observations: " ++ show rest))
either (\em -> logError (show em) >> liftIO (throwIO em)) return res