{-# LANGUAGE CPP #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE MultiParamTypeClasses     #-}
{-# LANGUAGE NoImplicitPrelude         #-}
{-# LANGUAGE NoMonomorphismRestriction #-}
{-# LANGUAGE BangPatterns #-}
-- | This module is meant as a replacement for Data.Conduit.List.
-- That module follows a naming scheme which was originally inspired
-- by its enumerator roots. This module is meant to introduce a naming
-- scheme which encourages conduit best practices.
--
-- There are two versions of functions in this module. Those with a trailing
-- E work in the individual elements of a chunk of data, e.g., the bytes of
-- a ByteString, the Chars of a Text, or the Ints of a Vector Int. Those
-- without a trailing E work on unchunked streams.
--
-- FIXME: discuss overall naming, usage of mono-traversable, etc
--
-- Mention take (Conduit) vs drop (Consumer)
module Data.Conduit.Combinators
    ( -- * Producers
      -- ** Pure
      yieldMany
    , unfold
    , enumFromTo
    , iterate
    , repeat
    , replicate
    , sourceLazy

      -- ** Monadic
    , repeatM
    , repeatWhileM
    , replicateM

      -- ** I\/O
    , sourceFile
    , sourceFileBS
    , sourceHandle
    , sourceHandleUnsafe
    , sourceIOHandle
    , stdin
    , withSourceFile

      -- ** Filesystem
    , sourceDirectory
    , sourceDirectoryDeep

      -- * Consumers
      -- ** Pure
    , drop
    , dropE
    , dropWhile
    , dropWhileE
    , fold
    , foldE
    , foldl
    , foldl1
    , foldlE
    , foldMap
    , foldMapE
    , foldWhile
    , all
    , allE
    , any
    , anyE
    , and
    , andE
    , or
    , orE
    , asum
    , elem
    , elemE
    , notElem
    , notElemE
    , sinkLazy
    , sinkList
    , sinkVector
    , sinkVectorN
    , sinkLazyBuilder
    , sinkNull
    , awaitNonNull
    , head
    , headDef
    , headE
    , peek
    , peekE
    , last
    , lastDef
    , lastE
    , length
    , lengthE
    , lengthIf
    , lengthIfE
    , maximum
    , maximumE
    , minimum
    , minimumE
    , null
    , nullE
    , sum
    , sumE
    , product
    , productE
    , find

      -- ** Monadic
    , mapM_
    , mapM_E
    , foldM
    , foldME
    , foldMapM
    , foldMapME

      -- ** I\/O
    , sinkFile
    , sinkFileCautious
    , sinkTempFile
    , sinkSystemTempFile
    , sinkFileBS
    , sinkHandle
    , sinkIOHandle
    , print
    , stdout
    , stderr
    , withSinkFile
    , withSinkFileBuilder
    , withSinkFileCautious
    , sinkHandleBuilder
    , sinkHandleFlush

      -- * Transformers
      -- ** Pure
    , map
    , mapE
    , omapE
    , concatMap
    , concatMapE
    , take
    , takeE
    , takeWhile
    , takeWhileE
    , takeExactly
    , takeExactlyE
    , concat
    , filter
    , filterE
    , mapWhile
    , conduitVector
    , scanl
    , mapAccumWhile
    , concatMapAccum
    , intersperse
    , slidingWindow
    , chunksOfE
    , chunksOfExactlyE

      -- ** Monadic
    , mapM
    , mapME
    , omapME
    , concatMapM
    , filterM
    , filterME
    , iterM
    , scanlM
    , mapAccumWhileM
    , concatMapAccumM

      -- ** Textual
    , encodeUtf8
    , decodeUtf8
    , decodeUtf8Lenient
    , line
    , lineAscii
    , unlines
    , unlinesAscii
    , takeExactlyUntilE
    , linesUnbounded
    , linesUnboundedAscii
    , splitOnUnboundedE

      -- ** Builders
    , builderToByteString
    , unsafeBuilderToByteString
    , builderToByteStringWith
    , builderToByteStringFlush
    , builderToByteStringWithFlush
    , BufferAllocStrategy
    , allNewBuffersStrategy
    , reuseBufferStrategy

      -- * Special
    , vectorBuilder
    , mapAccumS
    , peekForever
    , peekForeverE
    ) where

-- BEGIN IMPORTS

import           Data.ByteString.Builder     (Builder, toLazyByteString, hPutBuilder)
import qualified Data.ByteString.Builder.Internal as BB (flush)
import qualified Data.ByteString.Builder.Extra as BB (runBuilder, Next(Done, More, Chunk))
import qualified Data.NonNull as NonNull
import qualified Data.Traversable
import qualified Data.ByteString as S
import qualified Data.ByteString.Lazy as BL
import           Data.ByteString.Lazy.Internal (defaultChunkSize)
import           Control.Applicative         (Alternative(..), (<$>))
import           Control.Exception           (catch, throwIO, finally, bracket, try, evaluate)
import           Control.Category            (Category (..))
import           Control.Monad               (unless, when, (>=>), liftM, forever)
import           Control.Monad.IO.Unlift     (MonadIO (..), MonadUnliftIO, withRunInIO)
import           Control.Monad.Primitive     (PrimMonad, PrimState, unsafePrimToPrim)
import           Control.Monad.Trans.Class   (lift)
import           Control.Monad.Trans.Resource (MonadResource, MonadThrow, allocate, throwM)
import           Data.Conduit
import           Data.Conduit.Internal       (ConduitT (..), Pipe (..))
import qualified Data.Conduit.List           as CL
import           Data.IORef
import           Data.Maybe                  (fromMaybe, isNothing, isJust)
import           Data.Monoid                 (Monoid (..))
import           Data.MonoTraversable
import qualified Data.Sequences              as Seq
import qualified Data.Vector.Generic         as V
import qualified Data.Vector.Generic.Mutable as VM
import           Data.Void                   (absurd)
import           Prelude                     (Bool (..), Eq (..), Int,
                                              Maybe (..), Either (..), Monad (..), Num (..),
                                              Ord (..), fromIntegral, maybe, either,
                                              ($), Functor (..), Enum, seq, Show, Char,
                                              otherwise, Either (..), not,
                                              ($!), succ, FilePath, IO, String)
import Data.Word (Word8)
import qualified Prelude
import qualified System.IO                   as IO
import           System.IO.Error             (isDoesNotExistError)
import           System.IO.Unsafe            (unsafePerformIO)
import Data.ByteString (ByteString)
import Data.Text (Text)
import qualified Data.Text as T
import qualified Data.Text.Encoding as TE
import qualified Data.Text.Encoding.Error as TEE
import Data.Conduit.Combinators.Stream
import Data.Conduit.Internal.Fusion
import           Data.Primitive.MutVar       (MutVar, newMutVar, readMutVar,
                                              writeMutVar)
import qualified Data.Streaming.FileRead as FR
import qualified Data.Streaming.Filesystem as F
import           GHC.ForeignPtr (mallocPlainForeignPtrBytes, unsafeForeignPtrToPtr)
import           Foreign.ForeignPtr (touchForeignPtr, ForeignPtr)
import           Foreign.Ptr (Ptr, plusPtr, minusPtr)
import           Data.ByteString.Internal (ByteString (PS), mallocByteString)
import           System.FilePath ((</>), (<.>), takeDirectory, takeFileName)
import           System.Directory (renameFile, getTemporaryDirectory, removeFile)

import qualified Data.Sequences as DTE
import           Data.Sequences (LazySequence (..))

-- Defines INLINE_RULE0, INLINE_RULE, STREAMING0, and STREAMING.
#include "fusion-macros.h"

-- END IMPORTS

-- TODO:
--
--   * The functions sourceRandom* are based on, initReplicate and
--   initRepeat have specialized versions for when they're used with
--   ($$).  How does this interact with stream fusion?
--
--   * Is it possible to implement fusion for vectorBuilder?  Since it
--   takes a Sink yielding function as an input, the rewrite rule
--   would need to trigger when that parameter looks something like
--   (\x -> unstream (...)).  I don't see anything preventing doing
--   this, but it would be quite a bit of code.

-- NOTE: Fusion isn't possible for the following operations:
--
--   * Due to a lack of leftovers:
--     - dropE, dropWhile, dropWhileE
--     - headE
--     - peek, peekE
--     - null, nullE
--     - takeE, takeWhile, takeWhileE
--     - mapWhile
--     - codeWith
--     - line
--     - lineAscii
--
--   * Due to a use of leftover in a dependency:
--     - Due to "codeWith": encodeBase64, decodeBase64, encodeBase64URL, decodeBase64URL, decodeBase16
--     - due to "CT.decode": decodeUtf8, decodeUtf8Lenient
--
--   * Due to lack of resource cleanup (e.g. bracketP):
--     - sourceDirectory
--     - sourceDirectoryDeep
--     - sourceFile
--
--   * takeExactly / takeExactlyE - no monadic bind.  Another way to
--   look at this is that subsequent streams drive stream evaluation,
--   so there's no way for the conduit to guarantee a certain amount
--   of demand from the upstream.

-- | Yield each of the values contained by the given @MonoFoldable@.
--
-- This will work on many data structures, including lists, @ByteString@s, and @Vector@s.
--
-- Subject to fusion
--
-- @since 1.3.0
yieldMany, yieldManyC :: (Monad m, MonoFoldable mono)
                      => mono
                      -> ConduitT i (Element mono) m ()
yieldManyC :: mono -> ConduitT i (Element mono) m ()
yieldManyC = (Element mono -> ConduitT i (Element mono) m ())
-> mono -> ConduitT i (Element mono) m ()
forall mono m.
(MonoFoldable mono, Monoid m) =>
(Element mono -> m) -> mono -> m
ofoldMap Element mono -> ConduitT i (Element mono) m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield
{-# INLINE yieldManyC #-}
STREAMING(yieldMany, yieldManyC, yieldManyS, x)

-- | Generate a producer from a seed value.
--
-- Subject to fusion
--
-- @since 1.3.0
unfold :: Monad m
       => (b -> Maybe (a, b))
       -> b
       -> ConduitT i a m ()
INLINE_RULE(unfold, f x, CL.unfold f x)

-- | Enumerate from a value to a final value, inclusive, via 'succ'.
--
-- This is generally more efficient than using @Prelude@\'s @enumFromTo@ and
-- combining with @sourceList@ since this avoids any intermediate data
-- structures.
--
-- Subject to fusion
--
-- @since 1.3.0
enumFromTo :: (Monad m, Enum a, Ord a) => a -> a -> ConduitT i a m ()
INLINE_RULE(enumFromTo, f t, CL.enumFromTo f t)

-- | Produces an infinite stream of repeated applications of f to x.
--
-- Subject to fusion
--
-- @since 1.3.0
iterate :: Monad m => (a -> a) -> a -> ConduitT i a m ()
INLINE_RULE(iterate, f t, CL.iterate f t)

-- | Produce an infinite stream consisting entirely of the given value.
--
-- Subject to fusion
--
-- @since 1.3.0
repeat :: Monad m => a -> ConduitT i a m ()
INLINE_RULE(repeat, x, iterate id x)

-- | Produce a finite stream consisting of n copies of the given value.
--
-- Subject to fusion
--
-- @since 1.3.0
replicate :: Monad m
          => Int
          -> a
          -> ConduitT i a m ()
INLINE_RULE(replicate, n x, CL.replicate n x)

-- | Generate a producer by yielding each of the strict chunks in a @LazySequence@.
--
-- For more information, see 'toChunks'.
--
-- Subject to fusion
--
-- @since 1.3.0
sourceLazy :: (Monad m, LazySequence lazy strict)
           => lazy
           -> ConduitT i strict m ()
INLINE_RULE(sourceLazy, x, yieldMany (toChunks x))

-- | Repeatedly run the given action and yield all values it produces.
--
-- Subject to fusion
--
-- @since 1.3.0
repeatM, repeatMC :: Monad m
                  => m a
                  -> ConduitT i a m ()
repeatMC :: m a -> ConduitT i a m ()
repeatMC m a
m = ConduitT i a m () -> ConduitT i a m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (ConduitT i a m () -> ConduitT i a m ())
-> ConduitT i a m () -> ConduitT i a m ()
forall a b. (a -> b) -> a -> b
$ m a -> ConduitT i a m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift m a
m ConduitT i a m a -> (a -> ConduitT i a m ()) -> ConduitT i a m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= a -> ConduitT i a m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield
{-# INLINE repeatMC #-}
STREAMING(repeatM, repeatMC, repeatMS, m)

-- | Repeatedly run the given action and yield all values it produces, until
-- the provided predicate returns @False@.
--
-- Subject to fusion
--
-- @since 1.3.0
repeatWhileM, repeatWhileMC :: Monad m
                            => m a
                            -> (a -> Bool)
                            -> ConduitT i a m ()
repeatWhileMC :: m a -> (a -> Bool) -> ConduitT i a m ()
repeatWhileMC m a
m a -> Bool
f =
    ConduitT i a m ()
loop
  where
    loop :: ConduitT i a m ()
loop = do
        a
x <- m a -> ConduitT i a m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift m a
m
        Bool -> ConduitT i a m () -> ConduitT i a m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (a -> Bool
f a
x) (ConduitT i a m () -> ConduitT i a m ())
-> ConduitT i a m () -> ConduitT i a m ()
forall a b. (a -> b) -> a -> b
$ a -> ConduitT i a m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield a
x ConduitT i a m () -> ConduitT i a m () -> ConduitT i a m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ConduitT i a m ()
loop
STREAMING(repeatWhileM, repeatWhileMC, repeatWhileMS, m f)

-- | Perform the given action n times, yielding each result.
--
-- Subject to fusion
--
-- @since 1.3.0
replicateM :: Monad m
           => Int
           -> m a
           -> ConduitT i a m ()
INLINE_RULE(replicateM, n m, CL.replicateM n m)

-- | Stream the contents of a file as binary data.
--
-- @since 1.3.0
sourceFile :: MonadResource m
           => FilePath
           -> ConduitT i S.ByteString m ()
sourceFile :: FilePath -> ConduitT i ByteString m ()
sourceFile FilePath
fp =
    IO ReadHandle
-> (ReadHandle -> IO ())
-> (ReadHandle -> ConduitT i ByteString m ())
-> ConduitT i ByteString m ()
forall (m :: * -> *) a i o r.
MonadResource m =>
IO a -> (a -> IO ()) -> (a -> ConduitT i o m r) -> ConduitT i o m r
bracketP
        (FilePath -> IO ReadHandle
FR.openFile FilePath
fp)
         ReadHandle -> IO ()
FR.closeFile
         ReadHandle -> ConduitT i ByteString m ()
forall (m :: * -> *) i.
MonadIO m =>
ReadHandle -> ConduitT i ByteString m ()
loop
  where
    loop :: ReadHandle -> ConduitT i ByteString m ()
loop ReadHandle
h = do
        ByteString
bs <- IO ByteString -> ConduitT i ByteString m ByteString
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ByteString -> ConduitT i ByteString m ByteString)
-> IO ByteString -> ConduitT i ByteString m ByteString
forall a b. (a -> b) -> a -> b
$ ReadHandle -> IO ByteString
FR.readChunk ReadHandle
h
        Bool -> ConduitT i ByteString m () -> ConduitT i ByteString m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (ByteString -> Bool
S.null ByteString
bs) (ConduitT i ByteString m () -> ConduitT i ByteString m ())
-> ConduitT i ByteString m () -> ConduitT i ByteString m ()
forall a b. (a -> b) -> a -> b
$ do
            ByteString -> ConduitT i ByteString m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield ByteString
bs
            ReadHandle -> ConduitT i ByteString m ()
loop ReadHandle
h

-- | Stream the contents of a 'IO.Handle' as binary data. Note that this
-- function will /not/ automatically close the @Handle@ when processing
-- completes, since it did not acquire the @Handle@ in the first place.
--
-- @since 1.3.0
sourceHandle :: MonadIO m
             => IO.Handle
             -> ConduitT i S.ByteString m ()
sourceHandle :: Handle -> ConduitT i ByteString m ()
sourceHandle Handle
h =
    ConduitT i ByteString m ()
loop
  where
    loop :: ConduitT i ByteString m ()
loop = do
        ByteString
bs <- IO ByteString -> ConduitT i ByteString m ByteString
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Handle -> Int -> IO ByteString
S.hGetSome Handle
h Int
defaultChunkSize)
        if ByteString -> Bool
S.null ByteString
bs
            then () -> ConduitT i ByteString m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
            else ByteString -> ConduitT i ByteString m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield ByteString
bs ConduitT i ByteString m ()
-> ConduitT i ByteString m () -> ConduitT i ByteString m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ConduitT i ByteString m ()
loop

-- | Same as @sourceHandle@, but instead of allocating a new buffer for each
-- incoming chunk of data, reuses the same buffer. Therefore, the @ByteString@s
-- yielded by this function are not referentially transparent between two
-- different @yield@s.
--
-- This function will be slightly more efficient than @sourceHandle@ by
-- avoiding allocations and reducing garbage collections, but should only be
-- used if you can guarantee that you do not reuse a @ByteString@ (or any slice
-- thereof) between two calls to @await@.
--
-- @since 1.3.0
sourceHandleUnsafe :: MonadIO m => IO.Handle -> ConduitT i ByteString m ()
sourceHandleUnsafe :: Handle -> ConduitT i ByteString m ()
sourceHandleUnsafe Handle
handle = do
    ForeignPtr Word8
fptr <- IO (ForeignPtr Word8) -> ConduitT i ByteString m (ForeignPtr Word8)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (ForeignPtr Word8)
 -> ConduitT i ByteString m (ForeignPtr Word8))
-> IO (ForeignPtr Word8)
-> ConduitT i ByteString m (ForeignPtr Word8)
forall a b. (a -> b) -> a -> b
$ Int -> IO (ForeignPtr Word8)
forall a. Int -> IO (ForeignPtr a)
mallocPlainForeignPtrBytes Int
defaultChunkSize
    let ptr :: Ptr Word8
ptr = ForeignPtr Word8 -> Ptr Word8
forall a. ForeignPtr a -> Ptr a
unsafeForeignPtrToPtr ForeignPtr Word8
fptr
        loop :: ConduitT i ByteString m ()
loop = do
            Int
count <- IO Int -> ConduitT i ByteString m Int
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int -> ConduitT i ByteString m Int)
-> IO Int -> ConduitT i ByteString m Int
forall a b. (a -> b) -> a -> b
$ Handle -> Ptr Word8 -> Int -> IO Int
forall a. Handle -> Ptr a -> Int -> IO Int
IO.hGetBuf Handle
handle Ptr Word8
ptr Int
defaultChunkSize
            Bool -> ConduitT i ByteString m () -> ConduitT i ByteString m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
count Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0) (ConduitT i ByteString m () -> ConduitT i ByteString m ())
-> ConduitT i ByteString m () -> ConduitT i ByteString m ()
forall a b. (a -> b) -> a -> b
$ do
                ByteString -> ConduitT i ByteString m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield (ForeignPtr Word8 -> Int -> Int -> ByteString
PS ForeignPtr Word8
fptr Int
0 Int
count)
                ConduitT i ByteString m ()
loop

    ConduitT i ByteString m ()
loop

    IO () -> ConduitT i ByteString m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitT i ByteString m ())
-> IO () -> ConduitT i ByteString m ()
forall a b. (a -> b) -> a -> b
$ ForeignPtr Word8 -> IO ()
forall a. ForeignPtr a -> IO ()
touchForeignPtr ForeignPtr Word8
fptr

-- | An alternative to 'sourceHandle'.
-- Instead of taking a pre-opened 'IO.Handle', it takes an action that opens
-- a 'IO.Handle' (in read mode), so that it can open it only when needed
-- and close it as soon as possible.
--
-- @since 1.3.0
sourceIOHandle :: MonadResource m
               => IO IO.Handle
               -> ConduitT i S.ByteString m ()
sourceIOHandle :: IO Handle -> ConduitT i ByteString m ()
sourceIOHandle IO Handle
alloc = IO Handle
-> (Handle -> IO ())
-> (Handle -> ConduitT i ByteString m ())
-> ConduitT i ByteString m ()
forall (m :: * -> *) a i o r.
MonadResource m =>
IO a -> (a -> IO ()) -> (a -> ConduitT i o m r) -> ConduitT i o m r
bracketP IO Handle
alloc Handle -> IO ()
IO.hClose Handle -> ConduitT i ByteString m ()
forall (m :: * -> *) i.
MonadIO m =>
Handle -> ConduitT i ByteString m ()
sourceHandle

-- | Same as 'sourceFile'. The alternate name is a holdover from an older
-- version, when 'sourceFile' was more polymorphic than it is today.
--
-- @since 1.3.0
sourceFileBS :: MonadResource m => FilePath -> ConduitT i ByteString m ()
sourceFileBS :: FilePath -> ConduitT i ByteString m ()
sourceFileBS = FilePath -> ConduitT i ByteString m ()
forall (m :: * -> *) i.
MonadResource m =>
FilePath -> ConduitT i ByteString m ()
sourceFile
{-# INLINE sourceFileBS #-}

-- | @sourceHandle@ applied to @stdin@.
--
-- Subject to fusion
--
-- @since 1.3.0
stdin :: MonadIO m => ConduitT i ByteString m ()
INLINE_RULE0(stdin, sourceHandle IO.stdin)

-- | Stream all incoming data to the given file.
--
-- @since 1.3.0
sinkFile :: MonadResource m
         => FilePath
         -> ConduitT S.ByteString o m ()
sinkFile :: FilePath -> ConduitT ByteString o m ()
sinkFile FilePath
fp = IO Handle -> ConduitT ByteString o m ()
forall (m :: * -> *) o.
MonadResource m =>
IO Handle -> ConduitT ByteString o m ()
sinkIOHandle (FilePath -> IOMode -> IO Handle
IO.openBinaryFile FilePath
fp IOMode
IO.WriteMode)

-- | Cautious version of 'sinkFile'. The idea here is to stream the
-- values to a temporary file in the same directory of the destination
-- file, and only on successfully writing the entire file, moves it
-- atomically to the destination path.
--
-- In the event of an exception occurring, the temporary file will be
-- deleted and no move will be made. If the application shuts down
-- without running exception handling (such as machine failure or a
-- SIGKILL), the temporary file will remain and the destination file
-- will be untouched.
--
-- @since 1.3.0
sinkFileCautious
  :: MonadResource m
  => FilePath
  -> ConduitM S.ByteString o m ()
sinkFileCautious :: FilePath -> ConduitM ByteString o m ()
sinkFileCautious FilePath
fp =
    IO (FilePath, Handle)
-> ((FilePath, Handle) -> IO ())
-> ((FilePath, Handle) -> ConduitM ByteString o m ())
-> ConduitM ByteString o m ()
forall (m :: * -> *) a i o r.
MonadResource m =>
IO a -> (a -> IO ()) -> (a -> ConduitT i o m r) -> ConduitT i o m r
bracketP (FilePath -> IO (FilePath, Handle)
cautiousAcquire FilePath
fp) (FilePath, Handle) -> IO ()
cautiousCleanup (FilePath, Handle) -> ConduitM ByteString o m ()
inner
  where
    inner :: (FilePath, Handle) -> ConduitM ByteString o m ()
inner (FilePath
tmpFP, Handle
h) = do
        Handle -> ConduitM ByteString o m ()
forall (m :: * -> *) o.
MonadIO m =>
Handle -> ConduitT ByteString o m ()
sinkHandle Handle
h
        IO () -> ConduitM ByteString o m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitM ByteString o m ())
-> IO () -> ConduitM ByteString o m ()
forall a b. (a -> b) -> a -> b
$ do
            Handle -> IO ()
IO.hClose Handle
h
            FilePath -> FilePath -> IO ()
renameFile FilePath
tmpFP FilePath
fp

-- | Like 'sinkFileCautious', but uses the @with@ pattern instead of
-- @MonadResource@.
--
-- @since 1.3.0
withSinkFileCautious
  :: (MonadUnliftIO m, MonadIO n)
  => FilePath
  -> (ConduitM S.ByteString o n () -> m a)
  -> m a
withSinkFileCautious :: FilePath -> (ConduitM ByteString o n () -> m a) -> m a
withSinkFileCautious FilePath
fp ConduitM ByteString o n () -> m a
inner =
  ((forall a. m a -> IO a) -> IO a) -> m a
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO a) -> m a)
-> ((forall a. m a -> IO a) -> IO a) -> m a
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> IO (FilePath, Handle)
-> ((FilePath, Handle) -> IO ())
-> ((FilePath, Handle) -> IO a)
-> IO a
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket
    (FilePath -> IO (FilePath, Handle)
cautiousAcquire FilePath
fp)
    (FilePath, Handle) -> IO ()
