module Data.Conduit.Combinators
(
yieldMany
, unfold
, enumFromTo
, iterate
, repeat
, replicate
, sourceLazy
, repeatM
, repeatWhileM
, replicateM
, sourceFile
, sourceHandle
, sourceIOHandle
, stdin
, sourceRandom
, sourceRandomN
, sourceRandomGen
, sourceRandomNGen
, sourceDirectory
, sourceDirectoryDeep
, drop
, dropE
, dropWhile
, dropWhileE
, fold
, foldE
, foldl
, foldlE
, foldMap
, foldMapE
, all
, allE
, any
, anyE
, and
, andE
, or
, orE
, elem
, elemE
, notElem
, notElemE
, sinkLazy
, sinkList
, sinkVector
, sinkVectorN
, sinkBuilder
, sinkLazyBuilder
, sinkNull
, awaitNonNull
, headE
, peek
, peekE
, last
, lastE
, length
, lengthE
, lengthIf
, lengthIfE
, maximum
, maximumE
, minimum
, minimumE
, null
, nullE
, sum
, sumE
, product
, productE
, find
, mapM_
, mapM_E
, foldM
, foldME
, foldMapM
, foldMapME
, sinkFile
, sinkHandle
, sinkIOHandle
, print
, stdout
, stderr
, map
, mapE
, omapE
, concatMap
, concatMapE
, take
, takeE
, takeWhile
, takeWhileE
, takeExactly
, takeExactlyE
, concat
, filter
, filterE
, mapWhile
, conduitVector
, scanl
, concatMapAccum
, intersperse
, slidingWindow
, encodeBase64
, decodeBase64
, encodeBase64URL
, decodeBase64URL
, encodeBase16
, decodeBase16
, mapM
, mapME
, omapME
, concatMapM
, filterM
, filterME
, iterM
, scanlM
, concatMapAccumM
, encodeUtf8
, decodeUtf8
, decodeUtf8Lenient
, line
, lineAscii
, unlines
, unlinesAscii
, linesUnbounded
, linesUnboundedAscii
, vectorBuilder
) where
import Data.Builder
import qualified Data.NonNull as NonNull
import qualified Data.Traversable
import qualified Data.ByteString as S
import qualified Data.ByteString.Base16 as B16
import qualified Data.ByteString.Base64 as B64
import qualified Data.ByteString.Base64.URL as B64U
import Control.Applicative ((<$>))
import Control.Exception (assert)
import Control.Category (Category (..))
import Control.Monad (unless, when, (>=>), liftM, forever)
import Control.Monad.Base (MonadBase (liftBase))
import Control.Monad.IO.Class (MonadIO (..))
import Control.Monad.Primitive (PrimMonad, PrimState)
import Control.Monad.Trans.Class (lift)
import Control.Monad.Trans.Resource (MonadResource, MonadThrow)
import Data.Conduit
import Data.Conduit.Internal (ConduitM (..), Pipe (..))
import qualified Data.Conduit.List as CL
import Data.IOData
import Data.Monoid (Monoid (..))
import Data.MonoTraversable
import qualified Data.Sequences as Seq
import Data.Sequences.Lazy
import qualified Data.Vector.Generic as V
import qualified Data.Vector.Generic.Mutable as VM
import Data.Void (absurd)
import qualified Filesystem as F
import Filesystem.Path (FilePath, (</>))
import Filesystem.Path.CurrentOS (encodeString, decodeString)
import Prelude (Bool (..), Eq (..), Int,
Maybe (..), Monad (..), Num (..),
Ord (..), fromIntegral, maybe,
($), Functor (..), Enum, seq, Show, Char, (||),
mod, otherwise, Either (..),
($!), succ)
import Data.Word (Word8)
import qualified Prelude
import System.IO (Handle)
import qualified System.IO as SIO
import qualified Data.Textual.Encoding as DTE
import qualified Data.Conduit.Text as CT
import Data.ByteString (ByteString)
import Data.Text (Text)
import qualified System.Random.MWC as MWC
import Data.Conduit.Combinators.Internal
import qualified System.PosixCompat.Files as PosixC
import Data.Primitive.MutVar (MutVar, newMutVar, readMutVar,
writeMutVar)
#ifndef WINDOWS
import qualified System.Posix.Directory as Dir
#endif
#if MIN_VERSION_conduit(1,1,0)
import qualified Data.Conduit.Filesystem as CF
#endif
yieldMany :: (Monad m, MonoFoldable mono)
=> mono
-> Producer m (Element mono)
yieldMany = ofoldMap yield
unfold :: Monad m
=> (b -> Maybe (a, b))
-> b
-> Producer m a
unfold = CL.unfold
enumFromTo :: (Monad m, Enum a, Eq a) => a -> a -> Producer m a
enumFromTo = CL.enumFromTo
iterate :: Monad m => (a -> a) -> a -> Producer m a
iterate = CL.iterate
repeat :: Monad m => a -> Producer m a
repeat = iterate id
replicate :: Monad m
=> Int
-> a
-> Producer m a
replicate count0 a =
loop count0
where
loop count = if count <= 0
then return ()
else yield a >> loop (count 1)
sourceLazy :: (Monad m, LazySequence lazy strict)
=> lazy
-> Producer m strict
sourceLazy = yieldMany . toChunks
repeatM :: Monad m
=> m a
-> Producer m a
repeatM m = forever $ lift m >>= yield
repeatWhileM :: Monad m
=> m a
-> (a -> Bool)
-> Producer m a
repeatWhileM m f =
loop
where
loop = do
x <- lift m
when (f x) $ yield x >> loop
replicateM :: Monad m
=> Int
-> m a
-> Producer m a
replicateM count0 m =
loop count0
where
loop count = if count <= 0
then return ()
else lift m >>= yield >> loop (count 1)
sourceFile :: (MonadResource m, IOData a) => FilePath -> Producer m a
sourceFile fp = sourceIOHandle (F.openFile fp SIO.ReadMode)
sourceHandle :: (MonadIO m, IOData a) => Handle -> Producer m a
sourceHandle h =
loop
where
loop = do
x <- liftIO (hGetChunk h)
if onull x
then return ()
else yield x >> loop
sourceIOHandle :: (MonadResource m, IOData a) => SIO.IO Handle -> Producer m a
sourceIOHandle alloc = bracketP alloc SIO.hClose sourceHandle
stdin :: (MonadIO m, IOData a) => Producer m a
stdin = sourceHandle SIO.stdin
sourceRandom :: (MWC.Variate a, MonadIO m) => Producer m a
sourceRandom = initRepeat (liftIO MWC.createSystemRandom) (liftIO . MWC.uniform)
sourceRandomN :: (MWC.Variate a, MonadIO m)
=> Int
-> Producer m a
sourceRandomN = initReplicate (liftIO MWC.createSystemRandom) (liftIO . MWC.uniform)
sourceRandomGen :: (MWC.Variate a, MonadBase base m, PrimMonad base)
=> MWC.Gen (PrimState base)
-> Producer m a
sourceRandomGen gen = initRepeat (return gen) (liftBase . MWC.uniform)
sourceRandomNGen :: (MWC.Variate a, MonadBase base m, PrimMonad base)
=> MWC.Gen (PrimState base)
-> Int
-> Producer m a
sourceRandomNGen gen = initReplicate (return gen) (liftBase . MWC.uniform)
sourceDirectory :: MonadResource m => FilePath -> Producer m FilePath
#if MIN_VERSION_conduit(1,1,0)
sourceDirectory = mapOutput decodeString . CF.sourceDirectory . encodeString
#else
#ifdef WINDOWS
sourceDirectory = (liftIO . F.listDirectory) >=> yieldMany
#else
sourceDirectory dir =
bracketP (Dir.openDirStream $ encodeString dir) Dir.closeDirStream loop
where
loop ds = do
fp <- liftIO $ Dir.readDirStream ds
unless (Prelude.null fp) $ do
unless (fp == "." || fp == "..")
$ yield $ dir </> decodeString fp
loop ds
#endif
#endif
sourceDirectoryDeep :: MonadResource m
=> Bool
-> FilePath
-> Producer m FilePath
#if MIN_VERSION_conduit(1,1,0)
sourceDirectoryDeep follow = mapOutput decodeString . CF.sourceDirectoryDeep follow . encodeString
#else
sourceDirectoryDeep followSymlinks =
start
where
start :: MonadResource m => FilePath -> Producer m FilePath
start dir = sourceDirectory dir =$= awaitForever go
go :: MonadResource m => FilePath -> Producer m FilePath
go fp = do
isFile' <- liftIO $ F.isFile fp
if isFile'
then yield fp
else do
follow' <- liftIO $ follow fp
when follow' (start fp)
follow :: FilePath -> Prelude.IO Bool
follow p = do
let path = encodeString p
stat <- if followSymlinks
then PosixC.getFileStatus path
else PosixC.getSymbolicLinkStatus path
return (PosixC.isDirectory stat)
#endif
drop :: Monad m
=> Int
-> Consumer a m ()
drop =
loop
where
loop i | i <= 0 = return ()
loop count = await >>= maybe (return ()) (\_ -> loop (count 1))
dropE :: (Monad m, Seq.IsSequence seq)
=> Seq.Index seq
-> Consumer seq m ()
dropE =
loop
where
loop i = if i <= 0
then return ()
else await >>= maybe (return ()) (go i)
go i seq = do
unless (onull y) $ leftover y
loop i'
where
(x, y) = Seq.splitAt i seq
i' = i fromIntegral (olength x)
dropWhile :: Monad m
=> (a -> Bool)
-> Consumer a m ()
dropWhile f =
loop
where
loop = await >>= maybe (return ()) go
go x = if f x then loop else leftover x
dropWhileE :: (Monad m, Seq.IsSequence seq)
=> (Element seq -> Bool)
-> Consumer seq m ()
dropWhileE f =
loop
where
loop = await >>= maybe (return ()) go
go seq =
if onull x then loop else leftover x
where
x = Seq.dropWhile f seq
fold :: (Monad m, Monoid a)
=> Consumer a m a
fold = CL.foldMap id
foldE :: (Monad m, MonoFoldable mono, Monoid (Element mono))
=> Consumer mono m (Element mono)
foldE = CL.fold (\accum mono -> accum `mappend` ofoldMap id mono) mempty
foldl :: Monad m => (a -> b -> a) -> a -> Consumer b m a
foldl = CL.fold
foldlE :: (Monad m, MonoFoldable mono)
=> (a -> Element mono -> a)
-> a
-> Consumer mono m a
foldlE f = CL.fold (ofoldl' f)
foldMap :: (Monad m, Monoid b)
=> (a -> b)
-> Consumer a m b
foldMap = CL.foldMap
foldMapE :: (Monad m, MonoFoldable mono, Monoid w)
=> (Element mono -> w)
-> Consumer mono m w
foldMapE = CL.foldMap . ofoldMap
all :: Monad m
=> (a -> Bool)
-> Consumer a m Bool
all f =
loop
where
loop = await >>= maybe (return True) go
go x = if f x then loop else return False
allE :: (Monad m, MonoFoldable mono)
=> (Element mono -> Bool)
-> Consumer mono m Bool
allE = all . oall
any :: Monad m
=> (a -> Bool)
-> Consumer a m Bool
any f =
loop
where
loop = await >>= maybe (return False) go
go x = if f x then return True else loop
anyE :: (Monad m, MonoFoldable mono)
=> (Element mono -> Bool)
-> Consumer mono m Bool
anyE = any . oany
and :: Monad m => Consumer Bool m Bool
and = all id
andE :: (Monad m, MonoFoldable mono, Element mono ~ Bool)
=> Consumer mono m Bool
andE = allE id
or :: Monad m => Consumer Bool m Bool
or = any id
orE :: (Monad m, MonoFoldable mono, Element mono ~ Bool)
=> Consumer mono m Bool
orE = anyE id
elem :: (Monad m, Eq a) => a -> Consumer a m Bool
elem x = any (== x)
elemE :: (Monad m, Seq.EqSequence seq)
=> Element seq
-> Consumer seq m Bool
elemE = any . Seq.elem
notElem :: (Monad m, Eq a) => a -> Consumer a m Bool
notElem x = all (/= x)
notElemE :: (Monad m, Seq.EqSequence seq)
=> Element seq
-> Consumer seq m Bool
notElemE = all . Seq.notElem
sinkLazy :: (Monad m, LazySequence lazy strict)
=> Consumer strict m lazy
sinkLazy = (fromChunks . ($ [])) <$> CL.fold (\front next -> front . (next:)) id
sinkList :: Monad m => Consumer a m [a]
sinkList = CL.consume
sinkVector :: (MonadBase base m, V.Vector v a, PrimMonad base)
=> Consumer a m (v a)
sinkVector = do
let initSize = 10
mv0 <- liftBase $ VM.new initSize
let go maxSize i mv | i >= maxSize = do
let newMax = maxSize * 2
mv' <- liftBase $ VM.grow mv maxSize
go newMax i mv'
go maxSize i mv = do
mx <- await
case mx of
Nothing -> V.slice 0 i <$> liftBase (V.unsafeFreeze mv)
Just x -> do
liftBase $ VM.write mv i x
go maxSize (i + 1) mv
go initSize 0 mv0
sinkVectorN :: (MonadBase base m, V.Vector v a, PrimMonad base)
=> Int
-> Consumer a m (v a)
sinkVectorN maxSize = do
mv <- liftBase $ VM.new maxSize
let go i | i >= maxSize = liftBase $ V.unsafeFreeze mv
go i = do
mx <- await
case mx of
Nothing -> V.slice 0 i <$> liftBase (V.unsafeFreeze mv)
Just x -> do
liftBase $ VM.write mv i x
go (i + 1)
go 0
sinkBuilder :: (Monad m, Monoid builder, ToBuilder a builder)
=> Consumer a m builder
sinkBuilder = foldMap toBuilder
sinkLazyBuilder :: (Monad m, Monoid builder, ToBuilder a builder, Builder builder lazy)
=> Consumer a m lazy
sinkLazyBuilder = fmap builderToLazy sinkBuilder
sinkNull :: Monad m => Consumer a m ()
sinkNull = CL.sinkNull
awaitNonNull :: (Monad m, MonoFoldable a) => Consumer a m (Maybe (NonNull.NonNull a))
awaitNonNull =
go
where
go = await >>= maybe (return Nothing) go'
go' = maybe go (return . Just) . NonNull.fromNullable
headE :: (Monad m, Seq.IsSequence seq) => Consumer seq m (Maybe (Element seq))
headE =
loop
where
loop = await >>= maybe (return Nothing) go
go x =
case Seq.uncons x of
Nothing -> loop
Just (y, z) -> do
unless (onull z) $ leftover z
return $ Just y
peek :: Monad m => Consumer a m (Maybe a)
peek = CL.peek
peekE :: (Monad m, MonoFoldable mono) => Consumer mono m (Maybe (Element mono))
peekE =
loop
where
loop = await >>= maybe (return Nothing) go
go x =
case headMay x of
Nothing -> loop
Just y -> do
leftover x
return $ Just y
last :: Monad m => Consumer a m (Maybe a)
last =
await >>= maybe (return Nothing) loop
where
loop prev = await >>= maybe (return $ Just prev) loop
lastE :: (Monad m, Seq.IsSequence seq) => Consumer seq m (Maybe (Element seq))
lastE =
awaitNonNull >>= maybe (return Nothing) (loop . NonNull.last)
where
loop prev = awaitNonNull >>= maybe (return $ Just prev) (loop . NonNull.last)
length :: (Monad m, Num len) => Consumer a m len
length = foldl (\x _ -> x + 1) 0
lengthE :: (Monad m, Num len, MonoFoldable mono) => Consumer mono m len
lengthE = foldl (\x y -> x + fromIntegral (olength y)) 0
lengthIf :: (Monad m, Num len) => (a -> Bool) -> Consumer a m len
lengthIf f = foldl (\cnt a -> if f a then (cnt + 1) else cnt) 0
lengthIfE :: (Monad m, Num len, MonoFoldable mono)
=> (Element mono -> Bool) -> Consumer mono m len
lengthIfE f = foldlE (\cnt a -> if f a then (cnt + 1) else cnt) 0
maximum :: (Monad m, Ord a) => Consumer a m (Maybe a)
maximum =
await >>= maybe (return Nothing) loop
where
loop prev = await >>= maybe (return $ Just prev) (loop . max prev)
maximumE :: (Monad m, Seq.OrdSequence seq) => Consumer seq m (Maybe (Element seq))
maximumE =
start
where
start = await >>= maybe (return Nothing) start'
start' x =
case NonNull.fromNullable x of
Nothing -> start
Just y -> loop $ NonNull.maximum y
loop prev = await >>= maybe (return $ Just prev) (loop . ofoldl' max prev)
minimum :: (Monad m, Ord a) => Consumer a m (Maybe a)
minimum =
await >>= maybe (return Nothing) loop
where
loop prev = await >>= maybe (return $ Just prev) (loop . min prev)
minimumE :: (Monad m, Seq.OrdSequence seq) => Consumer seq m (Maybe (Element seq))
minimumE =
start
where
start = await >>= maybe (return Nothing) start'
start' x =
case NonNull.fromNullable x of
Nothing -> start
Just y -> loop $ NonNull.minimum y
loop prev = await >>= maybe (return $ Just prev) (loop . ofoldl' min prev)
null :: Monad m => Consumer a m Bool
null = (maybe True (\_ -> False)) `fmap` peek
nullE :: (Monad m, MonoFoldable mono)
=> Consumer mono m Bool
nullE =
go
where
go = await >>= maybe (return True) go'
go' x = if onull x then go else leftover x >> return False
sum :: (Monad m, Num a) => Consumer a m a
sum = foldl (+) 0
sumE :: (Monad m, MonoFoldable mono, Num (Element mono)) => Consumer mono m (Element mono)
sumE = foldlE (+) 0
product :: (Monad m, Num a) => Consumer a m a
product = foldl (*) 1
productE :: (Monad m, MonoFoldable mono, Num (Element mono)) => Consumer mono m (Element mono)
productE = foldlE (*) 1
find :: Monad m => (a -> Bool) -> Consumer a m (Maybe a)
find f =
loop
where
loop = await >>= maybe (return Nothing) go
go x = if f x then return (Just x) else loop
mapM_ :: Monad m => (a -> m ()) -> Consumer a m ()
mapM_ = CL.mapM_
mapM_E :: (Monad m, MonoFoldable mono) => (Element mono -> m ()) -> Consumer mono m ()
mapM_E = CL.mapM_ . omapM_
foldM :: Monad m => (a -> b -> m a) -> a -> Consumer b m a
foldM = CL.foldM
foldME :: (Monad m, MonoFoldable mono)
=> (a -> Element mono -> m a)
-> a
-> Consumer mono m a
foldME f = foldM (ofoldlM f)
foldMapM :: (Monad m, Monoid w) => (a -> m w) -> Consumer a m w
foldMapM = CL.foldMapM
foldMapME :: (Monad m, MonoFoldable mono, Monoid w)
=> (Element mono -> m w)
-> Consumer mono m w
foldMapME f =
CL.foldM go mempty
where
go = ofoldlM (\accum e -> mappend accum `liftM` f e)
sinkFile :: (MonadResource m, IOData a) => FilePath -> Consumer a m ()
sinkFile fp = sinkIOHandle (F.openFile fp SIO.WriteMode)
print :: (Show a, MonadIO m) => Consumer a m ()
print = mapM_ (liftIO . Prelude.print)
stdout :: (MonadIO m, IOData a) => Consumer a m ()
stdout = sinkHandle SIO.stdout
stderr :: (MonadIO m, IOData a) => Consumer a m ()
stderr = sinkHandle SIO.stderr
sinkHandle :: (MonadIO m, IOData a) => Handle -> Consumer a m ()
sinkHandle = CL.mapM_ . hPut
sinkIOHandle :: (MonadResource m, IOData a) => SIO.IO Handle -> Consumer a m ()
sinkIOHandle alloc = bracketP alloc SIO.hClose sinkHandle
map :: Monad m => (a -> b) -> Conduit a m b
map = CL.map
mapE :: (Monad m, Functor f) => (a -> b) -> Conduit (f a) m (f b)
mapE = CL.map . fmap
omapE :: (Monad m, MonoFunctor mono) => (Element mono -> Element mono) -> Conduit mono m mono
omapE = CL.map . omap
concatMap :: (Monad m, MonoFoldable mono)
=> (a -> mono)
-> Conduit a m (Element mono)
concatMap f = awaitForever (yieldMany . f)
concatMapE :: (Monad m, MonoFoldable mono, Monoid w)
=> (Element mono -> w)
-> Conduit mono m w
concatMapE = CL.map . ofoldMap
take :: Monad m => Int -> Conduit a m a
take =
loop
where
loop count = if count <= 0
then return ()
else await >>= maybe (return ()) (\i -> yield i >> loop (count 1))
takeE :: (Monad m, Seq.IsSequence seq)
=> Seq.Index seq
-> Conduit seq m seq
takeE =
loop
where
loop i = if i <= 0
then return ()
else await >>= maybe (return ()) (go i)
go i seq = do
unless (onull x) $ yield x
unless (onull y) $ leftover y
loop i'
where
(x, y) = Seq.splitAt i seq
i' = i fromIntegral (olength x)
takeWhile :: Monad m
=> (a -> Bool)
-> Conduit a m a
takeWhile f =
loop
where
loop = await >>= maybe (return ()) go
go x = if f x
then yield x >> loop
else leftover x
takeWhileE :: (Monad m, Seq.IsSequence seq)
=> (Element seq -> Bool)
-> Conduit seq m seq
takeWhileE f =
loop
where
loop = await >>= maybe (return ()) go
go seq = do
unless (onull x) $ yield x
if onull y
then loop
else leftover y
where
(x, y) = Seq.span f seq
takeExactly :: Monad m
=> Int
-> ConduitM a b m r
-> ConduitM a b m r
takeExactly count inner = take count =$= do
r <- inner
CL.sinkNull
return r
takeExactlyE :: (Monad m, Seq.IsSequence a)
=> Seq.Index a
-> ConduitM a b m r
-> ConduitM a b m r
takeExactlyE count inner = takeE count =$= do
r <- inner
CL.sinkNull
return r
concat :: (Monad m, MonoFoldable mono)
=> Conduit mono m (Element mono)
concat = awaitForever yieldMany
filter :: Monad m => (a -> Bool) -> Conduit a m a
filter = CL.filter
filterE :: (Seq.IsSequence seq, Monad m) => (Element seq -> Bool) -> Conduit seq m seq
filterE = CL.map . Seq.filter
mapWhile :: Monad m => (a -> Maybe b) -> Conduit a m b
mapWhile f =
loop
where
loop = await >>= maybe (return ()) go
go x =
case f x of
Just y -> yield y >> loop
Nothing -> leftover x
conduitVector :: (MonadBase base m, V.Vector v a, PrimMonad base)
=> Int
-> Conduit a m (v a)
conduitVector size =
loop
where
loop = do
v <- sinkVectorN size
unless (V.null v) $ do
yield v
loop
scanl :: Monad m => (a -> b -> a) -> a -> Conduit b m a
scanl f =
loop
where
loop seed =
await >>= maybe (yield seed) go
where
go b = do
let seed' = f seed b
seed' `seq` yield seed
loop seed'
concatMapAccum :: Monad m => (a -> accum -> (accum, [b])) -> accum -> Conduit a m b
concatMapAccum = CL.concatMapAccum
intersperse :: Monad m => a -> Conduit a m a
intersperse x =
await >>= omapM_ go
where
go y = yield y >> concatMap (\z -> [x, z])
slidingWindow :: (Monad m, Seq.IsSequence seq, Element seq ~ a) => Int -> Conduit a m seq
slidingWindow sz = go (if sz <= 0 then 1 else sz) mempty
where goContinue st = await >>=
maybe (return ())
(\x -> do
let st' = Seq.snoc st x
yield st' >> goContinue (Seq.unsafeTail st')
)
go 0 st = yield st >> goContinue (Seq.unsafeTail st)
go !n st = CL.head >>= \m ->
case m of
Nothing -> yield st
Just x -> go (n1) (Seq.snoc st x)
codeWith :: Monad m
=> Int
-> (ByteString -> Either e ByteString)
-> Conduit ByteString m ByteString
codeWith size f =
loop
where
loop = await >>= maybe (return ()) push
loopWith bs
| S.null bs = loop
| otherwise = await >>= maybe (finish bs) (pushWith bs)
finish bs =
case f bs of
Left _ -> leftover bs
Right x -> yield x
push bs = do
let (x, y) = S.splitAt (len (len `mod` size)) bs
if S.null x
then loopWith y
else do
case f x of
Left _ -> leftover bs
Right x' -> yield x' >> loopWith y
where
len = olength bs
pushWith bs1 bs2 | S.length bs1 + S.length bs2 < size = loopWith (S.append bs1 bs2)
pushWith bs1 bs2 = assertion1 $ assertion2 $ assertion3 $
case f bs1' of
Left _ -> leftover bs2 >> leftover bs1
Right toYield -> yield toYield >> push y
where
m = S.length bs1 `mod` size
(x, y) = S.splitAt (size m) bs2
bs1' = mappend bs1 x
assertion1 = assert $ olength bs1 < size
assertion2 = assert $ olength bs1' `mod` size == 0
assertion3 = assert $ olength bs1' > 0
encodeBase64 :: Monad m => Conduit ByteString m ByteString
encodeBase64 = codeWith 3 (Right . B64.encode)
decodeBase64 :: Monad m => Conduit ByteString m ByteString
decodeBase64 = codeWith 4 B64.decode
encodeBase64URL :: Monad m => Conduit ByteString m ByteString
encodeBase64URL = codeWith 3 (Right . B64U.encode)
decodeBase64URL :: Monad m => Conduit ByteString m ByteString
decodeBase64URL = codeWith 4 B64U.decode
encodeBase16 :: Monad m => Conduit ByteString m ByteString
encodeBase16 = map B16.encode
decodeBase16 :: Monad m => Conduit ByteString m ByteString
decodeBase16 =
codeWith 2 decode'
where
decode' x
| onull z = Right y
| otherwise = Left ()
where
(y, z) = B16.decode x
mapM :: Monad m => (a -> m b) -> Conduit a m b
mapM = CL.mapM
mapME :: (Monad m, Data.Traversable.Traversable f) => (a -> m b) -> Conduit (f a) m (f b)
mapME = CL.mapM . Data.Traversable.mapM
omapME :: (Monad m, MonoTraversable mono)
=> (Element mono -> m (Element mono))
-> Conduit mono m mono
omapME = CL.mapM . omapM
concatMapM :: (Monad m, MonoFoldable mono)
=> (a -> m mono)
-> Conduit a m (Element mono)
concatMapM f = awaitForever (lift . f >=> yieldMany)
filterM :: Monad m
=> (a -> m Bool)
-> Conduit a m a
filterM f =
awaitForever go
where
go x = do
b <- lift $ f x
when b $ yield x
filterME :: (Monad m, Seq.IsSequence seq) => (Element seq -> m Bool) -> Conduit seq m seq
filterME = CL.mapM . Seq.filterM
iterM :: Monad m => (a -> m ()) -> Conduit a m a
iterM = CL.iterM
scanlM :: Monad m => (a -> b -> m a) -> a -> Conduit b m a
scanlM f =
loop
where
loop seed =
await >>= maybe (yield seed) go
where
go b = do
seed' <- lift $ f seed b
seed' `seq` yield seed
loop seed'
concatMapAccumM :: Monad m => (a -> accum -> m (accum, [b])) -> accum -> Conduit a m b
concatMapAccumM = CL.concatMapAccumM
encodeUtf8 :: (Monad m, DTE.Utf8 text binary) => Conduit text m binary
encodeUtf8 = map DTE.encodeUtf8
decodeUtf8 :: MonadThrow m => Conduit ByteString m Text
decodeUtf8 = CT.decode CT.utf8
decodeUtf8Lenient :: MonadThrow m => Conduit ByteString m Text
decodeUtf8Lenient = CT.decodeUtf8Lenient
line :: (Monad m, Seq.IsSequence seq, Element seq ~ Char)
=> ConduitM seq o m r
-> ConduitM seq o m r
line inner = do
loop =$= do
x <- inner
sinkNull
return x
where
loop = await >>= omapM_ go
go t =
if onull y
then yield x >> loop
else do
unless (onull x) $ yield x
let y' = Seq.drop 1 y
unless (onull y') $ leftover y'
where
(x, y) = Seq.break (== '\n') t
lineAscii :: (Monad m, Seq.IsSequence seq, Element seq ~ Word8)
=> ConduitM seq o m r
-> ConduitM seq o m r
lineAscii inner =
loop =$= do
x <- inner
sinkNull
return x
where
loop = await >>= omapM_ go
go t =
if onull y
then yield x >> loop
else do
unless (onull x) $ yield x
let y' = Seq.drop 1 y
unless (onull y') $ leftover y'
where
(x, y) = Seq.break (== 10) t
unlines :: (Monad m, Seq.IsSequence seq, Element seq ~ Char) => Conduit seq m seq
unlines = concatMap (:[Seq.singleton '\n'])
unlinesAscii :: (Monad m, Seq.IsSequence seq, Element seq ~ Word8) => Conduit seq m seq
unlinesAscii = concatMap (:[Seq.singleton 10])
linesUnbounded :: (Monad m, Seq.IsSequence seq, Element seq ~ Char)
=> Conduit seq m seq
linesUnbounded =
start
where
start = await >>= maybe (return ()) loop
loop t =
if onull y
then do
mt <- await
case mt of
Nothing -> unless (onull t) $ yield t
Just t' -> loop (t `mappend` t')
else yield x >> loop (Seq.drop 1 y)
where
(x, y) = Seq.break (== '\n') t
linesUnboundedAscii :: (Monad m, Seq.IsSequence seq, Element seq ~ Word8)
=> Conduit seq m seq
linesUnboundedAscii =
start
where
start = await >>= maybe (return ()) loop
loop t =
if onull y
then do
mt <- await
case mt of
Nothing -> unless (onull t) $ yield t
Just t' -> loop (t `mappend` t')
else yield x >> loop (Seq.drop 1 y)
where
(x, y) = Seq.break (== 10) t
vectorBuilder :: (PrimMonad base, MonadBase base m, V.Vector v e, MonadBase base n)
=> Int
-> ((e -> n ()) -> Sink i m r)
-> ConduitM i (v e) m r
vectorBuilder size inner = do
ref <- liftBase $ do
mv <- VM.new size
newMutVar $! S 0 mv id
res <- onAwait (yieldS ref) (inner (liftBase . addE ref))
vs <- liftBase $ do
S idx mv front <- readMutVar ref
end <-
if idx == 0
then return []
else do
v <- V.unsafeFreeze mv
return [V.unsafeTake idx v]
return $ front end
Prelude.mapM_ yield vs
return res
data S s v e = S
!Int
!(V.Mutable v s e)
([v e] -> [v e])
onAwait :: Monad m
=> ConduitM i o m ()
-> Sink i m r
-> ConduitM i o m r
onAwait (ConduitM callback) =
ConduitM . go . unConduitM
where
go (Done r) = Done r
go (HaveOutput _ _ o) = absurd o
go (NeedInput f g) = callback >> NeedInput (go . f) (go . g)
go (PipeM mp) = PipeM (liftM go mp)
go (Leftover f i) = Leftover (go f) i
yieldS :: (PrimMonad base, MonadBase base m)
=> MutVar (PrimState base) (S (PrimState base) v e)
-> Producer m (v e)
yieldS ref = do
S idx mv front <- liftBase $ readMutVar ref
Prelude.mapM_ yield (front [])
liftBase $ writeMutVar ref $! S idx mv id
addE :: (PrimMonad m, V.Vector v e)
=> MutVar (PrimState m) (S (PrimState m) v e)
-> e
-> m ()
addE ref e = do
S idx mv front <- readMutVar ref
VM.write mv idx e
let idx' = succ idx
size = VM.length mv
if idx' >= size
then do
v <- V.unsafeFreeze mv
let front' = front . (v:)
mv' <- VM.new size
writeMutVar ref $! S 0 mv' front'
else writeMutVar ref $! S idx' mv front