module Data.ZoomCache.Write (
ZoomWrite(..)
, writeData
, writeDataVBR
, ZoomW
, withFileWrite
, flush
, ZoomWHandle
, openWrite
, closeWrite
, watermark
, setWatermark
, mkTrackSpec
, oneTrack
) where
import Blaze.ByteString.Builder hiding (flush)
import Control.Applicative ((<$>))
import Control.Monad.State
import Data.ByteString (ByteString)
import qualified Data.ByteString as B
import qualified Data.ByteString.Char8 as C
import Data.Dynamic
import qualified Data.Foldable as Fold
import Data.IntMap (IntMap)
import qualified Data.IntMap as IM
import Data.Monoid
import System.IO
import Blaze.ByteString.Builder.ZoomCache
import Blaze.ByteString.Builder.ZoomCache.Internal
import Data.ZoomCache.Common
import Data.ZoomCache.Format
import Data.ZoomCache.Types
class ZoomWrite t where
write :: TrackNo -> t -> ZoomW ()
data ZoomWHandle = ZoomWHandle
{ whHandle :: Handle
, whTrackWork :: !(IntMap TrackWork)
, whDeferred :: IntMap Builder
, whWriteData :: Bool
}
data TrackWork = TrackWork
{ twSpec :: TrackSpec
, twBuilder :: Builder
, twTSBuilder :: Builder
, twWriter :: Maybe ZoomWork
, twCount :: !Int
, twWatermark :: !Int
, twEntryTime :: !TimeStamp
, twExitTime :: !TimeStamp
}
type ZoomW = StateT ZoomWHandle IO
withFileWrite :: TrackMap
-> Bool
-> ZoomW ()
-> FilePath
-> IO ()
withFileWrite ztypes doRaw f path = do
z <- openWrite ztypes doRaw path
z' <- execStateT (f >> flush) z
hClose (whHandle z')
flush :: ZoomW ()
flush = do
h <- gets whHandle
tracks <- gets whTrackWork
doRaw <- gets whWriteData
when doRaw $
liftIO $ Fold.mapM_ (B.hPut h) $ IM.mapWithKey bsFromTrack tracks
mapM_ (uncurry flushSummary) (IM.assocs tracks)
pending <- mconcat . IM.elems <$> gets whDeferred
liftIO . B.hPut h . toByteString $ pending
modify $ \z -> z
{ whTrackWork = IM.map flushTrack (whTrackWork z)
, whDeferred = IM.empty
}
where
flushTrack :: TrackWork -> TrackWork
flushTrack tw = d{twWriter = clearWork <$> (twWriter tw)}
where
d = mkTrackWork (twSpec tw) (twExitTime tw) (twWatermark tw)
openWrite :: TrackMap
-> Bool
-> FilePath
-> IO ZoomWHandle
openWrite trackMap doRaw path = do
h <- openFile path WriteMode
let global = mkGlobal (IM.size trackMap)
writeGlobalHeader h global
let tracks = IM.foldWithKey addTrack IM.empty trackMap
mapM_ (uncurry (writeTrackHeader h)) (IM.assocs trackMap)
return $ ZoomWHandle h tracks IM.empty doRaw
where
addTrack :: TrackNo -> TrackSpec
-> IntMap TrackWork
-> IntMap TrackWork
addTrack trackNo spec = IM.insert trackNo trackState
where
trackState = mkTrackWork spec (TS 0) 1024
closeWrite :: ZoomWHandle -> IO ()
closeWrite z = hClose (whHandle z)
oneTrack :: (ZoomReadable a) => a -> DataRateType -> Rational -> ByteString -> TrackMap
oneTrack a !drType !rate !name = IM.singleton 1 (mkTrackSpec a drType rate name)
mkTrackSpec :: (ZoomReadable a) => a -> DataRateType -> Rational -> ByteString -> TrackSpec
mkTrackSpec a = TrackSpec (Codec a)
watermark :: TrackNo -> ZoomW (Maybe Int)
watermark trackNo = do
track <- IM.lookup trackNo <$> gets whTrackWork
return (twWatermark <$> track)
setWatermark :: TrackNo -> Int -> ZoomW ()
setWatermark trackNo w = modifyTrack trackNo f
where
f :: TrackWork -> TrackWork
f tw = tw { twWatermark = w }
writeGlobalHeader :: Handle -> Global -> IO ()
writeGlobalHeader h = B.hPut h . toByteString . fromGlobal
writeTrackHeader :: Handle -> Int -> TrackSpec -> IO ()
writeTrackHeader h trackNo TrackSpec{..} = do
B.hPut h . mconcat $
[ trackHeader
, toByteString $ mconcat
[ fromTrackNo trackNo
, fromCodec specType
, fromDataRateType specDRType
, fromRational64 specRate
, fromIntegral32be . C.length $ specName
]
, specName
]
incTimeStamp :: TimeStamp -> TimeStamp
incTimeStamp (TS t) = let t' = (t+1) in t' `seq` (TS t')
incTime :: TrackNo -> ZoomW ()
incTime trackNo = modifyTrack trackNo $ \tw -> tw
{ twEntryTime = if twCount tw == 0
then (incTimeStamp (twEntryTime tw))
else twEntryTime tw
, twExitTime = incTimeStamp (twExitTime tw)
}
setTime :: TrackNo -> TimeStamp -> ZoomW ()
setTime trackNo t = modifyTrack trackNo $ \tw -> tw
{ twEntryTime = if twCount tw == 0 then t else twEntryTime tw
, twExitTime = t
}
flushIfNeeded :: TrackNo -> ZoomW ()
flushIfNeeded trackNo = do
zt <- IM.lookup trackNo <$> gets whTrackWork
case zt of
Just track -> when (flushNeeded track) flush
Nothing -> error "no such track"
where
flushNeeded :: TrackWork -> Bool
flushNeeded TrackWork{..} = twCount >= twWatermark
writeData :: (Typeable a, ZoomWrite a, ZoomWritable a)
=> TrackNo -> a -> ZoomW ()
writeData trackNo d = do
incTime trackNo
doRaw <- gets whWriteData
when doRaw $
modifyTrack trackNo $ \z -> z { twBuilder = twBuilder z <> fromRaw d }
modifyTrack trackNo $ \z -> let c = (twCount z) in c `seq` z
{ twCount = c + 1
, twWriter = updateWork (twExitTime z) d (twWriter z)
}
flushIfNeeded trackNo
writeDataVBR :: (Typeable a, ZoomWrite a, ZoomWritable a)
=> TrackNo -> (TimeStamp, a) -> ZoomW ()
writeDataVBR trackNo (t, d) = do
setTime trackNo t
doRaw <- gets whWriteData
when doRaw $
modifyTrack trackNo $ \z -> z
{ twBuilder = twBuilder z <> fromRaw d
, twTSBuilder = twTSBuilder z <> fromTimeStamp t
}
modifyTrack trackNo $ \z -> let c = (twCount z) in c `seq` z
{ twCount = c + 1
, twWriter = updateWork t d (twWriter z)
}
flushIfNeeded trackNo
mkGlobal :: Int -> Global
mkGlobal n = Global
{ version = Version versionMajor versionMinor
, noTracks = n
, presentationTime = 0
, baseTime = 0
, baseUTC = Nothing
}
modifyTracks :: (IntMap TrackWork -> IntMap TrackWork) -> ZoomW ()
modifyTracks f = modify (\z -> z { whTrackWork = f (whTrackWork z) })
modifyTrack :: TrackNo -> (TrackWork -> TrackWork) -> ZoomW ()
modifyTrack trackNo f = modifyTracks (IM.adjust f trackNo)
bsFromTrack :: TrackNo -> TrackWork -> ByteString
bsFromTrack trackNo TrackWork{..} = toByteString $ mconcat
[ fromByteString packetHeader
, fromIntegral32be trackNo
, fromTimeStamp twEntryTime
, fromTimeStamp twExitTime
, fromIntegral32be twCount
, fromIntegral32be (len twBuilder + len twTSBuilder)
, twBuilder
, twTSBuilder
]
where
len = B.length . toByteString
mkTrackWork :: TrackSpec -> TimeStamp -> Int -> TrackWork
mkTrackWork !spec !entry !w = TrackWork
{ twSpec = spec
, twBuilder = mempty
, twTSBuilder = mempty
, twCount = 0
, twWatermark = w
, twEntryTime = entry
, twExitTime = entry
, twWriter = Nothing
}
clearWork :: ZoomWork -> ZoomWork
clearWork (ZoomWork l _) = ZoomWork l Nothing
updateWork :: (Typeable b, ZoomWritable b)
=> TimeStamp -> b
-> Maybe ZoomWork
-> Maybe ZoomWork
updateWork !t !d Nothing = Just (ZoomWork IM.empty (Just cw))
where
cw = updateSummaryData t d (initSummaryWork t)
updateWork !t !d (Just (ZoomWork l Nothing)) =
case cw'm of
Just _ -> Just (ZoomWork l cw'm)
Nothing -> Nothing
where
cw'm = case (fromDynamic . toDyn $ d) of
Just d' -> Just (updateSummaryData t d' (initSummaryWork t))
Nothing -> Nothing
updateWork !t !d (Just (ZoomWork l (Just cw))) =
case cw'm of
Just _ -> Just (ZoomWork l cw'm)
Nothing -> Nothing
where
cw'm = case (fromDynamic . toDyn $ d) of
Just d' -> Just (updateSummaryData t d' cw)
Nothing -> Nothing
flushSummary :: TrackNo -> TrackWork -> ZoomW ()
flushSummary trackNo TrackWork{..} = case twWriter of
Just writer -> do
let (writer', bs) = flushWork trackNo twEntryTime twExitTime writer
modify $ \z -> z { whDeferred = IM.unionWith mappend (whDeferred z) bs }
modifyTrack trackNo (\ztt -> ztt { twWriter = Just writer' } )
_ -> return ()
flushWork :: TrackNo -> TimeStamp -> TimeStamp
-> ZoomWork -> (ZoomWork, IntMap Builder)
flushWork _ _ _ op@(ZoomWork _ Nothing) = (op, IM.empty)
flushWork trackNo entryTime exitTime (ZoomWork l (Just cw)) =
(ZoomWork l' (Just cw), bs)
where
(bs, l') = pushSummary s IM.empty l
s = Summary
{ summaryTrack = trackNo
, summaryLevel = 1
, summaryEntryTime = entryTime
, summaryExitTime = exitTime
, summaryData = toSummaryData dur cw
}
dur = TSDiff $ (unTS exitTime) (unTS entryTime)
pushSummary :: (ZoomWritable a)
=> Summary a
-> IntMap Builder -> IntMap (Summary a -> Summary a)
-> (IntMap Builder, IntMap (Summary a -> Summary a))
pushSummary s bs l = do
case IM.lookup (summaryLevel s) l of
Just g -> pushSummary (g s) bs' cleared
Nothing -> (bs', inserted)
where
bs' = IM.insert (summaryLevel s) (fromSummary s) bs
f next = (s `appendSummary` next) { summaryLevel = summaryLevel s + 1 }
inserted = IM.insert (summaryLevel s) f l
cleared = IM.delete (summaryLevel s) l
appendSummary :: (ZoomWritable a) => Summary a -> Summary a -> Summary a
appendSummary s1 s2 = Summary
{ summaryTrack = summaryTrack s1
, summaryLevel = summaryLevel s1
, summaryEntryTime = summaryEntryTime s1
, summaryExitTime = summaryExitTime s2
, summaryData = appendSummaryData (dur s1) (summaryData s1)
(dur s2) (summaryData s2)
}
where
dur = summaryDuration
(<>) :: Monoid a => a -> a -> a
(<>) = mappend