module Data.MediaBus.Conduit.Reorder
( reorderFramesBySeqNumC
, reorderFramesByC
) where
import Conduit
import Control.Lens
import Control.Monad.State.Strict
import Data.Default
import Data.MediaBus.Basics.OrderedBy
import Data.MediaBus.Basics.Sequence
import Data.MediaBus.Basics.Series
import Data.MediaBus.Media.Stream
import qualified Data.Set as Set
data ReorderSt a b c = MkReorderSt
{ _expectedRank :: !a
, _frameQueue :: !(Set.Set (OrderedBy a b))
, _frameDrops :: !Int
, _lastFrameCtx :: !c
}
makeLenses ''ReorderSt
reorderFramesBySeqNumC
:: (Default s, Default i, Default t, Default p, Num s, Ord s, Monad m)
=> Int
-> Conduit (Stream i s t p c) m (Stream i s t p c)
reorderFramesBySeqNumC = reorderFramesByC seqNum (+ 1)
reorderFramesByC
:: ( Monad m
, Ord rank
, Default i
, Default t
, Default s
, Default p
, Default rank
)
=> Lens' (Stream i s t p c) rank
-> (rank -> rank)
-> Int
-> Conduit (Stream i s t p c) m (Stream i s t p c)
reorderFramesByC !frameRank !getNextRank !maxQueueLen =
evalStateC (MkReorderSt def Set.empty 0 def) go
where
maxDrops = maxQueueLen
go = do
awaitForever handleNext
flushQueue
where
handleNext s@(MkStream (Start !ctx)) = do
flushQueue
yield s
put (MkReorderSt (s ^. frameRank) Set.empty 0 ctx)
handleNext !frm = do
!expRank <- use expectedRank
let !currRank = frm ^. frameRank
case compare currRank expRank of
EQ -> do
yieldNext frm
maybeYieldNextFromQueue
LT
-> do
framesDropped <- frameDrops <+= 1
when (framesDropped == maxDrops) $ do
flushQueue
ctx <- use lastFrameCtx
let start =
MkStream (Start ctx) & frameRank .~ (frm ^. frameRank)
MkStream (Start ctx') = start
lastFrameCtx .= ctx'
yield start
yieldNext frm
GT -> do
frameQueue %= Set.insert (MkOrderedBy (view frameRank frm) frm)
maybeYieldNextFromQueue
yieldNext !frm = do
expectedRank .= frm ^. frameRank
updateExpectedRank
frameDrops .= 0
yield frm
flushQueue = do
!q <- frameQueue <<.= Set.empty
mapM_ (yieldNext . orderedByValue) (Set.toAscList q)
maybeYieldNextFromQueue = do
!q <- use frameQueue
!expRank <- use expectedRank
case Set.minView q of
Nothing -> return ()
Just (MkOrderedBy !currRank !candidate, !q') ->
let !isQueueFull = Set.size q == maxQueueLen
!isNextInQueue = currRank <= expRank
in when (isQueueFull || isNextInQueue) $ do
frameQueue .= q'
yieldNext candidate
maybeYieldNextFromQueue
updateExpectedRank = expectedRank %= getNextRank