-- |Monadic Iteratees: -- incremental input parsers, processors and transformers -- -- This module provides many basic iteratees from which more complicated -- iteratees can be built. In general these iteratees parallel those in -- @Data.List@, with some additions. module Bio.Iteratee.List ( -- * Iteratees -- ** Iteratee Utilities isFinished ,stream2list ,stream2stream -- ** Basic Iteratees ,dropWhileStream ,dropStream ,headStream ,tryHead ,lastStream ,heads ,peekStream ,roll ,lengthStream ,chunkLength ,takeFromChunk -- ** Nested iteratee combinators ,breakStream ,breakE ,takeStream ,takeUpTo ,takeWhileE ,mapStream ,concatMapStream ,concatMapStreamM ,mapMaybeStream ,filterStream ,filterStreamM ,groupStreamBy ,groupStreamOn ,mergeStreams ,mergeByChunks -- ** Folds ,foldStream -- * Enumerators -- ** Basic enumerators ,enumPureNChunk -- ** Enumerator Combinators ,enumWith ,zipStreams ,zipStreams3 ,zipStreams4 ,zipStreams5 ,sequenceStreams_ ,countConsumed -- ** Monadic functions ,mapStreamM ,mapStreamM_ ,foldStreamM -- * Re-exported modules ,module Bio.Iteratee.Iteratee ) where import Bio.Iteratee.Iteratee import Bio.Prelude import Control.Monad.Trans.Class (MonadTrans(..)) -- import qualified Data.ByteString as B -- Useful combinators for implementing iteratees and enumerators -- | Check if a stream has received 'EOF'. isFinished :: Nullable s => Iteratee s m Bool isFinished = liftI check where check c@(Chunk xs) | nullC xs = liftI check | otherwise = idone False c check s@(EOF _) = idone True s {-# INLINE isFinished #-} -- ------------------------------------------------------------------------ -- Primitive iteratees -- |Read a stream to the end and return all of its elements as a list. -- This iteratee returns all data from the stream *strictly*. stream2list :: Monad m => Iteratee [el] m [el] stream2list = liftM concat getChunks {-# INLINE stream2list #-} -- |Read a stream to the end and return all of its elements as a stream. -- This iteratee returns all data from the stream *strictly*. stream2stream :: (Monad m, Nullable s, Monoid s) => Iteratee s m s stream2stream = liftM mconcat getChunks {-# INLINE stream2stream #-} -- ------------------------------------------------------------------------ -- Parser combinators -- |Attempt to read the next element of the stream and return it -- Raise a (recoverable) error if the stream is terminated. -- -- The analogue of @List.head@ -- -- Because @head@ can raise an error, it shouldn't be used when constructing -- iteratees for @convStream@. Use @tryHead@ instead. headStream :: Iteratee [el] m el headStream = liftI step where step (Chunk [ ]) = icont step Nothing step (Chunk (hd:tl)) = idone hd (Chunk tl) step stream = icont step (Just (setEOF stream)) {-# INLINE headStream #-} -- | Similar to @headStream@, except it returns @Nothing@ if the stream -- is terminated. tryHead :: Iteratee [el] m (Maybe el) tryHead = liftI step where step (Chunk [ ]) = liftI step step (Chunk (hd:tl)) = idone (Just hd) (Chunk tl) step stream = idone Nothing stream {-# INLINE tryHead #-} -- |Attempt to read the last element of the stream and return it -- Raise a (recoverable) error if the stream is terminated -- -- The analogue of @List.last@ lastStream :: Iteratee [el] m el lastStream = liftI (step Nothing) where step l (Chunk xs) | nullC xs = liftI (step l) | otherwise = liftI $ step (Just $ last xs) step l s@(EOF _) = case l of Nothing -> icont (step l) . Just . setEOF $ s Just x -> idone x s {-# INLINE lastStream #-} -- |Given a sequence of characters, attempt to match them against -- the characters on the stream. Return the count of how many -- characters matched. The matched characters are removed from the -- stream. -- For example, if the stream contains 'abd', then (heads 'abc') -- will remove the characters 'ab' and return 2. heads :: (Monad m, Eq el) => [el] -> Iteratee [el] m Int heads st | nullC st = return 0 heads st = loopE 0 st where loopE cnt xs | nullC xs = return cnt | otherwise = liftI (step cnt xs) step cnt str (Chunk []) = liftI (step cnt str) step cnt [ ] stream = idone cnt stream step cnt (y:ys) s@(Chunk (x:xs)) | y == x = step (succ cnt) ys (Chunk xs) | otherwise = idone cnt s step cnt _ stream = idone cnt stream {-# INLINE heads #-} -- |Look ahead at the next element of the stream, without removing -- it from the stream. -- Return @Just c@ if successful, return @Nothing@ if the stream is -- terminated by 'EOF'. peekStream :: Iteratee [el] m (Maybe el) peekStream = liftI step where step (Chunk [ ]) = liftI step step s@(Chunk (x:_)) = idone (Just x) s step stream = idone Nothing stream {-# INLINE peekStream #-} -- | Return a chunk of @t@ elements length while consuming @d@ elements -- from the stream. Useful for creating a 'rolling average' with -- 'convStream'. roll :: Monad m => Int -- ^ length of chunk (t) -> Int -- ^ amount to consume (d) -> Iteratee [el] m [[el]] roll t d | t > d = liftI step where step (Chunk vec) | length vec >= t = idone [take t vec] (Chunk $ drop d vec) | null vec = liftI step | otherwise = liftI (step' vec) step stream = idone empty stream step' v1 (Chunk vec) = step . Chunk $ v1 `mappend` vec step' v1 stream = idone [v1] stream roll t d = do r <- joinI (takeStream t stream2stream) dropStream (d-t) return [r] -- d is >= t, so this version works {-# INLINE roll #-} -- |Drop n elements of the stream, if there are that many. -- -- The analogue of @List.drop@ dropStream :: Int -> Iteratee [el] m () dropStream 0 = idone () (Chunk emptyP) dropStream n' = liftI (step n') where step n (Chunk str) | length str < n = liftI (step (n - length str)) | otherwise = idone () (Chunk (drop n str)) step _ stream = idone () stream {-# INLINE dropStream #-} -- |Skip all elements while the predicate is true. -- -- The analogue of @List.dropWhile@ dropWhileStream :: (el -> Bool) -> Iteratee [el] m () dropWhileStream p = liftI step where step (Chunk str) | null rest = liftI step | otherwise = idone () (Chunk rest) where rest = dropWhile p str step stream = idone () stream {-# INLINE dropWhileStream #-} -- | Return the total length of the remaining part of the stream. -- -- This forces evaluation of the entire stream. -- -- The analogue of @List.length@ lengthStream :: Num a => Iteratee [el] m a lengthStream = liftI (step 0) where step !i (Chunk xs) = liftI (step $ i + fromIntegral (length xs)) step !i stream = idone i stream {-# INLINE lengthStream #-} -- | Get the length of the current chunk, or @Nothing@ if 'EOF'. -- -- This function consumes no input. chunkLength :: Iteratee [el] m (Maybe Int) chunkLength = liftI step where step s@(Chunk xs) = idone (Just $ length xs) s step stream = idone Nothing stream {-# INLINE chunkLength #-} -- | Take @n@ elements from the current chunk, or the whole chunk if -- @n@ is greater. takeFromChunk :: Int -> Iteratee [el] m [el] takeFromChunk n | n <= 0 = idone emptyP (Chunk emptyP) takeFromChunk n = liftI step where step (Chunk xs) = let (h,t) = splitAt n xs in idone h $ Chunk t step stream = idone emptyP stream {-# INLINE takeFromChunk #-} -- |Takes an element predicate and returns the (possibly empty) prefix of -- the stream. None of the characters in the string satisfy the character -- predicate. -- If the stream is not terminated, the first character of the remaining stream -- satisfies the predicate. -- -- N.B. 'breakE' should be used in preference to @breakStream@. -- @breakStream@ will retain all data until the predicate is met, which may -- result in a space leak. -- -- The analogue of @List.break@ breakStream :: (el -> Bool) -> Iteratee [el] m [el] breakStream cpred = icont (step mempty) Nothing where step bfr (Chunk str) | null str = icont (step bfr) Nothing | otherwise = case break cpred str of (str', tail') | null tail' -> icont (step (bfr `mappend` str)) Nothing | otherwise -> idone (bfr `mappend` str') (Chunk tail') step bfr stream = idone bfr stream {-# INLINE breakStream #-} -- --------------------------------------------------- -- The converters show a different way of composing two iteratees: -- `vertical' rather than `horizontal' -- |Takes an element predicate and an iteratee, running the iteratee -- on all elements of the stream until the predicate is met. -- -- the following rule relates @break@ to @breakE@ -- @break@ pred === @joinI@ (@breakE@ pred stream2stream) -- -- @breakE@ should be used in preference to @break@ whenever possible. breakE :: (el -> Bool) -> Enumeratee [el] [el] m a breakE cpred = eneeCheckIfDonePass (icont . step) where step k (Chunk s) | null s = liftI (step k) | otherwise = case break cpred s of (str', tail') | null tail' -> eneeCheckIfDonePass (icont . step) . k $ Chunk str' | otherwise -> idone (k $ Chunk str') (Chunk tail') step k stream = idone (liftI k) stream {-# INLINE breakE #-} -- |Read n elements from a stream and apply the given iteratee to the -- stream of the read elements. Unless the stream is terminated early, we -- read exactly n elements, even if the iteratee has accepted fewer. -- -- The analogue of @List.take@ takeStream :: Monad m => Int -- ^ number of elements to consume -> Enumeratee [el] [el] m a takeStream n' iter | n' <= 0 = return iter | otherwise = Iteratee $ \od oc -> runIter iter (on_done od oc) (on_cont od oc) where on_done od oc x _ = runIter (dropStream n' >> return (return x)) od oc on_cont od oc k Nothing = if n' == 0 then od (liftI k) (Chunk mempty) else runIter (liftI (step n' k)) od oc on_cont od oc _ (Just e) = runIter (dropStream n' >> throwErr e) od oc step n k (Chunk str) | null str = liftI (step n k) | length str <= n = takeStream (n - length str) $ k (Chunk str) | otherwise = idone (k (Chunk s1)) (Chunk s2) where (s1, s2) = splitAt n str step _n k stream = idone (liftI k) stream {-# INLINE takeStream #-} -- |Read n elements from a stream and apply the given iteratee to the -- stream of the read elements. If the given iteratee accepted fewer -- elements, we stop. -- This is the variation of 'takeStream' with the early termination -- of processing of the outer stream once the processing of the inner stream -- finished early. -- -- Iteratees composed with 'takeUpTo' will consume only enough elements to -- reach a done state. Any remaining data will be available in the outer -- stream. -- -- > > let iter = do -- > h <- joinI $ takeUpTo 5 I.head -- > t <- stream2list -- > return (h,t) -- > -- > > enumPureNChunk [1..10::Int] 3 iter >>= run >>= print -- > (1,[2,3,4,5,6,7,8,9,10]) -- > -- > > enumPureNChunk [1..10::Int] 7 iter >>= run >>= print -- > (1,[2,3,4,5,6,7,8,9,10]) -- -- in each case, @I.head@ consumes only one element, returning the remaining -- 4 elements to the outer stream takeUpTo :: Monad m => Int -> Enumeratee [el] [el] m a takeUpTo i iter | i <= 0 = idone iter (Chunk emptyP) | otherwise = Iteratee $ \od oc -> runIter iter (onDone od oc) (onCont od oc) where onDone od oc x str = runIter (idone (return x) str) od oc onCont od oc k Nothing = if i == 0 then od (liftI k) (Chunk mempty) else runIter (liftI (step i k)) od oc onCont od oc _ (Just e) = runIter (throwErr e) od oc step n k (Chunk str) | null str = liftI (step n k) | length str < n = takeUpTo (n - length str) $ k (Chunk str) | otherwise = -- check to see if the inner iteratee has completed, and if so, -- grab any remaining stream to put it in the outer iteratee. -- the outer iteratee is always complete at this stage, although -- the inner may not be. let (s1, s2) = splitAt n str in Iteratee $ \od' _ -> do res <- runIter (k (Chunk s1)) (\a s -> return $ Left (a, s)) (\k' e -> return $ Right (k',e)) case res of Left (a,Chunk s1') -> od' (return a) (Chunk $ s1' ++ s2) Left (a,s') -> od' (idone a s') (Chunk s2) Right (k',e) -> od' (icont k' e) (Chunk s2) step _ k stream = idone (liftI k) stream {-# INLINE takeUpTo #-} -- |Takes an element predicate and an iteratee, running the iteratee -- on all elements of the stream while the predicate is met. -- -- This is preferred to @takeWhile@. takeWhileE :: (el -> Bool) -> Enumeratee [el] [el] m a takeWhileE = breakE . (not .) {-# INLINEABLE takeWhileE #-} -- | Map a function over an 'Iteratee'. -- This one is reimplemented and differs from the the one in -- "Data.Iteratee.ListLike" in so far that it doesn't pass on an 'EOF' -- received in the input, which is the expected behavior. mapStream :: (el -> el') -> Enumeratee [el] [el'] m a mapStream = mapChunks . map {-# INLINE mapStream #-} -- | Apply a function to the elements of a stream, concatenate the -- results into a stream. No giant intermediate list is produced. concatMapStream :: Monoid t => (a -> t) -> Enumeratee [a] t m r concatMapStream = mapChunks . foldMap {-# INLINE concatMapStream #-} -- | Apply a monadic function to the elements of a stream, concatenate -- the results into a stream. No giant intermediate list is produced. concatMapStreamM :: Monad m => (a -> m t) -> Enumeratee [a] t m r concatMapStreamM f = eneeCheckIfDone (liftI . go) where go k (EOF mx) = idone (liftI k) (EOF mx) go k (Chunk xs) | null xs = liftI (go k) | otherwise = f (head xs) `mBind` eneeCheckIfDone (flip go (Chunk (tail xs))) . k . Chunk {-# INLINE concatMapStreamM #-} mapMaybeStream :: (a -> Maybe b) -> Enumeratee [a] [b] m r mapMaybeStream = mapChunks . mapMaybe {-# INLINE mapMaybeStream #-} -- |Creates an 'enumeratee' with only elements from the stream that -- satisfy the predicate function. The outer stream is completely consumed. -- -- The analogue of @List.filter@ filterStream :: (el -> Bool) -> Enumeratee [el] [el] m a filterStream p = mapChunks (filter p) {-# INLINE filterStream #-} -- | Apply a monadic filter predicate to an 'Iteratee'. filterStreamM :: Monad m => (a -> m Bool) -> Enumeratee [a] [a] m r filterStreamM k = mapChunksM (go id) where go acc [ ] = return $! acc empty go acc (h:t) = do p <- k h let acc' = if p then (:) h . acc else acc go acc' t {-# INLINE filterStreamM #-} -- | Grouping on 'Iteratee's. @groupStreamOn proj inner outer@ executes -- @inner (proj e)@, where @e@ is the first input element, to obtain an -- 'Iteratee' @i@, then passes elements @e@ to @i@ as long as @proj e@ -- produces the same result. If @proj e@ changes or the input ends, the -- pair of @proj e@ and the result of @run i@ is passed to @outer@. At -- end of input, the resulting @outer@ is returned. groupStreamOn :: (Monad m, Eq t1) => (e -> t1) -> (t1 -> m (Iteratee [e] m t2)) -> Enumeratee [e] [(t1, t2)] m a groupStreamOn proj inner = eneeCheckIfDonePass (icont . step) where step outer (EOF mx) = idone (liftI outer) $ EOF mx step outer (Chunk [ ]) = liftI $ step outer step outer c@(Chunk (h:_)) = let x = proj h in lift (inner x) >>= \i -> step' x i outer c -- We want to feed a 'Chunk' to the inner 'Iteratee', which might be -- finished. In that case, we would want to abort, but we cannot, -- since the outer iteration is still going on. So instead we -- discard data we would have fed to the inner 'Iteratee'. (Use of -- 'enumPure1Chunk' is not appropriate, it would accumulate the -- data, just to have it discarded by the 'run' that eventually -- happens. step' c it outer (Chunk as) | null as = liftI $ step' c it outer | (l,r) <- span ((==) c . proj) as, not (null l) = let od a _str = idoneM a $ EOF Nothing oc k Nothing = return $ k (Chunk l) oc k m = icontM k m in lift (runIter it od oc) >>= \it' -> step' c it' outer (Chunk r) step' c it outer str = lift (run it) >>= \b -> eneeCheckIfDone (`step` str) . outer $ Chunk [(c,b)] -- | Grouping on 'Iteratee's. @groupStreamBy cmp inner outer@ executes -- @inner@ to obtain an 'Iteratee' @i@, then passes elements @e@ to @i@ -- as long as @cmp e0 e@, where @e0@ is some preceeding element, is -- true. Else, the result of @run i@ is passed to @outer@ and -- 'groupStreamBy' restarts. At end of input, the resulting @outer@ is -- returned. groupStreamBy :: Monad m => (t -> t -> Bool) -> m (Iteratee [t] m t2) -> Enumeratee [t] [t2] m a groupStreamBy cmp inner = eneeCheckIfDonePass (icont . step) where step outer (EOF mx) = idone (liftI outer) $ EOF mx step outer (Chunk [ ]) = liftI $ step outer step outer c@(Chunk (h:_)) = lift inner >>= \i -> step' h i outer c step' c it outer (Chunk as) | null as = liftI $ step' c it outer | (l,r) <- span (cmp c) as, not (null l) = let od a _str = idoneM a $ EOF Nothing oc k Nothing = return $ k (Chunk l) oc k m = icontM k m in lift (runIter it od oc) >>= \it' -> step' (head l) it' outer (Chunk r) step' _ it outer str = lift (run it) >>= \b -> eneeCheckIfDone (`step` str) . outer $ Chunk [b] -- | @mergeStreams@ offers another way to nest iteratees: as a monad stack. -- This allows for the possibility of interleaving data from multiple -- streams. -- -- > -- print each element from a stream of lines. -- > logger :: (MonadIO m) => Iteratee [ByteString] m () -- > logger = mapStreamM_ (liftIO . putStrLn . B.unpack) -- > -- > -- combine alternating lines from two sources -- > -- To see how this was derived, follow the types from -- > -- 'ileaveLines logger' and work outwards. -- > run =<< enumFile 10 "file1" (joinI $ enumLinesBS $ -- > ( enumFile 10 "file2" . joinI . enumLinesBS $ joinI -- > (ileaveLines logger)) >>= run) -- > -- > ileaveLines :: (Functor m, Monad m) -- > => Enumeratee [ByteString] [ByteString] (Iteratee [ByteString] m) -- > [ByteString] -- > ileaveLines = mergeStreams (\l1 l2 -> -- > [B.pack "f1:\n\t" ,l1 ,B.pack "f2:\n\t" ,l2 ] -- > -- > -- mergeStreams :: Monad m => (el1 -> el2 -> b) -> Enumeratee [el2] b (Iteratee [el1] m) a mergeStreams f = convStream $ liftM2 f (lift headStream) headStream {-# INLINE mergeStreams #-} -- | A version of mergeStreams which operates on chunks instead of -- elements. -- -- mergeByChunks offers more control than 'mergeStreams'. -- 'mergeStreams' terminates when the first stream terminates, however -- mergeByChunks will continue until both streams are exhausted. -- -- 'mergeByChunks' guarantees that both chunks passed to the merge -- function will have the same number of elements, although that number -- may vary between calls. mergeByChunks :: Monad m => ([el1] -> [el2] -> c3) -- ^ merge function -> ([el1] -> c3) -> ([el2] -> c3) -> Enumeratee [el2] c3 (Iteratee [el1] m) a mergeByChunks f f1 f2 = unfoldConvStream iter (0 :: Int) where iter 1 = (\x -> (1,f1 x)) `liftM` lift getChunk iter 2 = (\x -> (2,f2 x)) `liftM` getChunk iter _ = do ml1 <- lift chunkLength ml2 <- chunkLength case (ml1, ml2) of (Just l1, Just l2) -> do let tval = min l1 l2 c1 <- lift $ takeFromChunk tval c2 <- takeFromChunk tval return (0, f c1 c2) (Just _, Nothing) -> iter 1 (Nothing, _) -> iter 2 {-# INLINE mergeByChunks #-} -- ------------------------------------------------------------------------ -- Folds -- | Left-associative fold that is strict in the accumulator. -- This function should be used in preference to 'foldl' whenever possible. -- -- The analogue of @List.foldl'@. foldStream :: (a -> el -> a) -> a -> Iteratee [el] m a foldStream f i = liftI (step i) where step acc (Chunk xs) | null xs = liftI (step acc) | otherwise = liftI (step $! foldl' f acc xs) step acc stream = idone acc stream {-# INLINE foldStream #-} -- ------------------------------------------------------------------------ -- Zips -- |Enumerate two iteratees over a single stream simultaneously. -- -- Compare to @List.zip@. zipStreams :: Monad m => Iteratee [el] m a -> Iteratee [el] m b -> Iteratee [el] m (a, b) zipStreams x0 y0 = do -- need to check if both iteratees are initially finished. If so, -- we don't want to push a chunk which will be dropped (a', x') <- lift $ runIter x0 od oc (b', y') <- lift $ runIter y0 od oc case checkDone a' b' of Just (Right (a,b,s)) -> idone (a,b) s -- 's' may be EOF, needs to stay Just (Left (Left a)) -> liftM ((,) a) y' Just (Left (Right b)) -> liftM (flip (,) b) x' Nothing -> liftI (step x' y') where step x y (Chunk xs) | nullC xs = liftI (step x y) step x y (Chunk xs) = do (a', x') <- lift $ (\i -> runIter i od oc) =<< enumPure1Chunk xs x (b', y') <- lift $ (\i -> runIter i od oc) =<< enumPure1Chunk xs y case checkDone a' b' of Just (Right (a,b,s)) -> idone (a,b) s Just (Left (Left a)) -> liftM ((,) a) y' Just (Left (Right b)) -> liftM (flip (,) b) x' Nothing -> liftI (step x' y') step x y (EOF err) = joinIM $ case err of Nothing -> (liftM2.liftM2) (,) (enumEof x) (enumEof y) Just e -> (liftM2.liftM2) (,) (enumErr e x) (enumErr e y) od a s = return (Just (a, s), idone a s) oc k e = return (Nothing , icont k e) checkDone r1 r2 = case (r1, r2) of (Just (a, s1), Just (b,s2)) -> Just $ Right (a, b, shorter s1 s2) (Just (a, _), Nothing) -> Just . Left $ Left a (Nothing, Just (b, _)) -> Just . Left $ Right b (Nothing, Nothing) -> Nothing shorter c1@(Chunk xs) c2@(Chunk ys) | length xs < length ys = c1 | otherwise = c2 shorter e@(EOF _) _ = e shorter _ e@(EOF _) = e {-# INLINE zipStreams #-} zipStreams3 :: Monad m => Iteratee [el] m a -> Iteratee [el] m b -> Iteratee [el] m c -> Iteratee [el] m (a, b, c) zipStreams3 a b c = zipStreams a (zipStreams b c) >>= \(r1, (r2, r3)) -> return (r1, r2, r3) {-# INLINE zipStreams3 #-} zipStreams4 :: Monad m => Iteratee [el] m a -> Iteratee [el] m b -> Iteratee [el] m c -> Iteratee [el] m d -> Iteratee [el] m (a, b, c, d) zipStreams4 a b c d = zipStreams a (zipStreams3 b c d) >>= \(r1, (r2, r3, r4)) -> return (r1, r2, r3, r4) {-# INLINE zipStreams4 #-} zipStreams5 :: Monad m => Iteratee [el] m a -> Iteratee [el] m b -> Iteratee [el] m c -> Iteratee [el] m d -> Iteratee [el] m e -> Iteratee [el] m (a, b, c, d, e) zipStreams5 a b c d e = zipStreams a (zipStreams4 b c d e) >>= \(r1, (r2, r3, r4, r5)) -> return (r1, r2, r3, r4, r5) {-# INLINE zipStreams5 #-} -- | Enumerate over two iteratees in parallel as long as the first iteratee -- is still consuming input. The second iteratee will be terminated with EOF -- when the first iteratee has completed. An example use is to determine -- how many elements an iteratee has consumed: -- -- > snd <$> enumWith (dropWhile (<5)) length -- -- Compare to @zipStreams@ enumWith :: Monad m => Iteratee [el] m a -> Iteratee [el] m b -> Iteratee [el] m (a, b) enumWith i1 i2 = do -- as with zipStreams, first check to see if the initial iteratee is complete, -- otherwise data would be dropped. -- running the second iteratee as well to prevent a monadic effect mismatch -- although I think that would be highly unlikely to happen in common -- code (a', x') <- lift $ runIter i1 od oc (_, y') <- lift $ runIter i2 od oc case a' of Just (a, s) -> flip idone s =<< lift (liftM ((,) a) $ run i2) Nothing -> go x' y' where od a s = return (Just (a, s), idone a s) oc k e = return (Nothing , icont k e) getUsed xs (Chunk ys) = take (length xs - length ys) xs getUsed xs (EOF _) = xs go x y = liftI step where step (Chunk xs) | nullC xs = liftI step step (Chunk xs) = do (a', x') <- lift $ (\i -> runIter i od oc) =<< enumPure1Chunk xs x case a' of Just (a, s) -> do b <- lift $ run =<< enumPure1Chunk (getUsed xs s) y idone (a, b) s Nothing -> lift (enumPure1Chunk xs y) >>= go x' step (EOF err) = joinIM $ case err of Nothing -> (liftM2.liftM2) (,) (enumEof x) (enumEof y) Just e -> (liftM2.liftM2) (,) (enumErr e x) (enumErr e y) {-# INLINE enumWith #-} -- |Enumerate a list of iteratees over a single stream simultaneously -- and discard the results. This is a different behavior than Prelude's -- sequence_ which runs iteratees in the list one after the other. -- -- Compare to @Prelude.sequence_@. sequenceStreams_ :: Monad m => [Iteratee [el] m a] -> Iteratee [el] m () sequenceStreams_ = self where self is = liftI step where step (Chunk xs) | null xs = liftI step step s@(Chunk _) = do -- give a chunk to each iteratee is' <- lift $ mapM (enumChunk s) is -- filter done iteratees (done, notDone) <- lift $ partition fst `liftM` mapM enumCheckIfDone is' if null notDone then idone () <=< remainingStream $ map snd done else self $ map snd notDone step s@(EOF _) = do s' <- remainingStream <=< lift $ mapM (enumChunk s) is case s' of EOF (Just e) -> throwErr e _ -> idone () s' -- returns the unconsumed part of the stream; "sequenceStreams_ is" consumes as -- much of the stream as the iteratee in is that consumes the most; e.g. -- sequenceStreams_ [I.head, I.last] consumes whole stream remainingStream :: Monad m => [Iteratee [el] m a] -> Iteratee [el] m (Stream [el]) remainingStream is = lift $ return . foldl1 shorter <=< mapM (\i -> runIter i od oc) $ is where od _ s = return s oc _ e = return $ case e of Nothing -> mempty _ -> EOF e -- return the shorter one of two streams; errors are propagated with the -- priority given to the "left" shorter c1@(Chunk xs) c2@(Chunk ys) | length xs < length ys = c1 | otherwise = c2 shorter (EOF e1 ) (EOF e2 ) = EOF (e1 `mplus` e2) shorter e@(EOF _) _ = e shorter _ e@(EOF _) = e -- |Transform an iteratee into one that keeps track of how much data it -- consumes. countConsumed :: (Monad m, Integral n) => Iteratee [el] m a -> Iteratee [el] m (a, n) countConsumed i = go 0 (const i) (Chunk emptyP) where go !n f str@(EOF _) = flip (,) n `liftM` f str go !n f str@(Chunk c) = Iteratee rI where newLen = n + fromIntegral (length c) rI od oc = runIter (f str) onDone onCont where onDone a str'@(Chunk c') = od (a, newLen - fromIntegral (length c')) str' onDone a str'@(EOF _) = od (a, n) str' onCont f' = oc (go newLen f') {-# INLINE countConsumed #-} -- ------------------------------------------------------------------------ -- Enumerators -- |The pure n-chunk enumerator -- It passes a given stream of elements to the iteratee in @n@-sized chunks. enumPureNChunk :: Monad m => [el] -> Int -> Enumerator [el] m a enumPureNChunk str n iter | null str = return iter | n > 0 = enum' str iter | otherwise = error $ "enumPureNChunk called with n==" ++ show n where enum' str' iter' | null str' = return iter' | otherwise = let (s1, s2) = splitAt n str' on_cont k Nothing = enum' s2 . k $ Chunk s1 on_cont k e = return $ icont k e in runIter iter' idoneM on_cont {-# INLINE enumPureNChunk #-} -- ------------------------------------------------------------------------ -- Monadic functions -- | Maps a monadic function over the elements of the stream and ignores -- the result. mapStreamM_ :: Monad m => (el -> m b) -> Iteratee [el] m () mapStreamM_ = mapChunksM_ . mapM_ {-# INLINE mapStreamM_ #-} -- | Maps a monadic function over an 'Iteratee'. mapStreamM :: Monad m => (el -> m el') -> Enumeratee [el] [el'] m a mapStreamM = mapChunksM . mapM {-# INLINE mapStreamM #-} -- | Folds a monadic function over an 'Iteratee'. foldStreamM :: Monad m => (b -> a -> m b) -> b -> Iteratee [a] m b foldStreamM = foldChunksM . foldM {-# INLINE foldStreamM #-}