module Conduit.Simple
( Source(..), Conduit, Sink
, runSource, lowerSource, source, conduit, conduitWith, sink
, returnC, close, skip, awaitForever
, yieldMany, sourceList
, unfoldC
, enumFromToC
, iterateC
, repeatC
, replicateC
, sourceLazy
, repeatMC
, repeatWhileMC
, replicateMC
, sourceHandle
, sourceFile
, sourceIOHandle
, stdinC
, initRepeat
, initReplicate
, sourceRandom
, sourceRandomN
, sourceRandomGen
, sourceRandomNGen
, sourceDirectory
, sourceDirectoryDeep
, dropC
, dropCE
, dropWhileC
, dropWhileCE
, foldC
, foldCE
, foldlC
, foldlCE
, foldMapC
, foldMapCE
, allC
, allCE
, anyC
, anyCE
, andC
, andCE
, orC
, orCE
, elemC
, elemCE
, notElemC
, notElemCE
, sinkLazy
, sinkList
, sinkVector
, sinkBuilder
, sinkLazyBuilder
, sinkNull
, awaitNonNull
, headCE
, lastC
, lastCE
, lengthC
, lengthCE
, lengthIfC
, lengthIfCE
, maximumC
, maximumCE
, minimumC
, minimumCE
, sumC
, sumCE
, productC
, productCE
, findC
, mapM_C
, mapM_CE
, foldMC
, foldMCE
, foldMapMC
, foldMapMCE
, sinkFile
, sinkHandle
, sinkIOHandle
, printC
, stdoutC
, stderrC
, mapC
, mapCE
, omapCE
, concatMapC
, concatMapCE
, takeC
, takeCE
, takeWhileC
, takeWhileCE
, takeExactlyC
, takeExactlyCE
, concatC
, filterC
, filterCE
, mapWhileC
, conduitVector
, scanlC
, concatMapAccumC
, intersperseC
, encodeBase64C
, decodeBase64C
, encodeBase64URLC
, decodeBase64URLC
, encodeBase16C
, decodeBase16C
, mapMC
, mapMCE
, omapMCE
, concatMapMC
, filterMC
, filterMCE
, iterMC
, scanlMC
, concatMapAccumMC
, encodeUtf8C
, decodeUtf8C
, lineC
, lineAsciiC
, unlinesC
, unlinesAsciiC
, linesUnboundedC_
, linesUnboundedC, linesC
, linesUnboundedAsciiC, linesAsciiC
, sourceMaybeMVar
, sourceMaybeTMVar
, asyncC
, sourceTChan
, sourceTQueue
, sourceTBQueue
, untilMC
, whileMC
, zipSinks
, ($=), (=$), (=$=), ($$)
, sequenceSources
) where
import Conduit.Simple.Compat
import Conduit.Simple.Core
import Control.Applicative ((<$>))
import Control.Concurrent.Async.Lifted
import Control.Concurrent.Lifted hiding (yield)
import Control.Concurrent.STM
import Control.Exception.Lifted (bracket)
import Control.Monad.Base (MonadBase(..))
import Control.Monad.Catch (MonadThrow)
import Control.Monad.Cont
import Control.Monad.Primitive
import Control.Monad.Trans.Control
import Control.Monad.Trans.Either (EitherT(..), left)
import Data.Builder (Builder(builderToLazy), ToBuilder(..))
import Data.ByteString (ByteString)
import Data.IOData (IOData(hGetChunk, hPut))
import Data.List (unfoldr)
import Data.MonoTraversable
import Data.NonNull as NonNull (NonNull, fromNullable)
import Data.Semigroup (Any(..), All(..), Monoid(..), Semigroup((<>)))
import Data.Sequences as Seq (SemiSequence(..), singleton,
IsSequence(break, drop, dropWhile,
fromList, splitAt))
import Data.Sequences (LazySequence(fromChunks, toChunks))
import qualified Data.Streaming.Filesystem as F
import Data.Text (Text)
import Data.Text.Encoding (encodeUtf8)
import Data.Traversable (Traversable)
import qualified Data.Vector.Generic as V
import qualified Data.Vector.Generic.Mutable as VM
import Data.Word (Word8)
import System.FilePath ((</>))
import System.IO (stdout, stdin, stderr, openFile, hClose,
Handle, IOMode(ReadMode, WriteMode))
import System.Random.MWC as MWC (Gen, Variate(uniform),
createSystemRandom)
yieldMany :: (Monad m, MonoFoldable mono) => mono -> Source m (Element mono)
yieldMany xs = source $ \z yield -> ofoldlM yield z xs
sourceList :: Monad m => [a] -> Source m a
sourceList xs = source $ \z yield -> foldM yield z xs
unfoldC :: forall m a b. Monad m => (b -> Maybe (a, b)) -> b -> Source m a
unfoldC = (sourceList .) . Data.List.unfoldr
enumFromToC :: forall m a. (Monad m, Enum a, Eq a) => a -> a -> Source m a
enumFromToC = (sourceList .) . enumFromTo
iterateC :: forall m a. Monad m => (a -> a) -> a -> Source m a
iterateC = (sourceList .) . iterate
repeatC :: forall m a. Monad m => a -> Source m a
repeatC = sourceList . Prelude.repeat
replicateC :: forall m a. Monad m => Int -> a -> Source m a
replicateC = (sourceList .) . Prelude.replicate
sourceLazy :: (Monad m, LazySequence lazy strict) => lazy -> Source m strict
sourceLazy = sourceList . toChunks
repeatMC :: forall m a. Monad m => m a -> Source m a
repeatMC x = source go
where
go :: r -> (r -> a -> EitherT r m r) -> EitherT r m r
go z yield = loop z
where
loop r = loop =<< yield r =<< lift x
repeatWhileMC :: forall m a. Monad m => m a -> (a -> Bool) -> Source m a
repeatWhileMC m f = source go
where
go :: r -> (r -> a -> EitherT r m r) -> EitherT r m r
go z yield = loop z
where
loop r = do
x <- lift m
if f x
then loop =<< yield r x
else return r
replicateMC :: forall m a. Monad m => Int -> m a -> Source m a
replicateMC n m = source $ go n
where
go :: Int -> r -> (r -> a -> EitherT r m r) -> EitherT r m r
go i z yield = loop i z
where
loop n' r | n' > 0 = loop (n' 1) =<< yield r =<< lift m
loop _ r = return r
sourceHandle :: forall m a. (MonadIO m, MonoFoldable a, IOData a) => Handle -> Source m a
sourceHandle h = source go
where
go :: r -> (r -> a -> EitherT r m r) -> EitherT r m r
go z yield = loop z
where
loop y = do
x <- liftIO $ hGetChunk h
if onull x
then return y
else loop =<< yield y x
sourceFile :: (MonadBaseControl IO m, MonadIO m, MonoFoldable a, IOData a)
=> FilePath -> Source m a
sourceFile path = source $ \z yield ->
liftBaseOp (bracket (openFile path ReadMode) hClose)
(\h -> runSource (sourceHandle h) z yield)
sourceIOHandle :: (MonadBaseControl IO m, MonadIO m, MonoFoldable a, IOData a)
=> IO Handle -> Source m a
sourceIOHandle f = source $ \z yield ->
liftBaseOp (bracket f hClose) $ \h ->
runSource (sourceHandle h) z yield
stdinC :: (MonadBaseControl IO m, MonadIO m, MonoFoldable a, IOData a) => Source m a
stdinC = sourceHandle stdin
initRepeat :: Monad m => m seed -> (seed -> m a) -> Source m a
initRepeat mseed f = source $ \z yield ->
lift mseed >>= \seed -> runSource (repeatMC (f seed)) z yield
initReplicate :: Monad m => m seed -> (seed -> m a) -> Int -> Source m a
initReplicate mseed f n = source $ \z yield ->
lift mseed >>= \seed -> runSource (replicateMC n (f seed)) z yield
sourceRandom :: (Variate a, MonadIO m) => Source m a
sourceRandom =
initRepeat (liftIO MWC.createSystemRandom) (liftIO . MWC.uniform)
sourceRandomN :: (Variate a, MonadIO m) => Int -> Source m a
sourceRandomN =
initReplicate (liftIO MWC.createSystemRandom) (liftIO . MWC.uniform)
sourceRandomGen :: (Variate a, MonadBase base m, PrimMonad base)
=> Gen (PrimState base) -> Source m a
sourceRandomGen gen = initRepeat (return gen) (liftBase . MWC.uniform)
sourceRandomNGen :: (Variate a, MonadBase base m, PrimMonad base)
=> Gen (PrimState base) -> Int -> Source m a
sourceRandomNGen gen = initReplicate (return gen) (liftBase . MWC.uniform)
sourceDirectory :: forall m. MonadBaseControl IO m
=> FilePath -> Source m FilePath
sourceDirectory dir = source $ \z yield ->
liftBaseOp (bracket (F.openDirStream dir) F.closeDirStream)
(go z yield)
where
go :: r -> (r -> FilePath -> EitherT r m r) -> F.DirStream -> EitherT r m r
go z yield ds = loop z
where
loop r = do
mfp <- liftBase $ F.readDirStream ds
case mfp of
Nothing -> return r
Just fp -> loop =<< yield r (dir </> fp)
sourceDirectoryDeep :: forall m. MonadBaseControl IO m
=> Bool -> FilePath -> Source m FilePath
sourceDirectoryDeep followSymlinks startDir = source go
where
go :: r -> (r -> FilePath -> EitherT r m r) -> EitherT r m r
go z yield = start startDir z
where
start dir r = runSource (sourceDirectory dir) r entry
entry r fp = do
ft <- liftBase $ F.getFileType fp
case ft of
F.FTFile -> yield r fp
F.FTFileSym -> yield r fp
F.FTDirectory -> start fp r
F.FTDirectorySym
| followSymlinks -> start fp r
| otherwise -> return r
F.FTOther -> return r
dropC :: Monad m => Int -> Conduit a m a
dropC n = conduitWith n go
where
go (r, n') _ _ | n' > 0 = return (r, n' 1)
go (r, _) yield x = yield r x
dropCE :: (Monad m, IsSequence seq) => Index seq -> Conduit seq m seq
dropCE n = conduitWith n go
where
go (r, n') yield s
| onull y = return (r, n' xn)
| otherwise = yield r y
where
(x, y) = Seq.splitAt n' s
xn = n' fromIntegral (olength x)
dropWhileC :: Monad m => (a -> Bool) -> Conduit a m a
dropWhileC f = conduitWith f go
where
go (r, k) _ x | k x = return (r, k)
go (r, _) yield x = fmap (const (const False)) <$> yield r x
dropWhileCE :: (Monad m, IsSequence seq)
=> (Element seq -> Bool)
-> Conduit seq m seq
dropWhileCE f = conduitWith f go
where
go (r, k) yield s
| onull x = return (r, k)
| otherwise = fmap (const (const False)) <$> yield r s
where
x = Seq.dropWhile k s
foldC :: (Monad m, Monoid a) => Sink a m a
foldC = foldMapC id
foldCE :: (Monad m, MonoFoldable mono, Monoid (Element mono))
=> Sink mono m (Element mono)
foldCE = foldlC (\acc mono -> acc `mappend` ofoldMap id mono) mempty
foldlC :: Monad m => (a -> b -> a) -> a -> Sink b m a
foldlC f z = sink z ((return .) . f)
foldlCE :: (Monad m, MonoFoldable mono)
=> (a -> Element mono -> a) -> a -> Sink mono m a
foldlCE f = foldlC (ofoldl' f)
foldMapC :: (Monad m, Monoid b) => (a -> b) -> Sink a m b
foldMapC f = foldlC (\acc x -> acc `mappend` f x) mempty
foldMapCE :: (Monad m, MonoFoldable mono, Monoid w)
=> (Element mono -> w) -> Sink mono m w
foldMapCE = foldMapC . ofoldMap
allC :: Monad m => (a -> Bool) -> Sink a m Bool
allC f = liftM getAll `liftM` foldMapC (All . f)
allCE :: (Monad m, MonoFoldable mono)
=> (Element mono -> Bool) -> Sink mono m Bool
allCE = allC . oall
anyC :: Monad m => (a -> Bool) -> Sink a m Bool
anyC f = liftM getAny `liftM` foldMapC (Any . f)
anyCE :: (Monad m, MonoFoldable mono)
=> (Element mono -> Bool) -> Sink mono m Bool
anyCE = anyC . oany
andC :: Monad m => Sink Bool m Bool
andC = allC id
andCE :: (Monad m, MonoFoldable mono, Element mono ~ Bool)
=> Sink mono m Bool
andCE = allCE id
orC :: Monad m => Sink Bool m Bool
orC = anyC id
orCE :: (Monad m, MonoFoldable mono, Element mono ~ Bool)
=> Sink mono m Bool
orCE = anyCE id
elemC :: (Monad m, Eq a) => a -> Sink a m Bool
elemC x = anyC (== x)
elemCE :: (Monad m, MonoFoldable seq, Eq (Element seq)) => Element seq -> Sink seq m Bool
elemCE = anyC . oelem
notElemC :: (Monad m, Eq a) => a -> Sink a m Bool
notElemC x = allC (/= x)
notElemCE :: (Monad m, MonoFoldable seq, Eq (Element seq)) => Element seq -> Sink seq m Bool
notElemCE = allC . onotElem
produceList :: Monad m => ([a] -> b) -> Sink a m b
produceList f =
liftM (f . ($ [])) . sink id (\front x -> return (front . (x:)))
sinkLazy :: (Monad m, LazySequence lazy strict) => Sink strict m lazy
sinkLazy = produceList fromChunks
sinkList :: Monad m => Sink a m [a]
sinkList = produceList id
sinkVector :: (MonadBase base m, V.Vector v a, PrimMonad base)
=> Sink a m (v a)
sinkVector = undefined
sinkBuilder :: (Monad m, Monoid builder, ToBuilder a builder)
=> Sink a m builder
sinkBuilder = foldMapC toBuilder
sinkLazyBuilder :: (Monad m, Monoid builder, ToBuilder a builder,
Builder builder lazy)
=> Sink a m lazy
sinkLazyBuilder = liftM builderToLazy . foldMapC toBuilder
sinkNull :: Monad m => Sink a m ()
sinkNull _ = return ()
awaitNonNull :: (Monad m, MonoFoldable a) => Conduit a m (Maybe (NonNull a))
awaitNonNull = conduit $ \r yield x ->
maybe (return r) (yield r . Just) (NonNull.fromNullable x)
headCE :: (Monad m, IsSequence seq) => Sink seq m (Maybe (Element seq))
headCE = undefined
lastC :: Monad m => Sink a m (Maybe a)
lastC = sink Nothing (const (return . Just))
lastCE :: (Monad m, IsSequence seq) => Sink seq m (Maybe (Element seq))
lastCE = undefined
lengthC :: (Monad m, Num len) => Sink a m len
lengthC = foldlC (\x _ -> x + 1) 0
lengthCE :: (Monad m, Num len, MonoFoldable mono) => Sink mono m len
lengthCE = foldlC (\x y -> x + fromIntegral (olength y)) 0
lengthIfC :: (Monad m, Num len) => (a -> Bool) -> Sink a m len
lengthIfC f = foldlC (\cnt a -> if f a then cnt + 1 else cnt) 0
lengthIfCE :: (Monad m, Num len, MonoFoldable mono)
=> (Element mono -> Bool) -> Sink mono m len
lengthIfCE f = foldlCE (\cnt a -> if f a then cnt + 1 else cnt) 0
maximumC :: (Monad m, Ord a) => Sink a m (Maybe a)
maximumC = sink Nothing $ \r y -> return $ Just $ maybe y (max y) r
maximumCE :: (Monad m, MonoFoldable seq) => Sink seq m (Maybe (Element seq))
maximumCE = undefined
minimumC :: (Monad m, Ord a) => Sink a m (Maybe a)
minimumC = sink Nothing $ \r y -> return $ Just $ maybe y (min y) r
minimumCE :: (Monad m, MonoFoldable seq) => Sink seq m (Maybe (Element seq))
minimumCE = undefined
sumC :: (Monad m, Num a) => Sink a m a
sumC = foldlC (+) 0
sumCE :: (Monad m, MonoFoldable mono, Num (Element mono))
=> Sink mono m (Element mono)
sumCE = undefined
productC :: (Monad m, Num a) => Sink a m a
productC = foldlC (*) 1
productCE :: (Monad m, MonoFoldable mono, Num (Element mono))
=> Sink mono m (Element mono)
productCE = undefined
findC :: Monad m => (a -> Bool) -> Sink a m (Maybe a)
findC f = sink Nothing $ \r x -> if f x then left (Just x) else return r
mapM_C :: Monad m => (a -> m ()) -> Sink a m ()
mapM_C f = sink () (const $ lift . f)
mapM_CE :: (Monad m, MonoFoldable mono)
=> (Element mono -> m ()) -> Sink mono m ()
mapM_CE = undefined
foldMC :: Monad m => (a -> b -> m a) -> a -> Sink b m a
foldMC f = flip sink ((lift .) . f)
foldMCE :: (Monad m, MonoFoldable mono)
=> (a -> Element mono -> m a) -> a -> Sink mono m a
foldMCE = undefined
foldMapMC :: (Monad m, Monoid w) => (a -> m w) -> Sink a m w
foldMapMC f = foldMC (\acc x -> (acc `mappend`) `liftM` f x) mempty
foldMapMCE :: (Monad m, MonoFoldable mono, Monoid w)
=> (Element mono -> m w) -> Sink mono m w
foldMapMCE = undefined
sinkFile :: (MonadBaseControl IO m, MonadIO m, IOData a)
=> FilePath -> Sink a m ()
sinkFile fp = sinkIOHandle (liftIO $ openFile fp WriteMode)
sinkHandle :: (MonadIO m, IOData a) => Handle -> Sink a m ()
sinkHandle = mapM_C . hPut
sinkIOHandle :: (MonadBaseControl IO m, MonadIO m, IOData a)
=> IO Handle -> Sink a m ()
sinkIOHandle alloc = liftBaseOp (bracket alloc hClose) . flip sinkHandle
printC :: (Show a, MonadIO m) => Sink a m ()
printC = mapM_C (liftIO . print)
stdoutC :: (MonadIO m, IOData a) => Sink a m ()
stdoutC = sinkHandle stdout
stderrC :: (MonadIO m, IOData a) => Sink a m ()
stderrC = sinkHandle stderr
mapC :: Monad m => (a -> b) -> Conduit a m b
mapC = fmap
mapCE :: (Monad m, Functor f) => (a -> b) -> Conduit (f a) m (f b)
mapCE = undefined
omapCE :: (Monad m, MonoFunctor mono)
=> (Element mono -> Element mono) -> Conduit mono m mono
omapCE = undefined
concatMapC :: (Monad m, MonoFoldable mono)
=> (a -> mono) -> Conduit a m (Element mono)
concatMapC f = conduit $ \r yield -> ofoldlM yield r . f
concatMapCE :: (Monad m, MonoFoldable mono, Monoid w)
=> (Element mono -> w) -> Conduit mono m w
concatMapCE = undefined
takeC :: Monad m => Int -> Conduit a m a
takeC n = conduitWith n go
where
go (z', n') yield x
| n' > 1 = next
| n' > 0 = left =<< next
| otherwise = left (z', 0)
where
next = fmap pred <$> yield z' x
takeCE :: (Monad m, IsSequence seq) => Index seq -> Conduit seq m seq
takeCE = undefined
takeWhileC :: Monad m => (a -> Bool) -> Conduit a m a
takeWhileC f = conduitWith f go
where
go (z', k) yield x | k x = yield z' x
go (z', _) _ _ = left (z', const False)
takeWhileCE :: (Monad m, IsSequence seq)
=> (Element seq -> Bool) -> Conduit seq m seq
takeWhileCE = undefined
takeExactlyC :: Monad m => Int -> Conduit a m b -> Conduit a m b
takeExactlyC = undefined
takeExactlyCE :: (Monad m, IsSequence a)
=> Index a -> Conduit a m b -> Conduit a m b
takeExactlyCE = undefined
concatC :: (Monad m, MonoFoldable mono) => Conduit mono m (Element mono)
concatC = awaitForever yieldMany
filterC :: Monad m => (a -> Bool) -> Conduit a m a
filterC f = awaitForever $ \x -> if f x then return x else skip
filterCE :: (IsSequence seq, Monad m)
=> (Element seq -> Bool) -> Conduit seq m seq
filterCE = undefined
mapWhileC :: Monad m => (a -> Maybe b) -> Conduit a m b
mapWhileC f = awaitForever $ \x -> case f x of Just y -> return y; _ -> close
conduitVector :: (MonadBase base m, V.Vector v a, PrimMonad base)
=> Int -> Conduit a m (v a)
conduitVector maxSize src = source $ \z yield -> do
mv <- liftBase $ VM.new maxSize
EitherT $ do
eres <- runEitherT $ runSource src (z, 0) $ \(r, i :: Int) x -> EitherT $
if i >= maxSize
then do
v <- liftBase $ V.unsafeFreeze mv
runEitherT $ rewrap (, 0) $ yield r v
else do
liftBase $ VM.write mv i x
return $ Right (r, i + 1)
case eres of
Left (z', _) -> return $ Left z'
Right (z', i)
| i > 0 -> do
v <- V.slice 0 i <$> liftBase (V.unsafeFreeze mv)
runEitherT $ yield z' v
| otherwise -> return $ Right z'
scanlC :: Monad m => (a -> b -> a) -> a -> Conduit b m a
scanlC = undefined
concatMapAccumC :: Monad m => (a -> accum -> (accum, [b])) -> accum -> Conduit a m b
concatMapAccumC = undefined
intersperseC :: Monad m => a -> Source m a -> Source m a
intersperseC s src = source $ \z yield -> EitherT $ do
eres <- runEitherT $ runSource src (Nothing, z) $ \(my, r) x ->
case my of
Nothing -> return (Just x, r)
Just y -> do
r' <- rewrap (Nothing,) $ yield r y
rewrap (Just x,) $ yield (snd r') s
case eres of
Left (_, r) -> return $ Left r
Right (Nothing, r) -> return $ Right r
Right (Just x, r) -> runEitherT $ yield r x
encodeBase64C :: Monad m => Conduit ByteString m ByteString
encodeBase64C = undefined
decodeBase64C :: Monad m => Conduit ByteString m ByteString
decodeBase64C = undefined
encodeBase64URLC :: Monad m => Conduit ByteString m ByteString
encodeBase64URLC = undefined
decodeBase64URLC :: Monad m => Conduit ByteString m ByteString
decodeBase64URLC = undefined
encodeBase16C :: Monad m => Conduit ByteString m ByteString
encodeBase16C = undefined
decodeBase16C :: Monad m => Conduit ByteString m ByteString
decodeBase16C = undefined
mapMC :: Monad m => (a -> m b) -> Conduit a m b
mapMC f = (>>= lift . f)
mapMCE :: (Monad m, Traversable f) => (a -> m b) -> Conduit (f a) m (f b)
mapMCE = undefined
omapMCE :: (Monad m, MonoTraversable mono)
=> (Element mono -> m (Element mono)) -> Conduit mono m mono
omapMCE = undefined
concatMapMC :: (Monad m, MonoFoldable mono)
=> (a -> m mono) -> Conduit a m (Element mono)
concatMapMC f = awaitForever $ yieldMany <=< lift . f
filterMC :: Monad m => (a -> m Bool) -> Conduit a m a
filterMC f = awaitForever $ \x -> do
res <- lift $ f x
if res
then return x
else skip
filterMCE :: (Monad m, IsSequence seq)
=> (Element seq -> m Bool) -> Conduit seq m seq
filterMCE = undefined
iterMC :: Monad m => (a -> m ()) -> Conduit a m a
iterMC = undefined
scanlMC :: Monad m => (a -> b -> m a) -> a -> Conduit b m a
scanlMC = undefined
concatMapAccumMC :: Monad m
=> (a -> accum -> m (accum, [b])) -> accum -> Conduit a m b
concatMapAccumMC = undefined
encodeUtf8C :: Monad m => Conduit Text m ByteString
encodeUtf8C = mapC encodeUtf8
decodeUtf8C :: MonadThrow m => Conduit ByteString m Text
decodeUtf8C = undefined
lineC :: (Monad m, IsSequence seq, Element seq ~ Char)
=> Conduit seq m o -> Conduit seq m o
lineC = undefined
lineAsciiC :: (Monad m, IsSequence seq, Element seq ~ Word8)
=> Conduit seq m o -> Conduit seq m o
lineAsciiC = undefined
unlinesC :: (Monad m, IsSequence seq, Element seq ~ Char)
=> Conduit seq m seq
unlinesC = concatMapC (: [Seq.singleton '\n'])
unlinesAsciiC :: (Monad m, IsSequence seq, Element seq ~ Word8)
=> Conduit seq m seq
unlinesAsciiC = concatMapC (: [Seq.singleton 10])
linesUnboundedC_ :: forall m seq. (Monad m, IsSequence seq, Eq (Element seq), Semigroup seq)
=> Element seq -> Conduit seq m seq
linesUnboundedC_ sep src = source $ \z yield -> EitherT $ do
eres <- runEitherT $ runSource src (z, n) (go yield)
case eres of
Left (r, _) -> return $ Left r
Right (r, t)
| onull t -> return $ Right r
| otherwise -> runEitherT $ yield r t
where
n = Seq.fromList []
go :: (r -> seq -> EitherT r m r) -> (r, seq) -> seq
-> EitherT (r, seq) m (r, seq)
go yield = loop
where
loop (r, t') t
| onull y = return (r, t <> t')
| otherwise = do
r' <- rewrap (, n) $ yield r (t' <> x)
loop r' (Seq.drop 1 y)
where
(x, y) = Seq.break (== sep) t
linesUnboundedC :: (Monad m, IsSequence seq, Element seq ~ Char, Semigroup seq)
=> Conduit seq m seq
linesUnboundedC = linesUnboundedC_ '\n'
linesUnboundedAsciiC :: (Monad m, IsSequence seq, Element seq ~ Word8, Semigroup seq)
=> Conduit seq m seq
linesUnboundedAsciiC = linesUnboundedC_ 10
linesC :: (Monad m, IsSequence seq, Element seq ~ Char, Semigroup seq)
=> Conduit seq m seq
linesC = linesUnboundedC
linesAsciiC :: (Monad m, IsSequence seq, Element seq ~ Word8, Semigroup seq)
=> Conduit seq m seq
linesAsciiC = linesUnboundedAsciiC
sourceMaybeMVar :: forall m a. MonadIO m => MVar (Maybe a) -> Source m a
sourceMaybeMVar var = source go
where
go :: r -> (r -> a -> EitherT r m r) -> EitherT r m r
go z yield = loop z
where
loop r = do
mx <- liftIO $ takeMVar var
case mx of
Nothing -> return r
Just x -> loop =<< yield r x
sourceMaybeTMVar :: forall a. TMVar (Maybe a) -> Source STM a
sourceMaybeTMVar var = source go
where
go :: r -> (r -> a -> EitherT r STM r) -> EitherT r STM r
go z yield = loop z
where
loop r = do
mx <- lift $ takeTMVar var
case mx of
Nothing -> return r
Just x -> loop =<< yield r x
asyncC :: (MonadBaseControl IO m, Monad m)
=> (a -> m b) -> Conduit a m (Async (StM m b))
asyncC f = awaitForever $ lift . async . f
sourceSTM :: forall container a. (container a -> STM a)
-> (container a -> STM Bool)
-> container a
-> Source STM a
sourceSTM getter tester chan = source go
where
go :: r -> (r -> a -> EitherT r STM r) -> EitherT r STM r
go z yield = loop z
where
loop r = do
x <- lift $ getter chan
r' <- yield r x
mt <- lift $ tester chan
if mt
then return r'
else loop r'
sourceTChan :: forall a. TChan a -> Source STM a
sourceTChan = sourceSTM readTChan isEmptyTChan
sourceTQueue :: forall a. TQueue a -> Source STM a
sourceTQueue = sourceSTM readTQueue isEmptyTQueue
sourceTBQueue :: forall a. TBQueue a -> Source STM a
sourceTBQueue = sourceSTM readTBQueue isEmptyTBQueue
untilMC :: forall m a. Monad m => m a -> m Bool -> Source m a
untilMC m f = source go
where
go :: r -> (r -> a -> EitherT r m r) -> EitherT r m r
go z yield = loop z
where
loop r = do
x <- lift m
r' <- yield r x
c <- lift f
if c then loop r' else return r'
whileMC :: forall m a. Monad m => m Bool -> m a -> Source m a
whileMC f m = source go
where
go :: r -> (r -> a -> EitherT r m r) -> EitherT r m r
go z yield = loop z
where
loop r = do
c <- lift f
if c
then lift m >>= yield r >>= loop
else return r
zipSinks :: forall a m r r'. (MonadBaseControl IO m, MonadIO m)
=> Sink a m r -> Sink a m r' -> Sink a m (r, r')
zipSinks sink1 sink2 src = do
x <- liftIO newEmptyMVar
y <- liftIO newEmptyMVar
withAsync (sink1 $ sourceMaybeMVar x) $ \a ->
withAsync (sink2 $ sourceMaybeMVar y) $ \b -> do
_ <- runEitherT $ runSource src () $ \() val -> do
liftIO $ putMVar x (Just val)
liftIO $ putMVar y (Just val)
liftIO $ putMVar x Nothing
liftIO $ putMVar y Nothing
waitBoth a b