module Data.Conduit.Combinators
(
yieldMany
, unfold
, enumFromTo
, iterate
, repeat
, replicate
, sourceLazy
, repeatM
, repeatWhileM
, replicateM
, sourceFile
, sourceHandle
, sourceIOHandle
, stdin
, drop
, dropE
, dropWhile
, dropWhileE
, fold
, foldE
, foldl
, foldlE
, foldMap
, foldMapE
, all
, allE
, any
, anyE
, and
, andE
, or
, orE
, elem
, elemE
, notElem
, notElemE
, sinkLazy
, sinkList
, sinkVector
, sinkBuilder
, sinkLazyBuilder
, sinkNull
, awaitNonNull
, headE
, peek
, peekE
, last
, lastE
, length
, lengthE
, maximum
, maximumE
, minimum
, minimumE
, null
, nullE
, sum
, sumE
, product
, productE
, find
, mapM_
, mapM_E
, foldM
, foldME
, foldMapM
, foldMapME
, sinkFile
, sinkHandle
, sinkIOHandle
, print
, stdout
, stderr
, map
, mapE
, omapE
, concatMap
, concatMapE
, take
, takeE
, takeWhile
, takeWhileE
, takeExactly
, takeExactlyE
, concat
, filter
, filterE
, mapWhile
, conduitVector
, scanl
, concatMapAccum
, intersperse
, mapM
, mapME
, omapME
, concatMapM
, filterM
, filterME
, iterM
, scanlM
, concatMapAccumM
, encodeUtf8
, decodeUtf8
, line
, lineAscii
, unlines
, unlinesAscii
, linesUnbounded
, linesUnboundedAscii
) where
import Data.Builder
import qualified Data.NonNull as NonNull
import qualified Data.Traversable
import Control.Applicative ((<$>))
import Control.Category (Category (..))
import Control.Monad (unless, when, (>=>), liftM, forever)
import Control.Monad.Base (MonadBase (liftBase))
import Control.Monad.IO.Class (MonadIO (..))
import Control.Monad.Primitive (PrimMonad)
import Control.Monad.Trans.Class (lift)
import Data.Conduit
import qualified Data.Conduit.List as CL
import Data.IOData
import Data.Monoid (Monoid (..))
import Data.MonoTraversable
import qualified Data.Sequences as Seq
import Data.Sequences.Lazy
import qualified Data.Vector.Generic as V
import qualified Data.Vector.Generic.Mutable as VM
import qualified Filesystem as F
import Filesystem.Path (FilePath)
import Prelude (Bool (..), Eq (..), Int,
Maybe (..), Monad (..), Num (..),
Ord (..), fromIntegral, maybe,
($), Functor (..), Enum, seq, Show, Char)
import Data.Word (Word8)
import qualified Prelude
import System.IO (Handle)
import qualified System.IO as SIO
import qualified Data.Textual.Encoding as DTE
import qualified Data.Conduit.Text as CT
import Data.ByteString (ByteString)
import Data.Text (Text)
yieldMany :: (Monad m, MonoFoldable mono)
=> mono
-> Producer m (Element mono)
yieldMany = ofoldMap yield
unfold :: Monad m
=> (b -> Maybe (a, b))
-> b
-> Producer m a
unfold = CL.unfold
enumFromTo :: (Monad m, Enum a, Eq a) => a -> a -> Producer m a
enumFromTo = CL.enumFromTo
iterate :: Monad m => (a -> a) -> a -> Producer m a
iterate = CL.iterate
repeat :: Monad m => a -> Producer m a
repeat = iterate id
replicate :: Monad m
=> Int
-> a
-> Producer m a
replicate count0 a =
loop count0
where
loop count = if count <= 0
then return ()
else yield a >> loop (count 1)
sourceLazy :: (Monad m, LazySequence lazy strict)
=> lazy
-> Producer m strict
sourceLazy = yieldMany . toChunks
repeatM :: Monad m
=> m a
-> Producer m a
repeatM m = forever $ lift m >>= yield
repeatWhileM :: Monad m
=> m a
-> (a -> Bool)
-> Producer m a
repeatWhileM m f =
loop
where
loop = do
x <- lift m
when (f x) $ yield x >> loop
replicateM :: Monad m
=> Int
-> m a
-> Producer m a
replicateM count0 m =
loop count0
where
loop count = if count <= 0
then return ()
else lift m >>= yield >> loop (count 1)
sourceFile :: (MonadResource m, IOData a) => FilePath -> Producer m a
sourceFile fp = sourceIOHandle (F.openFile fp SIO.ReadMode)
sourceHandle :: (MonadIO m, IOData a) => Handle -> Producer m a
sourceHandle h =
loop
where
loop = do
x <- liftIO (hGetChunk h)
if onull x
then return ()
else yield x >> loop
sourceIOHandle :: (MonadResource m, IOData a) => SIO.IO Handle -> Producer m a
sourceIOHandle alloc = bracketP alloc SIO.hClose sourceHandle
stdin :: (MonadIO m, IOData a) => Producer m a
stdin = sourceHandle SIO.stdin
drop :: Monad m
=> Int
-> Consumer a m ()
drop =
loop
where
loop i | i <= 0 = return ()
loop count = await >>= maybe (return ()) (\_ -> loop (count 1))
dropE :: (Monad m, Seq.IsSequence seq)
=> Seq.Index seq
-> Consumer seq m ()
dropE =
loop
where
loop i = if i <= 0
then return ()
else await >>= maybe (return ()) (go i)
go i seq = do
unless (onull y) $ leftover y
loop i'
where
(x, y) = Seq.splitAt i seq
i' = i fromIntegral (olength x)
dropWhile :: Monad m
=> (a -> Bool)
-> Consumer a m ()
dropWhile f =
loop
where
loop = await >>= maybe (return ()) go
go x = if f x then loop else leftover x
dropWhileE :: (Monad m, Seq.IsSequence seq)
=> (Element seq -> Bool)
-> Consumer seq m ()
dropWhileE f =
loop
where
loop = await >>= maybe (return ()) go
go seq =
if onull x then loop else leftover x
where
x = Seq.dropWhile f seq
fold :: (Monad m, Monoid a)
=> Consumer a m a
fold = CL.foldMap id
foldE :: (Monad m, MonoFoldable mono, Monoid (Element mono))
=> Consumer mono m (Element mono)
foldE = CL.fold (\accum mono -> accum `mappend` ofoldMap id mono) mempty
foldl :: Monad m => (a -> b -> a) -> a -> Consumer b m a
foldl = CL.fold
foldlE :: (Monad m, MonoFoldable mono)
=> (a -> Element mono -> a)
-> a
-> Consumer mono m a
foldlE f = CL.fold (ofoldl' f)
foldMap :: (Monad m, Monoid b)
=> (a -> b)
-> Consumer a m b
foldMap = CL.foldMap
foldMapE :: (Monad m, MonoFoldable mono, Monoid w)
=> (Element mono -> w)
-> Consumer mono m w
foldMapE = CL.foldMap . ofoldMap
all :: Monad m
=> (a -> Bool)
-> Consumer a m Bool
all f =
loop
where
loop = await >>= maybe (return True) go
go x = if f x then loop else return False
allE :: (Monad m, MonoFoldable mono)
=> (Element mono -> Bool)
-> Consumer mono m Bool
allE = all . oall
any :: Monad m
=> (a -> Bool)
-> Consumer a m Bool
any f =
loop
where
loop = await >>= maybe (return False) go
go x = if f x then return True else loop
anyE :: (Monad m, MonoFoldable mono)
=> (Element mono -> Bool)
-> Consumer mono m Bool
anyE = any . oany
and :: Monad m => Consumer Bool m Bool
and = all id
andE :: (Monad m, MonoFoldable mono, Element mono ~ Bool)
=> Consumer mono m Bool
andE = allE id
or :: Monad m => Consumer Bool m Bool
or = any id
orE :: (Monad m, MonoFoldable mono, Element mono ~ Bool)
=> Consumer mono m Bool
orE = anyE id
elem :: (Monad m, Eq a) => a -> Consumer a m Bool
elem x = any (== x)
elemE :: (Monad m, Seq.EqSequence seq)
=> Element seq
-> Consumer seq m Bool
elemE = any . Seq.elem
notElem :: (Monad m, Eq a) => a -> Consumer a m Bool
notElem x = all (/= x)
notElemE :: (Monad m, Seq.EqSequence seq)
=> Element seq
-> Consumer seq m Bool
notElemE = all . Seq.notElem
sinkLazy :: (Monad m, LazySequence lazy strict)
=> Consumer strict m lazy
sinkLazy = (fromChunks . ($ [])) <$> CL.fold (\front next -> front . (next:)) id
sinkList :: Monad m => Consumer a m [a]
sinkList = CL.consume
sinkVector :: (MonadBase base m, V.Vector v a, PrimMonad base)
=> Int
-> Consumer a m (v a)
sinkVector maxSize = do
mv <- liftBase $ VM.new maxSize
let go i | i >= maxSize = liftBase $ V.unsafeFreeze mv
go i = do
mx <- await
case mx of
Nothing -> V.slice 0 i <$> liftBase (V.unsafeFreeze mv)
Just x -> do
liftBase $ VM.write mv i x
go (i + 1)
go 0
sinkBuilder :: (Monad m, Monoid builder, ToBuilder a builder)
=> Consumer a m builder
sinkBuilder = foldMap toBuilder
sinkLazyBuilder :: (Monad m, Monoid builder, ToBuilder a builder, Builder builder lazy)
=> Consumer a m lazy
sinkLazyBuilder = fmap builderToLazy sinkBuilder
sinkNull :: Monad m => Consumer a m ()
sinkNull = CL.sinkNull
awaitNonNull :: (Monad m, NonNull.NonNull b, a ~ NonNull.Nullable b) => Consumer a m (Maybe b)
awaitNonNull =
go
where
go = await >>= maybe (return Nothing) go'
go' = maybe go (return . Just) . NonNull.fromNullable
headE :: (Monad m, Seq.IsSequence seq) => Consumer seq m (Maybe (Element seq))
headE =
loop
where
loop = await >>= maybe (return Nothing) go
go x =
case Seq.uncons x of
Nothing -> loop
Just (y, z) -> do
unless (onull z) $ leftover z
return $ Just y
peek :: Monad m => Consumer a m (Maybe a)
peek = CL.peek
peekE :: (Monad m, MonoFoldable mono) => Consumer mono m (Maybe (Element mono))
peekE =
loop
where
loop = await >>= maybe (return Nothing) go
go x =
case headMay x of
Nothing -> loop
Just y -> do
leftover x
return $ Just y
last :: Monad m => Consumer a m (Maybe a)
last =
await >>= maybe (return Nothing) loop
where
loop prev = await >>= maybe (return $ Just prev) loop
lastE :: (Monad m, Seq.IsSequence seq) => Consumer seq m (Maybe (Element seq))
lastE =
awaitNonNull >>= maybe (return Nothing) (loop . NonNull.last . NonNull.asNotEmpty)
where
loop prev = awaitNonNull >>= maybe (return $ Just prev) (loop . NonNull.last . NonNull.asNotEmpty)
length :: (Monad m, Num len) => Consumer a m len
length = foldl (\x _ -> x + 1) 0
lengthE :: (Monad m, Num len, MonoFoldable mono) => Consumer mono m len
lengthE = foldl (\x y -> x + fromIntegral (olength y)) 0
maximum :: (Monad m, Ord a) => Consumer a m (Maybe a)
maximum =
await >>= maybe (return Nothing) loop
where
loop prev = await >>= maybe (return $ Just prev) (loop . max prev)
maximumE :: (Monad m, Seq.OrdSequence seq) => Consumer seq m (Maybe (Element seq))
maximumE =
start
where
start = await >>= maybe (return Nothing) start'
start' x =
case NonNull.fromNullable x of
Nothing -> start
Just y -> loop $ NonNull.maximum $ NonNull.asNotEmpty y
loop prev = await >>= maybe (return $ Just prev) (loop . ofoldl' max prev)
minimum :: (Monad m, Ord a) => Consumer a m (Maybe a)
minimum =
await >>= maybe (return Nothing) loop
where
loop prev = await >>= maybe (return $ Just prev) (loop . min prev)
minimumE :: (Monad m, Seq.OrdSequence seq) => Consumer seq m (Maybe (Element seq))
minimumE =
start
where
start = await >>= maybe (return Nothing) start'
start' x =
case NonNull.fromNullable x of
Nothing -> start
Just y -> loop $ NonNull.minimum $ NonNull.asNotEmpty y
loop prev = await >>= maybe (return $ Just prev) (loop . ofoldl' min prev)
null :: Monad m => Consumer a m Bool
null = (maybe True (\_ -> False)) `fmap` peek
nullE :: (Monad m, MonoFoldable mono)
=> Consumer mono m Bool
nullE =
go
where
go = await >>= maybe (return True) go'
go' x = if onull x then go else leftover x >> return False
sum :: (Monad m, Num a) => Consumer a m a
sum = foldl (+) 0
sumE :: (Monad m, MonoFoldable mono, Num (Element mono)) => Consumer mono m (Element mono)
sumE = foldlE (+) 0
product :: (Monad m, Num a) => Consumer a m a
product = foldl (*) 1
productE :: (Monad m, MonoFoldable mono, Num (Element mono)) => Consumer mono m (Element mono)
productE = foldlE (*) 1
find :: Monad m => (a -> Bool) -> Consumer a m (Maybe a)
find f =
loop
where
loop = await >>= maybe (return Nothing) go
go x = if f x then return (Just x) else loop
mapM_ :: Monad m => (a -> m ()) -> Consumer a m ()
mapM_ = CL.mapM_
mapM_E :: (Monad m, MonoFoldable mono) => (Element mono -> m ()) -> Consumer mono m ()
mapM_E = CL.mapM_ . omapM_
foldM :: Monad m => (a -> b -> m a) -> a -> Consumer b m a
foldM = CL.foldM
foldME :: (Monad m, MonoFoldable mono)
=> (a -> Element mono -> m a)
-> a
-> Consumer mono m a
foldME f = foldM (ofoldlM f)
foldMapM :: (Monad m, Monoid w) => (a -> m w) -> Consumer a m w
foldMapM = CL.foldMapM
foldMapME :: (Monad m, MonoFoldable mono, Monoid w)
=> (Element mono -> m w)
-> Consumer mono m w
foldMapME f =
CL.foldM go mempty
where
go = ofoldlM (\accum e -> mappend accum `liftM` f e)
sinkFile :: (MonadResource m, IOData a) => FilePath -> Consumer a m ()
sinkFile fp = sinkIOHandle (F.openFile fp SIO.WriteMode)
print :: (Show a, MonadIO m) => Consumer a m ()
print = mapM_ (liftIO . Prelude.print)
stdout :: (MonadIO m, IOData a) => Consumer a m ()
stdout = sinkHandle SIO.stdout
stderr :: (MonadIO m, IOData a) => Consumer a m ()
stderr = sinkHandle SIO.stderr
sinkHandle :: (MonadIO m, IOData a) => Handle -> Consumer a m ()
sinkHandle = CL.mapM_ . hPut
sinkIOHandle :: (MonadResource m, IOData a) => SIO.IO Handle -> Consumer a m ()
sinkIOHandle alloc = bracketP alloc SIO.hClose sinkHandle
map :: Monad m => (a -> b) -> Conduit a m b
map = CL.map
mapE :: (Monad m, Functor f) => (a -> b) -> Conduit (f a) m (f b)
mapE = CL.map . fmap
omapE :: (Monad m, MonoFunctor mono) => (Element mono -> Element mono) -> Conduit mono m mono
omapE = CL.map . omap
concatMap :: (Monad m, MonoFoldable mono)
=> (a -> mono)
-> Conduit a m (Element mono)
concatMap f = awaitForever (yieldMany . f)
concatMapE :: (Monad m, MonoFoldable mono, Monoid w)
=> (Element mono -> w)
-> Conduit mono m w
concatMapE = CL.map . ofoldMap
take :: Monad m => Int -> Conduit a m a
take =
loop
where
loop count = if count <= 0
then return ()
else await >>= maybe (return ()) (\i -> yield i >> loop (count 1))
takeE :: (Monad m, Seq.IsSequence seq)
=> Seq.Index seq
-> Conduit seq m seq
takeE =
loop
where
loop i = if i <= 0
then return ()
else await >>= maybe (return ()) (go i)
go i seq = do
unless (onull x) $ yield x
unless (onull y) $ leftover y
loop i'
where
(x, y) = Seq.splitAt i seq
i' = i fromIntegral (olength x)
takeWhile :: Monad m
=> (a -> Bool)
-> Conduit a m a
takeWhile f =
loop
where
loop = await >>= maybe (return ()) go
go x = if f x
then yield x >> loop
else leftover x
takeWhileE :: (Monad m, Seq.IsSequence seq)
=> (Element seq -> Bool)
-> Conduit seq m seq
takeWhileE f =
loop
where
loop = await >>= maybe (return ()) go
go seq = do
unless (onull x) $ yield x
if onull y
then loop
else leftover y
where
(x, y) = Seq.span f seq
takeExactly :: Monad m
=> Int
-> ConduitM a b m r
-> ConduitM a b m r
takeExactly count inner = take count =$= do
r <- inner
CL.sinkNull
return r
takeExactlyE :: (Monad m, Seq.IsSequence a)
=> Seq.Index a
-> ConduitM a b m r
-> ConduitM a b m r
takeExactlyE count inner = takeE count =$= do
r <- inner
CL.sinkNull
return r
concat :: (Monad m, MonoFoldable mono)
=> Conduit mono m (Element mono)
concat = awaitForever yieldMany
filter :: Monad m => (a -> Bool) -> Conduit a m a
filter = CL.filter
filterE :: (Seq.IsSequence seq, Monad m) => (Element seq -> Bool) -> Conduit seq m seq
filterE = CL.map . Seq.filter
mapWhile :: Monad m => (a -> Maybe b) -> Conduit a m b
mapWhile f =
loop
where
loop = await >>= maybe (return ()) go
go x =
case f x of
Just y -> yield y >> loop
Nothing -> leftover x
conduitVector :: (MonadBase base m, V.Vector v a, PrimMonad base)
=> Int
-> Conduit a m (v a)
conduitVector size =
loop
where
loop = do
v <- sinkVector size
unless (V.null v) $ do
yield v
loop
scanl :: Monad m => (a -> b -> a) -> a -> Conduit b m a
scanl f =
loop
where
loop seed =
await >>= maybe (yield seed) go
where
go b = do
let seed' = f seed b
seed' `seq` yield seed
loop seed'
concatMapAccum :: Monad m => (a -> accum -> (accum, [b])) -> accum -> Conduit a m b
concatMapAccum = CL.concatMapAccum
intersperse :: Monad m => a -> Conduit a m a
intersperse x =
await >>= omapM_ go
where
go y = yield y >> concatMap (\z -> [x, z])
mapM :: Monad m => (a -> m b) -> Conduit a m b
mapM = CL.mapM
mapME :: (Monad m, Data.Traversable.Traversable f) => (a -> m b) -> Conduit (f a) m (f b)
mapME = CL.mapM . Data.Traversable.mapM
omapME :: (Monad m, MonoTraversable mono)
=> (Element mono -> m (Element mono))
-> Conduit mono m mono
omapME = CL.mapM . omapM
concatMapM :: (Monad m, MonoFoldable mono)
=> (a -> m mono)
-> Conduit a m (Element mono)
concatMapM f = awaitForever (lift . f >=> yieldMany)
filterM :: Monad m
=> (a -> m Bool)
-> Conduit a m a
filterM f =
awaitForever go
where
go x = do
b <- lift $ f x
when b $ yield x
filterME :: (Monad m, Seq.IsSequence seq) => (Element seq -> m Bool) -> Conduit seq m seq
filterME = CL.mapM . Seq.filterM
iterM :: Monad m => (a -> m ()) -> Conduit a m a
iterM = CL.iterM
scanlM :: Monad m => (a -> b -> m a) -> a -> Conduit b m a
scanlM f =
loop
where
loop seed =
await >>= maybe (yield seed) go
where
go b = do
seed' <- lift $ f seed b
seed' `seq` yield seed
loop seed'
concatMapAccumM :: Monad m => (a -> accum -> m (accum, [b])) -> accum -> Conduit a m b
concatMapAccumM = CL.concatMapAccumM
encodeUtf8 :: (Monad m, DTE.Utf8 text binary) => Conduit text m binary
encodeUtf8 = map DTE.encodeUtf8
decodeUtf8 :: MonadThrow m => Conduit ByteString m Text
decodeUtf8 = CT.decode CT.utf8
line :: (Monad m, Seq.IsSequence seq, Element seq ~ Char)
=> ConduitM seq o m r
-> ConduitM seq o m r
line inner = do
loop =$= do
x <- inner
sinkNull
return x
where
loop = await >>= omapM_ go
go t =
if onull y
then yield x >> loop
else do
unless (onull x) $ yield x
let y' = Seq.drop 1 y
unless (onull y') $ leftover y'
where
(x, y) = Seq.break (== '\n') t
lineAscii :: (Monad m, Seq.IsSequence seq, Element seq ~ Word8)
=> ConduitM seq o m r
-> ConduitM seq o m r
lineAscii inner =
loop =$= do
x <- inner
sinkNull
return x
where
loop = await >>= omapM_ go
go t =
if onull y
then yield x >> loop
else do
unless (onull x) $ yield x
let y' = Seq.drop 1 y
unless (onull y') $ leftover y'
where
(x, y) = Seq.break (== 10) t
unlines :: (Monad m, Seq.IsSequence seq, Element seq ~ Char) => Conduit seq m seq
unlines = concatMap (:[Seq.singleton '\n'])
unlinesAscii :: (Monad m, Seq.IsSequence seq, Element seq ~ Word8) => Conduit seq m seq
unlinesAscii = concatMap (:[Seq.singleton 10])
linesUnbounded :: (Monad m, Seq.IsSequence seq, Element seq ~ Char)
=> Conduit seq m seq
linesUnbounded =
start
where
start = await >>= maybe (return ()) loop
loop t =
if onull y
then do
mt <- await
case mt of
Nothing -> unless (onull t) $ yield t
Just t' -> loop (t `mappend` t')
else yield x >> loop (Seq.drop 1 y)
where
(x, y) = Seq.break (== '\n') t
linesUnboundedAscii :: (Monad m, Seq.IsSequence seq, Element seq ~ Word8)
=> Conduit seq m seq
linesUnboundedAscii =
start
where
start = await >>= maybe (return ()) loop
loop t =
if onull y
then do
mt <- await
case mt of
Nothing -> unless (onull t) $ yield t
Just t' -> loop (t `mappend` t')
else yield x >> loop (Seq.drop 1 y)
where
(x, y) = Seq.break (== 10) t