{-# LANGUAGE CPP #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE MultiParamTypeClasses     #-}
{-# LANGUAGE NoImplicitPrelude         #-}
{-# LANGUAGE NoMonomorphismRestriction #-}
{-# LANGUAGE BangPatterns #-}
module Data.Conduit.Combinators
    ( 
      
      yieldMany
    , unfold
    , enumFromTo
    , iterate
    , repeat
    , replicate
    , sourceLazy
      
    , repeatM
    , repeatWhileM
    , replicateM
      
    , sourceFile
    , sourceFileBS
    , sourceHandle
    , sourceHandleUnsafe
    , sourceIOHandle
    , stdin
    , withSourceFile
      
    , sourceDirectory
    , sourceDirectoryDeep
      
      
    , drop
    , dropE
    , dropWhile
    , dropWhileE
    , fold
    , foldE
    , foldl
    , foldl1
    , foldlE
    , foldMap
    , foldMapE
    , all
    , allE
    , any
    , anyE
    , and
    , andE
    , or
    , orE
    , asum
    , elem
    , elemE
    , notElem
    , notElemE
    , sinkLazy
    , sinkList
    , sinkVector
    , sinkVectorN
    , sinkLazyBuilder
    , sinkNull
    , awaitNonNull
    , head
    , headDef
    , headE
    , peek
    , peekE
    , last
    , lastDef
    , lastE
    , length
    , lengthE
    , lengthIf
    , lengthIfE
    , maximum
    , maximumE
    , minimum
    , minimumE
    , null
    , nullE
    , sum
    , sumE
    , product
    , productE
    , find
      
    , mapM_
    , mapM_E
    , foldM
    , foldME
    , foldMapM
    , foldMapME
      
    , sinkFile
    , sinkFileCautious
    , sinkTempFile
    , sinkSystemTempFile
    , sinkFileBS
    , sinkHandle
    , sinkIOHandle
    , print
    , stdout
    , stderr
    , withSinkFile
    , withSinkFileBuilder
    , withSinkFileCautious
    , sinkHandleBuilder
    , sinkHandleFlush
      
      
    , map
    , mapE
    , omapE
    , concatMap
    , concatMapE
    , take
    , takeE
    , takeWhile
    , takeWhileE
    , takeExactly
    , takeExactlyE
    , concat
    , filter
    , filterE
    , mapWhile
    , conduitVector
    , scanl
    , mapAccumWhile
    , concatMapAccum
    , intersperse
    , slidingWindow
    , chunksOfE
    , chunksOfExactlyE
      
    , mapM
    , mapME
    , omapME
    , concatMapM
    , filterM
    , filterME
    , iterM
    , scanlM
    , mapAccumWhileM
    , concatMapAccumM
      
    , encodeUtf8
    , decodeUtf8
    , decodeUtf8Lenient
    , line
    , lineAscii
    , unlines
    , unlinesAscii
    , takeExactlyUntilE
    , linesUnbounded
    , linesUnboundedAscii
    , splitOnUnboundedE
      
    , builderToByteString
    , unsafeBuilderToByteString
    , builderToByteStringWith
    , builderToByteStringFlush
    , builderToByteStringWithFlush
    , BufferAllocStrategy
    , allNewBuffersStrategy
    , reuseBufferStrategy
      
    , vectorBuilder
    , mapAccumS
    , peekForever
    , peekForeverE
    ) where
import           Data.ByteString.Builder     (Builder, toLazyByteString, hPutBuilder)
import qualified Data.ByteString.Builder.Internal as BB (flush)
import qualified Data.ByteString.Builder.Extra as BB (runBuilder, Next(Done, More, Chunk))
import qualified Data.NonNull as NonNull
import qualified Data.Traversable
import qualified Data.ByteString as S
import qualified Data.ByteString.Lazy as BL
import           Data.ByteString.Lazy.Internal (defaultChunkSize)
import           Control.Applicative         (Alternative(..), (<$>))
import           Control.Exception           (catch, throwIO, finally, bracket, try, evaluate)
import           Control.Category            (Category (..))
import           Control.Monad               (unless, when, (>=>), liftM, forever)
import           Control.Monad.IO.Unlift     (MonadIO (..), MonadUnliftIO, withRunInIO)
import           Control.Monad.Primitive     (PrimMonad, PrimState, unsafePrimToPrim)
import           Control.Monad.Trans.Class   (lift)
import           Control.Monad.Trans.Resource (MonadResource, MonadThrow, allocate, throwM)
import           Data.Conduit
import           Data.Conduit.Internal       (ConduitT (..), Pipe (..))
import qualified Data.Conduit.List           as CL
import           Data.IORef
import           Data.Maybe                  (fromMaybe, isNothing, isJust)
import           Data.Monoid                 (Monoid (..))
import           Data.MonoTraversable
import qualified Data.Sequences              as Seq
import qualified Data.Vector.Generic         as V
import qualified Data.Vector.Generic.Mutable as VM
import           Data.Void                   (absurd)
import           Prelude                     (Bool (..), Eq (..), Int,
                                              Maybe (..), Either (..), Monad (..), Num (..),
                                              Ord (..), fromIntegral, maybe, either,
                                              ($), Functor (..), Enum, seq, Show, Char,
                                              otherwise, Either (..), not,
                                              ($!), succ, FilePath, IO, String)
import Data.Word (Word8)
import qualified Prelude
import qualified System.IO                   as IO
import           System.IO.Error             (isDoesNotExistError)
import           System.IO.Unsafe            (unsafePerformIO)
import Data.ByteString (ByteString)
import Data.Text (Text)
import qualified Data.Text as T
import qualified Data.Text.Encoding as TE
import qualified Data.Text.Encoding.Error as TEE
import Data.Conduit.Combinators.Stream
import Data.Conduit.Internal.Fusion
import           Data.Primitive.MutVar       (MutVar, newMutVar, readMutVar,
                                              writeMutVar)
import qualified Data.Streaming.FileRead as FR
import qualified Data.Streaming.Filesystem as F
import           GHC.ForeignPtr (mallocPlainForeignPtrBytes, unsafeForeignPtrToPtr)
import           Foreign.ForeignPtr (touchForeignPtr, ForeignPtr)
import           Foreign.Ptr (Ptr, plusPtr, minusPtr)
import           Data.ByteString.Internal (ByteString (PS), mallocByteString)
import           System.FilePath ((</>), (<.>), takeDirectory, takeFileName)
import           System.Directory (renameFile, getTemporaryDirectory, removeFile)
import qualified Data.Sequences as DTE
import           Data.Sequences (LazySequence (..))
#include "fusion-macros.h"
yieldMany, yieldManyC :: (Monad m, MonoFoldable mono)
                      => mono
                      -> ConduitT i (Element mono) m ()
yieldManyC = ofoldMap yield
{-# INLINE yieldManyC #-}
STREAMING(yieldMany, yieldManyC, yieldManyS, x)
unfold :: Monad m
       => (b -> Maybe (a, b))
       -> b
       -> ConduitT i a m ()
INLINE_RULE(unfold, f x, CL.unfold f x)
enumFromTo :: (Monad m, Enum a, Ord a) => a -> a -> ConduitT i a m ()
INLINE_RULE(enumFromTo, f t, CL.enumFromTo f t)
iterate :: Monad m => (a -> a) -> a -> ConduitT i a m ()
INLINE_RULE(iterate, f t, CL.iterate f t)
repeat :: Monad m => a -> ConduitT i a m ()
INLINE_RULE(repeat, x, iterate id x)
replicate :: Monad m
          => Int
          -> a
          -> ConduitT i a m ()
INLINE_RULE(replicate, n x, CL.replicate n x)
sourceLazy :: (Monad m, LazySequence lazy strict)
           => lazy
           -> ConduitT i strict m ()
INLINE_RULE(sourceLazy, x, yieldMany (toChunks x))
repeatM, repeatMC :: Monad m
                  => m a
                  -> ConduitT i a m ()
repeatMC m = forever $ lift m >>= yield
{-# INLINE repeatMC #-}
STREAMING(repeatM, repeatMC, repeatMS, m)
repeatWhileM, repeatWhileMC :: Monad m
                            => m a
                            -> (a -> Bool)
                            -> ConduitT i a m ()
repeatWhileMC m f =
    loop
  where
    loop = do
        x <- lift m
        when (f x) $ yield x >> loop
STREAMING(repeatWhileM, repeatWhileMC, repeatWhileMS, m f)
replicateM :: Monad m
           => Int
           -> m a
           -> ConduitT i a m ()
INLINE_RULE(replicateM, n m, CL.replicateM n m)
sourceFile :: MonadResource m
           => FilePath
           -> ConduitT i S.ByteString m ()
sourceFile fp =
    bracketP
        (FR.openFile fp)
         FR.closeFile
         loop
  where
    loop h = do
        bs <- liftIO $ FR.readChunk h
        unless (S.null bs) $ do
            yield bs
            loop h
sourceHandle :: MonadIO m
             => IO.Handle
             -> ConduitT i S.ByteString m ()
sourceHandle h =
    loop
  where
    loop = do
        bs <- liftIO (S.hGetSome h defaultChunkSize)
        if S.null bs
            then return ()
            else yield bs >> loop
sourceHandleUnsafe :: MonadIO m => IO.Handle -> ConduitT i ByteString m ()
sourceHandleUnsafe handle = do
    fptr <- liftIO $ mallocPlainForeignPtrBytes defaultChunkSize
    let ptr = unsafeForeignPtrToPtr fptr
        loop = do
            count <- liftIO $ IO.hGetBuf handle ptr defaultChunkSize
            when (count > 0) $ do
                yield (PS fptr 0 count)
                loop
    loop
    liftIO $ touchForeignPtr fptr
sourceIOHandle :: MonadResource m
               => IO IO.Handle
               -> ConduitT i S.ByteString m ()
sourceIOHandle alloc = bracketP alloc IO.hClose sourceHandle
sourceFileBS :: MonadResource m => FilePath -> ConduitT i ByteString m ()
sourceFileBS = sourceFile
{-# INLINE sourceFileBS #-}
stdin :: MonadIO m => ConduitT i ByteString m ()
INLINE_RULE0(stdin, sourceHandle IO.stdin)
sinkFile :: MonadResource m
         => FilePath
         -> ConduitT S.ByteString o m ()
sinkFile fp = sinkIOHandle (IO.openBinaryFile fp IO.WriteMode)
sinkFileCautious
  :: MonadResource m
  => FilePath
  -> ConduitM S.ByteString o m ()
sinkFileCautious fp =
    bracketP (cautiousAcquire fp) cautiousCleanup inner
  where
    inner (tmpFP, h) = do
        sinkHandle h
        liftIO $ do
            IO.hClose h
            renameFile tmpFP fp
withSinkFileCautious
  :: (MonadUnliftIO m, MonadIO n)
  => FilePath
  -> (ConduitM S.ByteString o n () -> m a)
  -> m a
withSinkFileCautious fp inner =
  withRunInIO $ \run -> bracket
    (cautiousAcquire fp)
    cautiousCleanup
    (\(tmpFP, h) -> do
        a <- run $ inner $ sinkHandle h
        IO.hClose h
        renameFile tmpFP fp
        return a)
cautiousAcquire :: FilePath -> IO (FilePath, IO.Handle)
cautiousAcquire fp = IO.openBinaryTempFile (takeDirectory fp) (takeFileName fp <.> "tmp")
cautiousCleanup :: (FilePath, IO.Handle) -> IO ()
cautiousCleanup (tmpFP, h) = do
  IO.hClose h
  removeFile tmpFP `Control.Exception.catch` \e ->
    if isDoesNotExistError e
      then return ()
      else throwIO e
sinkTempFile :: MonadResource m
             => FilePath 
             -> String 
             -> ConduitM ByteString o m FilePath
sinkTempFile tmpdir pattern = do
    (_releaseKey, (fp, h)) <- allocate
        (IO.openBinaryTempFile tmpdir pattern)
        (\(fp, h) -> IO.hClose h `finally` (removeFile fp `Control.Exception.catch` \e ->
            if isDoesNotExistError e
                then return ()
                else throwIO e))
    sinkHandle h
    liftIO $ IO.hClose h
    return fp
sinkSystemTempFile
    :: MonadResource m
    => String 
    -> ConduitM ByteString o m FilePath
sinkSystemTempFile pattern = do
    dir <- liftIO getTemporaryDirectory
    sinkTempFile dir pattern
sinkHandle :: MonadIO m
           => IO.Handle
           -> ConduitT S.ByteString o m ()
sinkHandle h = awaitForever (liftIO . S.hPut h)
sinkHandleBuilder :: MonadIO m => IO.Handle -> ConduitM Builder o m ()
sinkHandleBuilder h = awaitForever (liftIO . hPutBuilder h)
sinkHandleFlush :: MonadIO m
                => IO.Handle
                -> ConduitM (Flush S.ByteString) o m ()
sinkHandleFlush h =
  awaitForever $ \mbs -> liftIO $
  case mbs of
    Chunk bs -> S.hPut h bs
    Flush -> IO.hFlush h
sinkIOHandle :: MonadResource m
             => IO IO.Handle
             -> ConduitT S.ByteString o m ()
sinkIOHandle alloc = bracketP alloc IO.hClose sinkHandle
withSourceFile
  :: (MonadUnliftIO m, MonadIO n)
  => FilePath
  -> (ConduitM i ByteString n () -> m a)
  -> m a
withSourceFile fp inner =
  withRunInIO $ \run ->
  IO.withBinaryFile fp IO.ReadMode $
  run . inner . sourceHandle
withSinkFile
  :: (MonadUnliftIO m, MonadIO n)
  => FilePath
  -> (ConduitM ByteString o n () -> m a)
  -> m a
withSinkFile fp inner =
  withRunInIO $ \run ->
  IO.withBinaryFile fp IO.WriteMode $
  run . inner . sinkHandle
withSinkFileBuilder
  :: (MonadUnliftIO m, MonadIO n)
  => FilePath
  -> (ConduitM Builder o n () -> m a)
  -> m a
withSinkFileBuilder fp inner =
  withRunInIO $ \run ->
  IO.withBinaryFile fp IO.WriteMode $ \h ->
  run $ inner $ CL.mapM_ (liftIO . hPutBuilder h)
sourceDirectory :: MonadResource m => FilePath -> ConduitT i FilePath m ()
sourceDirectory dir =
    bracketP (F.openDirStream dir) F.closeDirStream go
  where
    go ds =
        loop
      where
        loop = do
            mfp <- liftIO $ F.readDirStream ds
            case mfp of
                Nothing -> return ()
                Just fp -> do
                    yield $ dir </> fp
                    loop
sourceDirectoryDeep :: MonadResource m
                    => Bool 
                    -> FilePath 
                    -> ConduitT i FilePath m ()
sourceDirectoryDeep followSymlinks =
    start
  where
    start :: MonadResource m => FilePath -> ConduitT i FilePath m ()
    start dir = sourceDirectory dir .| awaitForever go
    go :: MonadResource m => FilePath -> ConduitT i FilePath m ()
    go fp = do
        ft <- liftIO $ F.getFileType fp
        case ft of
            F.FTFile -> yield fp
            F.FTFileSym -> yield fp
            F.FTDirectory -> start fp
            F.FTDirectorySym
                | followSymlinks -> start fp
                | otherwise -> return ()
            F.FTOther -> return ()
drop :: Monad m
     => Int
     -> ConduitT a o m ()
INLINE_RULE(drop, n, CL.drop n)
dropE :: (Monad m, Seq.IsSequence seq)
      => Seq.Index seq
      -> ConduitT seq o m ()
dropE =
    loop
  where
    loop i = if i <= 0
        then return ()
        else await >>= maybe (return ()) (go i)
    go i sq = do
        unless (onull y) $ leftover y
        loop i'
      where
        (x, y) = Seq.splitAt i sq
        i' = i - fromIntegral (olength x)
{-# INLINEABLE dropE #-}
dropWhile :: Monad m
          => (a -> Bool)
          -> ConduitT a o m ()
dropWhile f =
    loop
  where
    loop = await >>= maybe (return ()) go
    go x = if f x then loop else leftover x
{-# INLINE dropWhile #-}
dropWhileE :: (Monad m, Seq.IsSequence seq)
           => (Element seq -> Bool)
           -> ConduitT seq o m ()
dropWhileE f =
    loop
  where
    loop = await >>= maybe (return ()) go
    go sq =
        if onull x then loop else leftover x
      where
        x = Seq.dropWhile f sq
{-# INLINE dropWhileE #-}
fold :: (Monad m, Monoid a)
     => ConduitT a o m a
INLINE_RULE0(fold, CL.foldMap id)
foldE :: (Monad m, MonoFoldable mono, Monoid (Element mono))
      => ConduitT mono o m (Element mono)
INLINE_RULE0(foldE, CL.fold (\accum mono -> accum `mappend` ofoldMap id mono) mempty)
foldl :: Monad m => (a -> b -> a) -> a -> ConduitT b o m a
INLINE_RULE(foldl, f x, CL.fold f x)
foldlE :: (Monad m, MonoFoldable mono)
       => (a -> Element mono -> a)
       -> a
       -> ConduitT mono o m a
INLINE_RULE(foldlE, f x, CL.fold (ofoldlPrime f) x)
ofoldlPrime :: MonoFoldable mono => (a -> Element mono -> a) -> a -> mono -> a
ofoldlPrime = ofoldl'
foldMap :: (Monad m, Monoid b)
        => (a -> b)
        -> ConduitT a o m b
INLINE_RULE(foldMap, f, CL.foldMap f)
foldMapE :: (Monad m, MonoFoldable mono, Monoid w)
         => (Element mono -> w)
         -> ConduitT mono o m w
INLINE_RULE(foldMapE, f, CL.foldMap (ofoldMap f))
foldl1, foldl1C :: Monad m => (a -> a -> a) -> ConduitT a o m (Maybe a)
foldl1C f =
    await >>= maybe (return Nothing) loop
  where
    loop !prev = await >>= maybe (return $ Just prev) (loop . f prev)
STREAMING(foldl1, foldl1C, foldl1S, f)
foldl1E :: (Monad m, MonoFoldable mono, a ~ Element mono)
        => (a -> a -> a)
        -> ConduitT mono o m (Maybe a)
INLINE_RULE(foldl1E, f, foldl (foldMaybeNull f) Nothing)
foldMaybeNull :: (MonoFoldable mono, e ~ Element mono)
              => (e -> e -> e)
              -> Maybe e
              -> mono
              -> Maybe e
foldMaybeNull f macc mono =
    case (macc, NonNull.fromNullable mono) of
        (Just acc, Just nn) -> Just $ ofoldl' f acc nn
        (Nothing, Just nn) -> Just $ NonNull.ofoldl1' f nn
        _ -> macc
{-# INLINE foldMaybeNull #-}
all, allC :: Monad m
          => (a -> Bool)
          -> ConduitT a o m Bool
allC f = fmap isNothing $ find (Prelude.not . f)
{-# INLINE allC #-}
STREAMING(all, allC, allS, f)
allE :: (Monad m, MonoFoldable mono)
     => (Element mono -> Bool)
     -> ConduitT mono o m Bool
INLINE_RULE(allE, f, all (oall f))
any, anyC :: Monad m
          => (a -> Bool)
          -> ConduitT a o m Bool
anyC = fmap isJust . find
{-# INLINE anyC #-}
STREAMING(any, anyC, anyS, f)
anyE :: (Monad m, MonoFoldable mono)
     => (Element mono -> Bool)
     -> ConduitT mono o m Bool
INLINE_RULE(anyE, f, any (oany f))
and :: Monad m => ConduitT Bool o m Bool
INLINE_RULE0(and, all id)
andE :: (Monad m, MonoFoldable mono, Element mono ~ Bool)
     => ConduitT mono o m Bool
INLINE_RULE0(andE, allE id)
or :: Monad m => ConduitT Bool o m Bool
INLINE_RULE0(or, any id)
orE :: (Monad m, MonoFoldable mono, Element mono ~ Bool)
    => ConduitT mono o m Bool
INLINE_RULE0(orE, anyE id)
asum :: (Monad m, Alternative f)
     => ConduitT (f a) o m (f a)
INLINE_RULE0(asum, foldl (<|>) empty)
elem :: (Monad m, Eq a) => a -> ConduitT a o m Bool
INLINE_RULE(elem, x, any (== x))
elemE :: (Monad m, Seq.IsSequence seq, Eq (Element seq))
      => Element seq
      -> ConduitT seq o m Bool
INLINE_RULE(elemE, f, any (oelem f))
notElem :: (Monad m, Eq a) => a -> ConduitT a o m Bool
INLINE_RULE(notElem, x, all (/= x))
notElemE :: (Monad m, Seq.IsSequence seq, Eq (Element seq))
         => Element seq
         -> ConduitT seq o m Bool
INLINE_RULE(notElemE, x, all (onotElem x))
sinkLazy, sinkLazyC :: (Monad m, LazySequence lazy strict)
                    => ConduitT strict o m lazy
sinkLazyC = (fromChunks . ($ [])) <$> CL.fold (\front next -> front . (next:)) id
{-# INLINE sinkLazyC #-}
STREAMING0(sinkLazy, sinkLazyC, sinkLazyS)
sinkList :: Monad m => ConduitT a o m [a]
INLINE_RULE0(sinkList, CL.consume)
sinkVector, sinkVectorC :: (V.Vector v a, PrimMonad m)
                        => ConduitT a o m (v a)
sinkVectorC = do
    let initSize = 10
    mv0 <- VM.new initSize
    let go maxSize i mv | i >= maxSize = do
            let newMax = maxSize * 2
            mv' <- VM.grow mv maxSize
            go newMax i mv'
        go maxSize i mv = do
            mx <- await
            case mx of
                Nothing -> V.slice 0 i <$> V.unsafeFreeze mv
                Just x -> do
                    VM.write mv i x
                    go maxSize (i + 1) mv
    go initSize 0 mv0
{-# INLINEABLE sinkVectorC #-}
STREAMING0(sinkVector, sinkVectorC, sinkVectorS)
sinkVectorN, sinkVectorNC :: (V.Vector v a, PrimMonad m)
                          => Int 
                          -> ConduitT a o m (v a)
sinkVectorNC maxSize = do
    mv <- VM.new maxSize
    let go i | i >= maxSize = V.unsafeFreeze mv
        go i = do
            mx <- await
            case mx of
                Nothing -> V.slice 0 i <$> V.unsafeFreeze mv
                Just x -> do
                    VM.write mv i x
                    go (i + 1)
    go 0
{-# INLINEABLE sinkVectorNC #-}
STREAMING(sinkVectorN, sinkVectorNC, sinkVectorNS, maxSize)
sinkLazyBuilder, sinkLazyBuilderC :: Monad m => ConduitT Builder o m BL.ByteString
sinkLazyBuilderC = fmap toLazyByteString fold
{-# INLINE sinkLazyBuilderC #-}
STREAMING0(sinkLazyBuilder, sinkLazyBuilderC, sinkLazyBuilderS)
sinkNull :: Monad m => ConduitT a o m ()
INLINE_RULE0(sinkNull, CL.sinkNull)
awaitNonNull :: (Monad m, MonoFoldable a) => ConduitT a o m (Maybe (NonNull.NonNull a))
awaitNonNull =
    go
  where
    go = await >>= maybe (return Nothing) go'
    go' = maybe go (return . Just) . NonNull.fromNullable
{-# INLINE awaitNonNull #-}
head :: Monad m => ConduitT a o m (Maybe a)
head = CL.head
headDef :: Monad m => a -> ConduitT a o m a
headDef a = fromMaybe a <$> head
headE :: (Monad m, Seq.IsSequence seq) => ConduitT seq o m (Maybe (Element seq))
headE =
    loop
  where
    loop = await >>= maybe (return Nothing) go
    go x =
        case Seq.uncons x of
            Nothing -> loop
            Just (y, z) -> do
                unless (onull z) $ leftover z
                return $ Just y
{-# INLINE headE #-}
peek :: Monad m => ConduitT a o m (Maybe a)
peek = CL.peek
{-# INLINE peek #-}
peekE :: (Monad m, MonoFoldable mono) => ConduitT mono o m (Maybe (Element mono))
peekE =
    loop
  where
    loop = await >>= maybe (return Nothing) go
    go x =
        case headMay x of
            Nothing -> loop
            Just y -> do
                leftover x
                return $ Just y
{-# INLINE peekE #-}
last, lastC :: Monad m => ConduitT a o m (Maybe a)
lastC =
    await >>= maybe (return Nothing) loop
  where
    loop prev = await >>= maybe (return $ Just prev) loop
STREAMING0(last, lastC, lastS)
lastDef :: Monad m => a -> ConduitT a o m a
lastDef a = fromMaybe a <$> last
lastE, lastEC :: (Monad m, Seq.IsSequence seq) => ConduitT seq o m (Maybe (Element seq))
lastEC =
    awaitNonNull >>= maybe (return Nothing) (loop . NonNull.last)
  where
    loop prev = awaitNonNull >>= maybe (return $ Just prev) (loop . NonNull.last)
STREAMING0(lastE, lastEC, lastES)
length :: (Monad m, Num len) => ConduitT a o m len
INLINE_RULE0(length, foldl (\x _ -> x + 1) 0)
lengthE :: (Monad m, Num len, MonoFoldable mono) => ConduitT mono o m len
INLINE_RULE0(lengthE, foldl (\x y -> x + fromIntegral (olength y)) 0)
lengthIf :: (Monad m, Num len) => (a -> Bool) -> ConduitT a o m len
INLINE_RULE(lengthIf, f, foldl (\cnt a -> if f a then (cnt + 1) else cnt) 0)
lengthIfE :: (Monad m, Num len, MonoFoldable mono)
          => (Element mono -> Bool) -> ConduitT mono o m len
INLINE_RULE(lengthIfE, f, foldlE (\cnt a -> if f a then (cnt + 1) else cnt) 0)
maximum :: (Monad m, Ord a) => ConduitT a o m (Maybe a)
INLINE_RULE0(maximum, foldl1 max)
maximumE :: (Monad m, Seq.IsSequence seq, Ord (Element seq)) => ConduitT seq o m (Maybe (Element seq))
INLINE_RULE0(maximumE, foldl1E max)
minimum :: (Monad m, Ord a) => ConduitT a o m (Maybe a)
INLINE_RULE0(minimum, foldl1 min)
minimumE :: (Monad m, Seq.IsSequence seq, Ord (Element seq)) => ConduitT seq o m (Maybe (Element seq))
INLINE_RULE0(minimumE, foldl1E min)
null :: Monad m => ConduitT a o m Bool
null = (maybe True (\_ -> False)) `fmap` peek
{-# INLINE null #-}
nullE :: (Monad m, MonoFoldable mono)
      => ConduitT mono o m Bool
nullE =
    go
  where
    go = await >>= maybe (return True) go'
    go' x = if onull x then go else leftover x >> return False
{-# INLINE nullE #-}
sum :: (Monad m, Num a) => ConduitT a o m a
INLINE_RULE0(sum, foldl (+) 0)
sumE :: (Monad m, MonoFoldable mono, Num (Element mono)) => ConduitT mono o m (Element mono)
INLINE_RULE0(sumE, foldlE (+) 0)
product :: (Monad m, Num a) => ConduitT a o m a
INLINE_RULE0(product, foldl (*) 1)
productE :: (Monad m, MonoFoldable mono, Num (Element mono)) => ConduitT mono o m (Element mono)
INLINE_RULE0(productE, foldlE (*) 1)
find, findC :: Monad m => (a -> Bool) -> ConduitT a o m (Maybe a)
findC f =
    loop
  where
    loop = await >>= maybe (return Nothing) go
    go x = if f x then return (Just x) else loop
{-# INLINE findC #-}
STREAMING(find, findC, findS, f)
mapM_ :: Monad m => (a -> m ()) -> ConduitT a o m ()
INLINE_RULE(mapM_, f, CL.mapM_ f)
mapM_E :: (Monad m, MonoFoldable mono) => (Element mono -> m ()) -> ConduitT mono o m ()
INLINE_RULE(mapM_E, f, CL.mapM_ (omapM_ f))
foldM :: Monad m => (a -> b -> m a) -> a -> ConduitT b o m a
INLINE_RULE(foldM, f x, CL.foldM f x)
foldME :: (Monad m, MonoFoldable mono)
       => (a -> Element mono -> m a)
       -> a
       -> ConduitT mono o m a
INLINE_RULE(foldME, f x, foldM (ofoldlM f) x)
foldMapM :: (Monad m, Monoid w) => (a -> m w) -> ConduitT a o m w
INLINE_RULE(foldMapM, f, CL.foldMapM f)
foldMapME :: (Monad m, MonoFoldable mono, Monoid w)
          => (Element mono -> m w)
          -> ConduitT mono o m w
INLINE_RULE(foldMapME, f, CL.foldM (ofoldlM (\accum e -> mappend accum `liftM` f e)) mempty)
sinkFileBS :: MonadResource m => FilePath -> ConduitT ByteString o m ()
sinkFileBS = sinkFile
{-# INLINE sinkFileBS #-}
print :: (Show a, MonadIO m) => ConduitT a o m ()
INLINE_RULE0(print, mapM_ (liftIO . Prelude.print))
stdout :: MonadIO m => ConduitT ByteString o m ()
INLINE_RULE0(stdout, sinkHandle IO.stdout)
stderr :: MonadIO m => ConduitT ByteString o m ()
INLINE_RULE0(stderr, sinkHandle IO.stderr)
map :: Monad m => (a -> b) -> ConduitT a b m ()
INLINE_RULE(map, f, CL.map f)
mapE :: (Monad m, Functor f) => (a -> b) -> ConduitT (f a) (f b) m ()
INLINE_RULE(mapE, f, CL.map (fmap f))
omapE :: (Monad m, MonoFunctor mono) => (Element mono -> Element mono) -> ConduitT mono mono m ()
INLINE_RULE(omapE, f, CL.map (omap f))
concatMap, concatMapC :: (Monad m, MonoFoldable mono)
                      => (a -> mono)
                      -> ConduitT a (Element mono) m ()
concatMapC f = awaitForever (yieldMany . f)
{-# INLINE concatMapC #-}
STREAMING(concatMap, concatMapC, concatMapS, f)
concatMapE :: (Monad m, MonoFoldable mono, Monoid w)
           => (Element mono -> w)
           -> ConduitT mono w m ()
INLINE_RULE(concatMapE, f, CL.map (ofoldMap f))
take :: Monad m => Int -> ConduitT a a m ()
INLINE_RULE(take, n, CL.isolate n)
takeE :: (Monad m, Seq.IsSequence seq)
      => Seq.Index seq
      -> ConduitT seq seq m ()
takeE =
    loop
  where
    loop i = if i <= 0
        then return ()
        else await >>= maybe (return ()) (go i)
    go i sq = do
        unless (onull x) $ yield x
        unless (onull y) $ leftover y
        loop i'
      where
        (x, y) = Seq.splitAt i sq
        i' = i - fromIntegral (olength x)
{-# INLINEABLE takeE #-}
takeWhile :: Monad m
          => (a -> Bool)
          -> ConduitT a a m ()
takeWhile f =
    loop
  where
    loop = await >>= maybe (return ()) go
    go x = if f x
        then yield x >> loop
        else leftover x
{-# INLINE takeWhile #-}
takeWhileE :: (Monad m, Seq.IsSequence seq)
           => (Element seq -> Bool)
           -> ConduitT seq seq m ()
takeWhileE f =
    loop
  where
    loop = await >>= maybe (return ()) go
    go sq = do
        unless (onull x) $ yield x
        if onull y
            then loop
            else leftover y
      where
        (x, y) = Seq.span f sq
{-# INLINE takeWhileE #-}
takeExactly :: Monad m
            => Int
            -> ConduitT a b m r
            -> ConduitT a b m r
takeExactly count inner = take count .| do
    r <- inner
    CL.sinkNull
    return r
takeExactlyE :: (Monad m, Seq.IsSequence a)
             => Seq.Index a
             -> ConduitT a b m r
             -> ConduitT a b m r
takeExactlyE count inner = takeE count .| do
    r <- inner
    CL.sinkNull
    return r
{-# INLINE takeExactlyE #-}
concat, concatC :: (Monad m, MonoFoldable mono)
                => ConduitT mono (Element mono) m ()
concatC = awaitForever yieldMany
STREAMING0(concat, concatC, concatS)
filter :: Monad m => (a -> Bool) -> ConduitT a a m ()
INLINE_RULE(filter, f, CL.filter f)
filterE :: (Seq.IsSequence seq, Monad m) => (Element seq -> Bool) -> ConduitT seq seq m ()
INLINE_RULE(filterE, f, CL.map (Seq.filter f))
mapWhile :: Monad m => (a -> Maybe b) -> ConduitT a b m ()
mapWhile f =
    loop
  where
    loop = await >>= maybe (return ()) go
    go x =
        case f x of
            Just y -> yield y >> loop
            Nothing -> leftover x
{-# INLINE mapWhile #-}
conduitVector :: (V.Vector v a, PrimMonad m)
              => Int 
              -> ConduitT a (v a) m ()
conduitVector size =
    loop
  where
    loop = do
        v <- sinkVectorN size
        unless (V.null v) $ do
            yield v
            loop
{-# INLINE conduitVector #-}
scanl, scanlC :: Monad m => (a -> b -> a) -> a -> ConduitT b a m ()
scanlC f =
    loop
  where
    loop seed =
        await >>= maybe (yield seed) go
      where
        go b = do
            let seed' = f seed b
            seed' `seq` yield seed
            loop seed'
STREAMING(scanl, scanlC, scanlS, f x)
mapAccumWhile, mapAccumWhileC :: Monad m => (a -> s -> Either s (s, b)) -> s -> ConduitT a b m s
mapAccumWhileC f =
    loop
  where
    loop !s = await >>= maybe (return s) go
      where
        go a = either (return $!) (\(s', b) -> yield b >> loop s') $ f a s
{-# INLINE mapAccumWhileC #-}
STREAMING(mapAccumWhile, mapAccumWhileC, mapAccumWhileS, f s)
concatMapAccum :: Monad m => (a -> accum -> (accum, [b])) -> accum -> ConduitT a b m ()
INLINE_RULE0(concatMapAccum, CL.concatMapAccum)
intersperse, intersperseC :: Monad m => a -> ConduitT a a m ()
intersperseC x =
    await >>= omapM_ go
  where
    go y = yield y >> concatMap (\z -> [x, z])
STREAMING(intersperse, intersperseC, intersperseS, x)
slidingWindow, slidingWindowC :: (Monad m, Seq.IsSequence seq, Element seq ~ a) => Int -> ConduitT a seq m ()
slidingWindowC sz = go (max 1 sz) mempty
    where goContinue st = await >>=
                          maybe (return ())
                                (\x -> do
                                   let st' = Seq.snoc st x
                                   yield st' >> goContinue (Seq.unsafeTail st')
                                )
          go 0 st = yield st >> goContinue (Seq.unsafeTail st)
          go !n st = CL.head >>= \m ->
                     case m of
                       Nothing -> yield st
                       Just x -> go (n-1) (Seq.snoc st x)
STREAMING(slidingWindow, slidingWindowC, slidingWindowS, sz)
chunksOfE :: (Monad m, Seq.IsSequence seq) => Seq.Index seq -> ConduitT seq seq m ()
chunksOfE chunkSize = chunksOfExactlyE chunkSize >> (await >>= maybe (return ()) yield)
chunksOfExactlyE :: (Monad m, Seq.IsSequence seq) => Seq.Index seq -> ConduitT seq seq m ()
chunksOfExactlyE chunkSize = await >>= maybe (return ()) start
    where
        start b
            | onull b = chunksOfE chunkSize
            | Seq.lengthIndex b < chunkSize = continue (Seq.lengthIndex b) [b]
            | otherwise = let (first,rest) = Seq.splitAt chunkSize b in
                            yield first >> start rest
        continue !sofar bs = do
            next <- await
            case next of
                Nothing -> leftover (mconcat $ Prelude.reverse bs)
                Just next' ->
                    let !sofar' = Seq.lengthIndex next' + sofar
                        bs' = next':bs
                    in if sofar' < chunkSize
                            then continue sofar' bs'
                            else start (mconcat (Prelude.reverse bs'))
mapM :: Monad m => (a -> m b) -> ConduitT a b m ()
INLINE_RULE(mapM, f, CL.mapM f)
mapME :: (Monad m, Data.Traversable.Traversable f) => (a -> m b) -> ConduitT (f a) (f b) m ()
INLINE_RULE(mapME, f, CL.mapM (Data.Traversable.mapM f))
omapME :: (Monad m, MonoTraversable mono)
       => (Element mono -> m (Element mono))
       -> ConduitT mono mono m ()
INLINE_RULE(omapME, f, CL.mapM (omapM f))
concatMapM, concatMapMC :: (Monad m, MonoFoldable mono)
                        => (a -> m mono)
                        -> ConduitT a (Element mono) m ()
concatMapMC f = awaitForever (lift . f >=> yieldMany)
STREAMING(concatMapM, concatMapMC, concatMapMS, f)
filterM, filterMC :: Monad m
                  => (a -> m Bool)
                  -> ConduitT a a m ()
filterMC f =
    awaitForever go
  where
    go x = do
        b <- lift $ f x
        when b $ yield x
STREAMING(filterM, filterMC, filterMS, f)
filterME :: (Monad m, Seq.IsSequence seq) => (Element seq -> m Bool) -> ConduitT seq seq m ()
INLINE_RULE(filterME, f, CL.mapM (Seq.filterM f))
iterM :: Monad m => (a -> m ()) -> ConduitT a a m ()
INLINE_RULE(iterM, f, CL.iterM f)
scanlM, scanlMC :: Monad m => (a -> b -> m a) -> a -> ConduitT b a m ()
scanlMC f =
    loop
  where
    loop seed =
        await >>= maybe (yield seed) go
      where
        go b = do
            seed' <- lift $ f seed b
            seed' `seq` yield seed
            loop seed'
STREAMING(scanlM, scanlMC, scanlMS, f x)
mapAccumWhileM, mapAccumWhileMC :: Monad m => (a -> s -> m (Either s (s, b))) -> s -> ConduitT a b m s
mapAccumWhileMC f =
    loop
  where
    loop !s = await >>= maybe (return s) go
      where
        go a = lift (f a s) >>= either (return $!) (\(s', b) -> yield b >> loop s')
{-# INLINE mapAccumWhileMC #-}
STREAMING(mapAccumWhileM, mapAccumWhileMC, mapAccumWhileMS, f s)
concatMapAccumM :: Monad m => (a -> accum -> m (accum, [b])) -> accum -> ConduitT a b m ()
INLINE_RULE(concatMapAccumM, f x, CL.concatMapAccumM f x)
encodeUtf8 :: (Monad m, DTE.Utf8 text binary) => ConduitT text binary m ()
INLINE_RULE0(encodeUtf8, map DTE.encodeUtf8)
decodeUtf8 :: MonadThrow m => ConduitT ByteString Text m ()
decodeUtf8 =
    loop TE.streamDecodeUtf8
  where
    loop parse =
        await >>= maybe done go
      where
        parse' = unsafePerformIO . try . evaluate . parse
        done =
          case parse' mempty of
            Left e -> throwM (e :: TEE.UnicodeException)
            Right (TE.Some t bs _) -> do
              unless (T.null t) (yield t)
              unless (S.null bs) (yield $ T.replicate (S.length bs) (T.singleton '\xFFFD'))
        go bs = do
          case parse' bs of
            Left e -> do
              leftover bs
              throwM (e :: TEE.UnicodeException)
            Right (TE.Some t _ next) -> do
              unless (T.null t) (yield t)
              loop next
decodeUtf8Lenient :: Monad m => ConduitT ByteString Text m ()
decodeUtf8Lenient =
    loop (TE.streamDecodeUtf8With TEE.lenientDecode)
  where
    loop parse =
        await >>= maybe done go
      where
        done = do
          let TE.Some t bs _ = parse mempty
          unless (T.null t) (yield t)
          unless (S.null bs) (yield $ T.replicate (S.length bs) (T.singleton '\xFFFD'))
        go bs = do
          let TE.Some t _ next = parse bs
          unless (T.null t) (yield t)
          loop next
line :: (Monad m, Seq.IsSequence seq, Element seq ~ Char)
     => ConduitT seq o m r
     -> ConduitT seq o m r
line = takeExactlyUntilE (== '\n')
{-# INLINE line #-}
lineAscii :: (Monad m, Seq.IsSequence seq, Element seq ~ Word8)
          => ConduitT seq o m r
          -> ConduitT seq o m r
lineAscii = takeExactlyUntilE (== 10)
{-# INLINE lineAscii #-}
takeExactlyUntilE :: (Monad m, Seq.IsSequence seq)
                  => (Element seq -> Bool)
                  -> ConduitT seq o m r
                  -> ConduitT seq o m r
takeExactlyUntilE f inner =
    loop .| do
        x <- inner
        sinkNull
        return x
  where
    loop = await >>= omapM_ go
    go t =
        if onull y
            then yield x >> loop
            else do
                unless (onull x) $ yield x
                let y' = Seq.drop 1 y
                unless (onull y') $ leftover y'
      where
        (x, y) = Seq.break f t
{-# INLINE takeExactlyUntilE #-}
unlines :: (Monad m, Seq.IsSequence seq, Element seq ~ Char) => ConduitT seq seq m ()
INLINE_RULE0(unlines, concatMap (:[Seq.singleton '\n']))
unlinesAscii :: (Monad m, Seq.IsSequence seq, Element seq ~ Word8) => ConduitT seq seq m ()
INLINE_RULE0(unlinesAscii, concatMap (:[Seq.singleton 10]))
splitOnUnboundedE, splitOnUnboundedEC :: (Monad m, Seq.IsSequence seq) => (Element seq -> Bool) -> ConduitT seq seq m ()
splitOnUnboundedEC f =
    start
  where
    start = await >>= maybe (return ()) (loop id)
    loop bldr t =
        if onull y
            then do
                mt <- await
                case mt of
                    Nothing -> let finalChunk = mconcat $ bldr [t]
                               in  unless (onull finalChunk) $ yield finalChunk
                    Just t' -> loop (bldr . (t:)) t'
            else yield (mconcat $ bldr [x]) >> loop id (Seq.drop 1 y)
      where
        (x, y) = Seq.break f t
STREAMING(splitOnUnboundedE, splitOnUnboundedEC, splitOnUnboundedES, f)
linesUnbounded :: (Monad m, Seq.IsSequence seq, Element seq ~ Char)
               => ConduitT seq seq m ()
INLINE_RULE0(linesUnbounded, splitOnUnboundedE (== '\n'))
linesUnboundedAscii :: (Monad m, Seq.IsSequence seq, Element seq ~ Word8)
                    => ConduitT seq seq m ()
INLINE_RULE0(linesUnboundedAscii, splitOnUnboundedE (== 10))
builderToByteString :: PrimMonad m => ConduitT Builder S.ByteString m ()
builderToByteString = builderToByteStringWith defaultStrategy
{-# INLINE builderToByteString #-}
builderToByteStringFlush :: PrimMonad m
                         => ConduitT (Flush Builder) (Flush S.ByteString) m ()
builderToByteStringFlush = builderToByteStringWithFlush defaultStrategy
{-# INLINE builderToByteStringFlush #-}
unsafeBuilderToByteString :: PrimMonad m
                          => ConduitT Builder S.ByteString m ()
unsafeBuilderToByteString =
  builderToByteStringWith (reuseBufferStrategy (allocBuffer defaultChunkSize))
{-# INLINE unsafeBuilderToByteString #-}
builderToByteStringWith :: PrimMonad m
                        => BufferAllocStrategy
                        -> ConduitT Builder S.ByteString m ()
builderToByteStringWith =
    bbhelper (liftM (fmap Chunk) await) yield'
  where
    yield' Flush = return ()
    yield' (Chunk bs) = yield bs
{-# INLINE builderToByteStringWith #-}
builderToByteStringWithFlush
    :: PrimMonad m
    => BufferAllocStrategy
    -> ConduitT (Flush Builder) (Flush S.ByteString) m ()
builderToByteStringWithFlush = bbhelper await yield
{-# INLINE builderToByteStringWithFlush #-}
bbhelper
  :: PrimMonad m
  => m (Maybe (Flush Builder))
  -> (Flush S.ByteString -> m ())
  -> BufferAllocStrategy
  -> m ()
bbhelper await' yield' strat = do
    (recv, finish) <- unsafePrimToPrim $ newByteStringBuilderRecv strat
    let loop = await' >>= maybe finish' cont
        finish' = do
            mbs <- unsafePrimToPrim finish
            maybe (return ()) (yield' . Chunk) mbs
        cont fbuilder = do
            let builder =
                    case fbuilder of
                        Flush -> BB.flush
                        Chunk b -> b
            popper <- unsafePrimToPrim $ recv builder
            let cont' = do
                    bs <- unsafePrimToPrim popper
                    unless (S.null bs) $ do
                        yield' (Chunk bs)
                        cont'
            cont'
            case fbuilder of
                Flush -> yield' Flush
                Chunk _ -> return ()
            loop
    loop
{-# INLINE bbhelper #-}
type BuilderPopper = IO S.ByteString
type BuilderRecv = Builder -> IO BuilderPopper
type BuilderFinish = IO (Maybe S.ByteString)
newByteStringBuilderRecv :: BufferAllocStrategy -> IO (BuilderRecv, BuilderFinish)
newByteStringBuilderRecv (ioBufInit, nextBuf) = do
    refBuf <- newIORef ioBufInit
    return (push refBuf, finish refBuf)
  where
    finish refBuf = do
        ioBuf <- readIORef refBuf
        buf <- ioBuf
        return $ unsafeFreezeNonEmptyBuffer buf
    push refBuf builder = do
        refWri <- newIORef $ Left $ BB.runBuilder builder
        return $ popper refBuf refWri
    popper refBuf refWri = do
        ioBuf <- readIORef refBuf
        ebWri <- readIORef refWri
        case ebWri of
            Left bWri -> do
                !buf@(Buffer _ _ op ope) <- ioBuf
                (bytes, next) <- bWri op (ope `minusPtr` op)
                let op' = op `plusPtr` bytes
                case next of
                    BB.Done -> do
                        writeIORef refBuf $ return $ updateEndOfSlice buf op'
                        return S.empty
                    BB.More minSize bWri' -> do
                        let buf' = updateEndOfSlice buf op'
                            {-# INLINE cont #-}
                            cont mbs = do
                                
                                
                                
                                ioBuf' <- nextBuf minSize buf'
                                writeIORef refBuf ioBuf'
                                writeIORef refWri $ Left bWri'
                                case mbs of
                                    Just bs | not $ S.null bs -> return bs
                                    _ -> popper refBuf refWri
                        cont $ unsafeFreezeNonEmptyBuffer buf'
                    BB.Chunk bs bWri' -> do
                        let buf' = updateEndOfSlice buf op'
                        let yieldBS = do
                                nextBuf 1 buf' >>= writeIORef refBuf
                                writeIORef refWri $ Left bWri'
                                if S.null bs
                                    then popper refBuf refWri
                                    else return bs
                        case unsafeFreezeNonEmptyBuffer buf' of
                            Nothing -> yieldBS
                            Just bs' -> do
                                writeIORef refWri $ Right yieldBS
                                return bs'
            Right action -> action
data Buffer = Buffer {-# UNPACK #-} !(ForeignPtr Word8) 
                     {-# UNPACK #-} !(Ptr Word8)        
                     {-# UNPACK #-} !(Ptr Word8)        
                     {-# UNPACK #-} !(Ptr Word8)        
{-# INLINE unsafeFreezeBuffer #-}
unsafeFreezeBuffer :: Buffer -> S.ByteString
unsafeFreezeBuffer (Buffer fpbuf p0 op _) =
    PS fpbuf (p0 `minusPtr` unsafeForeignPtrToPtr fpbuf) (op `minusPtr` p0)
{-# INLINE unsafeFreezeNonEmptyBuffer #-}
unsafeFreezeNonEmptyBuffer :: Buffer -> Maybe S.ByteString
unsafeFreezeNonEmptyBuffer buf
  | sliceSize buf <= 0 = Nothing
  | otherwise          = Just $ unsafeFreezeBuffer buf
{-# INLINE updateEndOfSlice #-}
updateEndOfSlice :: Buffer    
                 -> Ptr Word8 
                 -> Buffer    
updateEndOfSlice (Buffer fpbuf p0 _ ope) op' = Buffer fpbuf p0 op' ope
sliceSize :: Buffer -> Int
sliceSize (Buffer _ p0 op _) = op `minusPtr` p0
type BufferAllocStrategy = (IO Buffer, Int -> Buffer -> IO (IO Buffer))
defaultStrategy :: BufferAllocStrategy
defaultStrategy = allNewBuffersStrategy defaultChunkSize
allNewBuffersStrategy :: Int                 
                      -> BufferAllocStrategy
allNewBuffersStrategy bufSize =
    ( allocBuffer bufSize
    , \reqSize _ -> return (allocBuffer (max reqSize bufSize)) )
reuseBufferStrategy :: IO Buffer
                    -> BufferAllocStrategy
reuseBufferStrategy buf0 =
    (buf0, tryReuseBuffer)
  where
    tryReuseBuffer reqSize buf
      | bufferSize buf >= reqSize = return $ return (reuseBuffer buf)
      | otherwise                 = return $ allocBuffer reqSize
bufferSize :: Buffer -> Int
bufferSize (Buffer fpbuf _ _ ope) =
    ope `minusPtr` unsafeForeignPtrToPtr fpbuf
{-# INLINE allocBuffer #-}
allocBuffer :: Int -> IO Buffer
allocBuffer size = do
    fpbuf <- mallocByteString size
    let !pbuf = unsafeForeignPtrToPtr fpbuf
    return $! Buffer fpbuf pbuf pbuf (pbuf `plusPtr` size)
{-# INLINE reuseBuffer #-}
reuseBuffer :: Buffer -> Buffer
reuseBuffer (Buffer fpbuf _ _ ope) = Buffer fpbuf p0 p0 ope
  where
    p0 = unsafeForeignPtrToPtr fpbuf
vectorBuilder :: (PrimMonad m, PrimMonad n, V.Vector v e, PrimState m ~ PrimState n)
              => Int 
              -> ((e -> n ()) -> ConduitT i Void m r)
              -> ConduitT i (v e) m r
vectorBuilder size inner = do
    ref <- do
        mv <- VM.new size
        newMutVar $! S 0 mv id
    res <- onAwait (yieldS ref) (inner (addE ref))
    vs <- do
        S idx mv front <- readMutVar ref
        end <-
            if idx == 0
                then return []
                else do
                    v <- V.unsafeFreeze mv
                    return [V.unsafeTake idx v]
        return $ front end
    Prelude.mapM_ yield vs
    return res
{-# INLINE vectorBuilder #-}
data S s v e = S
    {-# UNPACK #-} !Int 
    !(V.Mutable v s e)
    ([v e] -> [v e])
onAwait :: Monad m
        => ConduitT i o m ()
        -> ConduitT i Void m r
        -> ConduitT i o m r
onAwait (ConduitT callback) (ConduitT sink0) = ConduitT $ \rest -> let
    go (Done r) = rest r
    go (HaveOutput _ o) = absurd o
    go (NeedInput f g) = callback $ \() -> NeedInput (go . f) (go . g)
    go (PipeM mp) = PipeM (liftM go mp)
    go (Leftover f i) = Leftover (go f) i
    in go (sink0 Done)
{-# INLINE onAwait #-}
yieldS :: PrimMonad m
       => MutVar (PrimState m) (S (PrimState m) v e)
       -> ConduitT i (v e) m ()
yieldS ref = do
    S idx mv front <- readMutVar ref
    Prelude.mapM_ yield (front [])
    writeMutVar ref $! S idx mv id
{-# INLINE yieldS #-}
addE :: (PrimMonad m, V.Vector v e)
     => MutVar (PrimState m) (S (PrimState m) v e)
     -> e
     -> m ()
addE ref e = do
    S idx mv front <- readMutVar ref
    VM.write mv idx e
    let idx' = succ idx
        size = VM.length mv
    if idx' >= size
        then do
            v <- V.unsafeFreeze mv
            let front' = front . (v:)
            mv' <- VM.new size
            writeMutVar ref $! S 0 mv' front'
        else writeMutVar ref $! S idx' mv front
{-# INLINE addE #-}
mapAccumS
  :: Monad m
  => (a -> s -> ConduitT b Void m s)
  -> s
  -> ConduitT () b m ()
  -> ConduitT a Void m s
mapAccumS f s xs = do
    (_, u) <- loop (sealConduitT xs, s)
    return u
    where loop r@(ys, !t) = await >>= maybe (return r) go
              where go a  = lift (ys $$++ f a t) >>= loop
{-# INLINE mapAccumS #-}
peekForever :: Monad m => ConduitT i o m () -> ConduitT i o m ()
peekForever inner =
    loop
  where
    loop = do
        mx <- peek
        case mx of
            Nothing -> return ()
            Just _ -> inner >> loop
peekForeverE :: (Monad m, MonoFoldable i)
             => ConduitT i o m ()
             -> ConduitT i o m ()
peekForeverE inner =
    loop
  where
    loop = do
        mx <- peekE
        case mx of
            Nothing -> return ()
            Just _ -> inner >> loop