module Data.MediaBus.Rtp.Source
( type RtpStream
, rtpSource
, rtpPayloadDemux
, type RtpPayloadHandler
) where
import Conduit
import Control.Lens
import Control.Monad
import Control.Monad.Logger
import qualified Data.ByteString as B
import Data.Default
import qualified Data.List
import Data.Maybe
import Data.MediaBus
import Data.MediaBus.Rtp.Packet
import Data.String
import Text.Printf
type RtpStream p = Stream RtpSsrc RtpSeqNum RtpTimestamp p RtpPayload
data RRState ctx = MkRRState
{ _currCtx :: ctx
, _isFirstPacket :: Bool
} deriving (Show)
makeLenses ''RRState
rtpSource
:: ( Show i
, Show s
, Show t
, Show p
, Default i
, Monad m
, Default p
, MonadLogger m
)
=> Conduit (Stream i s t p B.ByteString) m (RtpStream p)
rtpSource =
evalStateC (MkRRState (MkFrameCtx def def def def) True) $
awaitForever processFrames
where
processFrames frm@(MkStream (Start _)) =
$logInfo (fromString ("state frame received: " ++ show frm))
processFrames (MkStream (Next (MkFrame _ _ !contentIn))) =
case deserialize contentIn of
Left rtpError -> logRtpError rtpError
Right rtpPacket -> do
let rtpHeader = header rtpPacket
res <- updateState rtpHeader
unless (res == FrameCtxNotChanged) (yieldStreamStart res)
yieldStreamNext (body rtpPacket)
logRtpError e = do
ctx <- use currCtx
$logError (fromString (printf "rtp packet parse error: %s, frame-context: %s" e (show ctx)))
updateState rtpHeader = do
oldCtx <-
currCtx <<%=
((frameCtxSeqNumRef .~ sequenceNumber rtpHeader) .
(frameCtxTimestampRef .~ headerTimestamp rtpHeader))
wasFirstPacket <- isFirstPacket <<.= False
if oldCtx ^. frameCtxSourceId /= ssrc rtpHeader
then do
currCtx . frameCtxSourceId .= ssrc rtpHeader
return RtpSsrcChanged
else if sequenceNumbersDifferTooMuch
(oldCtx ^. frameCtxSeqNumRef)
(sequenceNumber rtpHeader)
then return RtpSequenceNumberGap
else if timestampsDifferTooMuch
(oldCtx ^. frameCtxTimestampRef)
(headerTimestamp rtpHeader)
then return RtpTimestampGap
else if wasFirstPacket
then return NewRtpSession
else return FrameCtxNotChanged
where
sequenceNumbersDifferTooMuch oldSN currSN =
let d =
if currSN >= oldSN
then currSN oldSN
else oldSN currSN
sequenceNumberMaxDelta = 10
in d >= sequenceNumberMaxDelta
timestampsDifferTooMuch oldTS currTS =
let d =
if currTS >= oldTS
then currTS oldTS
else oldTS currTS
timestampMaxDelta = 2000
in d >= timestampMaxDelta
yieldStreamStart why = do
fx <- use currCtx
$logDebug (fromString (show why ++ ": starting rtp stream: " ++ show fx))
yieldStartFrameCtx fx
yieldStreamNext p = do
ts <- use (currCtx . frameCtxTimestampRef)
sn <- use (currCtx . frameCtxSeqNumRef)
yield (MkStream (Next (MkFrame ts sn p)))
data RRSourceChange
= NewRtpSession
| RtpSsrcChanged
| RtpSequenceNumberGap
| RtpTimestampGap
| FrameCtxNotChanged
deriving (Eq, Show)
rtpPayloadDemux
:: (Integral t, Monad m)
=> [(RtpPayloadType, RtpPayloadHandler (Ticks r t) c)]
-> c
-> Conduit (RtpStream p) m (Stream RtpSsrc RtpSeqNum (Ticks r t) p c)
rtpPayloadDemux payloadTable fallbackContent =
mapC (timestamp %~ (MkTicks . fromIntegral . _rtpTimestamp)) .|
awaitForever go
where
setFallbackContent = framePayload .~ fallbackContent
go (MkStream (Next !frm)) =
let pt = frm ^. framePayload . rtpPayloadType
mHandler = Data.List.lookup pt payloadTable
!frm' = fromMaybe setFallbackContent mHandler frm
in yieldNextFrame frm'
go (MkStream (Start !frmCtx)) = yieldStartFrameCtx frmCtx
type RtpPayloadHandler t c = Frame RtpSeqNum t RtpPayload -> Frame RtpSeqNum t c