-- Hoogle documentation, generated by Haddock -- See Hoogle, http://www.haskell.org/hoogle/ -- | http, attoparsec, pipes and other utilities for the streaming libraries -- -- This package includes http-client, aeson, attoparsec, zlib and pipes -- utilities for use with the streaming and streaming -- bytestring libraries. The modules generally closely follow -- similarly named modules in the pipes 'ecosystem', using similar -- function names, where possible. -- -- Thus, for example, using the http client module, we might number the -- lines of a remote document thus: -- --
-- import Streaming -- import Streaming.Prelude (with, each) -- import qualified Streaming.Prelude as S -- import Data.ByteString.Streaming.HTTP -- import qualified Data.ByteString.Streaming.Char8 as Q -- -- main = runResourceT $ do -- let output = numbers <|> Q.lines (simpleHTTP "http://lpaste.net/raw/146542") -- Q.putStrLn $ Q.unlines output -- -- numbers :: Monad m => Stream (Q.ByteString m) m () -- numbers = with (each [1..]) $ \n -> Q.pack (each (show n ++ ". ")) -- -- ["1. ", "2. " ..] ---- -- The memory requirements of this Prelude-ish program will not -- be affected by the fact that, say, the third 'line' is 10 terabytes -- long. -- -- This package of course heaps together a number of dependencies, as it -- seemed best not to spam hackage with numerous packages. If it seems -- reasonable to detach some of it, please raise an issue on the github -- page. -- -- Questions about usage can be raised as issues, or addressed to the -- pipes list. @package streaming-utils @version 0.2.0.0 -- | Here is a simple use of parsed and standard Streaming -- segmentation devices to parse a file in which groups of numbers are -- separated by blank lines. Such a problem of 'nesting streams' is -- described in the conduit context in this StackOverflow -- question -- --
-- -- $ cat nums.txt -- -- 1 -- -- 2 -- -- 3 -- -- -- -- 4 -- -- 5 -- -- 6 -- -- -- -- 7 -- -- 8 ---- -- We will sum the groups and stream the results to standard output: -- --
-- import Streaming -- import qualified Streaming.Prelude as S -- import qualified Data.ByteString.Streaming.Char8 as Q -- import qualified Data.Attoparsec.ByteString.Char8 as A -- import qualified Data.Attoparsec.ByteString.Streaming as A -- import Data.Function ((&)) -- -- main = Q.getContents -- raw bytes -- & A.parsed lineParser -- stream of parsed `Maybe Int`s; blank lines are `Nothing` -- & void -- drop any unparsed nonsense at the end -- & S.split Nothing -- split on blank lines -- & S.maps S.concat -- keep `Just x` values in the sub-streams (cp. catMaybes) -- & S.mapped S.sum -- sum each substream -- & S.print -- stream results to stdout -- -- lineParser = Just <$> A.scientific <* A.endOfLine <|> Nothing <$ A.endOfLine ---- --
-- -- $ cat nums.txt | ./atto -- -- 6.0 -- -- 15.0 -- -- 15.0 --module Data.Attoparsec.ByteString.Streaming type Message = ([String], String) -- | The result of a parse (Either a ([String], String)), with the -- unconsumed byte stream. -- --
-- >>> :set -XOverloadedStrings -- the string literal below is a streaming bytestring -- -- >>> (r,rest1) <- AS.parse (A.scientific <* A.many' A.space) "12.3 4.56 78.3" -- -- >>> print r -- Left 12.3 -- -- >>> (s,rest2) <- AS.parse (A.scientific <* A.many' A.space) rest1 -- -- >>> print s -- Left 4.56 -- -- >>> (t,rest3) <- AS.parse (A.scientific <* A.many' A.space) rest2 -- -- >>> print t -- Left 78.3 -- -- >>> Q.putStrLn rest3 --parse :: Monad m => Parser a -> ByteString m x -> m (Either a Message, ByteString m x) -- | Apply a parser repeatedly to a stream of bytes, streaming the parsed -- values, but ending when the parser fails.or the bytes run out. -- --
-- >>> S.print $ AS.parsed (A.scientific <* A.many' A.space) $ "12.3 4.56 78.9" -- 12.3 -- 4.56 -- 78.9 -- 18.282 --parsed :: Monad m => Parser a -> ByteString m r -> Stream (Of a) m (Either (Message, ByteString m r) r) -- | The encode, decode and decoded functions -- replicate the similar functions in Renzo Carbonara's -- pipes-aeson . Note that aeson accumulates a whole top -- level json array or object before coming to any conclusion. This is -- the only default that could cover all cases. -- -- The streamParse function accepts parsers from the -- json-streams library. The 'json-streams' parsers use aeson -- types, but will stream suitable elements as they arise. For this -- reason, of course, it cannot validate the entire json entity before -- acting, but carries on validation as it moves along, reporting failure -- when it comes. Though it is generally faster and accumulates less -- memory than the usual aeson parsers, it is by no means a universal -- replacement for aeson's behavior. It will certainly be sensible, for -- example, wherever you are executing a left fold over the subordinate -- elements, e.g. gathering certain statistics about them. -- -- Here we use a long top level array of objects from a file -- json-streams benchmarking directory. Each object in the top -- level array has a "friends" field with an assocated array of friends; -- each of these has a "name". Here, we extract the name of each friend -- of each person recorded in the level array, and enumerate them all: -- --
-- import Streaming
-- import qualified Streaming.Prelude as S
-- import Data.ByteString.Streaming.HTTP
-- import Data.ByteString.Streaming.Aeson (streamParse)
-- import Data.JsonStream.Parser (string, arrayOf, (.:), Parser)
-- import Data.Text (Text)
-- import Data.Function ((&))
-- main = do
-- req <- parseRequest "https://raw.githubusercontent.com/ondrap/json-stream/master/benchmarks/json-data/buffer-builder.json"
-- m <- newManager tlsManagerSettings
-- withHTTP req m $ \resp -> do
-- let names, friend_names :: Parser Text
-- names = arrayOf ("name" .: string)
-- friend_names = arrayOf ("friends" .: names)
-- responseBody resp -- raw bytestream from http-client
-- & streamParse friend_names -- find name fields in each sub-array of friends
-- & void -- drop material after any bad parse
-- & S.zip (S.each [1..]) -- number the friends' names
-- & S.print -- successively print to stdout
--
-- -- (1,"Joyce Jordan")
-- -- (2,"Ophelia Rosales")
-- -- (3,"Florine Stark")
-- -- ...
-- -- (287,"Hilda Craig")
-- -- (288,"Leola Higgins")
--
--
-- This program does not accumulate the whole byte stream, as an aeson
-- parser for a top-level json entity would. Rather it streams and
-- enumerates friends' names as soon as they come. With appropriate
-- instances, we could of course just stream the objects in the top-level
-- array instead.
module Data.ByteString.Streaming.Aeson
data DecodingError
-- | An attoparsec error that happened while parsing the raw JSON
-- string.
AttoparsecError :: ParsingError -> DecodingError
-- | An aeson error that happened while trying to convert a
-- Value to an FromJSON instance, as reported by
-- Error.
FromJSONError :: String -> DecodingError
-- | This instance allows using errorP with decoded and
-- decodedL instance Error (DecodingError, Producer a m r)
--
-- Consecutively parse a elements from the given
-- Producer using the given parser (such as decode or
-- parseValue), skipping any leading whitespace each time.
--
-- This Producer runs until it either runs out of input or until
-- a decoding failure occurs, in which case it returns Left with a
-- DecodingError and a Producer with any leftovers. You
-- can use errorP to turn the Either return value into an
-- ErrorT monad transformer.
--
-- Like encode, except it accepts any ToJSON instance, not
-- just Value or Value.
encode :: (Monad m, ToJSON a) => a -> ByteString m ()
-- | Given a bytestring, parse a top level json entity - returning any
-- leftover bytes.
decode :: (Monad m, FromJSON a) => StateT (ByteString m x) m (Either DecodingError a)
-- | Resolve a succession of top-level json items into a corresponding
-- stream of Haskell values.
decoded :: (Monad m, FromJSON a) => ByteString m r -> Stream (Of a) m (Either (DecodingError, ByteString m r) r)
-- | Experimental. Parse a bytestring with a json-streams parser.
-- The function will read through the whole of a single top level json
-- entity, streaming the valid parses as they arise. (It will thus for
-- example parse an infinite json bytestring, though these are rare in
-- practice ...)
--
-- If the parser is fitted to recognize only one thing, then zero or one
-- item will be yielded; if it uses combinators like arrayOf, it
-- will stream many values as they arise. See the example at the top of
-- this module, in which values inside a top level array are emitted as
-- they are parsed. Aeson would accumulate the whole bytestring before
-- declaring on the contents of the array. This of course makes sense,
-- since attempt to parse a json array may end with a bad parse,
-- invalidating the json as a whole. With json-streams, a bad
-- parse will also of course emerge in the end, but only after the
-- initial good parses are streamed. This too makes sense though, but in
-- a smaller range of contexts -- for example, where one is folding over
-- the parsed material.
--
-- This function is closely modelled on parseByteString and
-- parseLazyByteString from Data.JsonStream.Parser.
streamParse :: Monad m => Parser a -> ByteString m r -> Stream (Of a) m (Maybe String, ByteString m r)
instance Data.Data.Data Data.ByteString.Streaming.Aeson.DecodingError
instance GHC.Classes.Eq Data.ByteString.Streaming.Aeson.DecodingError
instance GHC.Show.Show Data.ByteString.Streaming.Aeson.DecodingError
instance GHC.Exception.Type.Exception Data.ByteString.Streaming.Aeson.DecodingError
-- | This module replicates `pipes-http` as closely as will type-check,
-- adding a conduit-like http in ResourceT and a
-- primitive simpleHTTP that emits a streaming bytestring rather
-- than a lazy one.
--
-- Here is an example GET request that streams the response body to
-- standard output:
--
-- -- import qualified Data.ByteString.Streaming as Q -- import Data.ByteString.Streaming.HTTP -- -- main = do -- req <- parseRequest "https://www.example.com" -- m <- newManager tlsManagerSettings -- withHTTP req m $ \resp -> Q.stdout (responseBody resp) ---- -- Here is an example POST request that also streams the request body -- from standard input: -- --
-- {-#LANGUAGE OverloadedStrings #-}
-- import qualified Data.ByteString.Streaming as Q
-- import Data.ByteString.Streaming.HTTP
--
-- main = do
-- req <- parseRequest "https://httpbin.org/post"
-- let req' = req
-- { method = "POST"
-- , requestBody = stream Q.stdin
-- }
-- m <- newManager tlsManagerSettings
-- withHTTP req' m $ \resp -> Q.stdout (responseBody resp)
--
--
-- Here is the GET request modified to use http and write to a
-- file. runResourceT manages the file handle and the
-- interaction.
--
-- -- import qualified Data.ByteString.Streaming as Q -- import Data.ByteString.Streaming.HTTP -- -- main = do -- req <- parseUrlThrow "https://www.example.com" -- m <- newManager tlsManagerSettings -- runResourceT $ do -- resp <- http request manager -- Q.writeFile "example.html" (responseBody resp) ---- -- simpleHTTP can be used in ghci like so: -- --
-- ghci> runResourceT $ Q.stdout $ Q.take 137 $ simpleHTTP "http://lpaste.net/raw/13" -- -- Adaptation and extension of a parser for data definitions given in -- -- appendix of G. Huttons's paper - Monadic Parser Combinators. -- -- --module Data.ByteString.Streaming.HTTP -- | Send an HTTP Request and wait for an HTTP Response withHTTP :: Request -> Manager -> (Response (ByteString IO ()) -> IO a) -> IO a http :: MonadResource m => Request -> Manager -> m (Response (ByteString m ())) -- | Create a RequestBody from a content length and an effectful -- ByteString streamN :: Int64 -> ByteString IO () -> RequestBody -- | Create a RequestBody from an effectful ByteString -- -- stream is more flexible than streamN, but requires the -- server to support chunked transfer encoding. stream :: ByteString IO () -> RequestBody -- | This is a quick method - oleg would call it 'unprofessional' - to -- bring a web page in view. It sparks its own internal manager and -- closes itself. Thus something like this makes sense -- --
-- >>> runResourceT $ Q.putStrLn $ simpleHttp "http://lpaste.net/raw/12" -- chunk _ [] = [] -- chunk n xs = let h = take n xs in h : (chunk n (drop n xs)) ---- -- but if you try something like -- --
-- >>> rest <- runResourceT $ Q.putStrLn $ Q.splitAt 40 $ simpleHTTP "http://lpaste.net/raw/146532" -- import Data.ByteString.Streaming.HTTP ---- -- it will just be good luck if with -- --
-- >>> runResourceT $ Q.putStrLn rest ---- -- you get the rest of the file: -- --
-- import qualified Data.ByteString.Streaming.Char8 as Q -- main = runResourceT $ Q.putStrLn $ simpleHTTP "http://lpaste.net/raw/146532" ---- -- rather than -- --
-- *** Exception: <socket: 13>: hGetBuf: illegal operation (handle is closed) ---- -- Since, of course, the handle was already closed by the first use of -- runResourceT. The same applies of course to the more hygienic -- withHTTP above, which permits one to extract an IO -- (ByteString IO r), by using splitAt or the like. -- -- The reaction of some streaming-io libraries was simply to forbid -- operations like splitAt. That this paternalism was not viewed -- as simply outrageous is a consequence of the opacity of the older -- iteratee-io libraries. It is obvious that I can no more run an -- effectful bytestring after I have made its effects impossible by using -- runResourceT (which basically means -- closeEverythingDown). I might as well try to run it after -- tossing my machine into the flames. Similarly, it is obvious that I -- cannot read from a handle after I have applied hClose; there -- is simply no difference between the two cases. simpleHTTP :: MonadResource m => String -> ByteString m () -- | The Resource transformer. This transformer keeps track of all -- registered actions, and calls them upon exit (via -- runResourceT). Actions may be registered via -- register, or resources may be allocated atomically via -- allocate. allocate corresponds closely to -- bracket. -- -- Releasing may be performed before exit via the release -- function. This is a highly recommended optimization, as it will ensure -- that scarce resources are freed early. Note that calling -- release will deregister the action, so that a release action -- will only ever be called once. -- -- Since 0.3.0 data ResourceT (m :: Type -> Type) a -- | A Monad which allows for safe resource allocation. In theory, -- any monad transformer stack which includes a ResourceT can be -- an instance of MonadResource. -- -- Note: runResourceT has a requirement for a MonadUnliftIO -- m monad, which allows control operations to be lifted. A -- MonadResource does not have this requirement. This means that -- transformers such as ContT can be an instance of -- MonadResource. However, the ContT wrapper will need -- to be unwrapped before calling runResourceT. -- -- Since 0.3.0 class MonadIO m => MonadResource (m :: Type -> Type) -- | Lift a ResourceT IO action into the current Monad. -- -- Since 0.4.0 liftResourceT :: MonadResource m => ResourceT IO a -> m a -- | Unwrap a ResourceT transformer, and call all registered release -- actions. -- -- Note that there is some reference counting involved due to -- resourceForkIO. If multiple threads are sharing the same -- collection of resources, only the last call to runResourceT -- will deallocate the resources. -- -- NOTE Since version 1.2.0, this function will throw a -- ResourceCleanupException if any of the cleanup functions throw -- an exception. runResourceT :: MonadUnliftIO m => ResourceT m a -> m a -- | This hyper-minimal module closely follows the corresponding module in -- Renzo Carbonara' 'pipes-network' package. It is meant to be used -- together with the Network.Simple.TCP module from Carbonara's -- network-simple package, which is completely re-exported from -- this module. module Streaming.Network.TCP -- | Receives bytes from a connected socket with a maximum chunk size. The -- bytestream ends if the remote peer closes its side of the connection -- or EOF is received. The implementation is as follows: -- --
-- fromSocket sock nbytes = loop where -- loop = do -- bs <- liftIO (NSB.recv sock nbytes) -- if B.null bs -- then return () -- else Q.chunk bs >> loop --fromSocket :: MonadIO m => Socket -> Int -> ByteString m () -- | Connect a stream of bytes to the remote end. The implementation is -- again very simple: -- --
-- toSocket sock = loop where -- loop bs = do -- e <- Q.nextChunk bs -- case e of -- Left r -> return r -- Right (b,rest) -> send sock b >> loop rest --toSocket :: MonadIO m => Socket -> ByteString m r -> m r -- | A socket data type. Sockets are not GCed unless they are closed -- by close. data Socket -- | Either a host name e.g., "haskell.org" or a numeric host -- address string consisting of a dotted decimal IPv4 address or an IPv6 -- address e.g., "192.168.0.1". type HostName = String type ServiceName = String -- | With older versions of the network library (version 2.6.0.2 -- or earlier) on Windows operating systems, the networking subsystem -- must be initialised using withSocketsDo before any networking -- operations can be used. eg. -- --
-- main = withSocketsDo $ do {...}
--
--
-- It is fine to nest calls to withSocketsDo, and to perform
-- networking operations after withSocketsDo has returned.
--
-- In newer versions of the network library (version v2.6.1.0 or
-- later) it is only necessary to call withSocketsDo if you are
-- calling the MkSocket constructor directly. However, for
-- compatibility with older versions on Windows, it is good practice to
-- always call withSocketsDo (it's very cheap).
withSocketsDo :: () => IO a -> IO a
-- | The existence of a constructor does not necessarily imply that that
-- socket address type is supported on your system: see
-- isSupportedSockAddr.
data SockAddr
-- | Writes the given list of ByteStrings to the socket.
--
-- Note: This uses writev(2) on POSIX.
sendMany :: MonadIO m => Socket -> [ByteString] -> m ()
-- | Writes a lazy ByteString to the socket.
--
-- Note: This uses writev(2) on POSIX.
sendLazy :: MonadIO m => Socket -> ByteString -> m ()
-- | Writes a ByteString to the socket.
--
-- Note: On POSIX, calling sendLazy once is much more efficient
-- than repeatedly calling send on strict ByteStrings. Use
-- sendLazy sock . fromChunks if you have more
-- than one strict ByteString to send.
send :: MonadIO m => Socket -> ByteString -> m ()
-- | Read up to a limited number of bytes from a socket.
--
-- Returns Nothing if the remote end closed the connection or
-- end-of-input was reached. The number of returned bytes might be less
-- than the specified limit, but it will never null.
recv :: MonadIO m => Socket -> Int -> m (Maybe ByteString)
-- | Shuts down and closes the Socket, silently ignoring any
-- synchronous exception that might happen.
closeSock :: MonadIO m => Socket -> m ()
-- | Obtain a Socket bound to the given host name and TCP service
-- port.
--
-- The obtained Socket should be closed manually using
-- closeSock when it's not needed anymore.
--
-- Prefer to use listen if you will be listening on this socket
-- and using it within a limited scope, and would like it to be closed
-- immediately after its usage or in case of exceptions.
--
-- Note: The NoDelay, KeepAlive and ReuseAddr
-- options are set on the socket.
bindSock :: MonadIO m => HostPreference -> ServiceName -> m (Socket, SockAddr)
-- | Obtain a Socket connected to the given host and TCP service
-- port.
--
-- The obtained Socket should be closed manually using
-- closeSock when it's not needed anymore, otherwise you risk
-- having the connection and socket open for much longer than needed.
--
-- Prefer to use connect if you will be using the socket within a
-- limited scope and would like it to be closed immediately after its
-- usage or in case of exceptions.
--
-- Note: The NoDelay and KeepAlive options are set on the
-- socket.
connectSock :: MonadIO m => HostName -> ServiceName -> m (Socket, SockAddr)
-- | Accept a single incoming connection and use it in a different thread.
--
-- The connection socket is shut down and closed when done or in case of
-- exceptions.
acceptFork :: MonadIO m => Socket -> ((Socket, SockAddr) -> IO ()) -> m ThreadId
-- | Accept a single incoming connection and use it.
--
-- The connection socket is shut down and closed when done or in case of
-- exceptions.
accept :: (MonadIO m, MonadMask m) => Socket -> ((Socket, SockAddr) -> m r) -> m r
-- | Bind a TCP listening socket and use it.
--
-- The listening socket is closed when done or in case of exceptions.
--
-- If you prefer to acquire and close the socket yourself, then use
-- bindSock and closeSock, as well as listenSock
-- function.
--
-- Note: The NoDelay, KeepAlive and ReuseAddr
-- options are set on the socket. The maximum number of incoming queued
-- connections is 2048.
listen :: (MonadIO m, MonadMask m) => HostPreference -> ServiceName -> ((Socket, SockAddr) -> m r) -> m r
-- | Start a TCP server that accepts incoming connections and handles them
-- concurrently in different threads.
--
-- Any acquired sockets are properly shut down and closed when done or in
-- case of exceptions. Exceptions from the threads handling the
-- individual connections won't cause serve to die.
--
-- Note: This function performs listen, acceptFork, so
-- don't perform those manually.
serve :: MonadIO m => HostPreference -> ServiceName -> ((Socket, SockAddr) -> IO ()) -> m a
-- | Connect to a TCP server and use the connection.
--
-- The connection socket is shut down and closed when done or in case of
-- exceptions.
--
-- If you prefer to acquire and close the socket yourself, then use
-- connectSock and closeSock.
--
-- Note: The NoDelay and KeepAlive options are set on the
-- socket.
connect :: (MonadIO m, MonadMask m) => HostName -> ServiceName -> ((Socket, SockAddr) -> m r) -> m r
-- | Preferred host to bind.
data HostPreference
-- | Any available host.
HostAny :: HostPreference
-- | Any available IPv4 host.
HostIPv4 :: HostPreference
-- | Any available IPv6 host.
HostIPv6 :: HostPreference
-- | An explicit host name.
Host :: HostName -> HostPreference
-- | Pipes.Group.Tutorial is the correct introduction to the use of
-- this module, which is mostly just an optimized Pipes.Group,
-- replacing FreeT with Stream. The module also
-- includes optimized functions for interoperation:
--
-- -- fromStream :: Monad m => Stream (Of a) m r -> Producer' a m r -- toStream :: Monad m => Producer a m r -> Stream (Of a) m r ---- -- . It is not a drop in replacement for Pipes.Group. The only -- systematic difference is that this simple module omits lenses. It is -- hoped that this will may make elementary usage easier to grasp. The -- lenses exported the pipes packages only come into their own with the -- simple StateT parsing procedure pipes promotes. We are not -- attempting here to replicate this advanced procedure, but only to make -- elementary forms of breaking and splitting possible in the simplest -- possible way. . The pipes-group tutorial is framed as a hunt -- for a genuinely streaming threeGroups, which would collect -- the first three groups of matching items while never holding more than -- the present item in memory. The formulation it opts for in the end -- would be expressed here thus: -- --
-- import Pipes -- import Streaming.Pipes -- import qualified Pipes.Prelude as P -- -- threeGroups :: (Monad m, Eq a) => Producer a m () -> Producer a m () -- threeGroups = concats . takes 3 . groups ---- -- The program splits the initial producer into a connected stream of -- producers containing "equal" values; it takes three of those; and then -- erases the effects of splitting. So for example -- --
-- >>> runEffect $ threeGroups (each "aabccoooooo") >-> P.print -- 'a' -- 'a' -- 'b' -- 'c' -- 'c' ---- -- The new user might look at the examples of splitting, breaking and -- joining in Streaming.Prelude keeping in mind that -- Producer a m r is equivalent to Stream (Of a) m r. . -- For the rest, only part of the tutorial that would need revision is -- the bit at the end about writing explicit FreeT programs. -- Here one does not proceed by pattern matching, but uses inspect -- in place of runFreeT -- --
-- inspect :: (Monad m, Functor f) => Stream f m r -> m (Either r (f (Stream f m r))) ---- -- and for construction of a Stream (Producer a m) m r, the -- usual battery of combinators: -- --
-- wrap :: (Monad m, Functor f) => f (Stream f m r) -> Stream f m r -- effect :: (Monad m, Functor f) => m (Stream f m r) -> Stream f m r -- yields :: (Monad m, Functor f) => f r -> Stream f m r -- lift :: (Monad m, Functor f) => m r -> Stream f m r ---- -- and so on. module Streaming.Pipes -- | Construct an ordinary pipes Producer from a Stream of -- elements -- --
-- >>> runEffect $ fromStream (S.each [1..3]) >-> P.print -- 1 -- 2 -- 3 --fromStream :: Monad m => Stream (Of a) m r -> Producer' a m r -- | Construct a Stream of elements from a pipes -- Producer -- --
-- >>> S.print $ toStream $ P.each [1..3] -- 1 -- 2 -- 3 --toStream :: Monad m => Producer a m r -> Stream (Of a) m r -- | Link the chunks of a producer of bytestrings into a single byte stream toStreamingByteString :: Monad m => Producer ByteString m r -> ByteString m r -- | Successively yield the chunks hidden in a byte stream. fromStreamingByteString :: Monad m => ByteString m r -> Producer' ByteString m r -- | chunksOf splits a Producer into a Stream of -- Producers of a given length. Its inverse is concats. -- --
-- >>> let listN n = L.purely P.folds L.list . P.chunksOf n -- -- >>> runEffect $ listN 3 P.stdinLn >-> P.take 2 >-> P.map unwords >-> P.print -- 1<Enter> -- 2<Enter> -- 3<Enter> -- "1 2 3" -- 4<Enter> -- 5<Enter> -- 6<Enter> -- "4 5 6" -- -- >>> let stylish = P.concats . P.maps (<* P.yield "-*-") . P.chunksOf 2 -- -- >>> runEffect $ stylish (P.each $ words "one two three four five six") >-> P.stdoutLn -- one -- two -- -*- -- three -- four -- -*- -- five -- six -- -*- --chunksOf :: Monad m => Int -> Producer a m r -> Stream (Producer a m) m r groups :: (Monad m, Eq a) => Producer a m r -> Stream (Producer a m) m r -- | groupsBy splits a Producer into a Stream of -- Producers of equal items. Its inverse is concats groupsBy :: Monad m => (a -> a -> Bool) -> Producer a m r -> Stream (Producer a m) m r -- | groupsBy' splits a Producer into a Stream of -- Producers grouped using the given relation. Its inverse is -- concats -- -- This differs from groupsBy by comparing successive elements -- instead of comparing each element to the first member of the group -- --
-- >>> import Pipes (yield, each) -- -- >>> import Pipes.Prelude (toList) -- -- >>> let rel c1 c2 = succ c1 == c2 -- -- >>> (toList . intercalates (yield '|') . groupsBy' rel) (each "12233345") -- "12|23|3|345" -- -- >>> (toList . intercalates (yield '|') . groupsBy rel) (each "12233345") -- "122|3|3|34|5" --groupsBy' :: Monad m => (a -> a -> Bool) -> Producer a m r -> Stream (Producer a m) m r split :: (Eq a, Monad m) => a -> Producer a m r -> Stream (Producer a m) m r breaks :: (Eq a, Monad m) => (a -> Bool) -> Producer a m r -> Stream (Producer a m) m r -- | Join a stream of Producers into a single Producer concats :: Monad m => Stream (Producer a m) m r -> Producer a m r -- | Interpolate a layer at each segment. This specializes to e.g. -- --
-- intercalates :: (Monad m, Functor f) => Stream f m () -> Stream (Stream f m) m r -> Stream f m r --intercalates :: (Monad m, Monad (t m), MonadTrans t) => t m x -> Stream (t m) m r -> t m r -- | Fold each Producer in a producer Stream -- --
-- purely folds -- :: Monad m => Fold a b -> Stream (Producer a m) m r -> Producer b m r --folds :: Monad m => (x -> a -> x) -> x -> (x -> b) -> Stream (Producer a m) m r -> Producer b m r -- | Fold each Producer in a Producer stream, monadically -- --
-- impurely foldsM -- :: Monad m => FoldM a b -> Stream (Producer a m) m r -> Producer b m r --foldsM :: Monad m => (x -> a -> m x) -> m x -> (x -> m b) -> Stream (Producer a m) m r -> Producer b m r takes :: (Monad m, Functor f) => Int -> Stream f m r -> Stream f m () -- | (takes' n) only keeps the first n Producers -- of a linked Stream of Producers -- -- Unlike takes, takes' is not functor-general - it is -- aware that a Producer can be drained, as functors cannot -- generally be. Here, then, we drain the unused Producers in -- order to preserve the return value. This makes it a suitable argument -- for maps. takes' :: Monad m => Int -> Stream (Producer a m) m r -> Stream (Producer a m) m r -- | Map layers of one functor to another with a transformation. Compare -- hoist, which has a similar effect on the monadic parameter. -- --
-- maps id = id -- maps f . maps g = maps (f . g) --maps :: (Monad m, Functor f) => (forall x. () => f x -> g x) -> Stream f m r -> Stream g m r -- | span splits a Producer into two Producers; the -- outer Producer is the longest consecutive group of elements -- that satisfy the predicate. Its inverse is join span :: Monad m => (a -> Bool) -> Producer a m r -> Producer a m (Producer a m r) -- | break splits a Producer into two Producers; the -- outer Producer is the longest consecutive group of elements -- that fail the predicate. Its inverse is join break :: Monad m => (a -> Bool) -> Producer a m r -> Producer a m (Producer a m r) -- | splitAt divides a Producer into two Producers -- after a fixed number of elements. Its inverse is join splitAt :: Monad m => Int -> Producer a m r -> Producer a m (Producer a m r) -- | Like groupBy, where the equality predicate is (==) group :: (Monad m, Eq a) => Producer a m r -> Producer a m (Producer a m r) -- | groupBy splits a Producer into two Producers; the -- second producer begins where we meet an element that is different -- according to the equality predicate. Its inverse is join groupBy :: Monad m => (a -> a -> Bool) -> Producer a m r -> Producer a m (Producer a m r) -- | This module modifies material in Renzo Carbonara's pipes-zlib -- package. module Streaming.Zip -- | Decompress a streaming bytestring. WindowBits is from -- Codec.Compression.Zlib -- --
-- decompress defaultWindowBits :: MonadIO m => ByteString m r -> ByteString m r --decompress :: MonadIO m => WindowBits -> ByteString m r -> ByteString m r -- | Decompress a zipped byte stream, returning any leftover input that -- follows the compressed material. decompress' :: MonadIO m => WindowBits -> ByteString m r -> ByteString m (Either (ByteString m r) r) -- | Compress a byte stream. -- -- See the Codec.Compression.Zlib module for details about -- CompressionLevel and WindowBits. compress :: MonadIO m => CompressionLevel -> WindowBits -> ByteString m r -> ByteString m r -- | Decompress a gzipped byte stream. gunzip :: MonadIO m => ByteString m r -> ByteString m r -- | Decompress a gzipped byte stream, returning any leftover input that -- follows the compressed stream. gunzip' :: MonadIO m => ByteString m r -> ByteString m (Either (ByteString m r) r) -- | Compress a byte stream in the gzip format. gzip :: MonadIO m => CompressionLevel -> ByteString m r -> ByteString m r -- | How hard should we try to compress? data CompressionLevel defaultCompression :: CompressionLevel noCompression :: CompressionLevel bestSpeed :: CompressionLevel bestCompression :: CompressionLevel -- | A specific compression level between 0 and 9. compressionLevel :: Int -> CompressionLevel -- | The default WindowBits is 15 which is also the maximum size. defaultWindowBits :: WindowBits windowBits :: Int -> WindowBits instance GHC.Classes.Ord Streaming.Zip.CompressionLevel instance GHC.Classes.Eq Streaming.Zip.CompressionLevel instance GHC.Read.Read Streaming.Zip.CompressionLevel instance GHC.Show.Show Streaming.Zip.CompressionLevel