-- | Capture 'Observation's and enqueue then into an STM 'TBQeueu'. module Control.Eff.Concurrent.Api.Observer.Queue ( ObservationQueue() , ObservationQueueReader , readObservationQueue , tryReadObservationQueue , flushObservationQueue , enqueueObservationsRegistered , enqueueObservations ) where import Control.Concurrent.STM import Control.Eff import Control.Eff.ExceptionExtra ( ) import Control.Eff.Lift import Control.Eff.Concurrent.Process import Control.Eff.Log import Control.Eff.Concurrent.Api import Control.Eff.Concurrent.Api.Client import Control.Eff.Concurrent.Api.Observer import Control.Eff.Concurrent.Api.Server import Control.Eff.Reader.Strict import Control.Exception.Safe as Safe import Control.Monad.IO.Class import Control.Monad ( unless ) import Data.Typeable import Text.Printf import GHC.Stack -- | Contains a 'TBQueue' capturing observations received by 'enqueueObservationsRegistered' -- or 'enqueueObservations'. newtype ObservationQueue a = ObservationQueue (TBQueue (Observation a)) -- | A 'Reader' for an 'ObservationQueue'. type ObservationQueueReader a = Reader (ObservationQueue a) logPrefix :: forall o proxy . (HasCallStack, Typeable o) => proxy o -> String logPrefix px = "observation queue: " ++ show (typeRep px) -- | Read queued observations captured by observing a 'Server' that implements -- an 'Observable' 'Api' using 'enqueueObservationsRegistered' or 'enqueueObservations'. -- This blocks until the next 'Observation' received. For a non-blocking -- variant use 'tryReadObservationQueue' or 'flushObservationQueue'. readObservationQueue :: forall o r . ( Member (ObservationQueueReader o) r , HasCallStack , MonadIO (Eff r) , Typeable o , HasLogging IO r ) => Eff r (Observation o) readObservationQueue = do ObservationQueue q <- ask @(ObservationQueue o) liftIO (atomically (readTBQueue q)) -- | Read queued observations captured by observing a 'Server' that implements -- an 'Observable' 'Api' using 'enqueueObservationsRegistered' or 'enqueueObservations'. -- Return the next 'Observation' immediately or 'Nothing' if the queue is empty. -- Use 'readObservationQueue' to block until an observation is observed. tryReadObservationQueue :: forall o r . ( Member (ObservationQueueReader o) r , HasCallStack , MonadIO (Eff r) , Typeable o , HasLogging IO r ) => Eff r (Maybe (Observation o)) tryReadObservationQueue = do ObservationQueue q <- ask @(ObservationQueue o) liftIO (atomically (tryReadTBQueue q)) -- | Read all currently queued 'Observation's captured by 'enqueueObservations'. -- This returns immediately all currently enqueued 'Observation's. For a blocking -- variant use 'readObservationQueue'. flushObservationQueue :: forall o r . ( Member (ObservationQueueReader o) r , HasCallStack , MonadIO (Eff r) , Typeable o , HasLogging IO r ) => Eff r [Observation o] flushObservationQueue = do ObservationQueue q <- ask @(ObservationQueue o) liftIO (atomically (flushTBQueue q)) -- | Observe a(the) registered 'Server' that implements an 'Observable' 'Api'. -- Based on 'enqueueObservations'. enqueueObservationsRegistered :: forall o r q a . ( ServesApi o r q , SetMember Process (Process q) r , Typeable o , Show (Observation o) , Observable o , HasLogging IO q , HasLogging IO r , Member Interrupts r , Lifted IO r , HasCallStack , MonadCatch (Eff r) ) => SchedulerProxy q -> Int -> Eff (ObservationQueueReader o ': r) a -> Eff r a enqueueObservationsRegistered px queueLimit k = do oSvr <- whereIsServer @o enqueueObservations px oSvr queueLimit k -- | Observe a 'Server' that implements an 'Observable' 'Api', the 'Observation's -- can be obtained by 'readObservationQueue'. All observations are captured up to -- the queue size limit, such that the first message received will be first message -- returned by 'readObservationQueue'. -- -- This function captures runtime exceptions and cleans up accordingly. enqueueObservations :: forall o r q a . ( SetMember Process (Process q) r , Typeable o , Show (Observation o) , Observable o , HasLogging IO r , HasLogging IO q , Member Interrupts r , Lifted IO q , HasCallStack , MonadCatch (Eff r) ) => SchedulerProxy q -> Server o -> Int -> Eff (ObservationQueueReader o ': r) a -> Eff r a enqueueObservations px oSvr queueLimit k = withQueue queueLimit (do ObservationQueue q <- ask @(ObservationQueue o) do logDebug (printf "%s starting with queue limit: %d" (logPrefix (Proxy @o)) queueLimit ) cbo <- spawnCallbackObserver px (\_from observation -> do liftIO (atomically (writeTBQueue q observation)) return HandleNextRequest ) logDebug (printf "%s started observer process %s" (logPrefix (Proxy @o)) (show cbo) ) registerObserver SchedulerProxy cbo oSvr res <- k forgetObserver SchedulerProxy cbo oSvr sendShutdown px (_fromServer cbo) ExitNormally logDebug (printf "%s stopped observer process" (logPrefix (Proxy @o))) return res ) withQueue :: forall a b e len . ( HasCallStack , Typeable a , Show (Observation a) , HasLogging IO e , MonadCatch (Eff e) , Integral len ) => len -> Eff (ObservationQueueReader a ': e) b -> Eff e b withQueue queueLimit e = do q <- liftIO (newTBQueueIO (fromIntegral queueLimit)) res <- Safe.tryAny (runReader (ObservationQueue q) e) rest <- liftIO (atomically (flushTBQueue q)) unless (null rest) (logNotice (logPrefix (Proxy @a) ++ " unread observations: " ++ show rest)) either (\em -> logError (show em) >> liftIO (throwIO em)) return res