-- | Asynchronous execution of conduits. This module contains a set of functions -- to concurrently execute 'Stream' processing conduits and couple them using -- 'TBQueue's. module Data.MediaBus.Conduit.Async ( withAsyncPolledSource , FrameContentQ() , mkFrameContentQ , frameContentQSink , frameContentQSource ) where import Conduit import Control.Concurrent (threadDelay) import Control.Concurrent.Async.Lifted import Control.Concurrent.STM import Control.Exception (evaluate) import Control.Lens import Control.Monad.Logger import Control.Monad.State import Control.Parallel.Strategies (NFData, rdeepseq, withStrategy) import Data.Default import Data.MediaBus.Basics.Clock import Data.MediaBus.Basics.Ticks import Data.MediaBus.Conduit.Stream import Data.MediaBus.Media.Discontinous import Data.MediaBus.Media.Stream import Data.Proxy import Data.Time.Clock import System.Random import Text.Printf import Data.String data PollFrameContentSourceSt s t = MkPollFrameContentSourceSt { _ppSeqNum :: !s , _ppTicks :: !t } makeLenses ''PollFrameContentSourceSt -- | Asynchronously run a 'Source' connected to a 'FrameContentQ' and create a -- new source that consumes the queue by polling periodically from that queue, -- generating a 'Discontinous' output. withAsyncPolledSource :: ( MonadResource m , MonadLogger m , MonadBaseControl IO m , KnownRate r , Integral t , Integral s , Default p , HasStaticDuration c , HasDuration c , NFData c , NFData p , NFData s , NFData t , Random i , Random t , Random s , Show c ) => Int -> Source m (Stream i s (Ticks r t) p c) -> ((Async (), Source m (Stream i s (Ticks r t) p (Discontinous c))) -> m o) -> m o withAsyncPolledSource !frameQueueLen !src !f = do !pq <- mkFrameContentQ frameQueueLen withAsync (runConduit (src .| frameContentQSink pq)) (\a -> f (void a, frameContentQSource pq)) -- | A queue for 'frameContent' to decouple concurrent conduits carrying -- 'Stream's. Under the hood a 'TBQueue' is used. A queue also knows it's -- default segment duration and preferred polling interval. data FrameContentQ a = MkFrameContentQ { _frameContentQSegmentDuration :: !NominalDiffTime , _frameContentQPollInterval :: !NominalDiffTime , _frameContentQRing :: !(TBQueue a) } -- | Create a new 'FrameContentQ' with an upper bound on the queue length. mkFrameContentQ :: forall m a. (HasStaticDuration a, MonadBaseControl IO m) => Int -> m (FrameContentQ a) mkFrameContentQ qlen = MkFrameContentQ segmentDuration (fromIntegral qlen * 0.5 * segmentDuration) <$> liftBase (newTBQueueIO qlen) where segmentDuration = getStaticDuration (Proxy :: Proxy a) -- | Consume the 'frameContent's of a 'Stream' and write them into a -- 'FrameContentQ'. When the queue is full, **drop the oldest element** and push -- in the new element, anyway. frameContentQSink :: (NFData a, MonadBaseControl IO m, Show a, MonadLogger m) => FrameContentQ a -> Sink (Stream i s t p a) m () frameContentQSink (MkFrameContentQ _ _ !ringRef) = awaitForever go where go !x = do maybe (return ()) pushInRing (x ^? eachFrameContent) return () where pushInRing !buf' = do isFull <- liftBase $ do !buf <- evaluate $ withStrategy rdeepseq buf' atomically $ do isFull <- isFullTBQueue ringRef when isFull (void $ readTBQueue ringRef) writeTBQueue ringRef buf return isFull when isFull $ $logInfo "queue full" -- | Periodically poll a 'FrameContentQ' and yield the content as frames with -- newly generated timestamp and sequence number values. frameContentQSource :: ( Random i , NFData c , NFData p , Default p , HasStaticDuration c , HasDuration c , MonadBaseControl IO m , MonadLogger m , KnownRate r , Integral t , Integral s , NFData t , NFData s ) => FrameContentQ c -> Source m (Stream i s (Ticks r t) p (Discontinous c)) frameContentQSource (MkFrameContentQ pTime pollIntervall ringRef) = evalStateC (MkPollFrameContentSourceSt 0 0) $ do yieldStart go False where go wasMissing = do res <- liftBase $ race (atomically $ readTBQueue ringRef) sleep case res of Left buf -> yieldNextBuffer (Got buf) >> go False Right dt -> yieldMissing dt wasMissing >> go True sleep = liftBase (do !(t0 :: ClockTime UtcClock) <- now threadDelay (_ticks pollIntervallMicros) !t1 <- now return (diffTime t1 t0 ^. utcClockTimeDiff)) yieldMissing !dt !wasMissing = do unless wasMissing ($logDebug (fromString (printf "underflow: %s" (show dt)))) replicateM_ (floor (dt / pTime)) (yieldNextBuffer Missing) yieldStart = (MkFrameCtx <$> liftBase randomIO <*> use ppTicks <*> use ppSeqNum <*> pure def) >>= yieldStartFrameCtx pollIntervallMicros :: Ticks (Hz 1000000) Int pollIntervallMicros = nominalDiffTime # pollIntervall yieldNextBuffer !buf = do let !bufferDuration = nominalDiffTime # getDuration buf !ts <- ppTicks <<+= bufferDuration !sn <- ppSeqNum <<+= 1 frm <- liftBase (evaluate (withStrategy rdeepseq $ MkFrame ts sn buf)) yieldNextFrame frm