module Data.ZoomCache.Write (
ZoomWrite(..)
, writeData
, writeDataVBR
, ZoomW
, withFileWrite
, flush
, ZoomWHandle
, openWrite
, closeWrite
, watermark
, setWatermark
, mkTrackSpec
, oneTrack
) where
import Blaze.ByteString.Builder hiding (flush)
import Codec.Compression.Zlib
import Control.Applicative ((<$>))
import Control.Monad.State
import Data.ByteString (ByteString)
import qualified Data.ByteString as B
import qualified Data.ByteString.Char8 as C
import qualified Data.ByteString.Lazy as L
import Data.Dynamic
import qualified Data.Foldable as Fold
import Data.IntMap (IntMap)
import qualified Data.IntMap as IM
import Data.List (foldl')
import Data.Monoid
import System.IO
import Blaze.ByteString.Builder.ZoomCache
import Blaze.ByteString.Builder.ZoomCache.Internal
import Data.ZoomCache.Common
import Data.ZoomCache.Format
import Data.ZoomCache.Numeric.Delta
import Data.ZoomCache.Types
class ZoomWrite t where
write :: TrackNo -> t -> ZoomW ()
data ZoomWHandle = ZoomWHandle
{ whHandle :: Handle
, whTrackWork :: !(IntMap TrackWork)
, whDeferred :: IntMap Builder
, whWriteData :: Bool
}
data TrackWork = TrackWork
{ twSpec :: TrackSpec
, twBuilder :: Builder
, twReverseTS :: [TimeStamp]
, twWriter :: Maybe ZoomWork
, twCount :: !Int
, twWatermark :: !Int
, twEntryTime :: !TimeStamp
, twExitTime :: !TimeStamp
}
type ZoomW = StateT ZoomWHandle IO
withFileWrite :: TrackMap
-> Bool
-> ZoomW ()
-> FilePath
-> IO ()
withFileWrite ztypes doRaw f path = do
z <- openWrite ztypes doRaw path
z' <- execStateT (f >> flush >> finish) z
hClose (whHandle z')
flush :: ZoomW ()
flush = diskTracks flushSummary
finish :: ZoomW ()
finish = diskTracks finishSummary
diskTracks :: (TrackNo -> TrackWork -> ZoomW ()) -> ZoomW ()
diskTracks fSummary = do
h <- gets whHandle
tracks <- gets whTrackWork
doRaw <- gets whWriteData
when doRaw $
liftIO $ Fold.mapM_ (L.hPut h) $ IM.mapWithKey bsFromTrack tracks
mapM_ (uncurry fSummary) (IM.assocs tracks)
pending <- mconcat . IM.elems <$> gets whDeferred
liftIO . B.hPut h . toByteString $ pending
modify $ \z -> z
{ whTrackWork = IM.map flushTrack (whTrackWork z)
, whDeferred = IM.empty
}
where
flushTrack :: TrackWork -> TrackWork
flushTrack tw = d{twWriter = clearWork <$> (twWriter tw)}
where
d = mkTrackWork (twSpec tw) (twExitTime tw) (twWatermark tw)
openWrite :: TrackMap
-> Bool
-> FilePath
-> IO ZoomWHandle
openWrite trackMap doRaw path = do
h <- openFile path WriteMode
let global = mkGlobal (IM.size trackMap)
writeGlobalHeader h global
let tracks = IM.foldWithKey addTrack IM.empty trackMap
mapM_ (uncurry (writeTrackHeader h)) (IM.assocs trackMap)
return $ ZoomWHandle h tracks IM.empty doRaw
where
addTrack :: TrackNo -> TrackSpec
-> IntMap TrackWork
-> IntMap TrackWork
addTrack trackNo spec = IM.insert trackNo trackState
where
trackState = mkTrackWork spec (TS 0) 1024
closeWrite :: ZoomWHandle -> IO ()
closeWrite z = hClose (whHandle z)
oneTrack :: (ZoomReadable a) => a -> Bool -> Bool -> DataRateType -> Rational -> ByteString -> TrackMap
oneTrack a delta zlib !drType !rate !name = IM.singleton 1 (mkTrackSpec a delta zlib drType rate name)
mkTrackSpec :: (ZoomReadable a)
=> a -> Bool -> Bool -> DataRateType -> Rational -> ByteString -> TrackSpec
mkTrackSpec a = TrackSpec (Codec a)
watermark :: TrackNo -> ZoomW (Maybe Int)
watermark trackNo = do
track <- IM.lookup trackNo <$> gets whTrackWork
return (twWatermark <$> track)
setWatermark :: TrackNo -> Int -> ZoomW ()
setWatermark trackNo w = modifyTrack trackNo f
where
f :: TrackWork -> TrackWork
f tw = tw { twWatermark = w }
writeGlobalHeader :: Handle -> Global -> IO ()
writeGlobalHeader h = B.hPut h . toByteString . fromGlobal
writeTrackHeader :: Handle -> Int -> TrackSpec -> IO ()
writeTrackHeader h trackNo TrackSpec{..} = do
B.hPut h . mconcat $
[ trackHeader
, toByteString $ mconcat
[ fromTrackNo trackNo
, fromCodec specType
, fromFlags specDeltaEncode specZlibCompress specDRType
, fromRational64 specRate
, fromIntegral32be . C.length $ specName
]
, specName
]
incTimeStamp :: TimeStamp -> TimeStamp
incTimeStamp (TS t) = let t' = (t+1) in t' `seq` (TS t')
incTime :: TrackNo -> ZoomW ()
incTime trackNo = modifyTrack trackNo $ \tw -> tw
{ twEntryTime = if twCount tw == 0
then (incTimeStamp (twEntryTime tw))
else twEntryTime tw
, twExitTime = incTimeStamp (twExitTime tw)
}
setTime :: TrackNo -> TimeStamp -> ZoomW ()
setTime trackNo t = modifyTrack trackNo $ \tw -> tw
{ twEntryTime = if twCount tw == 0 then t else twEntryTime tw
, twExitTime = t
}
flushIfNeeded :: TrackNo -> ZoomW ()
flushIfNeeded trackNo = do
zt <- IM.lookup trackNo <$> gets whTrackWork
case zt of
Just track -> when (flushNeeded track) flush
Nothing -> error "no such track"
where
flushNeeded :: TrackWork -> Bool
flushNeeded TrackWork{..} = twCount >= twWatermark
writeData :: (Typeable a, ZoomWrite a, ZoomWritable a)
=> TrackNo -> a -> ZoomW ()
writeData trackNo d = do
incTime trackNo
doRaw <- gets whWriteData
when doRaw $
modifyTrack trackNo $ \z -> z
{ twBuilder = twBuilder z <>
(deltaEncodeWork (specDeltaEncode . twSpec $ z) (twWriter z) d)
}
modifyTrack trackNo $ \z -> let c = (twCount z) in c `seq` z
{ twCount = c + 1
, twWriter = updateWork (twExitTime z) d (twWriter z)
}
flushIfNeeded trackNo
writeDataVBR :: (Typeable a, ZoomWrite a, ZoomWritable a)
=> TrackNo -> (TimeStamp, a) -> ZoomW ()
writeDataVBR trackNo (t, d) = do
setTime trackNo t
doRaw <- gets whWriteData
when doRaw $
modifyTrack trackNo $ \z -> z
{ twBuilder = twBuilder z <>
(deltaEncodeWork (specDeltaEncode . twSpec $ z) (twWriter z) d)
, twReverseTS = t : twReverseTS z
}
modifyTrack trackNo $ \z -> let c = (twCount z) in c `seq` z
{ twCount = c + 1
, twWriter = updateWork t d (twWriter z)
}
flushIfNeeded trackNo
deltaEncodeWork :: (Typeable a, ZoomWritable a)
=> Bool -> Maybe ZoomWork -> a -> Builder
deltaEncodeWork False _ d = fromRaw d
deltaEncodeWork _ (Just (ZoomWork _ (Just cw))) d =
case (fromDynamic . toDyn $ d) of
Just d' -> fromRaw (deltaEncodeRaw cw d')
Nothing -> fromRaw d
deltaEncodeWork _ _ d = fromRaw d
mkGlobal :: Int -> Global
mkGlobal n = Global
{ version = Version versionMajor versionMinor
, noTracks = n
, presentationTime = 0
, baseTime = 0
, baseUTC = Nothing
}
modifyTracks :: (IntMap TrackWork -> IntMap TrackWork) -> ZoomW ()
modifyTracks f = modify (\z -> z { whTrackWork = f (whTrackWork z) })
modifyTrack :: TrackNo -> (TrackWork -> TrackWork) -> ZoomW ()
modifyTrack trackNo f = modifyTracks (IM.adjust f trackNo)
bsFromTrack :: TrackNo -> TrackWork -> L.ByteString
bsFromTrack trackNo TrackWork{..} = mconcat
[ L.pack . B.unpack $ packetHeader
, toLazyByteString $ mconcat
[ fromIntegral32be trackNo
, fromTimeStamp twEntryTime
, fromTimeStamp twExitTime
, fromIntegral32be twCount
, fromIntegral32be (L.length rawBS)
]
, rawBS
]
where
tsBuilder = mconcat . map fromInt64be .
deltaEncode . map unTS . reverse $ twReverseTS
rawBS = c $ toLazyByteString (twBuilder <> tsBuilder)
c | specZlibCompress twSpec = compress
| otherwise = id
mkTrackWork :: TrackSpec -> TimeStamp -> Int -> TrackWork
mkTrackWork !spec !entry !w = TrackWork
{ twSpec = spec
, twBuilder = mempty
, twReverseTS = []
, twCount = 0
, twWatermark = w
, twEntryTime = entry
, twExitTime = entry
, twWriter = Nothing
}
clearWork :: ZoomWork -> ZoomWork
clearWork (ZoomWork l _) = ZoomWork l Nothing
updateWork :: (Typeable b, ZoomWritable b)
=> TimeStamp -> b
-> Maybe ZoomWork
-> Maybe ZoomWork
updateWork !t !d Nothing = Just (ZoomWork IM.empty (Just cw))
where
cw = updateSummaryData t d (initSummaryWork t)
updateWork !t !d (Just (ZoomWork l Nothing)) =
case cw'm of
Just _ -> Just (ZoomWork l cw'm)
Nothing -> Nothing
where
cw'm = case (fromDynamic . toDyn $ d) of
Just d' -> Just (updateSummaryData t d' (initSummaryWork t))
Nothing -> Nothing
updateWork !t !d (Just (ZoomWork l (Just cw))) =
case cw'm of
Just _ -> Just (ZoomWork l cw'm)
Nothing -> Nothing
where
cw'm = case (fromDynamic . toDyn $ d) of
Just d' -> Just (updateSummaryData t d' cw)
Nothing -> Nothing
flushSummary :: TrackNo -> TrackWork -> ZoomW ()
flushSummary trackNo tw@TrackWork{..} =
diskSummary (flushWork twEntryTime twExitTime) trackNo tw
finishSummary :: TrackNo -> TrackWork -> ZoomW ()
finishSummary = diskSummary finishWork
diskSummary :: (TrackNo -> ZoomWork -> (ZoomWork, IntMap Builder))
-> TrackNo -> TrackWork -> ZoomW ()
diskSummary fWork trackNo TrackWork{..} = case twWriter of
Just writer -> do
let (writer', bs) = fWork trackNo writer
modify $ \z -> z { whDeferred = IM.unionWith mappend (whDeferred z) bs }
modifyTrack trackNo (\ztt -> ztt { twWriter = Just writer' } )
_ -> return ()
finishWork :: TrackNo -> ZoomWork -> (ZoomWork, IntMap Builder)
finishWork _trackNo (ZoomWork l cw) = (ZoomWork IM.empty cw, finishLevels l)
finishLevels :: (Typeable a, ZoomWritable a)
=> IntMap (Summary a) -> IntMap Builder
finishLevels l = snd $ foldl' propagate (Nothing, IM.empty) [1 .. fst $ IM.findMax l]
where
propagate (Nothing, bs) k = case IM.lookup k l of
Nothing ->
(Nothing, bs)
Just saved ->
(Just (incLevel saved), IM.insert k (fromSummary saved) bs)
propagate (Just bub, bs) k = case IM.lookup k l of
Nothing ->
(Just (incLevel bub), IM.insert k (fromSummary bub) bs)
Just saved ->
let new = saved `appendSummary` bub in
(Just (incLevel new), IM.insert k (fromSummary new) bs)
flushWork :: TimeStamp -> TimeStamp
-> TrackNo -> ZoomWork -> (ZoomWork, IntMap Builder)
flushWork _ _ _ op@(ZoomWork _ Nothing) = (op, IM.empty)
flushWork entryTime exitTime trackNo (ZoomWork l (Just cw)) =
(ZoomWork l' (Just cw), bs)
where
(bs, l') = pushSummary s IM.empty l
s = Summary
{ summaryTrack = trackNo
, summaryLevel = 1
, summaryEntryTime = entryTime
, summaryExitTime = exitTime
, summaryData = toSummaryData dur cw
}
dur = TSDiff $ (unTS exitTime) (unTS entryTime)
pushSummary :: (ZoomWritable a)
=> Summary a
-> IntMap Builder -> IntMap (Summary a)
-> (IntMap Builder, IntMap (Summary a))
pushSummary s bs l = do
case IM.lookup (summaryLevel s) l of
Just saved -> pushSummary (saved `appendSummary` s) bs' cleared
Nothing -> (bs', inserted)
where
bs' = IM.insert (summaryLevel s) (fromSummary s) bs
inserted = IM.insert (summaryLevel s) (incLevel s) l
cleared = IM.delete (summaryLevel s) l
incLevel :: Summary a -> Summary a
incLevel s = s { summaryLevel = summaryLevel s + 1 }
appendSummary :: (ZoomWritable a) => Summary a -> Summary a -> Summary a
appendSummary s1 s2 = Summary
{ summaryTrack = summaryTrack s1
, summaryLevel = summaryLevel s1
, summaryEntryTime = summaryEntryTime s1
, summaryExitTime = summaryExitTime s2
, summaryData = appendSummaryData (dur s1) (summaryData s1)
(dur s2) (summaryData s2)
}
where
dur = summaryDuration
(<>) :: Monoid a => a -> a -> a
(<>) = mappend