cautiousCleanup
    (\(FilePath
tmpFP, Handle
h) -> do
        a
a <- m a -> IO a
forall a. m a -> IO a
run (m a -> IO a) -> m a -> IO a
forall a b. (a -> b) -> a -> b
$ ConduitM ByteString o n () -> m a
inner (ConduitM ByteString o n () -> m a)
-> ConduitM ByteString o n () -> m a
forall a b. (a -> b) -> a -> b
$ Handle -> ConduitM ByteString o n ()
forall (m :: * -> *) o.
MonadIO m =>
Handle -> ConduitT ByteString o m ()
sinkHandle Handle
h
        Handle -> IO ()
IO.hClose Handle
h
        FilePath -> FilePath -> IO ()
renameFile FilePath
tmpFP FilePath
fp
        a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a)

-- | Helper function for Cautious functions above, do not export!
cautiousAcquire :: FilePath -> IO (FilePath, IO.Handle)
cautiousAcquire :: FilePath -> IO (FilePath, Handle)
cautiousAcquire FilePath
fp = FilePath -> FilePath -> IO (FilePath, Handle)
IO.openBinaryTempFile (FilePath -> FilePath
takeDirectory FilePath
fp) (FilePath -> FilePath
takeFileName FilePath
fp FilePath -> FilePath -> FilePath
<.> FilePath
"tmp")

-- | Helper function for Cautious functions above, do not export!
cautiousCleanup :: (FilePath, IO.Handle) -> IO ()
cautiousCleanup :: (FilePath, Handle) -> IO ()
cautiousCleanup (FilePath
tmpFP, Handle
h) = do
  Handle -> IO ()
IO.hClose Handle
h
  FilePath -> IO ()
removeFile FilePath
tmpFP IO () -> (IOError -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`Control.Exception.catch` \IOError
e ->
    if IOError -> Bool
isDoesNotExistError IOError
e
      then () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
      else IOError -> IO ()
forall e a. Exception e => e -> IO a
throwIO IOError
e

-- | Stream data into a temporary file in the given directory with the
-- given filename pattern, and return the temporary filename. The
-- temporary file will be automatically deleted when exiting the
-- active 'ResourceT' block, if it still exists.
--
-- @since 1.3.0
sinkTempFile :: MonadResource m
             => FilePath -- ^ temp directory
             -> String -- ^ filename pattern
             -> ConduitM ByteString o m FilePath
sinkTempFile :: FilePath -> FilePath -> ConduitM ByteString o m FilePath
sinkTempFile FilePath
tmpdir FilePath
pattern = do
    (ReleaseKey
_releaseKey, (FilePath
fp, Handle
h)) <- IO (FilePath, Handle)
-> ((FilePath, Handle) -> IO ())
-> ConduitT ByteString o m (ReleaseKey, (FilePath, Handle))
forall (m :: * -> *) a.
MonadResource m =>
IO a -> (a -> IO ()) -> m (ReleaseKey, a)
allocate
        (FilePath -> FilePath -> IO (FilePath, Handle)
IO.openBinaryTempFile FilePath
tmpdir FilePath
pattern)
        (\(FilePath
fp, Handle
h) -> Handle -> IO ()
IO.hClose Handle
h IO () -> IO () -> IO ()
forall a b. IO a -> IO b -> IO a
`finally` (FilePath -> IO ()
removeFile FilePath
fp IO () -> (IOError -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`Control.Exception.catch` \IOError
e ->
            if IOError -> Bool
isDoesNotExistError IOError
e
                then () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
                else IOError -> IO ()
forall e a. Exception e => e -> IO a
throwIO IOError
e))
    Handle -> ConduitT ByteString o m ()
forall (m :: * -> *) o.
MonadIO m =>
Handle -> ConduitT ByteString o m ()
sinkHandle Handle
h
    IO () -> ConduitT ByteString o m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitT ByteString o m ())
-> IO () -> ConduitT ByteString o m ()
forall a b. (a -> b) -> a -> b
$ Handle -> IO ()
IO.hClose Handle
h
    FilePath -> ConduitM ByteString o m FilePath
forall (m :: * -> *) a. Monad m => a -> m a
return FilePath
fp

-- | Same as 'sinkTempFile', but will use the default temp file
-- directory for the system as the first argument.
--
-- @since 1.3.0
sinkSystemTempFile
    :: MonadResource m
    => String -- ^ filename pattern
    -> ConduitM ByteString o m FilePath
sinkSystemTempFile :: FilePath -> ConduitM ByteString o m FilePath
sinkSystemTempFile FilePath
pattern = do
    FilePath
dir <- IO FilePath -> ConduitM ByteString o m FilePath
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO FilePath
getTemporaryDirectory
    FilePath -> FilePath -> ConduitM ByteString o m FilePath
forall (m :: * -> *) o.
MonadResource m =>
FilePath -> FilePath -> ConduitM ByteString o m FilePath
sinkTempFile FilePath
dir FilePath
pattern

-- | Stream all incoming data to the given 'IO.Handle'. Note that this function
-- does /not/ flush and will /not/ close the @Handle@ when processing completes.
--
-- @since 1.3.0
sinkHandle :: MonadIO m
           => IO.Handle
           -> ConduitT S.ByteString o m ()
sinkHandle :: Handle -> ConduitT ByteString o m ()
sinkHandle Handle
h = (ByteString -> ConduitT ByteString o m ())
-> ConduitT ByteString o m ()
forall (m :: * -> *) i o r.
Monad m =>
(i -> ConduitT i o m r) -> ConduitT i o m ()
awaitForever (IO () -> ConduitT ByteString o m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitT ByteString o m ())
-> (ByteString -> IO ())
-> ByteString
-> ConduitT ByteString o m ()
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. Handle -> ByteString -> IO ()
S.hPut Handle
h)

-- | Stream incoming builders, executing them directly on the buffer of the
-- given 'IO.Handle'. Note that this function does /not/ automatically close the
-- @Handle@ when processing completes.
-- Pass 'Data.ByteString.Builder.Extra.flush' to flush the buffer.
--
-- @since 1.3.0
sinkHandleBuilder :: MonadIO m => IO.Handle -> ConduitM Builder o m ()
sinkHandleBuilder :: Handle -> ConduitM Builder o m ()
sinkHandleBuilder Handle
h = (Builder -> ConduitM Builder o m ()) -> ConduitM Builder o m ()
forall (m :: * -> *) i o r.
Monad m =>
(i -> ConduitT i o m r) -> ConduitT i o m ()
awaitForever (IO () -> ConduitM Builder o m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitM Builder o m ())
-> (Builder -> IO ()) -> Builder -> ConduitM Builder o m ()
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. Handle -> Builder -> IO ()
hPutBuilder Handle
h)

-- | Stream incoming @Flush@es, executing them on @IO.Handle@
-- Note that this function does /not/ automatically close the @Handle@ when
-- processing completes
--
-- @since 1.3.0
sinkHandleFlush :: MonadIO m
                => IO.Handle
                -> ConduitM (Flush S.ByteString) o m ()
sinkHandleFlush :: Handle -> ConduitM (Flush ByteString) o m ()
sinkHandleFlush Handle
h =
  (Flush ByteString -> ConduitM (Flush ByteString) o m ())
-> ConduitM (Flush ByteString) o m ()
forall (m :: * -> *) i o r.
Monad m =>
(i -> ConduitT i o m r) -> ConduitT i o m ()
awaitForever ((Flush ByteString -> ConduitM (Flush ByteString) o m ())
 -> ConduitM (Flush ByteString) o m ())
-> (Flush ByteString -> ConduitM (Flush ByteString) o m ())
-> ConduitM (Flush ByteString) o m ()
forall a b. (a -> b) -> a -> b
$ \Flush ByteString
mbs -> IO () -> ConduitM (Flush ByteString) o m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitM (Flush ByteString) o m ())
-> IO () -> ConduitM (Flush ByteString) o m ()
forall a b. (a -> b) -> a -> b
$
  case Flush ByteString
mbs of
    Chunk ByteString
bs -> Handle -> ByteString -> IO ()
S.hPut Handle
h ByteString
bs
    Flush ByteString
Flush -> Handle -> IO ()
IO.hFlush Handle
h

-- | An alternative to 'sinkHandle'.
-- Instead of taking a pre-opened 'IO.Handle', it takes an action that opens
-- a 'IO.Handle' (in write mode), so that it can open it only when needed
-- and close it as soon as possible.
--
-- @since 1.3.0
sinkIOHandle :: MonadResource m
             => IO IO.Handle
             -> ConduitT S.ByteString o m ()
sinkIOHandle :: IO Handle -> ConduitT ByteString o m ()
sinkIOHandle IO Handle
alloc = IO Handle
-> (Handle -> IO ())
-> (Handle -> ConduitT ByteString o m ())
-> ConduitT ByteString o m ()
forall (m :: * -> *) a i o r.
MonadResource m =>
IO a -> (a -> IO ()) -> (a -> ConduitT i o m r) -> ConduitT i o m r
bracketP IO Handle
alloc Handle -> IO ()
IO.hClose Handle -> ConduitT ByteString o m ()
forall (m :: * -> *) o.
MonadIO m =>
Handle -> ConduitT ByteString o m ()
sinkHandle

-- | Like 'IO.withBinaryFile', but provides a source to read bytes from.
--
-- @since 1.3.0
withSourceFile
  :: (MonadUnliftIO m, MonadIO n)
  => FilePath
  -> (ConduitM i ByteString n () -> m a)
  -> m a
withSourceFile :: FilePath -> (ConduitM i ByteString n () -> m a) -> m a
withSourceFile FilePath
fp ConduitM i ByteString n () -> m a
inner =
  ((forall a. m a -> IO a) -> IO a) -> m a
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO a) -> m a)
-> ((forall a. m a -> IO a) -> IO a) -> m a
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run ->
  FilePath -> IOMode -> (Handle -> IO a) -> IO a
forall r. FilePath -> IOMode -> (Handle -> IO r) -> IO r
IO.withBinaryFile FilePath
fp IOMode
IO.ReadMode ((Handle -> IO a) -> IO a) -> (Handle -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$
  m a -> IO a
forall a. m a -> IO a
run (m a -> IO a) -> (Handle -> m a) -> Handle -> IO a
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. ConduitM i ByteString n () -> m a
inner (ConduitM i ByteString n () -> m a)
-> (Handle -> ConduitM i ByteString n ()) -> Handle -> m a
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. Handle -> ConduitM i ByteString n ()
forall (m :: * -> *) i.
MonadIO m =>
Handle -> ConduitT i ByteString m ()
sourceHandle

-- | Like 'IO.withBinaryFile', but provides a sink to write bytes to.
--
-- @since 1.3.0
withSinkFile
  :: (MonadUnliftIO m, MonadIO n)
  => FilePath
  -> (ConduitM ByteString o n () -> m a)
  -> m a
withSinkFile :: FilePath -> (ConduitM ByteString o n () -> m a) -> m a
withSinkFile FilePath
fp ConduitM ByteString o n () -> m a
inner =
  ((forall a. m a -> IO a) -> IO a) -> m a
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO a) -> m a)
-> ((forall a. m a -> IO a) -> IO a) -> m a
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run ->
  FilePath -> IOMode -> (Handle -> IO a) -> IO a
forall r. FilePath -> IOMode -> (Handle -> IO r) -> IO r
IO.withBinaryFile FilePath
fp IOMode
IO.WriteMode ((Handle -> IO a) -> IO a) -> (Handle -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$
  m a -> IO a
forall a. m a -> IO a
run (m a -> IO a) -> (Handle -> m a) -> Handle -> IO a
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. ConduitM ByteString o n () -> m a
inner (ConduitM ByteString o n () -> m a)
-> (Handle -> ConduitM ByteString o n ()) -> Handle -> m a
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. Handle -> ConduitM ByteString o n ()
forall (m :: * -> *) o.
MonadIO m =>
Handle -> ConduitT ByteString o m ()
sinkHandle

-- | Same as 'withSinkFile', but lets you use a 'BB.Builder'.
--
-- @since 1.3.0
withSinkFileBuilder
  :: (MonadUnliftIO m, MonadIO n)
  => FilePath
  -> (ConduitM Builder o n () -> m a)
  -> m a
withSinkFileBuilder :: FilePath -> (ConduitM Builder o n () -> m a) -> m a
withSinkFileBuilder FilePath
fp ConduitM Builder o n () -> m a
inner =
  ((forall a. m a -> IO a) -> IO a) -> m a
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO a) -> m a)
-> ((forall a. m a -> IO a) -> IO a) -> m a
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run ->
  FilePath -> IOMode -> (Handle -> IO a) -> IO a
forall r. FilePath -> IOMode -> (Handle -> IO r) -> IO r
IO.withBinaryFile FilePath
fp IOMode
IO.WriteMode ((Handle -> IO a) -> IO a) -> (Handle -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \Handle
h ->
  m a -> IO a
forall a. m a -> IO a
run (m a -> IO a) -> m a -> IO a
forall a b. (a -> b) -> a -> b
$ ConduitM Builder o n () -> m a
inner (ConduitM Builder o n () -> m a) -> ConduitM Builder o n () -> m a
forall a b. (a -> b) -> a -> b
$ (Builder -> n ()) -> ConduitM Builder o n ()
forall (m :: * -> *) a o.
Monad m =>
(a -> m ()) -> ConduitT a o m ()
CL.mapM_ (IO () -> n ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> n ()) -> (Builder -> IO ()) -> Builder -> n ()
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. Handle -> Builder -> IO ()
hPutBuilder Handle
h)

-- | Stream the contents of the given directory, without traversing deeply.
--
-- This function will return /all/ of the contents of the directory, whether
-- they be files, directories, etc.
--
-- Note that the generated filepaths will be the complete path, not just the
-- filename. In other words, if you have a directory @foo@ containing files
-- @bar@ and @baz@, and you use @sourceDirectory@ on @foo@, the results will be
-- @foo/bar@ and @foo/baz@.
--
-- @since 1.3.0
sourceDirectory :: MonadResource m => FilePath -> ConduitT i FilePath m ()
sourceDirectory :: FilePath -> ConduitT i FilePath m ()
sourceDirectory FilePath
dir =
    IO DirStream
-> (DirStream -> IO ())
-> (DirStream -> ConduitT i FilePath m ())
-> ConduitT i FilePath m ()
forall (m :: * -> *) a i o r.
MonadResource m =>
IO a -> (a -> IO ()) -> (a -> ConduitT i o m r) -> ConduitT i o m r
bracketP (FilePath -> IO DirStream
F.openDirStream FilePath
dir) DirStream -> IO ()
F.closeDirStream DirStream -> ConduitT i FilePath m ()
go
  where
    go :: DirStream -> ConduitT i FilePath m ()
go DirStream
ds =
        ConduitT i FilePath m ()
loop
      where
        loop :: ConduitT i FilePath m ()
loop = do
            Maybe FilePath
mfp <- IO (Maybe FilePath) -> ConduitT i FilePath m (Maybe FilePath)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe FilePath) -> ConduitT i FilePath m (Maybe FilePath))
-> IO (Maybe FilePath) -> ConduitT i FilePath m (Maybe FilePath)
forall a b. (a -> b) -> a -> b
$ DirStream -> IO (Maybe FilePath)
F.readDirStream DirStream
ds
            case Maybe FilePath
mfp of
                Maybe FilePath
Nothing -> () -> ConduitT i FilePath m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
                Just FilePath
fp -> do
                    FilePath -> ConduitT i FilePath m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield (FilePath -> ConduitT i FilePath m ())
-> FilePath -> ConduitT i FilePath m ()
forall a b. (a -> b) -> a -> b
$ FilePath
dir FilePath -> FilePath -> FilePath
</> FilePath
fp
                    ConduitT i FilePath m ()
loop

-- | Deeply stream the contents of the given directory.
--
-- This works the same as @sourceDirectory@, but will not return directories at
-- all. This function also takes an extra parameter to indicate whether
-- symlinks will be followed.
--
-- @since 1.3.0
sourceDirectoryDeep :: MonadResource m
                    => Bool -- ^ Follow directory symlinks
                    -> FilePath -- ^ Root directory
                    -> ConduitT i FilePath m ()
sourceDirectoryDeep :: Bool -> FilePath -> ConduitT i FilePath m ()
sourceDirectoryDeep Bool
followSymlinks =
    FilePath -> ConduitT i FilePath m ()
forall (m :: * -> *) i.
MonadResource m =>
FilePath -> ConduitT i FilePath m ()
start
  where
    start :: MonadResource m => FilePath -> ConduitT i FilePath m ()
    start :: FilePath -> ConduitT i FilePath m ()
start FilePath
dir = FilePath -> ConduitT i FilePath m ()
forall (m :: * -> *) i.
MonadResource m =>
FilePath -> ConduitT i FilePath m ()
sourceDirectory FilePath
dir ConduitT i FilePath m ()
-> ConduitM FilePath FilePath m () -> ConduitT i FilePath m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| (FilePath -> ConduitM FilePath FilePath m ())
-> ConduitM FilePath FilePath m ()
forall (m :: * -> *) i o r.
Monad m =>
(i -> ConduitT i o m r) -> ConduitT i o m ()
awaitForever FilePath -> ConduitM FilePath FilePath m ()
forall (m :: * -> *) i.
MonadResource m =>
FilePath -> ConduitT i FilePath m ()
go

    go :: MonadResource m => FilePath -> ConduitT i FilePath m ()
    go :: FilePath -> ConduitT i FilePath m ()
go FilePath
fp = do
        FileType
ft <- IO FileType -> ConduitT i FilePath m FileType
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO FileType -> ConduitT i FilePath m FileType)
-> IO FileType -> ConduitT i FilePath m FileType
forall a b. (a -> b) -> a -> b
$ FilePath -> IO FileType
F.getFileType FilePath
fp
        case FileType
ft of
            FileType
F.FTFile -> FilePath -> ConduitT i FilePath m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield FilePath
fp
            FileType
F.FTFileSym -> FilePath -> ConduitT i FilePath m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield FilePath
fp
            FileType
F.FTDirectory -> FilePath -> ConduitT i FilePath m ()
forall (m :: * -> *) i.
MonadResource m =>
FilePath -> ConduitT i FilePath m ()
start FilePath
fp
            FileType
F.FTDirectorySym
                | Bool
followSymlinks -> FilePath -> ConduitT i FilePath m ()
forall (m :: * -> *) i.
MonadResource m =>
FilePath -> ConduitT i FilePath m ()
start FilePath
fp
                | Bool
otherwise -> () -> ConduitT i FilePath m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
            FileType
F.FTOther -> () -> ConduitT i FilePath m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | Ignore a certain number of values in the stream.
--
-- Note: since this function doesn't produce anything, you probably want to
-- use it with ('>>') instead of directly plugging it into a pipeline:
--
-- >>> runConduit $ yieldMany [1..5] .| drop 2 .| sinkList
-- []
-- >>> runConduit $ yieldMany [1..5] .| (drop 2 >> sinkList)
-- [3,4,5]
--
-- @since 1.3.0
drop :: Monad m
     => Int
     -> ConduitT a o m ()
INLINE_RULE(drop, n, CL.drop n)

-- | Drop a certain number of elements from a chunked stream.
--
-- Note: you likely want to use it with monadic composition. See the docs
-- for 'drop'.
--
-- @since 1.3.0
dropE :: (Monad m, Seq.IsSequence seq)
      => Seq.Index seq
      -> ConduitT seq o m ()
dropE :: Index seq -> ConduitT seq o m ()
dropE =
    Index seq -> ConduitT seq o m ()
forall i (m :: * -> *) o.
(Monad m, IsSequence i) =>
Index i -> ConduitT i o m ()
loop
  where
    loop :: Index i -> ConduitT i o m ()
loop Index i
i = if Index i
i Index i -> Index i -> Bool
forall a. Ord a => a -> a -> Bool
<= Index i
0
        then () -> ConduitT i o m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        else ConduitT i o m (Maybe i)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await ConduitT i o m (Maybe i)
-> (Maybe i -> ConduitT i o m ()) -> ConduitT i o m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ConduitT i o m ()
-> (i -> ConduitT i o m ()) -> Maybe i -> ConduitT i o m ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> ConduitT i o m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()) (Index i -> i -> ConduitT i o m ()
go Index i
i)

    go :: Index i -> i -> ConduitT i o m ()
go Index i
i i
sq = do
        Bool -> ConduitT i o m () -> ConduitT i o m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (i -> Bool
forall mono. MonoFoldable mono => mono -> Bool
onull i
y) (ConduitT i o m () -> ConduitT i o m ())
-> ConduitT i o m () -> ConduitT i o m ()
forall a b. (a -> b) -> a -> b
$ i -> ConduitT i o m ()
forall i o (m :: * -> *). i -> ConduitT i o m ()
leftover i
y
        Index i -> ConduitT i o m ()
loop Index i
i'
      where
        (i
x, i
y) = Index i -> i -> (i, i)
forall seq. IsSequence seq => Index seq -> seq -> (seq, seq)
Seq.splitAt Index i
i i
sq
        i' :: Index i
i' = Index i
i Index i -> Index i -> Index i
forall a. Num a => a -> a -> a
- Int -> Index i
forall a b. (Integral a, Num b) => a -> b
fromIntegral (i -> Int
forall mono. MonoFoldable mono => mono -> Int
olength i
x)
{-# INLINEABLE dropE #-}

-- | Drop all values which match the given predicate.
--
-- Note: you likely want to use it with monadic composition. See the docs
-- for 'drop'.
--
-- @since 1.3.0
dropWhile :: Monad m
          => (a -> Bool)
          -> ConduitT a o m ()
dropWhile :: (a -> Bool) -> ConduitT a o m ()
dropWhile a -> Bool
f =
    ConduitT a o m ()
loop
  where
    loop :: ConduitT a o m ()
loop = ConduitT a o m (Maybe a)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await ConduitT a o m (Maybe a)
-> (Maybe a -> ConduitT a o m ()) -> ConduitT a o m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ConduitT a o m ()
-> (a -> ConduitT a o m ()) -> Maybe a -> ConduitT a o m ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> ConduitT a o m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()) a -> ConduitT a o m ()
go
    go :: a -> ConduitT a o m ()
go a
x = if a -> Bool
f a
x then ConduitT a o m ()
loop else a -> ConduitT a o m ()
forall i o (m :: * -> *). i -> ConduitT i o m ()
leftover a
x
{-# INLINE dropWhile #-}

-- | Drop all elements in the chunked stream which match the given predicate.
--
-- Note: you likely want to use it with monadic composition. See the docs
-- for 'drop'.
--
-- @since 1.3.0
dropWhileE :: (Monad m, Seq.IsSequence seq)
           => (Element seq -> Bool)
           -> ConduitT seq o m ()
dropWhileE :: (Element seq -> Bool) -> ConduitT seq o m ()
dropWhileE Element seq -> Bool
f =
    ConduitT seq o m ()
loop
  where
    loop :: ConduitT seq o m ()
loop = ConduitT seq o m (Maybe seq)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await ConduitT seq o m (Maybe seq)
-> (Maybe seq -> ConduitT seq o m ()) -> ConduitT seq o m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ConduitT seq o m ()
-> (seq -> ConduitT seq o m ()) -> Maybe seq -> ConduitT seq o m ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> ConduitT seq o m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()) seq -> ConduitT seq o m ()
go

    go :: seq -> ConduitT seq o m ()
go seq
sq =
        if seq -> Bool
forall mono. MonoFoldable mono => mono -> Bool
onull seq
x then ConduitT seq o m ()
loop else seq -> ConduitT seq o m ()
forall i o (m :: * -> *). i -> ConduitT i o m ()
leftover seq
x
      where
        x :: seq
