{-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE RankNTypes #-} -- | Streaming compression and decompression using conduits. -- -- Parts of this code were taken from zlib-enum and adapted for conduits. module Data.Conduit.Zlib ( -- * Conduits compress, decompress, gzip, ungzip, -- * Flushing compressFlush, decompressFlush, -- * Decompression combinators multiple, -- * Re-exported from zlib-bindings WindowBits (..), defaultWindowBits ) where import Data.Streaming.Zlib import Data.Conduit import Data.ByteString (ByteString) import qualified Data.ByteString as S import Control.Monad (unless, liftM) import Control.Monad.Trans.Class (lift, MonadTrans) import Control.Monad.Primitive (PrimMonad, unsafePrimToPrim) import Control.Monad.Base (MonadBase, liftBase) import Control.Monad.Trans.Resource (MonadThrow, monadThrow) import Data.Function (fix) -- | Gzip compression with default parameters. gzip :: (MonadThrow m, MonadBase base m, PrimMonad base) => Conduit ByteString m ByteString gzip = compress 1 (WindowBits 31) -- | Gzip decompression with default parameters. ungzip :: (MonadBase base m, PrimMonad base, MonadThrow m) => Conduit ByteString m ByteString ungzip = decompress (WindowBits 31) unsafeLiftIO :: (MonadBase base m, PrimMonad base, MonadThrow m) => IO a -> m a unsafeLiftIO = liftBase . unsafePrimToPrim -- | -- Decompress (inflate) a stream of 'ByteString's. For example: -- -- > sourceFile "test.z" $= decompress defaultWindowBits $$ sinkFile "test" decompress :: (MonadBase base m, PrimMonad base, MonadThrow m) => WindowBits -- ^ Zlib parameter (see the zlib-bindings package as well as the zlib C library) -> Conduit ByteString m ByteString decompress = helperDecompress (liftM (fmap Chunk) await) yield' leftover where yield' Flush = return () yield' (Chunk bs) = yield bs -- | Same as 'decompress', but allows you to explicitly flush the stream. decompressFlush :: (MonadBase base m, PrimMonad base, MonadThrow m) => WindowBits -- ^ Zlib parameter (see the zlib-bindings package as well as the zlib C library) -> Conduit (Flush ByteString) m (Flush ByteString) decompressFlush = helperDecompress await yield (leftover . Chunk) helperDecompress :: (Monad (t m), MonadBase base m, PrimMonad base, MonadThrow m, MonadTrans t) => t m (Maybe (Flush ByteString)) -> (Flush ByteString -> t m ()) -> (ByteString -> t m ()) -> WindowBits -> t m () helperDecompress await' yield' leftover' config = do -- Initialize the stateful inflater, which will be used below -- This inflater is never exposed outside of this function inf <- lift $ unsafeLiftIO $ initInflate config -- Some helper functions used by the main feeder loop below let -- Flush any remaining inflated bytes downstream flush = do chunk <- lift $ unsafeLiftIO $ flushInflate inf unless (S.null chunk) $ yield' $ Chunk chunk -- Get any input which is unused by the inflater getUnused = lift $ unsafeLiftIO $ getUnusedInflate inf -- If there is any unused data, return it as leftovers to the stream unused = do rem' <- getUnused unless (S.null rem') $ leftover' rem' -- Main loop: feed data from upstream into the inflater fix $ \feeder -> do mnext <- await' case mnext of -- No more data is available from upstream Nothing -> do -- Flush any remaining uncompressed data flush -- Return the rest of the unconsumed data as leftovers unused -- Another chunk of compressed data arrived Just (Chunk x) -> do -- Feed the compressed data into the inflater, returning a -- "popper" which will return chunks of decompressed data popper <- lift $ unsafeLiftIO $ feedInflate inf x -- Loop over the popper grabbing decompressed chunks and -- yielding them downstream fix $ \pop -> do mbs <- lift $ unsafeLiftIO popper case mbs of -- No more data from this popper PRDone -> do rem' <- getUnused if S.null rem' -- No data was unused by the inflater, so let's -- fill it up again and get more data out of it then feeder -- In this case, there is some unconsumed data, -- meaning the compressed stream is complete. -- At this point, we need to stop feeding, -- return the unconsumed data as leftovers, and -- flush any remaining content (which should be -- nothing) else do flush leftover' rem' -- Another chunk available, yield it downstream and -- loop again PRNext bs -> do yield' (Chunk bs) pop -- An error occurred inside zlib, throw it PRError e -> lift $ monadThrow e -- We've been asked to flush the stream Just Flush -> do -- Get any uncompressed data waiting for us flush -- Put a Flush in the stream yield' Flush -- Feed in more data feeder -- | -- Compress (deflate) a stream of 'ByteString's. The 'WindowBits' also control -- the format (zlib vs. gzip). compress :: (MonadBase base m, PrimMonad base, MonadThrow m) => Int -- ^ Compression level -> WindowBits -- ^ Zlib parameter (see the zlib-bindings package as well as the zlib C library) -> Conduit ByteString m ByteString compress = helperCompress (liftM (fmap Chunk) await) yield' where yield' Flush = return () yield' (Chunk bs) = yield bs -- | Same as 'compress', but allows you to explicitly flush the stream. compressFlush :: (MonadBase base m, PrimMonad base, MonadThrow m) => Int -- ^ Compression level -> WindowBits -- ^ Zlib parameter (see the zlib-bindings package as well as the zlib C library) -> Conduit (Flush ByteString) m (Flush ByteString) compressFlush = helperCompress await yield helperCompress :: (Monad (t m), MonadBase base m, PrimMonad base, MonadThrow m, MonadTrans t) => t m (Maybe (Flush ByteString)) -> (Flush ByteString -> t m ()) -> Int -> WindowBits -> t m () helperCompress await' yield' level config = await' >>= maybe (return ()) start where start input = do def <- lift $ unsafeLiftIO $ initDeflate level config push def input continue def = await' >>= maybe (close def) (push def) goPopper popper = do mbs <- lift $ unsafeLiftIO popper case mbs of PRDone -> return () PRNext bs -> yield' (Chunk bs) >> goPopper popper PRError e -> lift $ monadThrow e push def (Chunk x) = do popper <- lift $ unsafeLiftIO $ feedDeflate def x goPopper popper continue def push def Flush = do mchunk <- lift $ unsafeLiftIO $ flushDeflate def case mchunk of PRDone -> return () PRNext x -> yield' $ Chunk x PRError e -> lift $ monadThrow e yield' Flush continue def close def = do mchunk <- lift $ unsafeLiftIO $ finishDeflate def case mchunk of PRDone -> return () PRNext chunk -> yield' (Chunk chunk) >> close def PRError e -> lift $ monadThrow e -- | The standard 'decompress' and 'ungzip' functions will only decompress a -- single compressed entity from the stream. This combinator will exhaust the -- stream completely of all individual compressed entities. This is useful for -- cases where you have a concatenated archive, e.g. @cat file1.gz file2.gz > -- combined.gz@. -- -- Usage: -- -- > sourceFile "combined.gz" $$ multiple ungzip =$ consume -- -- This combinator will not fail on an empty stream. If you want to ensure that -- at least one compressed entity in the stream exists, consider a usage such -- as: -- -- > sourceFile "combined.gz" $$ (ungzip >> multiple ungzip) =$ consume -- -- @since 1.1.10 multiple :: Monad m => Conduit ByteString m a -> Conduit ByteString m a multiple inner = loop where loop = do mbs <- await case mbs of Nothing -> return () Just bs | S.null bs -> loop | otherwise -> do leftover bs inner loop