{-# LANGUAGE BangPatterns #-} {-# LANGUAGE DeriveDataTypeable #-} {-# LANGUAGE OverloadedStrings #-} -- | Stream operations on 'ByteString'. module System.IO.Streams.ByteString ( -- * Counting bytes countInput , countOutput -- * Treating strings as streams , fromByteString , fromLazyByteString -- * Input and output , readExactly , takeBytesWhile , writeLazyByteString -- * Stream transformers -- ** Splitting/Joining , splitOn , lines , unlines , words , unwords -- ** Other , giveBytes , giveExactly , takeBytes , takeExactly , throwIfConsumesMoreThan , throwIfProducesMoreThan -- ** Rate limiting , throwIfTooSlow -- * String search , MatchInfo(..) , search -- * Exception types , RateTooSlowException , ReadTooShortException , TooManyBytesReadException , TooManyBytesWrittenException , TooFewBytesWrittenException ) where ------------------------------------------------------------------------------ import Control.Exception (Exception, throwIO) import Control.Monad (when, (>=>)) import Data.ByteString (ByteString) import qualified Data.ByteString.Char8 as S import qualified Data.ByteString.Lazy.Char8 as L import qualified Data.ByteString.Unsafe as S import Data.Char (isSpace) import Data.Int (Int64) import Data.IORef (IORef, newIORef, readIORef, writeIORef) import Data.Time.Clock.POSIX (getPOSIXTime) import Data.Typeable (Typeable) import Prelude hiding (read, lines, unlines, words, unwords) ------------------------------------------------------------------------------ import System.IO.Streams.Combinators (filterM, intersperse, outputFoldM) import System.IO.Streams.Internal (InputStream (..), OutputStream, makeInputStream, makeOutputStream, read, unRead, write) import System.IO.Streams.Internal.Search (MatchInfo (..), search) import System.IO.Streams.List (fromList, writeList) ------------------------------------------------------------------------------ {-# INLINE modifyRef #-} modifyRef :: IORef a -> (a -> a) -> IO () modifyRef ref f = do x <- readIORef ref writeIORef ref $! f x ------------------------------------------------------------------------------ -- | Writes a lazy 'ByteString' to an 'OutputStream'. -- -- Example: -- -- @ -- ghci> Streams.'writeLazyByteString' \"Test\\n\" Streams.'System.IO.Streams.stdout' -- Test -- @ writeLazyByteString :: L.ByteString -- ^ string to write to output -> OutputStream ByteString -- ^ output stream -> IO () writeLazyByteString = writeList . L.toChunks {-# INLINE writeLazyByteString #-} ------------------------------------------------------------------------------ -- | Creates an 'InputStream' from a 'ByteString'. fromByteString :: ByteString -> IO (InputStream ByteString) fromByteString = fromList . (:[]) ------------------------------------------------------------------------------ -- | Creates an 'InputStream' from a lazy 'ByteString'. fromLazyByteString :: L.ByteString -> IO (InputStream ByteString) fromLazyByteString = fromList . L.toChunks ------------------------------------------------------------------------------ -- | Wraps an 'InputStream', counting the number of bytes produced by the -- stream as a side effect. Produces a new 'InputStream' as well as an IO -- action to retrieve the count of bytes produced. -- -- Strings pushed back to the returned 'InputStream' will be pushed back to the -- original stream, and the count of produced bytes will be subtracted -- accordingly. -- -- Example: -- -- @ -- ghci> is <- Streams.'System.IO.Streams.fromList' [\"abc\", \"def\", \"ghi\"::ByteString] -- ghci> (is', getCount) <- Streams.'countInput' is -- ghci> Streams.'read' is' -- Just \"abc\" -- ghci> getCount -- 3 -- ghci> Streams.'unRead' \"bc\" is' -- ghci> getCount -- 1 -- ghci> Streams.'System.IO.Streams.peek' is -- Just \"bc\" -- ghci> Streams.'System.IO.Streams.toList' is' -- [\"bc\",\"def\",\"ghi\"] -- ghci> getCount -- 9 -- @ -- countInput :: InputStream ByteString -> IO (InputStream ByteString, IO Int64) countInput src = do ref <- newIORef (0 :: Int64) return $! (InputStream (prod ref) (pb ref), readIORef ref) where prod ref = read src >>= maybe (return Nothing) (\x -> do modifyRef ref (+ (fromIntegral $ S.length x)) return $! Just x) pb ref s = do modifyRef ref (\x -> x - (fromIntegral $ S.length s)) unRead s src ------------------------------------------------------------------------------ -- | Wraps an 'OutputStream', counting the number of bytes consumed by the -- stream as a side effect. Produces a new 'OutputStream' as well as an IO -- action to retrieve the count of bytes consumed. -- -- Example: -- -- @ -- ghci> (os :: OutputStream ByteString, getList) <- Streams.'System.IO.Streams.listOutputStream' -- ghci> (os', getCount) <- Streams.'countOutput' os -- ghci> Streams.'System.IO.Streams.fromList' [\"abc\", \"def\", \"ghi\"] >>= Streams.'System.IO.Streams.connectTo' os' -- ghci> getList -- [\"abc\",\"def\",\"ghi\"] -- ghci> getCount -- 9 -- @ countOutput :: OutputStream ByteString -> IO (OutputStream ByteString, IO Int64) countOutput = outputFoldM f 0 where f !count s = return z where !c = S.length s !z = toEnum c + count ------------------------------------------------------------------------------ -- | Wraps an 'InputStream', producing a new 'InputStream' that will produce at -- most @n@ bytes, subsequently yielding end-of-stream forever. -- -- Strings pushed back to the returned 'InputStream' will be propagated -- upstream, modifying the count of taken bytes accordingly. -- -- Example: -- -- @ -- ghci> is <- Streams.'System.IO.Streams.fromList' [\"truncated\", \" string\"::ByteString] -- ghci> is' <- Streams.'takeBytes' 9 is -- ghci> Streams.'read' is' -- Just \"truncated\" -- ghci> Streams.'read' is' -- Nothing -- ghci> Streams.'System.IO.Streams.peek' is -- Just \" string\" -- ghci> Streams.'unRead' \"cated\" is' -- ghci> Streams.'System.IO.Streams.peek' is -- Just \"cated\" -- ghci> Streams.'System.IO.Streams.peek' is' -- Just \"cated\" -- ghci> Streams.'read' is' -- Just \"cated\" -- ghci> Streams.'read' is' -- Nothing -- ghci> Streams.'read' is -- Just \" string\" -- @ takeBytes :: Int64 -- ^ maximum number of bytes to read -> InputStream ByteString -- ^ input stream to wrap -> IO (InputStream ByteString) takeBytes k0 = takeBytes' k0 (return Nothing) {-# INLINE takeBytes #-} ------------------------------------------------------------------------------ -- | Like @Streams.'takeBytes'@, but throws 'ReadTooShortException' when -- there aren't enough bytes present on the source. takeExactly :: Int64 -- ^ number of bytes to read -> InputStream ByteString -- ^ input stream to wrap -> IO (InputStream ByteString) takeExactly k0 = takeBytes' k0 (throwIO $ ReadTooShortException k0) {-# INLINE takeExactly #-} ------------------------------------------------------------------------------ -- Helper for the two above. takeBytes' :: Int64 -> IO (Maybe ByteString) -- ^ What to do if the input ends before having consumed the -- right amount of bytes. -> InputStream ByteString -> IO (InputStream ByteString) takeBytes' k0 h src = do kref <- newIORef k0 return $! InputStream (prod kref) (pb kref) where prod kref = do k <- readIORef kref if k <= 0 then return Nothing else read src >>= maybe h (chunk k) where chunk k s = do let l = fromIntegral $ S.length s let k' = k - l if k' <= 0 then let (a,b) = S.splitAt (fromIntegral k) s in do when (not $ S.null b) $ unRead b src writeIORef kref 0 return $! Just a else writeIORef kref k' >> return (Just s) pb kref s = do modifyRef kref (+ (fromIntegral $ S.length s)) unRead s src {-# INLINE takeBytes' #-} ------------------------------------------------------------------------------ -- | Splits an 'InputStream' over 'ByteString's using a delimiter predicate. -- -- Note that: -- -- * data pushed back with 'unRead' is *not* propagated upstream here. -- -- * the resulting 'InputStream' may hold an unbounded amount of the -- bytestring in memory waiting for the function to return true, so this -- function should not be used in unsafe contexts. -- -- * the delimiter is NOT included in the output. -- -- * consecutive delimiters are not merged. -- -- Example: -- -- @ -- ghci> Streams.'System.IO.Streams.fromList' [\"the quick br\", \"own fox\"::'ByteString'] >>= -- Streams.'splitOn' (== \' \') >>= Streams.'System.IO.Streams.toList' -- [\"the\",\"quick\",\"brown\",\"\",\"fox\"] -- @ -- splitOn :: (Char -> Bool) -- ^ predicate used to break the input -- stream into chunks -> InputStream ByteString -- ^ input stream -> IO (InputStream ByteString) splitOn p is = do ref <- newIORef id makeInputStream $ start ref where start ref = go where go = read is >>= maybe end chunk end = do dl <- readIORef ref case dl [] of [] -> return Nothing xs -> writeIORef ref id >> (return $! Just $! S.concat xs) chunk s = let (a, b) = S.break p s in if S.null b then modifyRef ref (\f -> f . (a:)) >> go else do let !b' = S.unsafeDrop 1 b dl <- readIORef ref if S.null b' then do writeIORef ref ("" :) return $ Just $! S.concat $ dl [a] else do writeIORef ref id unRead b' is return $ Just $! S.concat $ dl [a] ------------------------------------------------------------------------------ -- | Splits a bytestring 'InputStream' into lines. See 'splitOn' and -- 'Prelude.lines'. -- -- Example: -- -- @ -- ghci> is \<- Streams.'System.IO.Streams.fromList' [\"Hello,\\n world!\"] >>= Streams.'lines' -- ghci> replicateM 3 (Streams.'read' is) -- [Just \"Hello\", Just \", world!\", Nothing] -- @ -- -- Note that this may increase the chunk size if the input contains extremely -- long lines. lines :: InputStream ByteString -> IO (InputStream ByteString) lines = splitOn (== '\n') ------------------------------------------------------------------------------ -- | Splits a bytestring 'InputStream' into words. See 'splitOn' and -- 'Prelude.words'. -- -- Example: -- -- @ -- ghci> is \<- Streams.'System.IO.Streams.fromList' [\"Hello, world!\"] >>= Streams.'words' -- ghci> replicateM 3 (Streams.'read' is) -- [Just \"Hello,\", Just \"world!\", Nothing] -- @ -- -- Note that this may increase the chunk size if the input contains extremely -- long words. words :: InputStream ByteString -> IO (InputStream ByteString) words = splitOn isSpace >=> filterM (return . not . S.all isSpace) ------------------------------------------------------------------------------ -- | Intersperses string chunks sent to the given 'OutputStream' with newlines. -- See 'intersperse' and 'Prelude.unlines'. -- -- @ -- ghci> os <- Streams.'unlines' Streams.'System.IO.Streams.stdout' -- ghci> Streams.'write' (Just \"Hello,\") os -- Hello -- ghci> Streams.'write' Nothing os -- ghci> Streams.'write' (Just \"world!\") os -- world! -- @ unlines :: OutputStream ByteString -> IO (OutputStream ByteString) unlines os = makeOutputStream $ \m -> do write m os case m of Nothing -> return $! () Just _ -> write (Just "\n") os ------------------------------------------------------------------------------ -- | Intersperses string chunks sent to the given 'OutputStream' with spaces. -- See 'intersperse' and 'Prelude.unwords'. -- -- @ -- ghci> os <- Streams.'unwords' Streams.'System.IO.Streams.stdout' -- ghci> forM_ [Just \"Hello,\", Nothing, Just \"world!\\n\"] $ \w -> Streams.'write' w os -- Hello, world! -- @ unwords :: OutputStream ByteString -> IO (OutputStream ByteString) unwords = intersperse " " ------------------------------------------------------------------------------ -- | Thrown by 'throwIfProducesMoreThan' when too many bytes were read from the -- original 'InputStream'. data TooManyBytesReadException = TooManyBytesReadException deriving (Typeable) instance Show TooManyBytesReadException where show TooManyBytesReadException = "Too many bytes read" instance Exception TooManyBytesReadException ------------------------------------------------------------------------------ -- | Thrown by 'giveExactly' when too few bytes were written to the produced -- 'OutputStream'. data TooFewBytesWrittenException = TooFewBytesWrittenException deriving (Typeable) instance Show TooFewBytesWrittenException where show TooFewBytesWrittenException = "Too few bytes written" instance Exception TooFewBytesWrittenException ------------------------------------------------------------------------------ -- | Thrown by 'throwIfConsumesMoreThan' when too many bytes were sent to the -- produced 'OutputStream'. data TooManyBytesWrittenException = TooManyBytesWrittenException deriving (Typeable) instance Show TooManyBytesWrittenException where show TooManyBytesWrittenException = "Too many bytes written" instance Exception TooManyBytesWrittenException ------------------------------------------------------------------------------ -- | Thrown by 'readExactly' and 'takeExactly' when not enough bytes were -- available on the input. data ReadTooShortException = ReadTooShortException Int64 deriving (Typeable) instance Show ReadTooShortException where show (ReadTooShortException x) = "Short read, expected " ++ show x ++ " bytes" instance Exception ReadTooShortException ------------------------------------------------------------------------------ -- | Wraps an 'InputStream'. If more than @n@ bytes are produced by this -- stream, 'read' will throw a 'TooManyBytesReadException'. -- -- If a chunk yielded by the input stream would result in more than @n@ bytes -- being produced, 'throwIfProducesMoreThan' will cut the generated string such -- that exactly @n@ bytes are yielded by the returned stream, and the -- /subsequent/ read will throw an exception. Example: -- -- @ -- ghci> is \<- Streams.'System.IO.Streams.fromList' [\"abc\", \"def\", \"ghi\"] >>= -- Streams.'throwIfProducesMoreThan' 5 -- ghci> 'Control.Monad.replicateM' 2 ('read' is) -- [Just \"abc\",Just \"de\"] -- ghci> Streams.'read' is -- *** Exception: Too many bytes read -- @ -- -- Strings pushed back to the returned 'InputStream' will be propagated -- upstream, modifying the count of taken bytes accordingly. Example: -- -- @ -- ghci> is <- Streams.'System.IO.Streams.fromList' [\"abc\", \"def\", \"ghi\"] -- ghci> is' <- Streams.'throwIfProducesMoreThan' 5 is -- ghci> Streams.'read' is' -- Just \"abc\" -- ghci> Streams.'unRead' \"xyz\" is' -- ghci> Streams.'System.IO.Streams.peek' is -- Just \"xyz\" -- ghci> Streams.'read' is -- Just \"xyz\" -- ghci> Streams.'read' is -- Just \"de\" -- ghci> Streams.'read' is -- *** Exception: Too many bytes read -- @ -- throwIfProducesMoreThan :: Int64 -- ^ maximum number of bytes to read -> InputStream ByteString -- ^ input stream -> IO (InputStream ByteString) throwIfProducesMoreThan k0 src = do kref <- newIORef k0 return $! InputStream (prod kref) (pb kref) where prod kref = read src >>= maybe (return Nothing) chunk where chunk s = do k <- readIORef kref let k' = k - l case () of !_ | l == 0 -> return (Just s) | k == 0 -> throwIO TooManyBytesReadException | k' >= 0 -> writeIORef kref k' >> return (Just s) | otherwise -> do let (!a,!b) = S.splitAt (fromEnum k) s writeIORef kref 0 unRead b src return $! Just a where l = toEnum $ S.length s pb kref s = do unRead s src modifyRef kref (+ (fromIntegral $ S.length s)) ------------------------------------------------------------------------------ -- | Reads an @n@-byte ByteString from an input stream. Throws a -- 'ReadTooShortException' if fewer than @n@ bytes were available. -- -- Example: -- -- @ -- ghci> Streams.'System.IO.Streams.fromList' [\"long string\"] >>= Streams.'readExactly' 6 -- \"long s\" -- ghci> Streams.'System.IO.Streams.fromList' [\"short\"] >>= Streams.'readExactly' 6 -- *** Exception: Short read, expected 6 bytes -- @ -- readExactly :: Int -- ^ number of bytes to read -> InputStream ByteString -- ^ input stream -> IO ByteString readExactly n input = go id n where go !dl 0 = return $! S.concat $! dl [] go !dl k = read input >>= maybe (throwIO $ ReadTooShortException (fromIntegral n)) (\s -> do let l = S.length s if l >= k then do let (a,b) = S.splitAt k s when (not $ S.null b) $ unRead b input return $! S.concat $! dl [a] else go (dl . (s:)) (k - l)) ------------------------------------------------------------------------------ -- | Takes from a stream until the given predicate is no longer satisfied. -- Returns Nothing on end-of-stream, or @Just \"\"@ if the predicate is never -- satisfied. See 'Prelude.takeWhile' and 'Data.ByteString.Char8.takeWhile'. -- -- Example: -- -- @ -- ghci> Streams.'System.IO.Streams.fromList' [\"Hello, world!\"] >>= Streams.'takeBytesWhile' (/= ',') -- Just \"Hello\" -- ghci> import Data.Char -- ghci> Streams.'System.IO.Streams.fromList' [\"7 Samurai\"] >>= Streams.'takeBytesWhile' isAlpha -- Just \"\" -- ghci> Streams.'System.IO.Streams.fromList' [] >>= Streams.'takeBytesWhile' isAlpha -- Nothing -- @ takeBytesWhile :: (Char -> Bool) -- ^ predicate -> InputStream ByteString -- ^ input stream -> IO (Maybe ByteString) takeBytesWhile p input = read input >>= maybe (return Nothing) (go id) where go dl !s | S.null b = read input >>= maybe finish (go dl') | otherwise = unRead b input >> finish where (a, b) = S.span p s dl' = dl . (a:) finish = return $! Just $! S.concat $! dl [a] ------------------------------------------------------------------------------ -- | Wraps an 'OutputStream', producing a new stream that will pass along at -- most @n@ bytes to the wrapped stream, throwing any subsequent input away. -- -- Example: -- -- @ -- ghci> (os :: OutputStream ByteString, getList) <- Streams.'System.IO.Streams.listOutputStream' -- ghci> os' <- Streams.'giveBytes' 6 os -- ghci> Streams.'System.IO.Streams.fromList' [\"long \", \"string\"] >>= Streams.'System.IO.Streams.connectTo' os' -- ghci> getList -- [\"long \",\"s\"] -- @ giveBytes :: Int64 -- ^ maximum number of bytes to send -- to the wrapped stream -> OutputStream ByteString -- ^ output stream to wrap -> IO (OutputStream ByteString) giveBytes k0 str = do kref <- newIORef k0 makeOutputStream $ sink kref where sink _ Nothing = write Nothing str sink kref mb@(Just x) = do k <- readIORef kref let l = fromIntegral $ S.length x let k' = k - l if k' < 0 then do let a = S.take (fromIntegral k) x when (not $ S.null a) $ write (Just a) str writeIORef kref 0 else writeIORef kref k' >> write mb str ------------------------------------------------------------------------------ -- | Wraps an 'OutputStream', producing a new stream that will pass along -- exactly @n@ bytes to the wrapped stream. If the stream is sent more or fewer -- than the given number of bytes, the resulting stream will throw an exception -- (either 'TooFewBytesWrittenException' or 'TooManyBytesWrittenException') -- during a call to 'write'. -- -- Example: -- -- @ -- ghci> is <- Streams.'System.IO.Streams.fromList' [\"ok\"] -- ghci> Streams.'System.IO.Streams.outputToList' (Streams.'giveExactly' 2 >=> Streams.'System.IO.Streams.connect' is) -- [\"ok\"] -- ghci> is <- Streams.'System.IO.Streams.fromList' [\"ok\"] -- ghci> Streams.'System.IO.Streams.outputToList' (Streams.'giveExactly' 1 >=> Streams.'System.IO.Streams.connect' is) -- *** Exception: Too many bytes written -- ghci> is <- Streams.'System.IO.Streams.fromList' [\"ok\"] -- ghci> Streams.'System.IO.Streams.outputToList' (Streams.'giveExactly' 3 >=> Streams.'System.IO.Streams.connect' is) -- *** Exception: Too few bytes written -- @ giveExactly :: Int64 -> OutputStream ByteString -> IO (OutputStream ByteString) giveExactly k0 os = do ref <- newIORef k0 makeOutputStream $ go ref where go ref chunk = do !n <- readIORef ref case chunk of Nothing -> if n /= 0 then throwIO TooFewBytesWrittenException else return $! () Just s -> let n' = n - fromIntegral (S.length s) in if n' < 0 then throwIO TooManyBytesWrittenException else do writeIORef ref n' write chunk os ------------------------------------------------------------------------------ -- | Wraps an 'OutputStream', producing a new stream that will pass along at -- most @n@ bytes to the wrapped stream. If more than @n@ bytes are sent to the -- outer stream, a 'TooManyBytesWrittenException' will be thrown. -- -- /Note/: if more than @n@ bytes are sent to the outer stream, -- 'throwIfConsumesMoreThan' will not necessarily send the first @n@ bytes -- through to the wrapped stream before throwing the exception. -- -- Example: -- -- @ -- ghci> (os :: OutputStream ByteString, getList) <- Streams.'System.IO.Streams.listOutputStream' -- ghci> os' <- Streams.'throwIfConsumesMoreThan' 5 os -- ghci> Streams.'System.IO.Streams.fromList' [\"short\"] >>= Streams.'System.IO.Streams.connectTo' os' -- ghci> getList -- [\"short\"] -- ghci> os'' <- Streams.'throwIfConsumesMoreThan' 5 os -- ghci> Streams.'System.IO.Streams.fromList' [\"long\", \"string\"] >>= Streams.'System.IO.Streams.connectTo' os'' -- *** Exception: Too many bytes written -- @ throwIfConsumesMoreThan :: Int64 -- ^ maximum number of bytes to send to the -- wrapped stream -> OutputStream ByteString -- ^ output stream to wrap -> IO (OutputStream ByteString) throwIfConsumesMoreThan k0 str = do kref <- newIORef k0 makeOutputStream $ sink kref where sink _ Nothing = write Nothing str sink kref mb@(Just x) = do k <- readIORef kref let l = toEnum $ S.length x let k' = k - l if k' < 0 then throwIO TooManyBytesWrittenException else writeIORef kref k' >> write mb str ------------------------------------------------------------------------------ -- | Gets the current posix time getTime :: IO Double getTime = realToFrac `fmap` getPOSIXTime ------------------------------------------------------------------------------ -- | Thrown by 'throwIfTooSlow' if input is not being produced fast enough by -- the given 'InputStream'. -- data RateTooSlowException = RateTooSlowException deriving (Typeable) instance Show RateTooSlowException where show RateTooSlowException = "Input rate too slow" instance Exception RateTooSlowException ------------------------------------------------------------------------------ -- | Rate-limits an input stream. If the input stream is not read from faster -- than the given rate, reading from the wrapped stream will throw a -- 'RateTooSlowException'. -- -- Strings pushed back to the returned 'InputStream' will be propagated up to -- the original stream. throwIfTooSlow :: IO () -- ^ action to bump timeout -> Double -- ^ minimum data rate, in bytes per second -> Int -- ^ amount of time in seconds to wait before -- data rate calculation takes effect -> InputStream ByteString -- ^ input stream -> IO (InputStream ByteString) throwIfTooSlow !bump !minRate !minSeconds' !stream = do !_ <- bump startTime <- getTime bytesRead <- newIORef (0 :: Int64) return $! InputStream (prod startTime bytesRead) (pb bytesRead) where prod startTime bytesReadRef = read stream >>= maybe (return Nothing) chunk where chunk s = do let slen = S.length s now <- getTime let !delta = now - startTime nb <- readIORef bytesReadRef let newBytes = nb + fromIntegral slen when (delta > minSeconds + 1 && (fromIntegral newBytes / (delta - minSeconds)) < minRate) $ throwIO RateTooSlowException -- otherwise, bump the timeout and return the input !_ <- bump writeIORef bytesReadRef newBytes return $! Just s pb bytesReadRef s = do modifyRef bytesReadRef $ \x -> x - (fromIntegral $ S.length s) unRead s stream minSeconds = fromIntegral minSeconds'