x = (Element seq -> Bool) -> seq -> seq
forall seq. IsSequence seq => (Element seq -> Bool) -> seq -> seq
Seq.dropWhile Element seq -> Bool
f seq
sq
{-# INLINE dropWhileE #-}

-- | Monoidally combine all values in the stream.
--
-- Subject to fusion
--
-- @since 1.3.0
fold :: (Monad m, Monoid a)
     => ConduitT a o m a
INLINE_RULE0(fold, CL.foldMap id)

-- | Monoidally combine all elements in the chunked stream.
--
-- Subject to fusion
--
-- @since 1.3.0
foldE :: (Monad m, MonoFoldable mono, Monoid (Element mono))
      => ConduitT mono o m (Element mono)
INLINE_RULE0(foldE, CL.fold (\accum mono -> accum `mappend` ofoldMap id mono) mempty)

-- | A strict left fold.
--
-- Subject to fusion
--
-- @since 1.3.0
foldl :: Monad m => (a -> b -> a) -> a -> ConduitT b o m a
INLINE_RULE(foldl, f x, CL.fold f x)

-- | A strict left fold on a chunked stream.
--
-- Subject to fusion
--
-- @since 1.3.0
foldlE :: (Monad m, MonoFoldable mono)
       => (a -> Element mono -> a)
       -> a
       -> ConduitT mono o m a
INLINE_RULE(foldlE, f x, CL.fold (ofoldlPrime f) x)

-- Work around CPP not supporting identifiers with primes...
ofoldlPrime :: MonoFoldable mono => (a -> Element mono -> a) -> a -> mono -> a
ofoldlPrime :: (a -> Element mono -> a) -> a -> mono -> a
ofoldlPrime = (a -> Element mono -> a) -> a -> mono -> a
forall mono a.
MonoFoldable mono =>
(a -> Element mono -> a) -> a -> mono -> a
ofoldl'

-- | Apply the provided mapping function and monoidal combine all values.
--
-- Subject to fusion
--
-- @since 1.3.0
foldMap :: (Monad m, Monoid b)
        => (a -> b)
        -> ConduitT a o m b
INLINE_RULE(foldMap, f, CL.foldMap f)

-- | Apply the provided mapping function and monoidal combine all elements of the chunked stream.
--
-- Subject to fusion
--
-- @since 1.3.0
foldMapE :: (Monad m, MonoFoldable mono, Monoid w)
         => (Element mono -> w)
         -> ConduitT mono o m w
INLINE_RULE(foldMapE, f, CL.foldMap (ofoldMap f))

-- | A strict left fold with no starting value.  Returns 'Nothing'
-- when the stream is empty.
--
-- Subject to fusion
foldl1, foldl1C :: Monad m => (a -> a -> a) -> ConduitT a o m (Maybe a)
foldl1C :: (a -> a -> a) -> ConduitT a o m (Maybe a)
foldl1C a -> a -> a
f =
    ConduitT a o m (Maybe a)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await ConduitT a o m (Maybe a)
-> (Maybe a -> ConduitT a o m (Maybe a))
-> ConduitT a o m (Maybe a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ConduitT a o m (Maybe a)
-> (a -> ConduitT a o m (Maybe a))
-> Maybe a
-> ConduitT a o m (Maybe a)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (Maybe a -> ConduitT a o m (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing) a -> ConduitT a o m (Maybe a)
loop
  where
    loop :: a -> ConduitT a o m (Maybe a)
loop !a
prev = ConduitT a o m (Maybe a)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await ConduitT a o m (Maybe a)
-> (Maybe a -> ConduitT a o m (Maybe a))
-> ConduitT a o m (Maybe a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ConduitT a o m (Maybe a)
-> (a -> ConduitT a o m (Maybe a))
-> Maybe a
-> ConduitT a o m (Maybe a)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (Maybe a -> ConduitT a o m (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe a -> ConduitT a o m (Maybe a))
-> Maybe a -> ConduitT a o m (Maybe a)
forall a b. (a -> b) -> a -> b
$ a -> Maybe a
forall a. a -> Maybe a
Just a
prev) (a -> ConduitT a o m (Maybe a)
loop (a -> ConduitT a o m (Maybe a))
-> (a -> a) -> a -> ConduitT a o m (Maybe a)
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. a -> a -> a
f a
prev)
STREAMING(foldl1, foldl1C, foldl1S, f)

-- | A strict left fold on a chunked stream, with no starting value.
-- Returns 'Nothing' when the stream is empty.
--
-- Subject to fusion
--
-- @since 1.3.0
foldl1E :: (Monad m, MonoFoldable mono, a ~ Element mono)
        => (a -> a -> a)
        -> ConduitT mono o m (Maybe a)
INLINE_RULE(foldl1E, f, foldl (foldMaybeNull f) Nothing)

-- Helper for foldl1E
foldMaybeNull :: (MonoFoldable mono, e ~ Element mono)
              => (e -> e -> e)
              -> Maybe e
              -> mono
              -> Maybe e
foldMaybeNull :: (e -> e -> e) -> Maybe e -> mono -> Maybe e
foldMaybeNull e -> e -> e
f Maybe e
macc mono
mono =
    case (Maybe e
macc, mono -> Maybe (NonNull mono)
forall mono. MonoFoldable mono => mono -> Maybe (NonNull mono)
NonNull.fromNullable mono
mono) of
        (Just e
acc, Just NonNull mono
nn) -> e -> Maybe e
forall a. a -> Maybe a
Just (e -> Maybe e) -> e -> Maybe e
forall a b. (a -> b) -> a -> b
$ (e -> Element (NonNull mono) -> e) -> e -> NonNull mono -> e
forall mono a.
MonoFoldable mono =>
(a -> Element mono -> a) -> a -> mono -> a
ofoldl' e -> e -> e
e -> Element (NonNull mono) -> e
f e
acc NonNull mono
nn
        (Maybe e
Nothing, Just NonNull mono
nn) -> e -> Maybe e
forall a. a -> Maybe a
Just (e -> Maybe e) -> e -> Maybe e
forall a b. (a -> b) -> a -> b
$ (Element mono -> Element mono -> Element mono)
-> NonNull mono -> Element mono
forall mono.
MonoFoldable mono =>
(Element mono -> Element mono -> Element mono)
-> NonNull mono -> Element mono
NonNull.ofoldl1' e -> e -> e
Element mono -> Element mono -> Element mono
f NonNull mono
nn
        (Maybe e, Maybe (NonNull mono))
_ -> Maybe e
macc
{-# INLINE foldMaybeNull #-}

-- | Check that all values in the stream return True.
--
-- Subject to shortcut logic: at the first False, consumption of the stream
-- will stop.
--
-- Subject to fusion
--
-- @since 1.3.0
all, allC :: Monad m
          => (a -> Bool)
          -> ConduitT a o m Bool
allC :: (a -> Bool) -> ConduitT a o m Bool
allC a -> Bool
f = (Maybe a -> Bool)
-> ConduitT a o m (Maybe a) -> ConduitT a o m Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Maybe a -> Bool
forall a. Maybe a -> Bool
isNothing (ConduitT a o m (Maybe a) -> ConduitT a o m Bool)
-> ConduitT a o m (Maybe a) -> ConduitT a o m Bool
forall a b. (a -> b) -> a -> b
$ (a -> Bool) -> ConduitT a o m (Maybe a)
forall (m :: * -> *) a o.
Monad m =>
(a -> Bool) -> ConduitT a o m (Maybe a)
find (Bool -> Bool
Prelude.not (Bool -> Bool) -> (a -> Bool) -> a -> Bool
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. a -> Bool
f)
{-# INLINE allC #-}
STREAMING(all, allC, allS, f)

-- | Check that all elements in the chunked stream return True.
--
-- Subject to shortcut logic: at the first False, consumption of the stream
-- will stop.
--
-- Subject to fusion
--
-- @since 1.3.0
allE :: (Monad m, MonoFoldable mono)
     => (Element mono -> Bool)
     -> ConduitT mono o m Bool
INLINE_RULE(allE, f, all (oall f))

-- | Check that at least one value in the stream returns True.
--
-- Subject to shortcut logic: at the first True, consumption of the stream
-- will stop.
--
-- Subject to fusion
--
-- @since 1.3.0
any, anyC :: Monad m
          => (a -> Bool)
          -> ConduitT a o m Bool
anyC :: (a -> Bool) -> ConduitT a o m Bool
anyC = (Maybe a -> Bool)
-> ConduitT a o m (Maybe a) -> ConduitT a o m Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Maybe a -> Bool
forall a. Maybe a -> Bool
isJust (ConduitT a o m (Maybe a) -> ConduitT a o m Bool)
-> ((a -> Bool) -> ConduitT a o m (Maybe a))
-> (a -> Bool)
-> ConduitT a o m Bool
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. (a -> Bool) -> ConduitT a o m (Maybe a)
forall (m :: * -> *) a o.
Monad m =>
(a -> Bool) -> ConduitT a o m (Maybe a)
find
{-# INLINE anyC #-}
STREAMING(any, anyC, anyS, f)

-- | Check that at least one element in the chunked stream returns True.
--
-- Subject to shortcut logic: at the first True, consumption of the stream
-- will stop.
--
-- Subject to fusion
--
-- @since 1.3.0
anyE :: (Monad m, MonoFoldable mono)
     => (Element mono -> Bool)
     -> ConduitT mono o m Bool
INLINE_RULE(anyE, f, any (oany f))

-- | Are all values in the stream True?
--
-- Consumption stops once the first False is encountered.
--
-- Subject to fusion
--
-- @since 1.3.0
and :: Monad m => ConduitT Bool o m Bool
INLINE_RULE0(and, all id)

-- | Are all elements in the chunked stream True?
--
-- Consumption stops once the first False is encountered.
--
-- Subject to fusion
--
-- @since 1.3.0
andE :: (Monad m, MonoFoldable mono, Element mono ~ Bool)
     => ConduitT mono o m Bool
INLINE_RULE0(andE, allE id)

-- | Are any values in the stream True?
--
-- Consumption stops once the first True is encountered.
--
-- Subject to fusion
--
-- @since 1.3.0
or :: Monad m => ConduitT Bool o m Bool
INLINE_RULE0(or, any id)

-- | Are any elements in the chunked stream True?
--
-- Consumption stops once the first True is encountered.
--
-- Subject to fusion
--
-- @since 1.3.0
orE :: (Monad m, MonoFoldable mono, Element mono ~ Bool)
    => ConduitT mono o m Bool
INLINE_RULE0(orE, anyE id)

-- | 'Alternative'ly combine all values in the stream.
--
-- @since 1.3.0
asum :: (Monad m, Alternative f)
     => ConduitT (f a) o m (f a)
INLINE_RULE0(asum, foldl (<|>) empty)

-- | Are any values in the stream equal to the given value?
--
-- Stops consuming as soon as a match is found.
--
-- Subject to fusion
--
-- @since 1.3.0
elem :: (Monad m, Eq a) => a -> ConduitT a o m Bool
INLINE_RULE(elem, x, any (== x))

-- | Are any elements in the chunked stream equal to the given element?
--
-- Stops consuming as soon as a match is found.
--
-- Subject to fusion
--
-- @since 1.3.0
elemE :: (Monad m, Seq.IsSequence seq, Eq (Element seq))
      => Element seq
      -> ConduitT seq o m Bool
INLINE_RULE(elemE, f, any (oelem f))

-- | Are no values in the stream equal to the given value?
--
-- Stops consuming as soon as a match is found.
--
-- Subject to fusion
--
-- @since 1.3.0
notElem :: (Monad m, Eq a) => a -> ConduitT a o m Bool
INLINE_RULE(notElem, x, all (/= x))

-- | Are no elements in the chunked stream equal to the given element?
--
-- Stops consuming as soon as a match is found.
--
-- Subject to fusion
--
-- @since 1.3.0
notElemE :: (Monad m, Seq.IsSequence seq, Eq (Element seq))
         => Element seq
         -> ConduitT seq o m Bool
INLINE_RULE(notElemE, x, all (onotElem x))

-- | Consume all incoming strict chunks into a lazy sequence.
-- Note that the entirety of the sequence will be resident at memory.
--
-- This can be used to consume a stream of strict ByteStrings into a lazy
-- ByteString, for example.
--
-- Subject to fusion
--
-- @since 1.3.0
sinkLazy, sinkLazyC :: (Monad m, LazySequence lazy strict)
                    => ConduitT strict o m lazy
sinkLazyC :: ConduitT strict o m lazy
sinkLazyC = ([strict] -> lazy
forall lazy strict. LazySequence lazy strict => [strict] -> lazy
fromChunks ([strict] -> lazy)
-> (([strict] -> [strict]) -> [strict])
-> ([strict] -> [strict])
-> lazy
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. (([strict] -> [strict]) -> [strict] -> [strict]
forall a b. (a -> b) -> a -> b
$ [])) (([strict] -> [strict]) -> lazy)
-> ConduitT strict o m ([strict] -> [strict])
-> ConduitT strict o m lazy
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (([strict] -> [strict]) -> strict -> [strict] -> [strict])
-> ([strict] -> [strict])
-> ConduitT strict o m ([strict] -> [strict])
forall (m :: * -> *) b a o.
Monad m =>
(b -> a -> b) -> b -> ConduitT a o m b
CL.fold (\[strict] -> [strict]
front strict
next -> [strict] -> [strict]
front ([strict] -> [strict])
-> ([strict] -> [strict]) -> [strict] -> [strict]
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. (strict
nextstrict -> [strict] -> [strict]
forall a. a -> [a] -> [a]
:)) [strict] -> [strict]
forall k (cat :: k -> k -> *) (a :: k). Category cat => cat a a
id
{-# INLINE sinkLazyC #-}
STREAMING0(sinkLazy, sinkLazyC, sinkLazyS)

-- | Consume all values from the stream and return as a list. Note that this
-- will pull all values into memory.
--
-- Subject to fusion
--
-- @since 1.3.0
sinkList :: Monad m => ConduitT a o m [a]
INLINE_RULE0(sinkList, CL.consume)

-- | Sink incoming values into a vector, growing the vector as necessary to fit
-- more elements.
--
-- Note that using this function is more memory efficient than @sinkList@ and
-- then converting to a @Vector@, as it avoids intermediate list constructors.
--
-- Subject to fusion
--
-- @since 1.3.0
sinkVector, sinkVectorC :: (V.Vector v a, PrimMonad m)
                        => ConduitT a o m (v a)
sinkVectorC :: ConduitT a o m (v a)
sinkVectorC = do
    let initSize :: p
initSize = p
10
    Mutable v (PrimState m) a
mv0 <- Int -> ConduitT a o m (Mutable v (PrimState (ConduitT a o m)) a)
forall (m :: * -> *) (v :: * -> * -> *) a.
(PrimMonad m, MVector v a) =>
Int -> m (v (PrimState m) a)
VM.new Int
forall p. Num p => p
initSize
    let go :: Int -> Int -> Mutable v (PrimState m) a -> ConduitT a o m (v a)
go Int
maxSize Int
i Mutable v (PrimState m) a
mv | Int
i Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
maxSize = do
            let newMax :: Int
newMax = Int
maxSize Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
2
            Mutable v (PrimState m) a
mv' <- Mutable v (PrimState (ConduitT a o m)) a
-> Int -> ConduitT a o m (Mutable v (PrimState (ConduitT a o m)) a)
forall (m :: * -> *) (v :: * -> * -> *) a.
(PrimMonad m, MVector v a) =>
v (PrimState m) a -> Int -> m (v (PrimState m) a)
VM.grow Mutable v (PrimState m) a
Mutable v (PrimState (ConduitT a o m)) a
mv Int
maxSize
            Int -> Int -> Mutable v (PrimState m) a -> ConduitT a o m (v a)
go Int
newMax Int
i Mutable v (PrimState m) a
mv'
        go Int
maxSize Int
i Mutable v (PrimState m) a
mv = do
            Maybe a
mx <- ConduitT a o m (Maybe a)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await
            case Maybe a
mx of
                Maybe a
Nothing -> Int -> Int -> v a -> v a
forall (v :: * -> *) a. Vector v a => Int -> Int -> v a -> v a
V.slice Int
0 Int
i (v a -> v a) -> ConduitT a o m (v a) -> ConduitT a o m (v a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Mutable v (PrimState (ConduitT a o m)) a -> ConduitT a o m (v a)
forall (m :: * -> *) (v :: * -> *) a.
(PrimMonad m, Vector v a) =>
Mutable v (PrimState m) a -> m (v a)
V.unsafeFreeze Mutable v (PrimState m) a
Mutable v (PrimState (ConduitT a o m)) a
mv
                Just a
x -> do
                    Mutable v (PrimState (ConduitT a o m)) a
-> Int -> a -> ConduitT a o m ()
forall (m :: * -> *) (v :: * -> * -> *) a.
(PrimMonad m, MVector v a) =>
v (PrimState m) a -> Int -> a -> m ()
VM.write Mutable v (PrimState m) a
Mutable v (PrimState (ConduitT a o m)) a
mv Int
i a
x
                    Int -> Int -> Mutable v (PrimState m) a -> ConduitT a o m (v a)
go Int
maxSize (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) Mutable v (PrimState m) a
mv
    Int -> Int -> Mutable v (PrimState m) a -> ConduitT a o m (v a)
forall (m :: * -> *) (v :: * -> *) a o.
(PrimMonad m, Vector v a) =>
Int -> Int -> Mutable v (PrimState m) a -> ConduitT a o m (v a)
go Int
forall p. Num p => p
initSize Int
0 Mutable v (PrimState m) a
mv0
{-# INLINEABLE sinkVectorC #-}
STREAMING0(sinkVector, sinkVectorC, sinkVectorS)

-- | Sink incoming values into a vector, up until size @maxSize@.  Subsequent
-- values will be left in the stream. If there are less than @maxSize@ values
-- present, returns a @Vector@ of smaller size.
--
-- Note that using this function is more memory efficient than @sinkList@ and
-- then converting to a @Vector@, as it avoids intermediate list constructors.
--
-- Subject to fusion
--
-- @since 1.3.0
sinkVectorN, sinkVectorNC :: (V.Vector v a, PrimMonad m)
                          => Int -- ^ maximum allowed size
                          -> ConduitT a o m (v a)
sinkVectorNC :: Int -> ConduitT a o m (v a)
sinkVectorNC Int
maxSize = do
    Mutable v (PrimState m) a
mv <- Int -> ConduitT a o m (Mutable v (PrimState (ConduitT a o m)) a)
forall (m :: * -> *) (v :: * -> * -> *) a.
(PrimMonad m, MVector v a) =>
Int -> m (v (PrimState m) a)
VM.new Int
maxSize
    let go :: Int -> ConduitT a o m (v a)
go Int
i | Int
i Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
maxSize = Mutable v (PrimState (ConduitT a o m)) a -> ConduitT a o m (v a)
forall (m :: * -> *) (v :: * -> *) a.
(PrimMonad m, Vector v a) =>
Mutable v (PrimState m) a -> m (v a)
V.unsafeFreeze Mutable v (PrimState m) a
Mutable v (PrimState (ConduitT a o m)) a
mv
        go Int
i = do
            Maybe a
mx <- ConduitT a o m (Maybe a)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await
            case Maybe a
mx of
                Maybe a
Nothing -> Int -> Int -> v a -> v a
forall (v :: * -> *) a. Vector v a => Int -> Int -> v a -> v a
V.slice Int
0 Int
i (v a -> v a) -> ConduitT a o m (v a) -> ConduitT a o m (v a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Mutable v (PrimState (ConduitT a o m)) a -> ConduitT a o m (v a)
forall (m :: * -> *) (v :: * -> *) a.
(PrimMonad m, Vector v a) =>
Mutable v (PrimState m) a -> m (v a)
V.unsafeFreeze Mutable v (PrimState m) a
Mutable v (PrimState (ConduitT a o m)) a
mv
                Just a
x -> do
                    Mutable v (PrimState (ConduitT a o m)) a
-> Int -> a -> ConduitT a o m ()
forall (m :: * -> *) (v :: * -> * -> *) a.
(PrimMonad m, MVector v a) =>
v (PrimState m) a -> Int -> a -> m ()
VM.write Mutable v (PrimState m) a
Mutable v (PrimState (ConduitT a o m)) a
mv Int
i a
x
                    Int -> ConduitT a o m (v a)
go (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
    Int -> ConduitT a o m (v a)
go Int
0
{-# INLINEABLE sinkVectorNC #-}
STREAMING(sinkVectorN, sinkVectorNC, sinkVectorNS, maxSize)

-- | Same as @sinkBuilder@, but afterwards convert the builder to its lazy
-- representation.
--
-- Alternatively, this could be considered an alternative to @sinkLazy@, with
-- the following differences:
--
-- * This function will allow multiple input types, not just the strict version
-- of the lazy structure.
--
-- * Some buffer copying may occur in this version.
--
-- Subject to fusion
--
-- @since 1.3.0
sinkLazyBuilder, sinkLazyBuilderC :: Monad m => ConduitT Builder o m BL.ByteString
sinkLazyBuilderC :: ConduitT Builder o m ByteString
sinkLazyBuilderC = (Builder -> ByteString)
-> ConduitT Builder o m Builder -> ConduitT Builder o m ByteString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Builder -> ByteString
toLazyByteString ConduitT Builder o m Builder
forall (m :: * -> *) a o. (Monad m, Monoid a) => ConduitT a o m a
fold
{-# INLINE sinkLazyBuilderC #-}
STREAMING0(sinkLazyBuilder, sinkLazyBuilderC, sinkLazyBuilderS)

-- | Consume and discard all remaining values in the stream.
--
-- Subject to fusion
--
-- @since 1.3.0
sinkNull :: Monad m => ConduitT a o m ()
INLINE_RULE0(sinkNull, CL.sinkNull)

-- | Same as @await@, but discards any leading 'onull' values.
--
-- @since 1.3.0
awaitNonNull :: (Monad m, MonoFoldable a) => ConduitT a o m (Maybe (NonNull.NonNull a))
awaitNonNull :: ConduitT a o m (Maybe (NonNull a))
awaitNonNull =
    ConduitT a o m (Maybe (NonNull a))
forall (m :: * -> *) a o.
(Monad m, MonoFoldable a) =>
ConduitT a o m (Maybe (NonNull a))
go
  where
    go :: ConduitT a o m (Maybe (NonNull a))
go = ConduitT a o m (Maybe a)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await ConduitT a o m (Maybe a)
-> (Maybe a -> ConduitT a o m (Maybe (NonNull a)))
-> ConduitT a o m (Maybe (NonNull a))
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ConduitT a o m (Maybe (NonNull a))
-> (a -> ConduitT a o m (Maybe (NonNull a)))
-> Maybe a
-> ConduitT a o m (Maybe (NonNull a))
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (Maybe (NonNull a) -> ConduitT a o m (Maybe (NonNull a))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (NonNull a)
forall a. Maybe a
Nothing) a -> ConduitT a o m (Maybe (NonNull a))
go'

    go' :: a -> ConduitT a o m (Maybe (NonNull a))
go' = ConduitT a o m (Maybe (NonNull a))
-> (NonNull a -> ConduitT a o m (Maybe (NonNull a)))
-> Maybe (NonNull a)
-> ConduitT a o m (Maybe (NonNull a))
forall b a. b -> (a -> b) -> Maybe a -> b
maybe ConduitT a o m (Maybe (NonNull a))
go (Maybe (NonNull a) -> ConduitT a o m (Maybe (NonNull a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (NonNull a) -> ConduitT a o m (Maybe (NonNull a)))
-> (NonNull a -> Maybe (NonNull a))
-> NonNull a
-> ConduitT a o m (Maybe (NonNull a))
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. NonNull a -> Maybe (NonNull a)
forall a. a -> Maybe a
Just) (Maybe (NonNull a) -> ConduitT a o m (Maybe (NonNull a)))
-> (a -> Maybe (NonNull a))
-> a
-> ConduitT a o m (Maybe (NonNull a))
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. a -> Maybe (NonNull a)
forall mono. MonoFoldable mono => mono -> Maybe (NonNull mono)
NonNull.fromNullable
{-# INLINE awaitNonNull #-}

-- | Take a single value from the stream, if available.
--
-- @since 1.3.0
head :: Monad m => ConduitT a o m (Maybe a)
head :: ConduitT a o m (Maybe a)
head = ConduitT a o m (Maybe a)
forall (m :: * -> *) a o. Monad m => ConduitT a o m (Maybe a)
CL.head

-- | Same as 'head', but returns a default value if none are available from the stream.
--
-- @since 1.3.0
headDef :: Monad m => a -> ConduitT a o m a
headDef :: a -> ConduitT a o m a
headDef a
a = a -> Maybe a -> a
forall a. a -> Maybe a -> a
fromMaybe a
a (Maybe a -> a) -> ConduitT a o m (Maybe a) -> ConduitT a o m a
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ConduitT a o m (Maybe a)
forall (m :: * -> *) a o. Monad m => ConduitT a o m (Maybe a)
head

-- | Get the next element in the chunked stream.
--
-- @since 1.3.0
headE :: (Monad m, Seq.IsSequence seq) => ConduitT seq o m (Maybe (Element seq))
headE :: ConduitT seq o m (Maybe (Element seq))
headE =
    ConduitT seq o m (Maybe (Element seq))
forall (m :: * -> *) i o.
(Monad m, IsSequence i) =>
ConduitT i o m (Maybe (Element i))
loop
  where
    loop :: ConduitT i o m (Maybe (Element i))
loop = ConduitT i o m (Maybe i)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await ConduitT i o m (Maybe i)
-> (Maybe i -> ConduitT i o m (Maybe (Element i)))
-> ConduitT i o m (Maybe (Element i))
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ConduitT i o m (Maybe (Element i))
-> (i -> ConduitT i o m (Maybe (Element i)))
-> Maybe i
-> ConduitT i o m (Maybe (Element i))
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (Maybe (Element i) -> ConduitT i o m (Maybe (Element i))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (Element i)
forall a. Maybe a
Nothing) i -> ConduitT i o m (Maybe (Element i))
go
    go :: i -> ConduitT i o m (Maybe (Element i))
go i
x =
        case i -> Maybe (Element i, i)
forall seq. IsSequence seq => seq -> Maybe (Element seq, seq)
Seq.uncons i
x of
            Maybe (Element i, i)
Nothing -> ConduitT i o m (Maybe (Element i))
loop
            Just (Element i
y, i
z) -> do
                Bool -> ConduitT i o m () -> ConduitT i o m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (i -> Bool
forall mono. MonoFoldable mono => mono -> Bool
onull i
z) (ConduitT i o m () -> ConduitT i o m ())
-> ConduitT i o m () -> ConduitT i o m ()
forall a b. (a -> b) -> a -> b
$ i -> ConduitT i o m ()
forall i o (m :: * -> *). i -> ConduitT i o m ()
leftover i
z
                Maybe (Element i) -> ConduitT i o m (Maybe (Element i))
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (Element i) -> ConduitT i o m (Maybe (Element i)))
-> Maybe (Element i) -> ConduitT i o m (Maybe (Element i))
forall a b. (a -> b) -> a -> b
$ Element i -> Maybe (Element i)
forall a. a -> Maybe a
Just Element i
y
{-# INLINE headE #-}

-- | View the next value in the stream without consuming it.
--
-- @since 1.3.0
peek :: Monad m => ConduitT a o m (Maybe a)
peek :: ConduitT a o m (Maybe a)
peek = ConduitT a o m (Maybe a)
forall (m :: * -> *) a o. Monad m => ConduitT a o m (Maybe a)
CL.peek
{-# INLINE peek #-}

-- | View the next element in the chunked stream without consuming it.
--
-- @since 1.3.0
peekE :: (Monad m, MonoFoldable mono) => ConduitT mono o m (Maybe (Element mono))
peekE :: ConduitT mono o m (Maybe (Element mono))
peekE =
    ConduitT mono o m (Maybe (Element mono))
forall (m :: * -> *) i o.
(Monad m, MonoFoldable i) =>
ConduitT i o m (Maybe (Element i))
loop
  where
    loop :: ConduitT i o m (Maybe (Element i))
loop = ConduitT i o m (Maybe i)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await ConduitT i o m (Maybe i)
-> (Maybe i -> ConduitT i o m (Maybe (Element i)))
-> ConduitT i o m (Maybe (Element i))
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ConduitT i o m (Maybe (Element i))
-> (i -> ConduitT i o m (Maybe (Element i)))
-> Maybe i
-> ConduitT i o m (Maybe (Element i))
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (Maybe (Element i) -> ConduitT i o m (Maybe (Element i))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (Element i)
forall a. Maybe a
Nothing) i -> ConduitT i o m (Maybe (Element i))
go
    go :: i -> ConduitT i o m (Maybe (Element i))
go i
x =
        case i -> Maybe (Element i)
forall mono. MonoFoldable mono => mono -> Maybe (Element mono)
headMay i
x of
            Maybe (Element i)
Nothing -> ConduitT i o m (Maybe (Element i))
loop
            Just Element i
y -> do
                i -> ConduitT i o m ()
forall i o (m :: * -> *). i -> ConduitT i o m ()
leftover i
x
                Maybe (Element i) -> ConduitT i o m (Maybe (Element i))
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (Element i) -> ConduitT i o m (Maybe (Element i)))
-> Maybe (Element i) -> ConduitT i o m (Maybe (Element i))
forall a b. (a -> b) -> a -> b
$ Element i -> Maybe (Element i)
forall a. a -> Maybe a
Just Element i
y
{-# INLINE peekE #-}

-- | Retrieve the last value in the stream, if present.
--
-- Subject to fusion
--
-- @since 1.3.0
last, lastC :: Monad m => ConduitT a o m (Maybe a)
lastC :: ConduitT a o m (Maybe a)
lastC =
    ConduitT a o m (Maybe a)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await ConduitT a o m (Maybe a)
-> (Maybe a -> ConduitT a o m (Maybe a))
-> ConduitT a o m (Maybe a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ConduitT a o m (Maybe a)
-> (a -> ConduitT a o m (Maybe a))
-> Maybe a
-> ConduitT a o m (Maybe a)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (Maybe a -> ConduitT a o m (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing) a -> ConduitT a o m (Maybe a)
forall (m :: * -> *) a o. Monad m => a -> ConduitT a o m (Maybe a)
loop
  where
    loop :: a -> ConduitT a o m (Maybe a)
loop a
prev = ConduitT a o m (Maybe a)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await ConduitT a o m (Maybe a)
-> (Maybe a -> ConduitT a o m (Maybe a))
-> ConduitT a o m (Maybe a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ConduitT a o m (Maybe a)
-> (a -> ConduitT a o m (Maybe a))
-> Maybe a
-> ConduitT a o m (Maybe a)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (Maybe a -> ConduitT a o m (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe a -> ConduitT a o m (Maybe a))
-> Maybe a -> ConduitT a o m (Maybe a)
forall a b. (a -> b) -> a -> b
$ a -> Maybe a
forall a. a -> Maybe a
Just a
prev) a -> ConduitT a o m (Maybe a)
loop
STREAMING0(last, lastC, lastS)

-- | Same as 'last', but returns a default value if none are available from the stream.
--
-- @since 1.3.0
lastDef :: Monad m => a -> ConduitT a o m a
lastDef :: a -> ConduitT a o m a
lastDef a
a = a -> Maybe a -> a
forall a. a -> Maybe a -> a
fromMaybe a
a (Maybe a -> a) -> ConduitT a o m (Maybe a) -> ConduitT a o m a
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ConduitT a o m (Maybe a)
forall (m :: * -> *) a o. Monad m => ConduitT a o m (Maybe a)
last

-- | Retrieve the last element in the chunked stream, if present.
--
-- Subject to fusion
--
-- @since 1.3.0
lastE, lastEC :: (Monad m, Seq.IsSequence seq) => ConduitT seq o m (Maybe (Element seq))
lastEC :: ConduitT seq o m (Maybe (Element seq))
lastEC =
    ConduitT seq o m (Maybe (NonNull seq))
forall (m :: * -> *) a o.
(Monad m, MonoFoldable a) =>
ConduitT a o m (Maybe (NonNull a))
awaitNonNull ConduitT seq o m (Maybe (NonNull seq))
-> (Maybe (NonNull seq) -> ConduitT seq o m (Maybe (Element seq)))
-> ConduitT seq o m (Maybe (Element seq))
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ConduitT seq o m (Maybe (Element seq))
-> (NonNull seq -> ConduitT seq o m (Maybe (Element seq)))
-> Maybe (NonNull seq)
-> ConduitT seq o m (Maybe (Element seq))
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (Maybe (Element seq) -> ConduitT seq o m (Maybe (Element seq))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (Element seq)
forall a. Maybe a
Nothing) (Element seq -> ConduitT seq o m (Maybe (Element seq))
forall (m :: * -> *) a o.
(Monad m, MonoFoldable a) =>
Element a -> ConduitT a o m (Maybe (Element a))
loop (Element seq -> ConduitT seq o m (Maybe (Element seq)))
-> (NonNull seq -> Element seq)
-> NonNull seq
-> ConduitT seq o m (Maybe (Element seq))
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. NonNull seq -> Element seq
forall mono. MonoFoldable mono => NonNull mono -> Element mono
NonNull.last)
  where
    loop :: Element a -> ConduitT a o m (Maybe (Element a))
loop Element a
prev = ConduitT a o m (Maybe (NonNull a))
forall (m :: * -> *) a o.
(Monad m, MonoFoldable a) =>
ConduitT a o m (Maybe (NonNull a))
awaitNonNull ConduitT a o m (Maybe (NonNull a))
-> (Maybe (NonNull a) -> ConduitT a o m (Maybe (Element a)))
-> ConduitT a o m (Maybe (Element a))
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ConduitT a o m (Maybe (Element a))
-> (NonNull a -> ConduitT a o m (Maybe (Element a)))
-> Maybe (NonNull a)
-> ConduitT a o m (Maybe (Element a))
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (Maybe (Element a) -> ConduitT a o m (Maybe (Element a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (Element a) -> ConduitT a o m (Maybe (Element a)))
-> Maybe (Element a) -> ConduitT a o m (Maybe (Element a))
forall a b. (a -> b) -> a -> b
$ Element a -> Maybe (Element a)
forall a. a -> Maybe a
Just Element a
prev) (Element a -> ConduitT a o m (Maybe (Element a))
loop (Element a -> ConduitT a o m (Maybe (Element a)))
-> (NonNull a -> Element a)
-> NonNull a
-> ConduitT a o m (Maybe (Element a))
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. NonNull a -> Element a
forall mono. MonoFoldable mono => NonNull mono -> Element mono
NonNull.last)
STREAMING0(lastE, lastEC, lastES)

-- | Count how many values are in the stream.
--
-- Subject to fusion
--
-- @since 1.3.0
length :: (Monad m, Num len) => ConduitT a o m len
INLINE_RULE0(length, foldl (\x _ -> x + 1) 0)

-- | Count how many elements are in the chunked stream.
--
-- Subject to fusion
--
-- @since 1.3.0
lengthE :: (Monad m, Num len, MonoFoldable mono) => ConduitT mono o m len
INLINE_RULE0(lengthE, foldl (\x y -> x + fromIntegral (olength y)) 0)

-- | Count how many values in the stream pass the given predicate.
--
-- Subject to fusion
--
-- @since 1.3.0
lengthIf :: (Monad m, Num len) => (a -> Bool) -> ConduitT a o m len
INLINE_RULE(lengthIf, f, foldl (\cnt a -> if f a then (cnt + 1) else cnt) 0)

-- | Count how many elements in the chunked stream pass the given predicate.
--
-- Subject to fusion
--
-- @since 1.3.0
lengthIfE :: (Monad m, Num len, MonoFoldable mono)
          => (Element mono -> Bool) -> ConduitT mono o m len
INLINE_RULE(lengthIfE, f, foldlE (\cnt a -> if f a then (cnt + 1) else cnt) 0)

-- | Get the largest value in the stream, if present.
--
-- Subject to fusion
--
-- @since 1.3.0
maximum :: (Monad m, Ord a) => ConduitT a o m (Maybe a)
INLINE_RULE0(maximum, foldl1 max)

-- | Get the largest element in the chunked stream, if present.
--
-- Subject to fusion
--
-- @since 1.3.0
maximumE :: (Monad m, Seq.IsSequence seq, Ord (Element seq)) => ConduitT seq o m (Maybe (Element seq))
INLINE_RULE0(maximumE, foldl1E max)

-- | Get the smallest value in the stream, if present.
--
-- Subject to fusion
--
-- @since 1.3.0
minimum :: (Monad m, Ord a) => ConduitT a o m (Maybe a)
INLINE_RULE0(minimum, foldl1 min)

-- | Get the smallest element in the chunked stream, if present.
--
-- Subject to fusion
--
-- @since 1.3.0
minimumE :: (Monad m, Seq.IsSequence seq, Ord (Element seq)) => ConduitT seq o m (Maybe (Element seq))
INLINE_RULE0(minimumE, foldl1E min)

-- | True if there are no values in the stream.
--
-- This function does not modify the stream.
--
-- @since 1.3.0
null :: Monad m => ConduitT a o m Bool
null :: ConduitT a o m Bool
null = (Bool -> (a -> Bool) -> Maybe a -> Bool
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Bool
True (\a
_ -> Bool
False)) (Maybe a -> Bool)
-> ConduitT a o m (Maybe a) -> ConduitT a o m Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
`fmap` ConduitT a o m (Maybe a)
forall (m :: * -> *) a o. Monad m => ConduitT a o m (Maybe a)
peek
{-# INLINE null #-}

-- | True if there are no elements in the chunked stream.
--
-- This function may remove empty leading chunks from the stream, but otherwise
-- will not modify it.
--
-- @since 1.3.0
nullE :: (Monad m, MonoFoldable mono)
      => ConduitT mono o m Bool
nullE :: ConduitT mono o m Bool
nullE =
    ConduitT mono o m Bool
forall (m :: * -> *) i o.
(Monad m, MonoFoldable i) =>
ConduitT i o m Bool
go
  where
    go :: ConduitT i o m Bool
go = ConduitT i o m (Maybe i)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await ConduitT i o m (Maybe i)
-> (Maybe i -> ConduitT i o m Bool) -> ConduitT i o m Bool
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ConduitT i o m Bool
-> (i -> ConduitT i o m Bool) -> Maybe i -> ConduitT i o m Bool
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (Bool -> ConduitT i o m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True) i -> ConduitT i o m Bool
go'
    go' :: i -> ConduitT i o m Bool
go' i
x = if i -> Bool
forall mono. MonoFoldable mono => mono -> Bool
onull i
x then ConduitT i o m Bool
go else i -> ConduitT i o m ()
forall i o (m :: * -> *). i -> ConduitT i o m ()
leftover i
x ConduitT i o m () -> ConduitT i o m Bool -> ConduitT i o m Bool
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Bool -> ConduitT i o m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
{-# INLINE nullE #-}

-- | Get the sum of all values in the stream.
--
-- Subject to fusion
--
-- @since 1.3.0
sum :: (Monad m, Num a) => ConduitT a o m a
INLINE_RULE0(sum, foldl (+) 0)

-- | Get the sum of all elements in the chunked stream.
--
-- Subject to fusion
--
-- @since 1.3.0
sumE :: (Monad m, MonoFoldable mono, Num (Element mono)) => ConduitT mono o m (Element mono)
INLINE_RULE0(sumE, foldlE (+) 0)

-- | Get the product of all values in the stream.
--
-- Subject to fusion
--
-- @since 1.3.0
product :: (Monad m, Num a) => ConduitT a o m a
INLINE_RULE0(product, foldl (*) 1)

-- | Get the product of all elements in the chunked stream.
--
-- Subject to fusion
--
-- @since 1.3.0
productE :: (Monad m, MonoFoldable mono, Num (Element mono)) => ConduitT mono o m (Element mono)
INLINE_RULE0(productE, foldlE (*) 1)

-- | Find the first matching value.
--
-- Subject to fusion
--
-- @since 1.3.0
find, findC :: Monad m => (a -> Bool) -> ConduitT a o m (Maybe a)
findC :: (a -> Bool) -> ConduitT a o m (Maybe a)
findC a -> Bool
f =
    ConduitT a o m (Maybe a)
loop
  where
    loop :: ConduitT a o m (Maybe a)
loop = ConduitT a o m (Maybe a)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await ConduitT a o m (Maybe a)
-> (Maybe a -> ConduitT a o m (Maybe a))
-> ConduitT a o m (Maybe a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ConduitT a o m (Maybe a)
-> (a -> ConduitT a o m (Maybe a))
-> Maybe a
-> ConduitT a o m (Maybe a)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (Maybe a -> ConduitT a o m (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing) a -> ConduitT a o m (Maybe a)
go
    go :: a -> ConduitT a o m (Maybe a)
go a
x = if a -> Bool
f a
x then Maybe a -> ConduitT a o m (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just a
x) else ConduitT a o m (Maybe a)
loop
{-# INLINE findC #-}
STREAMING(find, findC, findS, f)

-- | Apply the action to all values in the stream.
--
-- Note: if you want to /pass/ the values instead of /consuming/ them, use
-- 'iterM' instead.
--
-- Subject to fusion
--
-- @since 1.3.0
mapM_ :: Monad m => (a -> m ()) -> ConduitT a o m ()
INLINE_RULE(mapM_, f, CL.mapM_ f)

-- | Apply the action to all elements in the chunked stream.
--
-- Note: the same caveat as with 'mapM_' applies. If you don't want to
-- consume the values, you can use 'iterM':
--
-- > iterM (omapM_ f)
--
-- Subject to fusion
--
-- @since 1.3.0
mapM_E :: (Monad m, MonoFoldable mono) => (Element mono -> m ()) -> ConduitT mono o m ()
INLINE_RULE(mapM_E, f, CL.mapM_ (omapM_ f))

-- | A monadic strict left fold.
--
-- Subject to fusion
--
-- @since 1.3.0
foldM :: Monad m => (a -> b -> m a) -> a -> ConduitT b o m a
INLINE_RULE(foldM, f x, CL.foldM f x)

-- | A monadic strict left fold on a chunked stream.
--
-- Subject to fusion
--
-- @since 1.3.0
foldME :: (Monad m, MonoFoldable mono)
       => (a -> Element mono -> m a)
       -> a
       -> ConduitT mono o m a
INLINE_RULE(foldME, f x, foldM (ofoldlM f) x)

-- | Apply the provided monadic mapping function and monoidal combine all values.
--
-- Subject to fusion
--
-- @since 1.3.0
foldMapM :: (Monad m, Monoid w) => (a -> m w) -> ConduitT a o m w
INLINE_RULE(foldMapM, f, CL.foldMapM f)

-- | Apply the provided monadic mapping function and monoidal combine all
-- elements in the chunked stream.
--
-- Subject to fusion
--
-- @since 1.3.0
foldMapME :: (Monad m, MonoFoldable mono, Monoid w)
          => (Element mono -> m w)
          -> ConduitT mono o m w
INLINE_RULE(foldMapME, f, CL.foldM (ofoldlM (\accum e -> mappend accum `liftM` f e)) mempty)

-- | 'sinkFile' specialized to 'ByteString' to help with type
-- inference.
--
-- @since 1.3.0
sinkFileBS :: MonadResource m => FilePath -> ConduitT ByteString o m ()
sinkFileBS :: FilePath -> ConduitT ByteString o m ()
sinkFileBS = FilePath -> ConduitT ByteString o m ()
forall (m :: * -> *) o.
MonadResource m =>
FilePath -> ConduitT ByteString o m ()
sinkFile
{-# INLINE sinkFileBS #-}

-- | Print all incoming values to stdout.
--
-- Subject to fusion
--
-- @since 1.3.0
print :: (Show a, MonadIO m) => ConduitT a o m ()
INLINE_RULE0(print, mapM_ (liftIO . Prelude.print))

-- | @sinkHandle@ applied to @stdout@.
--
-- Subject to fusion
--
-- @since 1.3.0
stdout :: MonadIO m => ConduitT ByteString o m ()
INLINE_RULE0(stdout, sinkHandle IO.stdout)

-- | @sinkHandle@ applied to @stderr@.
--
-- Subject to fusion
--
-- @since 1.3.0
stderr :: MonadIO m => ConduitT ByteString o m ()
INLINE_RULE0(stderr, sinkHandle IO.stderr)

-- | Apply a transformation to all values in a stream.
--
-- Subject to fusion
--
-- @since 1.3.0
map :: Monad m => (a -> b) -> ConduitT a b m ()
INLINE_RULE(map, f, CL.map f)

-- | Apply a transformation to all elements in a chunked stream.
--
-- Subject to fusion
--
-- @since 1.3.0
mapE :: (Monad m, Functor f) => (a -> b) -> ConduitT (f a) (f b) m ()
INLINE_RULE(mapE, f, CL.map (fmap f))

-- | Apply a monomorphic transformation to all elements in a chunked stream.
--
-- Unlike @mapE@, this will work on types like @ByteString@ and @Text@ which
-- are @MonoFunctor@ but not @Functor@.
--
-- Subject to fusion
--
-- @since 1.3.0
omapE :: (Monad m, MonoFunctor mono) => (Element mono -> Element mono) -> ConduitT mono mono m ()
INLINE_RULE(omapE, f, CL.map (omap f))

-- | Apply the function to each value in the stream, resulting in a foldable
-- value (e.g., a list). Then yield each of the individual values in that
-- foldable value separately.
--
-- Generalizes concatMap, mapMaybe, and mapFoldable.
--
-- Subject to fusion
--
-- @since 1.3.0
concatMap, concatMapC :: (Monad m, MonoFoldable mono)
                      => (a -> mono)
                      -> ConduitT a (Element mono) m ()
concatMapC :: (a -> mono) -> ConduitT a (Element mono) m ()
concatMapC a -> mono
f = (a -> ConduitT a (Element mono) m ())
-> ConduitT a (Element mono) m ()
forall (m :: * -> *) i o r.
Monad m =>
(i -> ConduitT i o m r) -> ConduitT i o m ()
awaitForever (mono -> ConduitT a (Element mono) m ()
forall (m :: * -> *) mono i.
(Monad m, MonoFoldable mono) =>
mono -> ConduitT i (Element mono) m ()
yieldMany (mono -> ConduitT a (Element mono) m ())
-> (a -> mono) -> a -> ConduitT a (Element mono) m ()
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. a -> mono
f)
{-# INLINE concatMapC #-}
STREAMING(concatMap, concatMapC, concatMapS, f)

-- | Apply the function to each element in the chunked stream, resulting in a
-- foldable value (e.g., a list). Then yield each of the individual values in
-- that foldable value separately.
--
-- Generalizes concatMap, mapMaybe, and mapFoldable.
--
-- Subject to fusion
--
-- @since 1.3.0
concatMapE :: (Monad m, MonoFoldable mono, Monoid w)
           => (Element mono -> w)
           -> ConduitT mono w m ()
INLINE_RULE(concatMapE, f, CL.map (ofoldMap f))

-- | Stream up to n number of values downstream.
--
-- Note that, if downstream terminates early, not all values will be consumed.
-- If you want to force /exactly/ the given number of values to be consumed,
-- see 'takeExactly'.
--
-- Subject to fusion
--
-- @since 1.3.0
take :: Monad m => Int -> ConduitT a a m ()
INLINE_RULE(take, n, CL.isolate n)

-- | Stream up to n number of elements downstream in a chunked stream.
--
-- Note that, if downstream terminates early, not all values will be consumed.
-- If you want to force /exactly/ the given number of values to be consumed,
-- see 'takeExactlyE'.
--
-- @since 1.3.0
takeE :: (Monad m, Seq.IsSequence seq)
      => Seq.Index seq
      -> ConduitT seq seq m ()
takeE :: Index seq -> ConduitT seq seq m ()
takeE =
    Index seq -> ConduitT seq seq m ()
forall i (m :: * -> *).
(Monad m, IsSequence i) =>
Index i -> ConduitT i i m ()
loop
  where
    loop :: Index i -> ConduitT i i m ()
loop Index i
i = if Index i
i Index i -> Index i -> Bool
forall a. Ord a => a -> a -> Bool
<= Index i
0
        then () -> ConduitT i i m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        else ConduitT i i m (Maybe i)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await ConduitT i i m (Maybe i)
-> (Maybe i -> ConduitT i i m ()) -> ConduitT i i m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ConduitT i i m ()
-> (i -> ConduitT i i m ()) -> Maybe i -> ConduitT i i m ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> ConduitT i i m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()) (Index i -> i -> ConduitT i i m ()
go Index i
i)

    go :: Index i -> i -> ConduitT i i m ()
go Index i
i i
sq = do
        Bool -> ConduitT i i m () -> ConduitT i i m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (i -> Bool
forall mono. MonoFoldable mono => mono -> Bool
onull i
x) (ConduitT i i m () -> ConduitT i i m ())
-> ConduitT i i m () -> ConduitT i i m ()
forall a b. (a -> b) -> a -> b
$ i -> ConduitT i i m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield i
x
        Bool -> ConduitT i i m () -> ConduitT i i m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (i -> Bool
forall mono. MonoFoldable mono => mono -> Bool
onull i
y) (ConduitT i i m () -> ConduitT i i m ())
-> ConduitT i i m () -> ConduitT i i m ()
forall a b. (a -> b) -> a -> b
$ i -> ConduitT i i m ()
forall i o (m :: * -> *). i -> ConduitT i o m ()
leftover i
y
        Index i -> ConduitT i i m ()
loop Index i
i'
      where
        (i
x, i
y) = Index i -> i -> (i, i)
forall seq. IsSequence seq => Index seq -> seq -> (seq, seq)
Seq.splitAt Index i
i i
sq
        i' :: Index i
i' = Index i
i Index i -> Index i -> Index i
forall a. Num a => a -> a -> a
- Int -> Index i
forall a b. (Integral a, Num b) => a -> b
fromIntegral (i -> Int
forall mono. MonoFoldable mono => mono -> Int
olength i
x)
{-# INLINEABLE takeE #-}

-- | Stream all values downstream that match the given predicate.
--
-- Same caveats regarding downstream termination apply as with 'take'.
--
-- @since 1.3.0
takeWhile :: Monad m
          => (a -> Bool)
          -> ConduitT a a m ()
takeWhile :: (a -> Bool) -> ConduitT a a m ()
takeWhile a -> Bool
f =
    ConduitT a a m ()
loop
  where
    loop :: ConduitT a a m ()
loop = ConduitT a a m (Maybe a)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await ConduitT a a m (Maybe a)
-> (Maybe a -> ConduitT a a m ()) -> ConduitT a a m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ConduitT a a m ()
-> (a -> ConduitT a a m ()) -> Maybe a -> ConduitT a a m ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> ConduitT a a m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()) a -> ConduitT a a m ()
go
    go :: a -> ConduitT a a m ()
go a
x = if a -> Bool
f a
x
        then a -> ConduitT a a m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield a
x ConduitT a a m () -> ConduitT a a m () -> ConduitT a a m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ConduitT a a m ()
loop
        else a -> ConduitT a a m ()
forall i o (m :: * -> *). i -> ConduitT i o m ()
leftover a
x
{-# INLINE takeWhile #-}

-- | Stream all elements downstream that match the given predicate in a chunked stream.
--
-- Same caveats regarding downstream termination apply as with 'takeE'.
--
-- @since 1.3.0
takeWhileE :: (Monad m, Seq.IsSequence seq)
           => (Element seq -> Bool)
           -> ConduitT seq seq m ()
takeWhileE :: (Element seq -> Bool) -> ConduitT seq seq m ()
takeWhileE Element seq -> Bool
f =
    ConduitT seq seq m ()
loop
  where
    loop :: ConduitT seq seq m ()
loop = ConduitT seq seq m (Maybe seq)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await ConduitT seq seq m (Maybe seq)
-> (Maybe seq -> ConduitT seq seq m ()) -> ConduitT seq seq m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ConduitT seq seq m ()
-> (seq -> ConduitT seq seq m ())
-> Maybe seq
-> ConduitT seq seq m ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> ConduitT seq seq m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()) seq -> ConduitT seq seq m ()
go

    go :: seq -> ConduitT seq seq m ()
go seq
sq = do
        Bool -> ConduitT seq seq m () -> ConduitT seq seq m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (seq -> Bool
forall mono. MonoFoldable mono => mono -> Bool
onull seq
x) (ConduitT seq seq m () -> ConduitT seq seq m ())
-> ConduitT seq seq m () -> ConduitT seq seq m ()
forall a b. (a -> b) -> a -> b
$ seq -> ConduitT seq seq m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield seq
x
        if seq -> Bool
forall mono. MonoFoldable mono => mono -> Bool
onull seq
y
            then ConduitT seq seq m ()
loop
            else seq -> ConduitT seq seq m ()
forall i o (m :: * -> *). i -> ConduitT i o m ()
leftover seq
y
      where
        (seq
x, seq
y) = (Element seq -> Bool) -> seq -> (seq, seq)
forall seq.
IsSequence seq =>
(Element seq -> Bool) -> seq -> (seq, seq)
Seq.span Element seq -> Bool
f seq
sq
{-# INLINE takeWhileE #-}

-- | Consume precisely the given number of values and feed them downstream.
--
-- This function is in contrast to 'take', which will only consume up to the
-- given number of values, and will terminate early if downstream terminates
-- early. This function will discard any additional values in the stream if
-- they are unconsumed.
--
-- Note that this function takes a downstream @ConduitT@ as a parameter, as
-- opposed to working with normal fusion. For more information, see
-- <http://www.yesodweb.com/blog/2013/10/core-flaw-pipes-conduit>, the section
-- titled \"pipes and conduit: isolate\".
--
-- @since 1.3.0
takeExactly :: Monad m
            => Int
            -> ConduitT a b m r
            -> ConduitT a b m r
takeExactly :: Int -> ConduitT a b m r -> ConduitT a b m r
takeExactly Int
count ConduitT a b m r
inner = Int -> ConduitT a a m ()
forall (m :: * -> *) a. Monad m => Int -> ConduitT a a m ()
take Int
count ConduitT a a m () -> ConduitT a b m r -> ConduitT a b m r
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| do
    r
r <- ConduitT a b m r
inner
    ConduitT a b m ()
forall (m :: * -> *) i o. Monad m => ConduitT i o m ()
CL.sinkNull
    r -> ConduitT a b m r
forall (m :: * -> *) a. Monad m => a -> m a
return r
r

-- | Same as 'takeExactly', but for chunked streams.
--
-- @since 1.3.0
takeExactlyE :: (Monad m, Seq.IsSequence a)
             => Seq.Index a
             -> ConduitT a b m r
             -> ConduitT a b m r
takeExactlyE :: Index a -> ConduitT a b m r -> ConduitT a b m r
takeExactlyE Index a
count ConduitT a b m r
inner = Index a -> ConduitT a a m ()
forall (m :: * -> *) seq.
(Monad m, IsSequence seq) =>
Index seq -> ConduitT seq seq m ()
takeE Index a
count ConduitT a a m () -> ConduitT a b m r -> ConduitT a b m r
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| do
    r
r <- ConduitT a b m r
inner
    ConduitT a b m ()
forall (m :: * -> *) i o. Monad m => ConduitT i o m ()
CL.sinkNull
    r -> ConduitT a b m r
forall (m :: * -> *) a. Monad m => a -> m a
return r
r
{-# INLINE takeExactlyE #-}

-- | Flatten out a stream by yielding the values contained in an incoming
-- @MonoFoldable@ as individually yielded values.
--
-- Subject to fusion
--
-- @since 1.3.0
concat, concatC :: (Monad m, MonoFoldable mono)
                => ConduitT mono (Element mono) m ()
concatC :: ConduitT mono (Element mono) m ()
concatC = (mono -> ConduitT mono (Element mono) m ())
-> ConduitT mono (Element mono) m ()
forall (m :: * -> *) i o r.
Monad m =>
(i -> ConduitT i o m r) -> ConduitT i o m ()
awaitForever mono -> ConduitT mono (Element mono) m ()
forall (m :: * -> *) mono i.
(Monad m, MonoFoldable mono) =>
mono -> ConduitT i (Element mono) m ()
yieldMany
STREAMING0(concat, concatC, concatS)

-- | Keep only values in the stream passing a given predicate.
--
-- Subject to fusion
--
-- @since 1.3.0
filter :: Monad m => (a -> Bool) -> ConduitT a a m ()
INLINE_RULE(filter, f, CL.filter f)

-- | Keep only elements in the chunked stream passing a given predicate.
--
-- Subject to fusion
--
-- @since 1.3.0
filterE :: (Seq.IsSequence seq, Monad m) => (Element seq -> Bool) -> ConduitT seq seq m ()
INLINE_RULE(filterE, f, CL.map (Seq.filter f))

-- | Map values as long as the result is @Just@.
--
-- @since 1.3.0
mapWhile :: Monad m => (a -> Maybe b) -> ConduitT a b m ()
mapWhile :: (a -> Maybe b) -> ConduitT a b m ()
mapWhile a -> Maybe b
f =
    ConduitT a b m ()
loop
  where
    loop :: ConduitT a b m ()
loop = ConduitT a b m (Maybe a)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await ConduitT a b m (Maybe a)
-> (Maybe a -> ConduitT a b m ()) -> ConduitT a b m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ConduitT a b m ()
-> (a -> ConduitT a b m ()) -> Maybe a -> ConduitT a b m ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> ConduitT a b m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()) a -> ConduitT a b m ()
go
    go :: a -> ConduitT a b m ()
go a
x =
        case a -> Maybe b
f a
x of
            Just b
y -> b -> ConduitT a b m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield b
y ConduitT a b m () -> ConduitT a b m () -> ConduitT a b m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ConduitT a b m ()
loop
            Maybe b
Nothing -> a -> ConduitT a b m ()
forall i o (m :: * -> *). i -> ConduitT i o m ()
leftover a
x
{-# INLINE mapWhile #-}

-- | Break up a stream of values into vectors of size n. The final vector may
-- be smaller than n if the total number of values is not a strict multiple of
-- n. No empty vectors will be yielded.
--
-- @since 1.3.0
conduitVector :: (V.Vector v a, PrimMonad m)
              => Int -- ^ maximum allowed size
              -> ConduitT a (v a) m ()
conduitVector :: Int -> ConduitT a (v a) m ()
conduitVector Int
size =
    ConduitT a (v a) m ()
loop
  where
    loop :: ConduitT a (v a) m ()
loop = do
        v a
v <- Int -> ConduitT a (v a) m (v a)
forall (v :: * -> *) a (m :: * -> *) o.
(Vector v a, PrimMonad m) =>
Int -> ConduitT a o m (v a)
sinkVectorN Int
size
        Bool -> ConduitT a (v a) m () -> ConduitT a (v a) m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (v a -> Bool
forall (v :: * -> *) a. Vector v a => v a -> Bool
V.null v a
v) (ConduitT a (v a) m () -> ConduitT a (v a) m ())
-> ConduitT a (v a) m () -> ConduitT a (v a) m ()
forall a b. (a -> b) -> a -> b
$ do
            v a -> ConduitT a (v a) m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield v a
v
            ConduitT a (v a) m ()
loop
{-# INLINE conduitVector #-}

-- | Analog of 'Prelude.scanl' for lists.
--
-- Subject to fusion
--
-- @since 1.3.0
scanl, scanlC :: Monad m => (a -> b -> a) -> a -> ConduitT b a m ()
scanlC :: (a -> b -> a) -> a -> ConduitT b a m ()
scanlC a -> b -> a
f =
    a -> ConduitT b a m ()
loop
  where
    loop :: a -> ConduitT b a m ()
loop a
seed =
        ConduitT b a m (Maybe b)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await ConduitT b a m (Maybe b)
-> (Maybe b -> ConduitT b a m ()) -> ConduitT b a m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ConduitT b a m ()
-> (b -> ConduitT b a m ()) -> Maybe b -> ConduitT b a m ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (a -> ConduitT b a m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield a
seed) b -> ConduitT b a m ()
go
      where
        go :: b -> ConduitT b a m ()
go b
b = do
            let seed' :: a
seed' = a -> b -> a
f a
seed b
b
            a
seed' a -> ConduitT b a m () -> ConduitT b a m ()
`seq` a -> ConduitT b a m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield a
seed
            a -> ConduitT b a m ()
loop a
seed'
STREAMING(scanl, scanlC, scanlS, f x)

-- | 'mapWhile' with a break condition dependent on a strict accumulator.
-- Equivalently, 'CL.mapAccum' as long as the result is @Right@. Instead of
-- producing a leftover, the breaking input determines the resulting
-- accumulator via @Left@.
--
-- Subject to fusion
mapAccumWhile, mapAccumWhileC :: Monad m => (a -> s -> Either s (s, b)) -> s -> ConduitT a b m s
mapAccumWhileC :: (a -> s -> Either s (s, b)) -> s -> ConduitT a b m s
mapAccumWhileC a -> s -> Either s (s, b)
f =
    s -> ConduitT a b m s
loop
  where
    loop :: s -> ConduitT a b m s
loop !s
s = ConduitT a b m (Maybe a)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await ConduitT a b m (Maybe a)
-> (Maybe a -> ConduitT a b m s) -> ConduitT a b m s
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ConduitT a b m s
-> (a -> ConduitT a b m s) -> Maybe a -> ConduitT a b m s
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (s -> ConduitT a b m s
forall (m :: * -> *) a. Monad m => a -> m a
return s
s) a -> ConduitT a b m s
go
      where
        go :: a -> ConduitT a b m s
go a
a = (s -> ConduitT a b m s)
-> ((s, b) -> ConduitT a b m s)
-> Either s (s, b)
-> ConduitT a b m s
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (s -> ConduitT a b m s
forall (m :: * -> *) a. Monad m => a -> m a
return (s -> ConduitT a b m s) -> s -> ConduitT a b m s
forall a b. (a -> b) -> a -> b
$!) (\(s
s', b
b) -> b -> ConduitT a b m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield b
b ConduitT a b m () -> ConduitT a b m s -> ConduitT a b m s
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> s -> ConduitT a b m s
loop s
s') (Either s (s, b) -> ConduitT a b m s)
-> Either s (s, b) -> ConduitT a b m s
forall a b. (a -> b) -> a -> b
$ a -> s -> Either s (s, b)
f a
a s
s
{-# INLINE mapAccumWhileC #-}
STREAMING(mapAccumWhile, mapAccumWhileC, mapAccumWhileS, f s)


-- | Specialized version of 'mapAccumWhile' that does not provide values downstream.
--
-- @since 1.3.4
foldWhile :: Monad m => (a -> s -> Either e s) -> s -> ConduitT a o m (Either e s)
foldWhile :: (a -> s -> Either e s) -> s -> ConduitT a o m (Either e s)
foldWhile a -> s -> Either e s
f = s -> ConduitT a o m (Either e s)
loop
  where
    loop :: s -> ConduitT a o m (Either e s)
loop !s
s = ConduitT a o m (Maybe a)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await ConduitT a o m (Maybe a)
-> (Maybe a -> ConduitT a o m (Either e s))
-> ConduitT a o m (Either e s)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ConduitT a o m (Either e s)
-> (a -> ConduitT a o m (Either e s))
-> Maybe a
-> ConduitT a o m (Either e s)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (Either e s -> ConduitT a o m (Either e s)
forall (m :: * -> *) a. Monad m => a -> m a
return (Either e s -> ConduitT a o m (Either e s))
-> Either e s -> ConduitT a o m (Either e s)
forall a b. (a -> b) -> a -> b
$ s -> Either e s
forall a b. b -> Either a b
Right s
s) a -> ConduitT a o m (Either e s)
go
      where
        go :: a -> ConduitT a o m (Either e s)
go a
a = (e -> ConduitT a o m (Either e s))
-> (s -> ConduitT a o m (Either e s))
-> Either e s
-> ConduitT a o m (Either e s)
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (Either e s -> ConduitT a o m (Either e s)
forall (m :: * -> *) a. Monad m => a -> m a
return (Either e s -> ConduitT a o m (Either e s))
-> (e -> Either e s) -> e -> ConduitT a o m (Either e s)
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. e -> Either e s
forall a b. a -> Either a b
Left (e -> ConduitT a o m (Either e s))
-> e -> ConduitT a o m (Either e s)
forall a b. (a -> b) -> a -> b
$!) s -> ConduitT a o m (Either e s)
loop (Either e s -> ConduitT a o m (Either e s))
-> Either e s -> ConduitT a o m (Either e s)
forall a b. (a -> b) -> a -> b
$ a -> s -> Either e s
f a
a s
s
{-# INLINE foldWhile #-}


-- | 'concatMap' with an accumulator.
--
-- Subject to fusion
--
-- @since 1.3.0
concatMapAccum :: Monad m => (a -> accum -> (accum, [b])) -> accum -> ConduitT a b m ()
INLINE_RULE0(concatMapAccum, CL.concatMapAccum)

-- | Insert the given value between each two values in the stream.
--
-- Subject to fusion
--
-- @since 1.3.0
intersperse, intersperseC :: Monad m => a -> ConduitT a a m ()
intersperseC :: a -> ConduitT a a m ()
intersperseC a
x =
    ConduitT a a m (Maybe a)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await ConduitT a a m (Maybe a)
-> (Maybe a -> ConduitT a a m ()) -> ConduitT a a m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Element (Maybe a) -> ConduitT a a m ())
-> Maybe a -> ConduitT a a m ()
forall mono (m :: * -> *).
(MonoFoldable mono, Applicative m) =>
(Element mono -> m ()) -> mono -> m ()
omapM_ a -> ConduitT a a m ()
Element (Maybe a) -> ConduitT a a m ()
go
  where
    go :: a -> ConduitT a a m ()
go a
y = a -> ConduitT a a m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield a
y ConduitT a a m () -> ConduitT a a m () -> ConduitT a a m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> (a -> [a]) -> ConduitT a (Element [a]) m ()
forall (m :: * -> *) mono a.
(Monad m, MonoFoldable mono) =>
(a -> mono) -> ConduitT a (Element mono) m ()
concatMap (\a
z -> [a
x, a
z])
STREAMING(intersperse, intersperseC, intersperseS, x)

-- | Sliding window of values
-- 1,2,3,4,5 with window size 2 gives
-- [1,2],[2,3],[3,4],[4,5]
--
-- Best used with structures that support O(1) snoc.
--
-- Subject to fusion
--
-- @since 1.3.0
slidingWindow, slidingWindowC :: (Monad m, Seq.IsSequence seq, Element seq ~ a) => Int -> ConduitT a seq m ()
slidingWindowC :: Int -> ConduitT a seq m ()
slidingWindowC Int
sz = Int -> seq -> ConduitT (Element seq) seq m ()
forall (m :: * -> *) t t.
(Monad m, IsSequence t, Num t, Eq t) =>
t -> t -> ConduitT (Element t) t m ()
go (Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
1 Int
sz) seq
forall a. Monoid a => a
mempty
    where goContinue :: o -> ConduitT (Element o) o m ()
goContinue o
st = ConduitT (Element o) o m (Maybe (Element o))
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await ConduitT (Element o) o m (Maybe (Element o))
-> (Maybe (Element o) -> ConduitT (Element o) o m ())
-> ConduitT (Element o) o m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>=
                          ConduitT (Element o) o m ()
-> (Element o -> ConduitT (Element o) o m ())
-> Maybe (Element o)
-> ConduitT (Element o) o m ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> ConduitT (Element o) o m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
                                (\Element o
x -> do
                                   let st' :: o
st' = o -> Element o -> o
forall seq. SemiSequence seq => seq -> Element seq -> seq
Seq.snoc o
st Element o
x
                                   o -> ConduitT (Element o) o m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield o
st' ConduitT (Element o) o m ()
-> ConduitT (Element o) o m () -> ConduitT (Element o) o m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> o -> ConduitT (Element o) o m ()
goContinue (o -> o
forall seq. IsSequence seq => seq -> seq
Seq.unsafeTail o
st')
                                )
          go :: t -> t -> ConduitT (Element t) t m ()
go t
0 t
st = t -> ConduitT (Element t) t m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield t
st ConduitT (Element t) t m ()
-> ConduitT (Element t) t m () -> ConduitT (Element t) t m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> t -> ConduitT (Element t) t m ()
forall (m :: * -> *) o.
(Monad m, IsSequence o) =>
o -> ConduitT (Element o) o m ()
goContinue (t -> t
forall seq. IsSequence seq => seq -> seq
Seq.unsafeTail t
st)
          go !t
n t
st = ConduitT (Element t) t m (Maybe (Element t))
forall (m :: * -> *) a o. Monad m => ConduitT a o m (Maybe a)
CL.head ConduitT (Element t) t m (Maybe (Element t))
-> (Maybe (Element t) -> ConduitT (Element t) t m ())
-> ConduitT (Element t) t m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Maybe (Element t)
m ->
                     case Maybe (Element t)
m of
                       Maybe (Element t)
Nothing -> t -> ConduitT (Element t) t m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield t
st
                       Just Element t
x -> t -> t -> ConduitT (Element t) t m ()
go (t
nt -> t -> t
forall a. Num a => a -> a -> a
-t
1) (t -> Element t -> t
forall seq. SemiSequence seq => seq -> Element seq -> seq
Seq.snoc t
st Element t
x)
STREAMING(slidingWindow, slidingWindowC, slidingWindowS, sz)


-- | Split input into chunk of size 'chunkSize'
--
-- The last element may be smaller than the 'chunkSize' (see also
-- 'chunksOfExactlyE' which will not yield this last element)
--
-- @since 1.3.0
chunksOfE :: (Monad m, Seq.IsSequence seq) => Seq.Index seq -> ConduitT seq seq m ()
chunksOfE :: Index seq -> ConduitT seq seq m ()
chunksOfE Index seq
chunkSize = Index seq -> ConduitT seq seq m ()
forall (m :: * -> *) seq.
(Monad m, IsSequence seq) =>
Index seq -> ConduitT seq seq m ()
chunksOfExactlyE Index seq
chunkSize ConduitT seq seq m ()
-> ConduitT seq seq m () -> ConduitT seq seq m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> (ConduitT seq seq m (Maybe seq)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await ConduitT seq seq m (Maybe seq)
-> (Maybe seq -> ConduitT seq seq m ()) -> ConduitT seq seq m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ConduitT seq seq m ()
-> (seq -> ConduitT seq seq m ())
-> Maybe seq
-> ConduitT seq seq m ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> ConduitT seq seq m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()) seq -> ConduitT seq seq m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield)

-- | Split input into chunk of size 'chunkSize'
--
-- If the input does not split into chunks exactly, the remainder will be
-- leftover (see also 'chunksOfE')
--
-- @since 1.3.0
chunksOfExactlyE :: (Monad m, Seq.IsSequence seq) => Seq.Index seq -> ConduitT seq seq m ()
chunksOfExactlyE :: Index seq -> ConduitT seq seq m ()
chunksOfExactlyE Index seq
chunkSize = ConduitT seq seq m (Maybe seq)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await ConduitT seq seq m (Maybe seq)
-> (Maybe seq -> ConduitT seq seq m ()) -> ConduitT seq seq m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ConduitT seq seq m ()
-> (seq -> ConduitT seq seq m ())
-> Maybe seq
-> ConduitT seq seq m ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> ConduitT seq seq m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()) seq -> ConduitT seq seq m ()
start
    where
        start :: seq -> ConduitT seq seq m ()
start seq
b
            | seq -> Bool
forall mono. MonoFoldable mono => mono -> Bool
onull seq
b = Index seq -> ConduitT seq seq m ()
forall (m :: * -> *) seq.
(Monad m, IsSequence seq) =>
Index seq -> ConduitT seq seq m ()
chunksOfExactlyE Index seq
chunkSize
            | seq -> Index seq
forall seq. IsSequence seq => seq -> Index seq
Seq.lengthIndex seq
b Index seq -> Index seq -> Bool
forall a. Ord a => a -> a -> Bool
< Index seq
chunkSize = Index seq -> [seq] -> ConduitT seq seq m ()
continue (seq -> Index seq
forall seq. IsSequence seq => seq -> Index seq
Seq.lengthIndex seq
b) [seq
b]
            | Bool
otherwise = let (seq
first,seq
rest) = Index seq -> seq -> (seq, seq)
forall seq. IsSequence seq => Index seq -> seq -> (seq, seq)
Seq.splitAt Index seq
chunkSize seq
b in
                            seq -> ConduitT seq seq m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield seq
first ConduitT seq seq m ()
-> ConduitT seq seq m () -> ConduitT seq seq m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> seq -> ConduitT seq seq m ()
start seq
rest
        continue :: Index seq -> [seq] -> ConduitT seq seq m ()
continue !Index seq
sofar [seq]
bs = do
            Maybe seq
next <- ConduitT seq seq m (Maybe seq)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await
            case Maybe seq
next of
                Maybe seq
Nothing -> seq -> ConduitT seq seq m ()
forall i o (m :: * -> *). i -> ConduitT i o m ()
leftover ([seq] -> seq
forall a. Monoid a => [a] -> a
mconcat ([seq] -> seq) -> [seq] -> seq
forall a b. (a -> b) -> a -> b
$ [seq] -> [seq]
forall a. [a] -> [a]
Prelude.reverse [seq]
bs)
                Just seq
next' ->
                    let !sofar' :: Index seq
sofar' = seq -> Index seq
forall seq. IsSequence seq => seq -> Index seq
Seq.lengthIndex seq
next' Index seq -> Index seq -> Index seq
forall a. Num a => a -> a -> a
+ Index seq
sofar
                        bs' :: [seq]
bs' = seq
next'seq -> [seq] -> [seq]
forall a. a -> [a] -> [a]
:[seq]
bs
                    in if Index seq
sofar' Index seq -> Index seq -> Bool
forall a. Ord a => a -> a -> Bool
< Index seq
chunkSize
                            then Index seq -> [seq] -> ConduitT seq seq m ()
continue Index seq
sofar' [seq]
bs'
                            else seq -> ConduitT seq seq m ()
start ([seq] -> seq
forall a. Monoid a => [a] -> a
mconcat ([seq] -> [seq]
forall a. [a] -> [a]
Prelude.reverse [seq]
bs'))

-- | Apply a monadic transformation to all values in a stream.
--
-- If you do not need the transformed values, and instead just want the monadic
-- side-effects of running the action, see 'mapM_'.
--
-- Subject to fusion
--
-- @since 1.3.0
mapM :: Monad m => (a -> m b) -> ConduitT a b m ()
INLINE_RULE(mapM, f, CL.mapM f)

-- | Apply a monadic transformation to all elements in a chunked stream.
--
-- Subject to fusion
--
-- @since 1.3.0
mapME :: (Monad m, Data.Traversable.Traversable f) => (a -> m b) -> ConduitT (f a) (f b) m ()
INLINE_RULE(mapME, f, CL.mapM (Data.Traversable.mapM f))

-- | Apply a monadic monomorphic transformation to all elements in a chunked stream.
--
-- Unlike @mapME@, this will work on types like @ByteString@ and @Text@ which
-- are @MonoFunctor@ but not @Functor@.
--
-- Subject to fusion
--
-- @since 1.3.0
omapME :: (Monad m, MonoTraversable mono)
       => (Element mono -> m (Element mono))
       -> ConduitT mono mono m ()
INLINE_RULE(omapME, f, CL.mapM (omapM f))

-- | Apply the monadic function to each value in the stream, resulting in a
-- foldable value (e.g., a list). Then yield each of the individual values in
-- that foldable value separately.
--
-- Generalizes concatMapM, mapMaybeM, and mapFoldableM.
--
-- Subject to fusion
--
-- @since 1.3.0
concatMapM, concatMapMC :: (Monad m, MonoFoldable mono)
                        => (a -> m mono)
                        -> ConduitT a (Element mono) m ()
concatMapMC :: (a -> m mono) -> ConduitT a (Element mono) m ()
concatMapMC a -> m mono
f = (a -> ConduitT a (Element mono) m ())
-> ConduitT a (Element mono) m ()
forall (m :: * -> *) i o r.
Monad m =>
(i -> ConduitT i o m r) -> ConduitT i o m ()
awaitForever (m mono -> ConduitT a (Element mono) m mono
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m mono -> ConduitT a (Element mono) m mono)
-> (a -> m mono) -> a -> ConduitT a (Element mono) m mono
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. a -> m mono
f (a -> ConduitT a (Element mono) m mono)
-> (mono -> ConduitT a (Element mono) m ())
-> a
-> ConduitT a (Element mono) m ()
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> mono -> ConduitT a (Element mono) m ()
forall (m :: * -> *) mono i.
(Monad m, MonoFoldable mono) =>
mono -> ConduitT i (Element mono) m ()
yieldMany)
STREAMING(concatMapM, concatMapMC, concatMapMS, f)

-- | Keep only values in the stream passing a given monadic predicate.
--
-- Subject to fusion
--
-- @since 1.3.0
filterM, filterMC :: Monad m
                  => (a -> m Bool)
                  -> ConduitT a a m ()
filterMC :: (a -> m Bool) -> ConduitT a a m ()
filterMC a -> m Bool
f =
    (a -> ConduitT a a m ()) -> ConduitT a a m ()
forall (m :: * -> *) i o r.
Monad m =>
(i -> ConduitT i o m r) -> ConduitT i o m ()
awaitForever a -> ConduitT a a m ()
go
  where
    go :: a -> ConduitT a a m ()
go a
x = do
        Bool
b <- m Bool -> ConduitT a a m Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m Bool -> ConduitT a a m Bool) -> m Bool -> ConduitT a a m Bool
forall a b. (a -> b) -> a -> b
$ a -> m Bool
f a
x
        Bool -> ConduitT a a m () -> ConduitT a a m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
b (ConduitT a a m () -> ConduitT a a m ())
-> ConduitT a a m () -> ConduitT a a m ()
forall a b. (a -> b) -> a -> b
$ a -> ConduitT a a m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield a
x
STREAMING(filterM, filterMC, filterMS, f)

-- | Keep only elements in the chunked stream passing a given monadic predicate.
--
-- Subject to fusion
--
-- @since 1.3.0
filterME :: (Monad m, Seq.IsSequence seq) => (Element seq -> m Bool) -> ConduitT seq seq m ()
INLINE_RULE(filterME, f, CL.mapM (Seq.filterM f))

-- | Apply a monadic action on all values in a stream.
--
-- This @Conduit@ can be used to perform a monadic side-effect for every
-- value, whilst passing the value through the @Conduit@ as-is.
--
-- > iterM f = mapM (\a -> f a >>= \() -> return a)
--
-- Subject to fusion
--
-- @since 1.3.0
iterM :: Monad m => (a -> m ()) -> ConduitT a a m ()
INLINE_RULE(iterM, f, CL.iterM f)

-- | Analog of 'Prelude.scanl' for lists, monadic.
--
-- Subject to fusion
--
-- @since 1.3.0
scanlM, scanlMC :: Monad m => (a -> b -> m a) -> a -> ConduitT b a m ()
scanlMC :: (a -> b -> m a) -> a -> ConduitT b a m ()
scanlMC a -> b -> m a
f =
    a -> ConduitT b a m ()
loop
  where
    loop :: a -> ConduitT b a m ()
loop a
seed =
        ConduitT b a m (Maybe b)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await ConduitT b a m (Maybe b)
-> (Maybe b -> ConduitT b a m ()) -> ConduitT b a m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ConduitT b a m ()
-> (b -> ConduitT b a m ()) -> Maybe b -> ConduitT b a m ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (a -> ConduitT b a m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield a
seed) b -> ConduitT b a m ()
go
      where
        go :: b -> ConduitT b a m ()
go b
b = do
            a
seed' <- m a -> ConduitT b a m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m a -> ConduitT b a m a) -> m a -> ConduitT b a m a
forall a b. (a -> b) -> a -> b
$ a -> b -> m a
f a
seed b
b
            a
seed' a -> ConduitT b a m () -> ConduitT b a m ()
`seq` a -> ConduitT b a m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield a
seed
            a -> ConduitT b a m ()
loop a
seed'
STREAMING(scanlM, scanlMC, scanlMS, f x)

-- | Monadic `mapAccumWhile`.
--
-- Subject to fusion
mapAccumWhileM, mapAccumWhileMC :: Monad m => (a -> s -> m (Either s (s, b))) -> s -> ConduitT a b m s
mapAccumWhileMC :: (a -> s -> m (Either s (s, b))) -> s -> ConduitT a b m s
mapAccumWhileMC a -> s -> m (Either s (s, b))
f =
    s -> ConduitT a b m s
loop
  where
    loop :: s -> ConduitT a b m s
loop !s
s = ConduitT a b m (Maybe a)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await ConduitT a b m (Maybe a)
-> (Maybe a -> ConduitT a b m s) -> ConduitT a b m s
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ConduitT a b m s
-> (a -> ConduitT a b m s) -> Maybe a -> ConduitT a b m s
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (s -> ConduitT a b m s
forall (m :: * -> *) a. Monad m => a -> m a
return s
s) a -> ConduitT a b m s
go
      where
        go :: a -> ConduitT a b m s
go a
a = m (Either s (s, b)) -> ConduitT a b m (Either s (s, b))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (a -> s -> m (Either s (s, b))
f a
a s
s) ConduitT a b m (Either s (s, b))
-> (Either s (s, b) -> ConduitT a b m s) -> ConduitT a b m s
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (s -> ConduitT a b m s)
-> ((s, b) -> ConduitT a b m s)
-> Either s (s, b)
-> ConduitT a b m s
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (s -> ConduitT a b m s
forall (m :: * -> *) a. Monad m => a -> m a
return (s -> ConduitT a b m s) -> s -> ConduitT a b m s
forall a b. (a -> b) -> a -> b
$!) (\(s
s', b
b) -> b -> ConduitT a b m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield b
b ConduitT a b m () -> ConduitT a b m s -> ConduitT a b m s
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> s -> ConduitT a b m s
loop s
s')
{-# INLINE mapAccumWhileMC #-}
STREAMING(mapAccumWhileM, mapAccumWhileMC, mapAccumWhileMS, f s)

-- | 'concatMapM' with an accumulator.
--
-- Subject to fusion
--
-- @since 1.3.0
concatMapAccumM :: Monad m => (a -> accum -> m (accum, [b])) -> accum -> ConduitT a b m ()
INLINE_RULE(concatMapAccumM, f x, CL.concatMapAccumM f x)

-- | Encode a stream of text as UTF8.
--
-- Subject to fusion
--
-- @since 1.3.0
encodeUtf8 :: (Monad m, DTE.Utf8 text binary) => ConduitT text binary m ()
INLINE_RULE0(encodeUtf8, map DTE.encodeUtf8)

-- | Decode a stream of binary data as UTF8.
--
-- @since 1.3.0
decodeUtf8 :: MonadThrow m => ConduitT ByteString Text m ()
decodeUtf8 :: ConduitT ByteString Text m ()
decodeUtf8 =
    (ByteString -> Decoding) -> ConduitT ByteString Text m ()
forall (m :: * -> *).
MonadThrow m =>
(ByteString -> Decoding) -> ConduitT ByteString Text m ()
loop ByteString -> Decoding
TE.streamDecodeUtf8
  where
    loop :: (ByteString -> Decoding) -> ConduitT ByteString Text m ()
loop ByteString -> Decoding
parse =
        ConduitT ByteString Text m (Maybe ByteString)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await ConduitT ByteString Text m (Maybe ByteString)
-> (Maybe ByteString -> ConduitT ByteString Text m ())
-> ConduitT ByteString Text m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ConduitT ByteString Text m ()
-> (ByteString -> ConduitT ByteString Text m ())
-> Maybe ByteString
-> ConduitT ByteString Text m ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe ConduitT ByteString Text m ()
done ByteString -> ConduitT ByteString Text m ()
go
      where
        parse' :: ByteString -> Either UnicodeException Decoding
parse' = IO (Either UnicodeException Decoding)
-> Either UnicodeException Decoding
forall a. IO a -> a
unsafePerformIO (IO (Either UnicodeException Decoding)
 -> Either UnicodeException Decoding)
-> (ByteString -> IO (Either UnicodeException Decoding))
-> ByteString
-> Either UnicodeException Decoding
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. IO Decoding -> IO (Either UnicodeException Decoding)
forall e a. Exception e => IO a -> IO (Either e a)
try (IO Decoding -> IO (Either UnicodeException Decoding))
-> (ByteString -> IO Decoding)
-> ByteString
-> IO (Either UnicodeException Decoding)
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. Decoding -> IO Decoding
forall a. a -> IO a
evaluate (Decoding -> IO Decoding)
-> (ByteString -> Decoding) -> ByteString -> IO Decoding
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. ByteString -> Decoding
parse
        done :: ConduitT ByteString Text m ()
done =
          case ByteString -> Either UnicodeException Decoding
parse' ByteString
forall a. Monoid a => a
mempty of
            Left UnicodeException
e -> UnicodeException -> ConduitT ByteString Text m ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM (UnicodeException
e :: TEE.UnicodeException)
            Right (TE.Some Text
t ByteString
bs ByteString -> Decoding
_) -> do
              Bool
-> ConduitT ByteString Text m () -> ConduitT ByteString Text m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Text -> Bool
T.null Text
t) (Text -> ConduitT ByteString Text m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield Text
t)
              Bool
-> ConduitT ByteString Text m () -> ConduitT ByteString Text m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (ByteString -> Bool
S.null ByteString
bs) (Text -> ConduitT ByteString Text m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield (Text -> ConduitT ByteString Text m ())
-> Text -> ConduitT ByteString Text m ()
forall a b. (a -> b) -> a -> b
$ Int -> Text -> Text
T.replicate (ByteString -> Int
S.length ByteString
bs) (Char -> Text
T.singleton Char
'\xFFFD'))

        go :: ByteString -> ConduitT ByteString Text m ()
go ByteString
bs = do
          case ByteString -> Either UnicodeException Decoding
parse' ByteString
bs of
            Left UnicodeException
e -> do
              ByteString -> ConduitT ByteString Text m ()
forall i o (m :: * -> *). i -> ConduitT i o m ()
leftover ByteString
bs
              UnicodeException -> ConduitT ByteString Text m ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM (UnicodeException
e :: TEE.UnicodeException)
            Right (TE.Some Text
t ByteString
_ ByteString -> Decoding
next) -> do
              Bool
-> ConduitT ByteString Text m () -> ConduitT ByteString Text m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Text -> Bool
T.null Text
t) (Text -> ConduitT ByteString Text m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield Text
t)
              (ByteString -> Decoding) -> ConduitT ByteString Text m ()
loop ByteString -> Decoding
next

-- | Decode a stream of binary data as UTF8, replacing any invalid bytes with
-- the Unicode replacement character.
--
-- @since 1.3.0
decodeUtf8Lenient :: Monad m => ConduitT ByteString Text m ()
decodeUtf8Lenient :: ConduitT ByteString Text m ()
decodeUtf8Lenient =
    (ByteString -> Decoding) -> ConduitT ByteString Text m ()
forall (m :: * -> *).
Monad m =>
(ByteString -> Decoding) -> ConduitT ByteString Text m ()
loop (OnDecodeError -> ByteString -> Decoding
TE.streamDecodeUtf8With OnDecodeError
TEE.lenientDecode)
  where
    loop :: (ByteString -> Decoding) -> ConduitT ByteString Text m ()
loop ByteString -> Decoding
parse =
        ConduitT ByteString Text m (Maybe ByteString)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await ConduitT ByteString Text m (Maybe ByteString)
-> (Maybe ByteString -> ConduitT ByteString Text m ())
-> ConduitT ByteString Text m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ConduitT ByteString Text m ()
-> (ByteString -> ConduitT ByteString Text m ())
-> Maybe ByteString
-> ConduitT ByteString Text m ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe ConduitT ByteString Text m ()
done ByteString -> ConduitT ByteString Text m ()
go
      where
        done :: ConduitT ByteString Text m ()
done = do
          let TE.Some Text
t ByteString
bs ByteString -> Decoding
_ = ByteString -> Decoding
parse ByteString
forall a. Monoid a => a
mempty
          Bool
-> ConduitT ByteString Text m () -> ConduitT ByteString Text m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Text -> Bool
T.null Text
t) (Text -> ConduitT ByteString Text m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield Text
t)
          Bool
-> ConduitT ByteString Text m () -> ConduitT ByteString Text m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (ByteString -> Bool
S.null ByteString
bs) (Text -> ConduitT ByteString Text m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield (Text -> ConduitT ByteString Text m ())
-> Text -> ConduitT ByteString Text m ()
forall a b. (a -> b) -> a -> b
$ Int -> Text -> Text
T.replicate (ByteString -> Int
S.length ByteString
bs) (Char -> Text
T.singleton Char
'\xFFFD'))

        go :: ByteString -> ConduitT ByteString Text m ()
go ByteString
bs = do
          let TE.Some Text
t ByteString
_ ByteString -> Decoding
next = ByteString -> Decoding
parse ByteString
bs
          Bool
-> ConduitT ByteString Text m () -> ConduitT ByteString Text m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Text -> Bool
T.null Text
t) (Text -> ConduitT ByteString Text m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield Text
t)
          (ByteString -> Decoding) -> ConduitT ByteString Text m ()
loop ByteString -> Decoding
next

-- | Stream in the entirety of a single line.
--
-- Like @takeExactly@, this will consume the entirety of the line regardless of
-- the behavior of the inner Conduit.
--
-- @since 1.3.0
line :: (Monad m, Seq.IsSequence seq, Element seq ~ Char)
     => ConduitT seq o m r
     -> ConduitT seq o m r
line :: ConduitT seq o m r -> ConduitT seq o m r
line = (Element seq -> Bool) -> ConduitT seq o m r -> ConduitT seq o m r
forall (m :: * -> *) seq o r.
(Monad m, IsSequence seq) =>
(Element seq -> Bool) -> ConduitT seq o m r -> ConduitT seq o m r
takeExactlyUntilE (Char -> Char -> Bool
forall a. Eq a => a -> a -> Bool
== Char
'\n')
{-# INLINE line #-}

-- | Same as 'line', but operates on ASCII/binary data.
--
-- @since 1.3.0
lineAscii :: (Monad m, Seq.IsSequence seq, Element seq ~ Word8)
          => ConduitT seq o m r
          -> ConduitT seq o m r
lineAscii :: ConduitT seq o m r -> ConduitT seq o m r
lineAscii = (Element seq -> Bool) -> ConduitT seq o m r -> ConduitT seq o m r
forall (m :: * -> *) seq o r.
(Monad m, IsSequence seq) =>
(Element seq -> Bool) -> ConduitT seq o m r -> ConduitT seq o m r
takeExactlyUntilE (Word8 -> Word8 -> Bool
forall a. Eq a => a -> a -> Bool
== Word8
10)
{-# INLINE lineAscii #-}

-- | Stream in the chunked input until an element matches a predicate.
--
-- Like @takeExactly@, this will consume the entirety of the prefix
-- regardless of the behavior of the inner Conduit.
takeExactlyUntilE :: (Monad m, Seq.IsSequence seq)
                  => (Element seq -> Bool)
                  -> ConduitT seq o m r
                  -> ConduitT seq o m r
takeExactlyUntilE :: (Element seq -> Bool) -> ConduitT seq o m r -> ConduitT seq o m r
takeExactlyUntilE Element seq -> Bool
f ConduitT seq o m r
inner =
    ConduitT seq seq m ()
loop ConduitT seq seq m () -> ConduitT seq o m r -> ConduitT seq o m r
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| do
        r
x <- ConduitT seq o m r
inner
        ConduitT seq o m ()
forall (m :: * -> *) i o. Monad m => ConduitT i o m ()
sinkNull
        r -> ConduitT seq o m r
forall (m :: * -> *) a. Monad m => a -> m a
return r
x
  where
    loop :: ConduitT seq seq m ()
loop = ConduitT seq seq m (Maybe seq)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await ConduitT seq seq m (Maybe seq)
-> (Maybe seq -> ConduitT seq seq m ()) -> ConduitT seq seq m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Element (Maybe seq) -> ConduitT seq seq m ())
-> Maybe seq -> ConduitT seq seq m ()
forall mono (m :: * -> *).
(MonoFoldable mono, Applicative m) =>
(Element mono -> m ()) -> mono -> m ()
omapM_ seq -> ConduitT seq seq m ()
Element (Maybe seq) -> ConduitT seq seq m ()
go
    go :: seq -> ConduitT seq seq m ()
go seq
t =
        if seq -> Bool
forall mono. MonoFoldable mono => mono -> Bool
onull seq
y
            then seq -> ConduitT seq seq m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield seq
x ConduitT seq seq m ()
-> ConduitT seq seq m () -> ConduitT seq seq m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ConduitT seq seq m ()
loop
            else do
                Bool -> ConduitT seq seq m () -> ConduitT seq seq m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (seq -> Bool
forall mono. MonoFoldable mono => mono -> Bool
onull seq
x) (ConduitT seq seq m () -> ConduitT seq seq m ())
-> ConduitT seq seq m () -> ConduitT seq seq m ()
forall a b. (a -> b) -> a -> b
$ seq -> ConduitT seq seq m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield seq
x
                let y' :: seq
y' = Index seq -> seq -> seq
forall seq. IsSequence seq => Index seq -> seq -> seq
Seq.drop Index seq
1 seq
y
                Bool -> ConduitT seq seq m () -> ConduitT seq seq m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (seq -> Bool
forall mono. MonoFoldable mono => mono -> Bool
onull seq
y') (ConduitT seq seq m () -> ConduitT seq seq m ())
-> ConduitT seq seq m () -> ConduitT seq seq m ()
forall a b. (a -> b) -> a -> b
$ seq -> ConduitT seq seq m ()
forall i o (m :: * -> *). i -> ConduitT i o m ()
leftover seq
y'
      where
        (seq
x, seq
y) = (Element seq -> Bool) -> seq -> (seq, seq)
forall seq.
IsSequence seq =>
(Element seq -> Bool) -> seq -> (seq, seq)
Seq.break Element seq -> Bool
f seq
t
{-# INLINE takeExactlyUntilE #-}

-- | Insert a newline character after each incoming chunk of data.
--
-- Subject to fusion
--
-- @since 1.3.0
unlines :: (Monad m, Seq.IsSequence seq, Element seq ~ Char) => ConduitT seq seq m ()
INLINE_RULE0(unlines, concatMap (:[Seq.singleton '\n']))

-- | Same as 'unlines', but operates on ASCII/binary data.
--
-- Subject to fusion
--
-- @since 1.3.0
unlinesAscii :: (Monad m, Seq.IsSequence seq, Element seq ~ Word8) => ConduitT seq seq m ()
INLINE_RULE0(unlinesAscii, concatMap (:[Seq.singleton 10]))

-- | Split a stream of arbitrarily-chunked data, based on a predicate
-- on elements.  Elements that satisfy the predicate will cause chunks
-- to be split, and aren't included in these output chunks.  Note
-- that, if you have unknown or untrusted input, this function is
-- /unsafe/, since it would allow an attacker to form chunks of
-- massive length and exhaust memory.
splitOnUnboundedE, splitOnUnboundedEC :: (Monad m, Seq.IsSequence seq) => (Element seq -> Bool) -> ConduitT seq seq m ()
splitOnUnboundedEC :: (Element seq -> Bool) -> ConduitT seq seq m ()
splitOnUnboundedEC Element seq -> Bool
f =
    ConduitT seq seq m ()
start
  where
    start :: ConduitT seq seq m ()
start = ConduitT seq seq m (Maybe seq)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await ConduitT seq seq m (Maybe seq)
-> (Maybe seq -> ConduitT seq seq m ()) -> ConduitT seq seq m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ConduitT seq seq m ()
-> (seq -> ConduitT seq seq m ())
-> Maybe seq
-> ConduitT seq seq m ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> ConduitT seq seq m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()) (([seq] -> [seq]) -> seq -> ConduitT seq seq m ()
loop [seq] -> [seq]
forall k (cat :: k -> k -> *) (a :: k). Category cat => cat a a
id)

    loop :: ([seq] -> [seq]) -> seq -> ConduitT seq seq m ()
loop [seq] -> [seq]
bldr seq
t =
        if seq -> Bool
forall mono. MonoFoldable mono => mono -> Bool
onull seq
y
            then do
                Maybe seq
mt <- ConduitT seq seq m (Maybe seq)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await
                case Maybe seq
mt of
                    Maybe seq
Nothing -> let finalChunk :: seq
finalChunk = [seq] -> seq
forall a. Monoid a => [a] -> a
mconcat ([seq] -> seq) -> [seq] -> seq
forall a b. (a -> b) -> a -> b
$ [seq] -> [seq]
bldr [seq
t]
                               in  Bool -> ConduitT seq seq m () -> ConduitT seq seq m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (seq -> Bool
forall mono. MonoFoldable mono => mono -> Bool
onull seq
finalChunk) (ConduitT seq seq m () -> ConduitT seq seq m ())
-> ConduitT seq seq m () -> ConduitT seq seq m ()
forall a b. (a -> b) -> a -> b
$ seq -> ConduitT seq seq m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield seq
finalChunk
                    Just seq
t' -> ([seq] -> [seq]) -> seq -> ConduitT seq seq m ()
loop ([seq] -> [seq]
bldr ([seq] -> [seq]) -> ([seq] -> [seq]) -> [seq] -> [seq]
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. (seq
tseq -> [seq] -> [seq]
forall a. a -> [a] -> [a]
:)) seq
t'
            else seq -> ConduitT seq seq m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield ([seq] -> seq
forall a. Monoid a => [a] -> a
mconcat ([seq] -> seq) -> [seq] -> seq
forall a b. (a -> b) -> a -> b
$ [seq] -> [seq]
bldr [seq
x]) ConduitT seq seq m ()
-> ConduitT seq seq m () -> ConduitT seq seq m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ([seq] -> [seq]) -> seq -> ConduitT seq seq m ()
loop [seq] -> [seq]
forall k (cat :: k -> k -> *) (a :: k). Category cat => cat a a
id (Index seq -> seq -> seq
forall seq. IsSequence seq => Index seq -> seq -> seq
Seq.drop Index seq
1 seq
y)
      where
        (seq
x, seq
y) = (Element seq -> Bool) -> seq -> (seq, seq)
forall seq.
IsSequence seq =>
(Element seq -> Bool) -> seq -> (seq, seq)
Seq.break Element seq -> Bool
f seq
t
STREAMING(splitOnUnboundedE, splitOnUnboundedEC, splitOnUnboundedES, f)

-- | Convert a stream of arbitrarily-chunked textual data into a stream of data
-- where each chunk represents a single line. Note that, if you have
-- unknown or untrusted input, this function is /unsafe/, since it would allow an
-- attacker to form lines of massive length and exhaust memory.
--
-- Subject to fusion
--
-- @since 1.3.0
linesUnbounded :: (Monad m, Seq.IsSequence seq, Element seq ~ Char)
               => ConduitT seq seq m ()
INLINE_RULE0(linesUnbounded, splitOnUnboundedE (== '\n'))

-- | Same as 'linesUnbounded', but for ASCII/binary data.
--
-- Subject to fusion
--
-- @since 1.3.0
linesUnboundedAscii :: (Monad m, Seq.IsSequence seq, Element seq ~ Word8)
                    => ConduitT seq seq m ()
INLINE_RULE0(linesUnboundedAscii, splitOnUnboundedE (== 10))

-- | Incrementally execute builders and pass on the filled chunks as
-- bytestrings.
--
-- @since 1.3.0
builderToByteString :: PrimMonad m => ConduitT Builder S.ByteString m ()
builderToByteString :: ConduitT Builder ByteString m ()
builderToByteString = BufferAllocStrategy -> ConduitT Builder ByteString m ()
forall (m :: * -> *).
PrimMonad m =>
BufferAllocStrategy -> ConduitT Builder ByteString m ()
builderToByteStringWith BufferAllocStrategy
defaultStrategy
{-# INLINE builderToByteString #-}

-- | Same as 'builderToByteString', but input and output are wrapped in
-- 'Flush'.
--
-- @since 1.3.0
builderToByteStringFlush :: PrimMonad m
                         => ConduitT (Flush Builder) (Flush S.ByteString) m ()
builderToByteStringFlush :: ConduitT (Flush Builder) (Flush ByteString) m ()
builderToByteStringFlush = BufferAllocStrategy
-> ConduitT (Flush Builder) (Flush ByteString) m ()
forall (m :: * -> *).
PrimMonad m =>
BufferAllocStrategy
-> ConduitT (Flush Builder) (Flush ByteString) m ()
builderToByteStringWithFlush BufferAllocStrategy
defaultStrategy
{-# INLINE builderToByteStringFlush #-}

-- | Incrementally execute builders on the given buffer and pass on the filled
-- chunks as bytestrings. Note that, if the given buffer is too small for the
-- execution of a build step, a larger one will be allocated.
--
-- WARNING: This conduit yields bytestrings that are NOT
-- referentially transparent. Their content will be overwritten as soon
-- as control is returned from the inner sink!
--
-- @since 1.3.0
unsafeBuilderToByteString :: PrimMonad m
                          => ConduitT Builder S.ByteString m ()
unsafeBuilderToByteString :: ConduitT Builder ByteString m ()
unsafeBuilderToByteString =
  BufferAllocStrategy -> ConduitT Builder ByteString m ()
forall (m :: * -> *).
PrimMonad m =>
BufferAllocStrategy -> ConduitT Builder ByteString m ()
builderToByteStringWith (IO Buffer -> BufferAllocStrategy
reuseBufferStrategy (Int -> IO Buffer
allocBuffer Int
defaultChunkSize))
{-# INLINE unsafeBuilderToByteString #-}


-- | A conduit that incrementally executes builders and passes on the
-- filled chunks as bytestrings to an inner sink.
--
-- INV: All bytestrings passed to the inner sink are non-empty.
--
-- @since 1.3.0
builderToByteStringWith :: PrimMonad m
                        => BufferAllocStrategy
                        -> ConduitT Builder S.ByteString m ()
builderToByteStringWith :: BufferAllocStrategy -> ConduitT Builder ByteString m ()
builderToByteStringWith =
    ConduitT Builder ByteString m (Maybe (Flush Builder))
-> (Flush ByteString -> ConduitT Builder ByteString m ())
-> BufferAllocStrategy
-> ConduitT Builder ByteString m ()
forall (m :: * -> *).
PrimMonad m =>
m (Maybe (Flush Builder))
-> (Flush ByteString -> m ()) -> BufferAllocStrategy -> m ()
bbhelper ((Maybe Builder -> Maybe (Flush Builder))
-> ConduitT Builder ByteString m (Maybe Builder)
-> ConduitT Builder ByteString m (Maybe (Flush Builder))
forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM ((Builder -> Flush Builder)
-> Maybe Builder -> Maybe (Flush Builder)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Builder -> Flush Builder
forall a. a -> Flush a
Chunk) ConduitT Builder ByteString m (Maybe Builder)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await) Flush ByteString -> ConduitT Builder ByteString m ()
forall (m :: * -> *) o i. Monad m => Flush o -> ConduitT i o m ()
yield'
  where
    yield' :: Flush o -> ConduitT i o m ()
yield' Flush o
Flush = () -> ConduitT i o m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    yield' (Chunk o
bs) = o -> ConduitT i o m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield o
bs
{-# INLINE builderToByteStringWith #-}

-- |
--
-- @since 1.3.0
builderToByteStringWithFlush
    :: PrimMonad m
    => BufferAllocStrategy
    -> ConduitT (Flush Builder) (Flush S.ByteString) m ()
builderToByteStringWithFlush :: BufferAllocStrategy
-> ConduitT (Flush Builder) (Flush ByteString) m ()
builderToByteStringWithFlush = ConduitT
  (Flush Builder) (Flush ByteString) m (Maybe (Flush Builder))
-> (Flush ByteString
    -> ConduitT (Flush Builder) (Flush ByteString) m ())
-> BufferAllocStrategy
-> ConduitT (Flush Builder) (Flush ByteString) m ()
forall (m :: * -> *).
PrimMonad m =>
m (Maybe (Flush Builder))
-> (Flush ByteString -> m ()) -> BufferAllocStrategy -> m ()
bbhelper ConduitT
  (Flush Builder) (Flush ByteString) m (Maybe (Flush Builder))
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await Flush ByteString
-> ConduitT (Flush Builder) (Flush ByteString) m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield
{-# INLINE builderToByteStringWithFlush #-}

bbhelper
  :: PrimMonad m
  => m (Maybe (Flush Builder))
  -> (Flush S.ByteString -> m ())
  -> BufferAllocStrategy
  -> m ()
bbhelper :: m (Maybe (Flush Builder))
-> (Flush ByteString -> m ()) -> BufferAllocStrategy -> m ()
bbhelper m (Maybe (Flush Builder))
await' Flush ByteString -> m ()
yield' BufferAllocStrategy
strat = do
    (BuilderRecv
recv, BuilderFinish
finish) <- IO (BuilderRecv, BuilderFinish) -> m (BuilderRecv, BuilderFinish)
forall (m1 :: * -> *) (m2 :: * -> *) a.
(PrimBase m1, PrimMonad m2) =>
m1 a -> m2 a
unsafePrimToPrim (IO (BuilderRecv, BuilderFinish) -> m (BuilderRecv, BuilderFinish))
-> IO (BuilderRecv, BuilderFinish)
-> m (BuilderRecv, BuilderFinish)
forall a b. (a -> b) -> a -> b
$ BufferAllocStrategy -> IO (BuilderRecv, BuilderFinish)
newByteStringBuilderRecv BufferAllocStrategy
strat
    let loop :: m ()
loop = m (Maybe (Flush Builder))
await' m (Maybe (Flush Builder))
-> (Maybe (Flush Builder) -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= m () -> (Flush Builder -> m ()) -> Maybe (Flush Builder) -> m ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe m ()
finish' Flush Builder -> m ()
cont
        finish' :: m ()
finish' = do
            Maybe ByteString
mbs <- BuilderFinish -> m (Maybe ByteString)
forall (m1 :: * -> *) (m2 :: * -> *) a.
(PrimBase m1, PrimMonad m2) =>
m1 a -> m2 a
unsafePrimToPrim BuilderFinish
finish
            m () -> (ByteString -> m ()) -> Maybe ByteString -> m ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()) (Flush ByteString -> m ()
yield' (Flush ByteString -> m ())
-> (ByteString -> Flush ByteString) -> ByteString -> m ()
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. ByteString -> Flush ByteString
forall a. a -> Flush a
Chunk) Maybe ByteString
mbs
        cont :: Flush Builder -> m ()
cont Flush Builder
fbuilder = do
            let builder :: Builder
builder =
                    case Flush Builder
fbuilder of
                        Flush Builder
Flush -> Builder
BB.flush
                        Chunk Builder
b -> Builder
b
            IO ByteString
popper <- IO (IO ByteString) -> m (IO ByteString)
forall (m1 :: * -> *) (m2 :: * -> *) a.
(PrimBase m1, PrimMonad m2) =>
m1 a -> m2 a
unsafePrimToPrim (IO (IO ByteString) -> m (IO ByteString))
-> IO (IO ByteString) -> m (IO ByteString)
forall a b. (a -> b) -> a -> b
$ BuilderRecv
recv Builder
builder
            let cont' :: m ()
cont' = do
                    ByteString
bs <- IO ByteString -> m ByteString
forall (m1 :: * -> *) (m2 :: * -> *) a.
(PrimBase m1, PrimMonad m2) =>
m1 a -> m2 a
unsafePrimToPrim IO ByteString
popper
                    Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (ByteString -> Bool
S.null ByteString
bs) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
                        Flush ByteString -> m ()
yield' (ByteString -> Flush ByteString
forall a. a -> Flush a
Chunk ByteString
bs)
                        m ()
cont'
            m ()
cont'
            case Flush Builder
fbuilder of
                Flush Builder
Flush -> Flush ByteString -> m ()
yield' Flush ByteString
forall a. Flush a
Flush
                Chunk Builder
_ -> () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
            m ()
loop
    m ()
loop
{-# INLINE bbhelper #-}

-- | Provides a series of @ByteString@s until empty, at which point it provides
-- an empty @ByteString@.
--
-- @since 1.3.0
--
type BuilderPopper = IO S.ByteString

type BuilderRecv = Builder -> IO BuilderPopper

type BuilderFinish = IO (Maybe S.ByteString)

newByteStringBuilderRecv :: BufferAllocStrategy -> IO (BuilderRecv, BuilderFinish)
newByteStringBuilderRecv :: BufferAllocStrategy -> IO (BuilderRecv, BuilderFinish)
newByteStringBuilderRecv (IO Buffer
ioBufInit, Int -> Buffer -> IO (IO Buffer)
nextBuf) = do
    IORef (IO Buffer)
refBuf <- IO Buffer -> IO (IORef (IO Buffer))
forall a. a -> IO (IORef a)
newIORef IO Buffer
ioBufInit
    (BuilderRecv, BuilderFinish) -> IO (BuilderRecv, BuilderFinish)
forall (m :: * -> *) a. Monad m => a -> m a
return (IORef (IO Buffer) -> BuilderRecv
push IORef (IO Buffer)
refBuf, IORef (IO Buffer) -> BuilderFinish
finish IORef (IO Buffer)
refBuf)
  where
    finish :: IORef (IO Buffer) -> BuilderFinish
finish IORef (IO Buffer)
refBuf = do
        IO Buffer
ioBuf <- IORef (IO Buffer) -> IO (IO Buffer)
forall a. IORef a -> IO a
readIORef IORef (IO Buffer)
refBuf
        Buffer
buf <- IO Buffer
ioBuf
        Maybe ByteString -> BuilderFinish
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe ByteString -> BuilderFinish)
-> Maybe ByteString -> BuilderFinish
forall a b. (a -> b) -> a -> b
$ Buffer -> Maybe ByteString
unsafeFreezeNonEmptyBuffer Buffer
buf

    push :: IORef (IO Buffer) -> BuilderRecv
push IORef (IO Buffer)
refBuf Builder
builder = do
        IORef (Either BufferWriter (IO ByteString))
refWri <- Either BufferWriter (IO ByteString)
-> IO (IORef (Either BufferWriter (IO ByteString)))
forall a. a -> IO (IORef a)
newIORef (Either BufferWriter (IO ByteString)
 -> IO (IORef (Either BufferWriter (IO ByteString))))
-> Either BufferWriter (IO ByteString)
-> IO (IORef (Either BufferWriter (IO ByteString)))
forall a b. (a -> b) -> a -> b
$ BufferWriter -> Either BufferWriter (IO ByteString)
forall a b. a -> Either a b
Left (BufferWriter -> Either BufferWriter (IO ByteString))
-> BufferWriter -> Either BufferWriter (IO ByteString)
forall a b. (a -> b) -> a -> b
$ Builder -> BufferWriter
BB.runBuilder Builder
builder
        IO ByteString -> IO (IO ByteString)
forall (m :: * -> *) a. Monad m => a -> m a
return (IO ByteString -> IO (IO ByteString))
-> IO ByteString -> IO (IO ByteString)
forall a b. (a -> b) -> a -> b
$ IORef (IO Buffer)
-> IORef (Either BufferWriter (IO ByteString)) -> IO ByteString
popper IORef (IO Buffer)
refBuf IORef (Either BufferWriter (IO ByteString))
refWri

    popper :: IORef (IO Buffer)
-> IORef (Either BufferWriter (IO ByteString)) -> IO ByteString
popper IORef (IO Buffer)
refBuf IORef (Either BufferWriter (IO ByteString))
refWri = do
        IO Buffer
ioBuf <- IORef (IO Buffer) -> IO (IO Buffer)
forall a. IORef a -> IO a
readIORef IORef (IO Buffer)
refBuf
        Either BufferWriter (IO ByteString)
ebWri <- IORef (Either BufferWriter (IO ByteString))
-> IO (Either BufferWriter (IO ByteString))
forall a. IORef a -> IO a
readIORef IORef (Either BufferWriter (IO ByteString))
refWri
        case Either BufferWriter (IO ByteString)
ebWri of
            Left BufferWriter
bWri -> do
                !buf :: Buffer
buf@(Buffer ForeignPtr Word8
_ Ptr Word8
_ Ptr Word8
op Ptr Word8
ope) <- IO Buffer
ioBuf
                (Int
bytes, Next
next) <- BufferWriter
bWri Ptr Word8
op (Ptr Word8
ope Ptr Word8 -> Ptr Word8 -> Int
forall a b. Ptr a -> Ptr b -> Int
`minusPtr` Ptr Word8
op)
                let op' :: Ptr Word8
op' = Ptr Word8
op Ptr Word8 -> Int -> Ptr Word8
forall a b. Ptr a -> Int -> Ptr b
`plusPtr` Int
bytes
                case Next
next of
                    Next
BB.Done -> do
                        IORef (IO Buffer) -> IO Buffer -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (IO Buffer)
refBuf (IO Buffer -> IO ()) -> IO Buffer -> IO ()
forall a b. (a -> b) -> a -> b
$ Buffer -> IO Buffer
forall (m :: * -> *) a. Monad m => a -> m a
return (Buffer -> IO Buffer) -> Buffer -> IO Buffer
forall a b. (a -> b) -> a -> b
$ Buffer -> Ptr Word8 -> Buffer
updateEndOfSlice Buffer
buf Ptr Word8
op'
                        ByteString -> IO ByteString
forall (m :: * -> *) a. Monad m => a -> m a
return ByteString
S.empty
                    BB.More Int
minSize BufferWriter
bWri' -> do
                        let buf' :: Buffer
buf' = Buffer -> Ptr Word8 -> Buffer
updateEndOfSlice Buffer
buf Ptr Word8
op'
                            {-# INLINE cont #-}
                            cont :: Maybe ByteString -> IO ByteString
cont Maybe ByteString
mbs = do
                                -- sequencing the computation of the next buffer
                                -- construction here ensures that the reference to the
                                -- foreign pointer `fp` is lost as soon as possible.
                                IO Buffer
ioBuf' <- Int -> Buffer -> IO (IO Buffer)
nextBuf Int
minSize Buffer
buf'
                                IORef (IO Buffer) -> IO Buffer -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (IO Buffer)
refBuf IO Buffer
ioBuf'
                                IORef (Either BufferWriter (IO ByteString))
-> Either BufferWriter (IO ByteString) -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Either BufferWriter (IO ByteString))
refWri (Either BufferWriter (IO ByteString) -> IO ())
-> Either BufferWriter (IO ByteString) -> IO ()
forall a b. (a -> b) -> a -> b
$ BufferWriter -> Either BufferWriter (IO ByteString)
forall a b. a -> Either a b
Left BufferWriter
bWri'
                                case Maybe ByteString
mbs of
                                    Just ByteString
bs | Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ ByteString -> Bool
S.null ByteString
bs -> ByteString -> IO ByteString
forall (m :: * -> *) a. Monad m => a -> m a
return ByteString
bs
                                    Maybe ByteString
_ -> IORef (IO Buffer)
-> IORef (Either BufferWriter (IO ByteString)) -> IO ByteString
popper IORef (IO Buffer)
refBuf IORef (Either BufferWriter (IO ByteString))
refWri
                        Maybe ByteString -> IO ByteString
cont (Maybe ByteString -> IO ByteString)
-> Maybe ByteString -> IO ByteString
forall a b. (a -> b) -> a -> b
$ Buffer -> Maybe ByteString
unsafeFreezeNonEmptyBuffer Buffer
buf'
                    BB.Chunk ByteString
bs BufferWriter
bWri' -> do
                        let buf' :: Buffer
buf' = Buffer -> Ptr Word8 -> Buffer
updateEndOfSlice Buffer
buf Ptr Word8
op'
                        let yieldBS :: IO ByteString
yieldBS = do
                                Int -> Buffer -> IO (IO Buffer)
nextBuf Int
1 Buffer
buf' IO (IO Buffer) -> (IO Buffer -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IORef (IO Buffer) -> IO Buffer -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (IO Buffer)
refBuf
                                IORef (Either BufferWriter (IO ByteString))
-> Either BufferWriter (IO ByteString) -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Either BufferWriter (IO ByteString))
refWri (Either BufferWriter (IO ByteString) -> IO ())
-> Either BufferWriter (IO ByteString) -> IO ()
forall a b. (a -> b) -> a -> b
$ BufferWriter -> Either BufferWriter (IO ByteString)
forall a b. a -> Either a b
Left BufferWriter
bWri'
                                if ByteString -> Bool
S.null ByteString
bs
                                    then IORef (IO Buffer)
-> IORef (Either BufferWriter (IO ByteString)) -> IO ByteString
popper IORef (IO Buffer)
refBuf IORef (Either BufferWriter (IO ByteString))
refWri
                                    else ByteString -> IO ByteString
forall (m :: * -> *) a. Monad m => a -> m a
return ByteString
bs
                        case Buffer -> Maybe ByteString
unsafeFreezeNonEmptyBuffer Buffer
buf' of
                            Maybe ByteString
Nothing -> IO ByteString
yieldBS
                            Just ByteString
bs' -> do
                                IORef (Either BufferWriter (IO ByteString))
-> Either BufferWriter (IO ByteString) -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Either BufferWriter (IO ByteString))
refWri (Either BufferWriter (IO ByteString) -> IO ())
-> Either BufferWriter (IO ByteString) -> IO ()
forall a b. (a -> b) -> a -> b
$ IO ByteString -> Either BufferWriter (IO ByteString)
forall a b. b -> Either a b
Right IO ByteString
yieldBS
                                ByteString -> IO ByteString
forall (m :: * -> *) a. Monad m => a -> m a
return ByteString
bs'
            Right IO ByteString
action -> IO ByteString
action

-- | A buffer @Buffer fpbuf p0 op ope@ describes a buffer with the underlying
-- byte array @fpbuf..ope@, the currently written slice @p0..op@ and the free
-- space @op..ope@.
--
-- @since 1.3.0
data Buffer = Buffer {-# UNPACK #-} !(ForeignPtr Word8) -- underlying pinned array
                     {-# UNPACK #-} !(Ptr Word8)        -- beginning of slice
                     {-# UNPACK #-} !(Ptr Word8)        -- next free byte
                     {-# UNPACK #-} !(Ptr Word8)        -- first byte after buffer

-- | Convert the buffer to a bytestring. This operation is unsafe in the sense
-- that created bytestring shares the underlying byte array with the buffer.
-- Hence, depending on the later use of this buffer (e.g., if it gets reset and
-- filled again) referential transparency may be lost.
--
-- @since 1.3.0
--
{-# INLINE unsafeFreezeBuffer #-}
unsafeFreezeBuffer :: Buffer -> S.ByteString
unsafeFreezeBuffer :: Buffer -> ByteString
unsafeFreezeBuffer (Buffer ForeignPtr Word8
fpbuf Ptr Word8
p0 Ptr Word8
op Ptr Word8
_) =
    ForeignPtr Word8 -> Int -> Int -> ByteString
PS ForeignPtr Word8
fpbuf (Ptr Word8
p0 Ptr Word8 -> Ptr Word8 -> Int
forall a b. Ptr a -> Ptr b -> Int
`minusPtr` ForeignPtr Word8 -> Ptr Word8
forall a. ForeignPtr a -> Ptr a
unsafeForeignPtrToPtr ForeignPtr Word8
fpbuf) (Ptr Word8
op Ptr Word8 -> Ptr Word8 -> Int
forall a b. Ptr a -> Ptr b -> Int
`minusPtr` Ptr Word8
p0)

-- | Convert a buffer to a non-empty bytestring. See 'unsafeFreezeBuffer' for
-- the explanation of why this operation may be unsafe.
--
-- @since 1.3.0
--
{-# INLINE unsafeFreezeNonEmptyBuffer #-}
unsafeFreezeNonEmptyBuffer :: Buffer -> Maybe S.ByteString
unsafeFreezeNonEmptyBuffer :: Buffer -> Maybe ByteString
unsafeFreezeNonEmptyBuffer Buffer
buf
  | Buffer -> Int
sliceSize Buffer
buf Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0 = Maybe ByteString
forall a. Maybe a
Nothing
  | Bool
otherwise          = ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just (ByteString -> Maybe ByteString) -> ByteString -> Maybe ByteString
forall a b. (a -> b) -> a -> b
$ Buffer -> ByteString
unsafeFreezeBuffer Buffer
buf

-- | Update the end of slice pointer.
--
-- @since 1.3.0
--
{-# INLINE updateEndOfSlice #-}
updateEndOfSlice :: Buffer    -- Old buffer
                 -> Ptr Word8 -- New end of slice
                 -> Buffer    -- Updated buffer
updateEndOfSlice :: Buffer -> Ptr Word8 -> Buffer
updateEndOfSlice (Buffer ForeignPtr Word8
fpbuf Ptr Word8
p0 Ptr Word8
_ Ptr Word8
ope) Ptr Word8
op' = ForeignPtr Word8 -> Ptr Word8 -> Ptr Word8 -> Ptr Word8 -> Buffer
Buffer ForeignPtr Word8
fpbuf Ptr Word8
p0 Ptr Word8
op' Ptr Word8
ope

-- | The size of the written slice in the buffer.
--
-- @since 1.3.0
--
sliceSize :: Buffer -> Int
sliceSize :: Buffer -> Int
sliceSize (Buffer ForeignPtr Word8
_ Ptr Word8
p0 Ptr Word8
op Ptr Word8
_) = Ptr Word8
op Ptr Word8 -> Ptr Word8 -> Int
forall a b. Ptr a -> Ptr b -> Int
`minusPtr` Ptr Word8
p0

-- | A buffer allocation strategy @(buf0, nextBuf)@ specifies the initial
-- buffer to use and how to compute a new buffer @nextBuf minSize buf@ with at
-- least size @minSize@ from a filled buffer @buf@. The double nesting of the
-- @IO@ monad helps to ensure that the reference to the filled buffer @buf@ is
-- lost as soon as possible, but the new buffer doesn't have to be allocated
-- too early.
--
-- @since 1.3.0
type BufferAllocStrategy = (IO Buffer, Int -> Buffer -> IO (IO Buffer))

-- | Safe default: allocate new buffers of default chunk size
--
-- @since 1.3.0
defaultStrategy :: BufferAllocStrategy
defaultStrategy :: BufferAllocStrategy
defaultStrategy = Int -> BufferAllocStrategy
allNewBuffersStrategy Int
defaultChunkSize

-- | The simplest buffer allocation strategy: whenever a buffer is requested,
-- allocate a new one that is big enough for the next build step to execute.
--
-- NOTE that this allocation strategy may spill quite some memory upon direct
-- insertion of a bytestring by the builder. Thats no problem for garbage
-- collection, but it may lead to unreasonably high memory consumption in
-- special circumstances.
--
-- @since 1.3.0
allNewBuffersStrategy :: Int                 -- Minimal buffer size.
                      -> BufferAllocStrategy
allNewBuffersStrategy :: Int -> BufferAllocStrategy
allNewBuffersStrategy Int
bufSize =
    ( Int -> IO Buffer
allocBuffer Int
bufSize
    , \Int
reqSize Buffer
_ -> IO Buffer -> IO (IO Buffer)
forall (m :: * -> *) a. Monad m => a -> m a
return (Int -> IO Buffer
allocBuffer (Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
reqSize Int
bufSize)) )

-- | An unsafe, but possibly more efficient buffer allocation strategy:
-- reuse the buffer, if it is big enough for the next build step to execute.
--
-- @since 1.3.0
reuseBufferStrategy :: IO Buffer
                    -> BufferAllocStrategy
reuseBufferStrategy :: IO Buffer -> BufferAllocStrategy
reuseBufferStrategy IO Buffer
buf0 =
    (IO Buffer
buf0, Int -> Buffer -> IO (IO Buffer)
forall (m :: * -> *). Monad m => Int -> Buffer -> m (IO Buffer)
tryReuseBuffer)
  where
    tryReuseBuffer :: Int -> Buffer -> m (IO Buffer)
tryReuseBuffer Int
reqSize Buffer
buf
      | Buffer -> Int
bufferSize Buffer
buf Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
reqSize = IO Buffer -> m (IO Buffer)
forall (m :: * -> *) a. Monad m => a -> m a
return (IO Buffer -> m (IO Buffer)) -> IO Buffer -> m (IO Buffer)
forall a b. (a -> b) -> a -> b
$ Buffer -> IO Buffer
forall (m :: * -> *) a. Monad m => a -> m a
return (Buffer -> Buffer
reuseBuffer Buffer
buf)
      | Bool
otherwise                 = IO Buffer -> m (IO Buffer)
forall (m :: * -> *) a. Monad m => a -> m a
return (IO Buffer -> m (IO Buffer)) -> IO Buffer -> m (IO Buffer)
forall a b. (a -> b) -> a -> b
$ Int -> IO Buffer
allocBuffer Int
reqSize

-- | The size of the whole byte array underlying the buffer.
--
-- @since 1.3.0
--
bufferSize :: Buffer -> Int
bufferSize :: Buffer -> Int
bufferSize (Buffer ForeignPtr Word8
fpbuf Ptr Word8
_ Ptr Word8
_ Ptr Word8
ope) =
    Ptr Word8
ope Ptr Word8 -> Ptr Word8 -> Int
forall a b. Ptr a -> Ptr b -> Int
`minusPtr` ForeignPtr Word8 -> Ptr Word8
forall a. ForeignPtr a -> Ptr a
unsafeForeignPtrToPtr ForeignPtr Word8
fpbuf

-- | @allocBuffer size@ allocates a new buffer of size @size@.
--
-- @since 1.3.0
--
{-# INLINE allocBuffer #-}
allocBuffer :: Int -> IO Buffer
allocBuffer :: Int -> IO Buffer
allocBuffer Int
size = do
    ForeignPtr Word8
fpbuf <- Int -> IO (ForeignPtr Word8)
forall a. Int -> IO (ForeignPtr a)
mallocByteString Int
size
    let !pbuf :: Ptr Word8
pbuf = ForeignPtr Word8 -> Ptr Word8
forall a. ForeignPtr a -> Ptr a
unsafeForeignPtrToPtr ForeignPtr Word8
fpbuf
    Buffer -> IO Buffer
forall (m :: * -> *) a. Monad m => a -> m a
return (Buffer -> IO Buffer) -> Buffer -> IO Buffer
forall a b. (a -> b) -> a -> b
$! ForeignPtr Word8 -> Ptr Word8 -> Ptr Word8 -> Ptr Word8 -> Buffer
Buffer ForeignPtr Word8
fpbuf Ptr Word8
pbuf Ptr Word8
pbuf (Ptr Word8
pbuf Ptr Word8 -> Int -> Ptr Word8
forall a b. Ptr a -> Int -> Ptr b
`plusPtr` Int
size)

-- | Resets the beginning of the next slice and the next free byte such that
-- the whole buffer can be filled again.
--
-- @since 1.3.0
--
{-# INLINE reuseBuffer #-}
reuseBuffer :: Buffer -> Buffer
reuseBuffer :: Buffer -> Buffer
reuseBuffer (Buffer ForeignPtr Word8
fpbuf Ptr Word8
_ Ptr Word8
_ Ptr Word8
ope) = ForeignPtr Word8 -> Ptr Word8 -> Ptr Word8 -> Ptr Word8 -> Buffer
Buffer ForeignPtr Word8
fpbuf Ptr Word8
p0 Ptr Word8
p0 Ptr Word8
ope
  where
    p0 :: Ptr Word8
p0 = ForeignPtr Word8 -> Ptr Word8
forall a. ForeignPtr a -> Ptr a
unsafeForeignPtrToPtr ForeignPtr Word8
fpbuf

-- | Generally speaking, yielding values from inside a Conduit requires
-- some allocation for constructors. This can introduce an overhead,
-- similar to the overhead needed to represent a list of values instead of
-- a vector. This overhead is even more severe when talking about unboxed
-- values.
--
-- This combinator allows you to overcome this overhead, and efficiently
-- fill up vectors. It takes two parameters. The first is the size of each
-- mutable vector to be allocated. The second is a function. The function
-- takes an argument which will yield the next value into a mutable
-- vector.
--
-- Under the surface, this function uses a number of tricks to get high
-- performance. For more information on both usage and implementation,
-- please see:
-- <https://www.fpcomplete.com/user/snoyberg/library-documentation/vectorbuilder>
--
-- @since 1.3.0
vectorBuilder :: (PrimMonad m, PrimMonad n, V.Vector v e, PrimState m ~ PrimState n)
              => Int -- ^ size
              -> ((e -> n ()) -> ConduitT i Void m r)
              -> ConduitT i (v e) m r
vectorBuilder :: Int -> ((e -> n ()) -> ConduitT i Void m r) -> ConduitT i (v e) m r
vectorBuilder Int
size (e -> n ()) -> ConduitT i Void m r
inner = do
    MutVar (PrimState m) (S (PrimState n) v e)
ref <- do
        Mutable v (PrimState n) e
mv <- Int
-> ConduitT
     i (v e) m (Mutable v (PrimState (ConduitT i (v e) m)) e)
forall (m :: * -> *) (v :: * -> * -> *) a.
(PrimMonad m, MVector v a) =>
Int -> m (v (PrimState m) a)
VM.new Int
size
        S (PrimState n) v e
-> ConduitT i (v e) m (MutVar (PrimState m) (S (PrimState n) v e))
forall (m :: * -> *) a.
PrimMonad m =>
a -> m (MutVar (PrimState m) a)
newMutVar (S (PrimState n) v e
 -> ConduitT i (v e) m (MutVar (PrimState m) (S (PrimState n) v e)))
-> S (PrimState n) v e
-> ConduitT i (v e) m (MutVar (PrimState m) (S (PrimState n) v e))
forall a b. (a -> b) -> a -> b
$! Int
-> Mutable v (PrimState n) e
-> ([v e] -> [v e])
-> S (PrimState n) v e
forall s (v :: * -> *) e.
Int -> Mutable v s e -> ([v e] -> [v e]) -> S s v e
S Int
0 Mutable v (PrimState n) e
mv [v e] -> [v e]
forall k (cat :: k -> k -> *) (a :: k). Category cat => cat a a
id
    r
res <- ConduitT i (v e) m ()
-> ConduitT i Void m r -> ConduitT i (v e) m r
forall (m :: * -> *) i o r.
Monad m =>
ConduitT i o m () -> ConduitT i Void m r -> ConduitT i o m r
onAwait (MutVar (PrimState m) (S (PrimState m) v e) -> ConduitT i (v e) m ()
forall (m :: * -> *) (v :: * -> *) e i.
PrimMonad m =>
MutVar (PrimState m) (S (PrimState m) v e) -> ConduitT i (v e) m ()
yieldS MutVar (PrimState m) (S (PrimState m) v e)
MutVar (PrimState m) (S (PrimState n) v e)
ref) ((e -> n ()) -> ConduitT i Void m r
inner (MutVar (PrimState n) (S (PrimState n) v e) -> e -> n ()
forall (m :: * -> *) (v :: * -> *) e.
(PrimMonad m, Vector v e) =>
MutVar (PrimState m) (S (PrimState m) v e) -> e -> m ()
addE MutVar (PrimState m) (S (PrimState n) v e)
MutVar (PrimState n) (S (PrimState n) v e)
ref))
    [v e]
vs <- do
        S Int
idx Mutable v (PrimState n) e
mv [v e] -> [v e]
front <- MutVar (PrimState (ConduitT i (v e) m)) (S (PrimState n) v e)
-> ConduitT i (v e) m (S (PrimState n) v e)
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> m a
readMutVar MutVar (PrimState m) (S (PrimState n) v e)
MutVar (PrimState (ConduitT i (v e) m)) (S (PrimState n) v e)
ref
        [v e]
end <-
            if Int
idx Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
                then [v e] -> ConduitT i (v e) m [v e]
forall (m :: * -> *) a. Monad m => a -> m a
return []
                else do
                    v e
v <- Mutable v (PrimState (ConduitT i (v e) m)) e
-> ConduitT i (v e) m (v e)
forall (m :: * -> *) (v :: * -> *) a.
(PrimMonad m, Vector v a) =>
Mutable v (PrimState m) a -> m (v a)
V.unsafeFreeze Mutable v (PrimState n) e
Mutable v (PrimState (ConduitT i (v e) m)) e
mv
                    [v e] -> ConduitT i (v e) m [v e]
forall (m :: * -> *) a. Monad m => a -> m a
return [Int -> v e -> v e
forall (v :: * -> *) a. Vector v a => Int -> v a -> v a
V.unsafeTake Int
idx v e
v]
        [v e] -> ConduitT i (v e) m [v e]
forall (m :: * -> *) a. Monad m => a -> m a
return ([v e] -> ConduitT i (v e) m [v e])
-> [v e] -> ConduitT i (v e) m [v e]
forall a b. (a -> b) -> a -> b
$ [v e] -> [v e]
front [v e]
end
    (v e -> ConduitT i (v e) m ()) -> [v e] -> ConduitT i (v e) m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
Prelude.mapM_ v e -> ConduitT i (v e) m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield [v e]
vs
    r -> ConduitT i (v e) m r
forall (m :: * -> *) a. Monad m => a -> m a
return r
res
{-# INLINE vectorBuilder #-}

data S s v e = S
    {-# UNPACK #-} !Int -- index
    !(V.Mutable v s e)
    ([v e] -> [v e])

onAwait :: Monad m
        => ConduitT i o m ()
        -> ConduitT i Void m r
        -> ConduitT i o m r
onAwait :: ConduitT i o m () -> ConduitT i Void m r -> ConduitT i o m r
onAwait (ConduitT forall b. (() -> Pipe i i o () m b) -> Pipe i i o () m b
callback) (ConduitT forall b. (r -> Pipe i i Void () m b) -> Pipe i i Void () m b
sink0) = (forall b. (r -> Pipe i i o () m b) -> Pipe i i o () m b)
-> ConduitT i o m r
forall i o (m :: * -> *) r.
(forall b. (r -> Pipe i i o () m b) -> Pipe i i o () m b)
-> ConduitT i o m r
ConduitT ((forall b. (r -> Pipe i i o () m b) -> Pipe i i o () m b)
 -> ConduitT i o m r)
-> (forall b. (r -> Pipe i i o () m b) -> Pipe i i o () m b)
-> ConduitT i o m r
forall a b. (a -> b) -> a -> b
$ \r -> Pipe i i o () m b
rest -> let
    go :: Pipe i i Void () m r -> Pipe i i o () m b
go (Done r
r) = r -> Pipe i i o () m b
rest r
r
    go (HaveOutput Pipe i i Void () m r
_ Void
o) = Void -> Pipe i i o () m b
forall a. Void -> a
absurd Void
o
    go (NeedInput i -> Pipe i i Void () m r
f () -> Pipe i i Void () m r
g) = (() -> Pipe i i o () m b) -> Pipe i i o () m b
forall b. (() -> Pipe i i o () m b) -> Pipe i i o () m b
callback ((() -> Pipe i i o () m b) -> Pipe i i o () m b)
-> (() -> Pipe i i o () m b) -> Pipe i i o () m b
forall a b. (a -> b) -> a -> b
$ \() -> (i -> Pipe i i o () m b)
-> (() -> Pipe i i o () m b) -> Pipe i i o () m b
forall l i o u (m :: * -> *) r.
(i -> Pipe l i o u m r)
-> (u -> Pipe l i o u m r) -> Pipe l i o u m r
NeedInput (Pipe i i Void () m r -> Pipe i i o () m b
go (Pipe i i Void () m r -> Pipe i i o () m b)
-> (i -> Pipe i i Void () m r) -> i -> Pipe i i o () m b
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. i -> Pipe i i Void () m r
f) (Pipe i i Void () m r -> Pipe i i o () m b
go (Pipe i i Void () m r -> Pipe i i o () m b)
-> (() -> Pipe i i Void () m r) -> () -> Pipe i i o () m b
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. () -> Pipe i i Void () m r
g)
    go (PipeM m (Pipe i i Void () m r)
mp) = m (Pipe i i o () m b) -> Pipe i i o () m b
forall l i o u (m :: * -> *) r.
m (Pipe l i o u m r) -> Pipe l i o u m r
PipeM ((Pipe i i Void () m r -> Pipe i i o () m b)
-> m (Pipe i i Void () m r) -> m (Pipe i i o () m b)
forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM Pipe i i Void () m r -> Pipe i i o () m b
go m (Pipe i i Void () m r)
mp)
    go (Leftover Pipe i i Void () m r
f i
i) = Pipe i i o () m b -> i -> Pipe i i o () m b
forall l i o u (m :: * -> *) r.
Pipe l i o u m r -> l -> Pipe l i o u m r
Leftover (Pipe i i Void () m r -> Pipe i i o () m b
go Pipe i i Void () m r
f) i
i
    in Pipe i i Void () m r -> Pipe i i o () m b
go ((r -> Pipe i i Void () m r) -> Pipe i i Void () m r
forall b. (r -> Pipe i i Void () m b) -> Pipe i i Void () m b
sink0 r -> Pipe i i Void () m r
forall l i o u (m :: * -> *) r. r -> Pipe l i o u m r
Done)
{-# INLINE onAwait #-}

yieldS :: PrimMonad m
       => MutVar (PrimState m) (S (PrimState m) v e)
       -> ConduitT i (v e) m ()
yieldS :: MutVar (PrimState m) (S (PrimState m) v e) -> ConduitT i (v e) m ()
yieldS MutVar (PrimState m) (S (PrimState m) v e)
ref = do
    S Int
idx Mutable v (PrimState m) e
mv [v e] -> [v e]
front <- MutVar (PrimState (ConduitT i (v e) m)) (S (PrimState m) v e)
-> ConduitT i (v e) m (S (PrimState m) v e)
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> m a
readMutVar MutVar (PrimState m) (S (PrimState m) v e)
MutVar (PrimState (ConduitT i (v e) m)) (S (PrimState m) v e)
ref
    (v e -> ConduitT i (v e) m ()) -> [v e] -> ConduitT i (v e) m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
Prelude.mapM_ v e -> ConduitT i (v e) m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield ([v e] -> [v e]
front [])
    MutVar (PrimState (ConduitT i (v e) m)) (S (PrimState m) v e)
-> S (PrimState m) v e -> ConduitT i (v e) m ()
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> a -> m ()
writeMutVar MutVar (PrimState m) (S (PrimState m) v e)
MutVar (PrimState (ConduitT i (v e) m)) (S (PrimState m) v e)
ref (S (PrimState m) v e -> ConduitT i (v e) m ())
-> S (PrimState m) v e -> ConduitT i (v e) m ()
forall a b. (a -> b) -> a -> b
$! Int
-> Mutable v (PrimState m) e
-> ([v e] -> [v e])
-> S (PrimState m) v e
forall s (v :: * -> *) e.
Int -> Mutable v s e -> ([v e] -> [v e]) -> S s v e
S Int
idx Mutable v (PrimState m) e
mv [v e] -> [v e]
forall k (cat :: k -> k -> *) (a :: k). Category cat => cat a a
id
{-# INLINE yieldS #-}

addE :: (PrimMonad m, V.Vector v e)
     => MutVar (PrimState m) (S (PrimState m) v e)
     -> e
     -> m ()
addE :: MutVar (PrimState m) (S (PrimState m) v e) -> e -> m ()
addE MutVar (PrimState m) (S (PrimState m) v e)
ref e
e = do
    S Int
idx Mutable v (PrimState m) e
mv [v e] -> [v e]
front <- MutVar (PrimState m) (S (PrimState m) v e)
-> m (S (PrimState m) v e)
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> m a
readMutVar MutVar (PrimState m) (S (PrimState m) v e)
ref
    Mutable v (PrimState m) e -> Int -> e -> m ()
forall (m :: * -> *) (v :: * -> * -> *) a.
(PrimMonad m, MVector v a) =>
v (PrimState m) a -> Int -> a -> m ()
VM.write Mutable v (PrimState m) e
mv Int
idx e
e
    let idx' :: Int
idx' = Int -> Int
forall a. Enum a => a -> a
succ Int
idx
        size :: Int
size = Mutable v (PrimState m) e -> Int
forall (v :: * -> * -> *) a s. MVector v a => v s a -> Int
VM.length Mutable v (PrimState m) e
mv
    if Int
idx' Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
size
        then do
            v e
v <- Mutable v (PrimState m) e -> m (v e)
forall (m :: * -> *) (v :: * -> *) a.
(PrimMonad m, Vector v a) =>
Mutable v (PrimState m) a -> m (v a)
V.unsafeFreeze Mutable v (PrimState m) e
mv
            let front' :: [v e] -> [v e]
front' = [v e] -> [v e]
front ([v e] -> [v e]) -> ([v e] -> [v e]) -> [v e] -> [v e]
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. (v e
vv e -> [v e] -> [v e]
forall a. a -> [a] -> [a]
:)
            Mutable v (PrimState m) e
mv' <- Int -> m (Mutable v (PrimState m) e)
forall (m :: * -> *) (v :: * -> * -> *) a.
(PrimMonad m, MVector v a) =>
Int -> m (v (PrimState m) a)
VM.new Int
size
            MutVar (PrimState m) (S (PrimState m) v e)
-> S (PrimState m) v e -> m ()
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> a -> m ()
writeMutVar MutVar (PrimState m) (S (PrimState m) v e)
ref (S (PrimState m) v e -> m ()) -> S (PrimState m) v e -> m ()
forall a b. (a -> b) -> a -> b
$! Int
-> Mutable v (PrimState m) e
-> ([v e] -> [v e])
-> S (PrimState m) v e
forall s (v :: * -> *) e.
Int -> Mutable v s e -> ([v e] -> [v e]) -> S s v e
S Int
0 Mutable v (PrimState m) e
mv' [v e] -> [v e]
front'
        else MutVar (PrimState m) (S (PrimState m) v e)
-> S (PrimState m) v e -> m ()
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> a -> m ()
writeMutVar MutVar (PrimState m) (S (PrimState m) v e)
ref (S (PrimState m) v e -> m ()) -> S (PrimState m) v e -> m ()
forall a b. (a -> b) -> a -> b
$! Int
-> Mutable v (PrimState m) e
-> ([v e] -> [v e])
-> S (PrimState m) v e
forall s (v :: * -> *) e.
Int -> Mutable v s e -> ([v e] -> [v e]) -> S s v e
S Int
idx' Mutable v (PrimState m) e
mv [v e] -> [v e]
front
{-# INLINE addE #-}

-- | Consume a source with a strict accumulator, in a way piecewise defined by
-- a controlling stream. The latter will be evaluated until it terminates.
--
-- >>> let f a s = liftM (:s) $ mapC (*a) =$ CL.take a
-- >>> reverse $ runIdentity $ yieldMany [0..3] $$ mapAccumS f [] (yieldMany [1..])
-- [[],[1],[4,6],[12,15,18]] :: [[Int]]
mapAccumS
  :: Monad m
  => (a -> s -> ConduitT b Void m s)
  -> s
  -> ConduitT () b m ()
  -> ConduitT a Void m s
mapAccumS :: (a -> s -> ConduitT b Void m s)
-> s -> ConduitT () b m () -> ConduitT a Void m s
mapAccumS a -> s -> ConduitT b Void m s
f s
s ConduitT () b m ()
xs = do
    (SealedConduitT () b m ()
_, s
u) <- (SealedConduitT () b m (), s)
-> ConduitT a Void m (SealedConduitT () b m (), s)
loop (ConduitT () b m () -> SealedConduitT () b m ()
forall i o (m :: * -> *) r.
ConduitT i o m r -> SealedConduitT i o m r
sealConduitT ConduitT () b m ()
xs, s
s)
    s -> ConduitT a Void m s
forall (m :: * -> *) a. Monad m => a -> m a
return s
u
    where loop :: (SealedConduitT () b m (), s)
-> ConduitT a Void m (SealedConduitT () b m (), s)
loop r :: (SealedConduitT () b m (), s)
r@(SealedConduitT () b m ()
ys, !s
t) = ConduitT a Void m (Maybe a)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
await ConduitT a Void m (Maybe a)
-> (Maybe a -> ConduitT a Void m (SealedConduitT () b m (), s))
-> ConduitT a Void m (SealedConduitT () b m (), s)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ConduitT a Void m (SealedConduitT () b m (), s)
-> (a -> ConduitT a Void m (SealedConduitT () b m (), s))
-> Maybe a
-> ConduitT a Void m (SealedConduitT () b m (), s)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe ((SealedConduitT () b m (), s)
-> ConduitT a Void m (SealedConduitT () b m (), s)
forall (m :: * -> *) a. Monad m => a -> m a
return (SealedConduitT () b m (), s)
r) a -> ConduitT a Void m (SealedConduitT () b m (), s)
go
              where go :: a -> ConduitT a Void m (SealedConduitT () b m (), s)
go a
a  = m (SealedConduitT () b m (), s)
-> ConduitT a Void m (SealedConduitT () b m (), s)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (SealedConduitT () b m ()
ys SealedConduitT () b m ()
-> ConduitT b Void m s -> m (SealedConduitT () b m (), s)
forall (m :: * -> *) a b.
Monad m =>
SealedConduitT () a m ()
-> Sink a m b -> m (SealedConduitT () a m (), b)
$$++ a -> s -> ConduitT b Void m s
f a
a s
t) ConduitT a Void m (SealedConduitT () b m (), s)
-> ((SealedConduitT () b m (), s)
    -> ConduitT a Void m (SealedConduitT () b m (), s))
-> ConduitT a Void m (SealedConduitT () b m (), s)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (SealedConduitT () b m (), s)
-> ConduitT a Void m (SealedConduitT () b m (), s)
loop
{-# INLINE mapAccumS #-}

-- | Run a consuming conduit repeatedly, only stopping when there is no more
-- data available from upstream.
--
-- @since 1.3.0
peekForever :: Monad m => ConduitT i o m () -> ConduitT i o m ()
peekForever :: ConduitT i o m () -> ConduitT i o m ()
peekForever ConduitT i o m ()
inner =
    ConduitT i o m ()
loop
  where
    loop :: ConduitT i o m ()
loop = do
        Maybe i
mx <- ConduitT i o m (Maybe i)
forall (m :: * -> *) a o. Monad m => ConduitT a o m (Maybe a)
peek
        case Maybe i
mx of
            Maybe i
Nothing -> () -> ConduitT i o m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
            Just i
_ -> ConduitT i o m ()
inner ConduitT i o m () -> ConduitT i o m () -> ConduitT i o m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ConduitT i o m ()
loop

-- | Run a consuming conduit repeatedly, only stopping when there is no more
-- data available from upstream.
--
-- In contrast to 'peekForever', this function will ignore empty
-- chunks of data. So for example, if a stream of data contains an
-- empty @ByteString@, it is still treated as empty, and the consuming
-- function is not called.
--
-- @since 1.3.0
peekForeverE :: (Monad m, MonoFoldable i)
             => ConduitT i o m ()
             -> ConduitT i o m ()
peekForeverE :: ConduitT i o m () -> ConduitT i o m ()
peekForeverE ConduitT i o m ()
inner =
    ConduitT i o m ()
loop
  where
    loop :: ConduitT i o m ()
loop = do
        Maybe (Element i)
mx <- ConduitT i o m (Maybe (Element i))
forall (m :: * -> *) i o.
(Monad m, MonoFoldable i) =>
ConduitT i o m (Maybe (Element i))
peekE
        case Maybe (Element i)
mx of
            Maybe (Element i)
Nothing -> () -> ConduitT i o m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
            Just Element i
_ -> ConduitT i o m ()
inner ConduitT i o m () -> ConduitT i o m () -> ConduitT i o m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ConduitT i o m ()
loop