{-# LANGUAGE CPP #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE ViewPatterns #-}
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE TypeFamilies #-}
-- | These are stream fusion versions of some of the functions in
-- "Data.Conduit.Combinators".  Many functions don't have stream
-- versions here because instead they have @RULES@ which inline a
-- definition that fuses.
module Data.Conduit.Combinators.Stream
  ( yieldManyS
  , repeatMS
  , repeatWhileMS
  , foldl1S
  , allS
  , anyS
  , sinkLazyS
  , sinkVectorS
  , sinkVectorNS
  , sinkLazyBuilderS
  , lastS
  , lastES
  , findS
  , concatMapS
  , concatMapMS
  , concatS
  , scanlS
  , scanlMS
  , mapAccumWhileS
  , mapAccumWhileMS
  , intersperseS
  , slidingWindowS
  , filterMS
  , splitOnUnboundedES
  , initReplicateS
  , initRepeatS
  )
  where

-- BEGIN IMPORTS

import           Control.Monad (liftM)
import           Control.Monad.Primitive (PrimMonad)
import qualified Data.ByteString.Lazy as BL
import           Data.ByteString.Builder (Builder, toLazyByteString)
import           Data.Conduit.Internal.Fusion
import           Data.Conduit.Internal.List.Stream (foldS)
import           Data.Maybe (isNothing, isJust)
import           Data.MonoTraversable
#if ! MIN_VERSION_base(4,8,0)
import           Data.Monoid (Monoid (..))
#endif
import qualified Data.NonNull as NonNull
import qualified Data.Sequences as Seq
import qualified Data.Vector.Generic as V
import qualified Data.Vector.Generic.Mutable as VM
import           Prelude

#if MIN_VERSION_mono_traversable(1,0,0)
import           Data.Sequences (LazySequence (..))
#else
import           Data.Sequences.Lazy
#endif

-- END IMPORTS

yieldManyS :: (Monad m, MonoFoldable mono)
            => mono
            -> StreamProducer m (Element mono)
yieldManyS :: mono -> StreamProducer m (Element mono)
yieldManyS mono
mono Stream m i ()
_ =
    ([Element mono] -> m (Step [Element mono] (Element mono) ()))
-> m [Element mono] -> Stream m (Element mono) ()
forall (m :: * -> *) o r s.
(s -> m (Step s o r)) -> m s -> Stream m o r
Stream (Step [Element mono] (Element mono) ()
-> m (Step [Element mono] (Element mono) ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Step [Element mono] (Element mono) ()
 -> m (Step [Element mono] (Element mono) ()))
-> ([Element mono] -> Step [Element mono] (Element mono) ())
-> [Element mono]
-> m (Step [Element mono] (Element mono) ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Element mono] -> Step [Element mono] (Element mono) ()
forall o. [o] -> Step [o] o ()
step) ([Element mono] -> m [Element mono]
forall (m :: * -> *) a. Monad m => a -> m a
return (mono -> [Element mono]
forall mono. MonoFoldable mono => mono -> [Element mono]
otoList mono
mono))
  where
    step :: [o] -> Step [o] o ()
step [] = () -> Step [o] o ()
forall s o r. r -> Step s o r
Stop ()
    step (o
x:[o]
xs) = [o] -> o -> Step [o] o ()
forall s o r. s -> o -> Step s o r
Emit [o]
xs o
x
{-# INLINE yieldManyS #-}

repeatMS :: Monad m
         => m a
         -> StreamProducer m a
repeatMS :: m a -> StreamProducer m a
repeatMS m a
m Stream m i ()
_ =
    (() -> m (Step () a ())) -> m () -> Stream m a ()
forall (m :: * -> *) o r s.
(s -> m (Step s o r)) -> m s -> Stream m o r
Stream () -> m (Step () a ())
step (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
  where
    step :: () -> m (Step () a ())
step ()
_ = (a -> Step () a ()) -> m a -> m (Step () a ())
forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM (() -> a -> Step () a ()
forall s o r. s -> o -> Step s o r
Emit ()) m a
m
{-# INLINE repeatMS #-}

repeatWhileMS :: Monad m
              => m a
              -> (a -> Bool)
              -> StreamProducer m a
repeatWhileMS :: m a -> (a -> Bool) -> StreamProducer m a
repeatWhileMS m a
m a -> Bool
f Stream m i ()
_ =
    (() -> m (Step () a ())) -> m () -> Stream m a ()
forall (m :: * -> *) o r s.
(s -> m (Step s o r)) -> m s -> Stream m o r
Stream () -> m (Step () a ())
step (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
  where
    step :: () -> m (Step () a ())
step ()
_ = do
        a
x <- m a
m
        Step () a () -> m (Step () a ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Step () a () -> m (Step () a ()))
-> Step () a () -> m (Step () a ())
forall a b. (a -> b) -> a -> b
$ if a -> Bool
f a
x
            then () -> a -> Step () a ()
forall s o r. s -> o -> Step s o r
Emit () a
x
            else () -> Step () a ()
forall s o r. r -> Step s o r
Stop ()
{-# INLINE repeatWhileMS #-}

foldl1S :: Monad m
        => (a -> a -> a)
        -> StreamConsumer a m (Maybe a)
foldl1S :: (a -> a -> a) -> StreamConsumer a m (Maybe a)
foldl1S a -> a -> a
f (Stream s -> m (Step s a ())
step m s
ms0) =
    ((Maybe a, s) -> m (Step (Maybe a, s) o (Maybe a)))
-> m (Maybe a, s) -> Stream m o (Maybe a)
forall (m :: * -> *) o r s.
(s -> m (Step s o r)) -> m s -> Stream m o r
Stream (Maybe a, s) -> m (Step (Maybe a, s) o (Maybe a))
step' ((s -> (Maybe a, s)) -> m s -> m (Maybe a, s)
forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM (Maybe a
forall a. Maybe a
Nothing, ) m s
ms0)
  where
    step' :: (Maybe a, s) -> m (Step (Maybe a, s) o (Maybe a))
step' (Maybe a
mprev, s
s) = do
        Step s a ()
res <- s -> m (Step s a ())
step s
s
        Step (Maybe a, s) o (Maybe a) -> m (Step (Maybe a, s) o (Maybe a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe a, s) o (Maybe a)
 -> m (Step (Maybe a, s) o (Maybe a)))
-> Step (Maybe a, s) o (Maybe a)
-> m (Step (Maybe a, s) o (Maybe a))
forall a b. (a -> b) -> a -> b
$ case Step s a ()
res of
            Stop () -> Maybe a -> Step (Maybe a, s) o (Maybe a)
forall s o r. r -> Step s o r
Stop Maybe a
mprev
            Skip s
s' -> (Maybe a, s) -> Step (Maybe a, s) o (Maybe a)
forall s o r. s -> Step s o r
Skip (Maybe a
mprev, s
s')
            Emit s
s' a
a -> (Maybe a, s) -> Step (Maybe a, s) o (Maybe a)
forall s o r. s -> Step s o r
Skip (a -> Maybe a
forall a. a -> Maybe a
Just (a -> Maybe a) -> a -> Maybe a
forall a b. (a -> b) -> a -> b
$ a -> (a -> a) -> Maybe a -> a
forall b a. b -> (a -> b) -> Maybe a -> b
maybe a
a (a -> a -> a
`f` a
a) Maybe a
mprev, s
s')
{-# INLINE foldl1S #-}

allS :: Monad m
     => (a -> Bool)
     -> StreamConsumer a m Bool
allS :: (a -> Bool) -> StreamConsumer a m Bool
allS a -> Bool
f = (Maybe a -> Bool)
-> StreamConduitT a o m (Maybe a) -> StreamConduitT a o m Bool
forall (m :: * -> *) a b i o.
Monad m =>
(a -> b) -> StreamConduitT i o m a -> StreamConduitT i o m b
fmapS Maybe a -> Bool
forall a. Maybe a -> Bool
isNothing ((a -> Bool) -> StreamConsumer a m (Maybe a)
forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> StreamConsumer a m (Maybe a)
findS (Bool -> Bool
Prelude.not (Bool -> Bool) -> (a -> Bool) -> a -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Bool
f))
{-# INLINE allS #-}

anyS :: Monad m
     => (a -> Bool)
     -> StreamConsumer a m Bool
anyS :: (a -> Bool) -> StreamConsumer a m Bool
anyS a -> Bool
f = (Maybe a -> Bool)
-> StreamConduitT a o m (Maybe a) -> StreamConduitT a o m Bool
forall (m :: * -> *) a b i o.
Monad m =>
(a -> b) -> StreamConduitT i o m a -> StreamConduitT i o m b
fmapS Maybe a -> Bool
forall a. Maybe a -> Bool
isJust ((a -> Bool) -> StreamConsumer a m (Maybe a)
forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> StreamConsumer a m (Maybe a)
findS a -> Bool
f)
{-# INLINE anyS #-}

--TODO: use a definition like
-- fmapS (fromChunks . ($ [])) <$> CL.fold (\front next -> front . (next:)) id

sinkLazyS :: (Monad m, LazySequence lazy strict)
          => StreamConsumer strict m lazy
sinkLazyS :: StreamConsumer strict m lazy
sinkLazyS = (([strict] -> [strict]) -> lazy)
-> StreamConduitT strict o m ([strict] -> [strict])
-> StreamConduitT strict o m lazy
forall (m :: * -> *) a b i o.
Monad m =>
(a -> b) -> StreamConduitT i o m a -> StreamConduitT i o m b
fmapS ([strict] -> lazy
forall lazy strict. LazySequence lazy strict => [strict] -> lazy
fromChunks ([strict] -> lazy)
-> (([strict] -> [strict]) -> [strict])
-> ([strict] -> [strict])
-> lazy
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (([strict] -> [strict]) -> [strict] -> [strict]
forall a b. (a -> b) -> a -> b
$ [])) (StreamConduitT strict o m ([strict] -> [strict])
 -> StreamConduitT strict o m lazy)
-> StreamConduitT strict o m ([strict] -> [strict])
-> StreamConduitT strict o m lazy
forall a b. (a -> b) -> a -> b
$ (([strict] -> [strict]) -> strict -> [strict] -> [strict])
-> ([strict] -> [strict])
-> StreamConsumer strict m ([strict] -> [strict])
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> StreamConsumer a m b
foldS (\[strict] -> [strict]
front strict
next -> [strict] -> [strict]
front ([strict] -> [strict])
-> ([strict] -> [strict]) -> [strict] -> [strict]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (strict
nextstrict -> [strict] -> [strict]
forall a. a -> [a] -> [a]
:)) [strict] -> [strict]
forall a. a -> a
id
{-# INLINE sinkLazyS #-}

sinkVectorS :: (V.Vector v a, PrimMonad m)
            => StreamConsumer a m (v a)
sinkVectorS :: StreamConsumer a m (v a)
sinkVectorS (Stream s -> m (Step s a ())
step m s
ms0) = do
    ((Int, Int, Mutable v (PrimState m) a, s)
 -> m (Step (Int, Int, Mutable v (PrimState m) a, s) o (v a)))
-> m (Int, Int, Mutable v (PrimState m) a, s) -> Stream m o (v a)
forall (m :: * -> *) o r s.
(s -> m (Step s o r)) -> m s -> Stream m o r
Stream (Int, Int, Mutable v (PrimState m) a, s)
-> m (Step (Int, Int, Mutable v (PrimState m) a, s) o (v a))
step' (m (Int, Int, Mutable v (PrimState m) a, s) -> Stream m o (v a))
-> m (Int, Int, Mutable v (PrimState m) a, s) -> Stream m o (v a)
forall a b. (a -> b) -> a -> b
$ do
        s
s0 <- m s
ms0
        Mutable v (PrimState m) a
mv0 <- Int -> m (Mutable v (PrimState m) a)
forall (m :: * -> *) (v :: * -> * -> *) a.
(PrimMonad m, MVector v a) =>
Int -> m (v (PrimState m) a)
VM.new Int
initSize
        (Int, Int, Mutable v (PrimState m) a, s)
-> m (Int, Int, Mutable v (PrimState m) a, s)
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
initSize, Int
0, Mutable v (PrimState m) a
mv0, s
s0)
  where
    initSize :: Int
initSize = Int
10
    step' :: (Int, Int, Mutable v (PrimState m) a, s)
-> m (Step (Int, Int, Mutable v (PrimState m) a, s) o (v a))
step' (Int
maxSize, Int
i, Mutable v (PrimState m) a
mv, s
s) = do
        Step s a ()
res <- s -> m (Step s a ())
step s
s
        case Step s a ()
res of
            Stop () -> (v a -> Step (Int, Int, Mutable v (PrimState m) a, s) o (v a))
-> m (v a)
-> m (Step (Int, Int, Mutable v (PrimState m) a, s) o (v a))
forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM (v a -> Step (Int, Int, Mutable v (PrimState m) a, s) o (v a)
forall s o r. r -> Step s o r
Stop (v a -> Step (Int, Int, Mutable v (PrimState m) a, s) o (v a))
-> (v a -> v a)
-> v a
-> Step (Int, Int, Mutable v (PrimState m) a, s) o (v a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> Int -> v a -> v a
forall (v :: * -> *) a. Vector v a => Int -> Int -> v a -> v a
V.slice Int
0 Int
i) (m (v a)
 -> m (Step (Int, Int, Mutable v (PrimState m) a, s) o (v a)))
-> m (v a)
-> m (Step (Int, Int, Mutable v (PrimState m) a, s) o (v a))
forall a b. (a -> b) -> a -> b
$ Mutable v (PrimState m) a -> m (v a)
forall (m :: * -> *) (v :: * -> *) a.
(PrimMonad m, Vector v a) =>
Mutable v (PrimState m) a -> m (v a)
V.unsafeFreeze Mutable v (PrimState m) a
mv
            Skip s
s' -> Step (Int, Int, Mutable v (PrimState m) a, s) o (v a)
-> m (Step (Int, Int, Mutable v (PrimState m) a, s) o (v a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Int, Int, Mutable v (PrimState m) a, s) o (v a)
 -> m (Step (Int, Int, Mutable v (PrimState m) a, s) o (v a)))
-> Step (Int, Int, Mutable v (PrimState m) a, s) o (v a)
-> m (Step (Int, Int, Mutable v (PrimState m) a, s) o (v a))
forall a b. (a -> b) -> a -> b
$ (Int, Int, Mutable v (PrimState m) a, s)
-> Step (Int, Int, Mutable v (PrimState m) a, s) o (v a)
forall s o r. s -> Step s o r
Skip (Int
maxSize, Int
i, Mutable v (PrimState m) a
mv, s
s')
            Emit s
s' a
x -> do
                Mutable v (PrimState m) a -> Int -> a -> m ()
forall (m :: * -> *) (v :: * -> * -> *) a.
(PrimMonad m, MVector v a) =>
v (PrimState m) a -> Int -> a -> m ()
VM.write Mutable v (PrimState m) a
mv Int
i a
x
                let i' :: Int
i' = Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
                if Int
i' Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
maxSize
                    then do
                        let newMax :: Int
newMax = Int
maxSize Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
2
                        Mutable v (PrimState m) a
mv' <- Mutable v (PrimState m) a -> Int -> m (Mutable v (PrimState m) a)
forall (m :: * -> *) (v :: * -> * -> *) a.
(PrimMonad m, MVector v a) =>
v (PrimState m) a -> Int -> m (v (PrimState m) a)
VM.grow Mutable v (PrimState m) a
mv Int
maxSize
                        Step (Int, Int, Mutable v (PrimState m) a, s) o (v a)
-> m (Step (Int, Int, Mutable v (PrimState m) a, s) o (v a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Int, Int, Mutable v (PrimState m) a, s) o (v a)
 -> m (Step (Int, Int, Mutable v (PrimState m) a, s) o (v a)))
-> Step (Int, Int, Mutable v (PrimState m) a, s) o (v a)
-> m (Step (Int, Int, Mutable v (PrimState m) a, s) o (v a))
forall a b. (a -> b) -> a -> b
$ (Int, Int, Mutable v (PrimState m) a, s)
-> Step (Int, Int, Mutable v (PrimState m) a, s) o (v a)
forall s o r. s -> Step s o r
Skip (Int
newMax, Int
i', Mutable v (PrimState m) a
mv', s
s')
                    else Step (Int, Int, Mutable v (PrimState m) a, s) o (v a)
-> m (Step (Int, Int, Mutable v (PrimState m) a, s) o (v a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Int, Int, Mutable v (PrimState m) a, s) o (v a)
 -> m (Step (Int, Int, Mutable v (PrimState m) a, s) o (v a)))
-> Step (Int, Int, Mutable v (PrimState m) a, s) o (v a)
-> m (Step (Int, Int, Mutable v (PrimState m) a, s) o (v a))
forall a b. (a -> b) -> a -> b
$ (Int, Int, Mutable v (PrimState m) a, s)
-> Step (Int, Int, Mutable v (PrimState m) a, s) o (v a)
forall s o r. s -> Step s o r
Skip (Int
maxSize, Int
i', Mutable v (PrimState m) a
mv, s
s')
{-# INLINE sinkVectorS #-}

sinkVectorNS :: (V.Vector v a, PrimMonad m)
             => Int -- ^ maximum allowed size
             -> StreamConsumer a m (v a)
sinkVectorNS :: Int -> StreamConsumer a m (v a)
sinkVectorNS Int
maxSize (Stream s -> m (Step s a ())
step m s
ms0) = do
    ((Int, Mutable v (PrimState m) a, s)
 -> m (Step (Int, Mutable v (PrimState m) a, s) o (v a)))
-> m (Int, Mutable v (PrimState m) a, s) -> Stream m o (v a)
forall (m :: * -> *) o r s.
(s -> m (Step s o r)) -> m s -> Stream m o r
Stream (Int, Mutable v (PrimState m) a, s)
-> m (Step (Int, Mutable v (PrimState m) a, s) o (v a))
step' (m (Int, Mutable v (PrimState m) a, s) -> Stream m o (v a))
-> m (Int, Mutable v (PrimState m) a, s) -> Stream m o (v a)
forall a b. (a -> b) -> a -> b
$ do
        s
s0 <- m s
ms0
        Mutable v (PrimState m) a
mv0 <- Int -> m (Mutable v (PrimState m) a)
forall (m :: * -> *) (v :: * -> * -> *) a.
(PrimMonad m, MVector v a) =>
Int -> m (v (PrimState m) a)
VM.new Int
maxSize
        (Int, Mutable v (PrimState m) a, s)
-> m (Int, Mutable v (PrimState m) a, s)
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
0, Mutable v (PrimState m) a
mv0, s
s0)
  where
    step' :: (Int, Mutable v (PrimState m) a, s)
-> m (Step (Int, Mutable v (PrimState m) a, s) o (v a))
step' (Int
i, Mutable v (PrimState m) a
mv, s
_) | Int
i Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
maxSize = (v a -> Step (Int, Mutable v (PrimState m) a, s) o (v a))
-> m (v a) -> m (Step (Int, Mutable v (PrimState m) a, s) o (v a))
forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM v a -> Step (Int, Mutable v (PrimState m) a, s) o (v a)
forall s o r. r -> Step s o r
Stop (m (v a) -> m (Step (Int, Mutable v (PrimState m) a, s) o (v a)))
-> m (v a) -> m (Step (Int, Mutable v (PrimState m) a, s) o (v a))
forall a b. (a -> b) -> a -> b
$ Mutable v (PrimState m) a -> m (v a)
forall (m :: * -> *) (v :: * -> *) a.
(PrimMonad m, Vector v a) =>
Mutable v (PrimState m) a -> m (v a)
V.unsafeFreeze Mutable v (PrimState m) a
mv
    step' (Int
i, Mutable v (PrimState m) a
mv, s
s) = do
        Step s a ()
res <- s -> m (Step s a ())
step s
s
        case Step s a ()
res of
            Stop () -> (v a -> Step (Int, Mutable v (PrimState m) a, s) o (v a))
-> m (v a) -> m (Step (Int, Mutable v (PrimState m) a, s) o (v a))
forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM (v a -> Step (Int, Mutable v (PrimState m) a, s) o (v a)
forall s o r. r -> Step s o r
Stop (v a -> Step (Int, Mutable v (PrimState m) a, s) o (v a))
-> (v a -> v a)
-> v a
-> Step (Int, Mutable v (PrimState m) a, s) o (v a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> Int -> v a -> v a
forall (v :: * -> *) a. Vector v a => Int -> Int -> v a -> v a
V.slice Int
0 Int
i) (m (v a) -> m (Step (Int, Mutable v (PrimState m) a, s) o (v a)))
-> m (v a) -> m (Step (Int, Mutable v (PrimState m) a, s) o (v a))
forall a b. (a -> b) -> a -> b
$ Mutable v (PrimState m) a -> m (v a)
forall (m :: * -> *) (v :: * -> *) a.
(PrimMonad m, Vector v a) =>
Mutable v (PrimState m) a -> m (v a)
V.unsafeFreeze Mutable v (PrimState m) a
mv
            Skip s
s' -> Step (Int, Mutable v (PrimState m) a, s) o (v a)
-> m (Step (Int, Mutable v (PrimState m) a, s) o (v a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Int, Mutable v (PrimState m) a, s) o (v a)
 -> m (Step (Int, Mutable v (PrimState m) a, s) o (v a)))
-> Step (Int, Mutable v (PrimState m) a, s) o (v a)
-> m (Step (Int, Mutable v (PrimState m) a, s) o (v a))
forall a b. (a -> b) -> a -> b
$ (Int, Mutable v (PrimState m) a, s)
-> Step (Int, Mutable v (PrimState m) a, s) o (v a)
forall s o r. s -> Step s o r
Skip (Int
i, Mutable v (PrimState m) a
mv, s
s')
            Emit s
s' a
x -> do
                Mutable v (PrimState m) a -> Int -> a -> m ()
forall (m :: * -> *) (v :: * -> * -> *) a.
(PrimMonad m, MVector v a) =>
v (PrimState m) a -> Int -> a -> m ()
VM.write Mutable v (PrimState m) a
mv Int
i a
x
                let i' :: Int
i' = Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
                Step (Int, Mutable v (PrimState m) a, s) o (v a)
-> m (Step (Int, Mutable v (PrimState m) a, s) o (v a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Int, Mutable v (PrimState m) a, s) o (v a)
 -> m (Step (Int, Mutable v (PrimState m) a, s) o (v a)))
-> Step (Int, Mutable v (PrimState m) a, s) o (v a)
-> m (Step (Int, Mutable v (PrimState m) a, s) o (v a))
forall a b. (a -> b) -> a -> b
$ (Int, Mutable v (PrimState m) a, s)
-> Step (Int, Mutable v (PrimState m) a, s) o (v a)
forall s o r. s -> Step s o r
Skip (Int
i', Mutable v (PrimState m) a
mv, s
s')
{-# INLINE sinkVectorNS #-}

sinkLazyBuilderS :: Monad m => StreamConsumer Builder m BL.ByteString
sinkLazyBuilderS :: StreamConsumer Builder m ByteString
sinkLazyBuilderS = (Builder -> ByteString)
-> StreamConduitT Builder o m Builder
-> StreamConduitT Builder o m ByteString
forall (m :: * -> *) a b i o.
Monad m =>
(a -> b) -> StreamConduitT i o m a -> StreamConduitT i o m b
fmapS Builder -> ByteString
toLazyByteString ((Builder -> Builder -> Builder)
-> Builder -> StreamConsumer Builder m Builder
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> StreamConsumer a m b
foldS Builder -> Builder -> Builder
forall a. Monoid a => a -> a -> a
mappend Builder
forall a. Monoid a => a
mempty)
{-# INLINE sinkLazyBuilderS #-}

lastS :: Monad m
      => StreamConsumer a m (Maybe a)
lastS :: StreamConsumer a m (Maybe a)
lastS (Stream s -> m (Step s a ())
step m s
ms0) =
    ((Maybe a, s) -> m (Step (Maybe a, s) o (Maybe a)))
-> m (Maybe a, s) -> Stream m o (Maybe a)
forall (m :: * -> *) o r s.
(s -> m (Step s o r)) -> m s -> Stream m o r
Stream (Maybe a, s) -> m (Step (Maybe a, s) o (Maybe a))
step' ((s -> (Maybe a, s)) -> m s -> m (Maybe a, s)
forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM (Maybe a
forall a. Maybe a
Nothing,) m s
ms0)
  where
    step' :: (Maybe a, s) -> m (Step (Maybe a, s) o (Maybe a))
step' (Maybe a
mlast, s
s) = do
        Step s a ()
res <- s -> m (Step s a ())
step s
s
        Step (Maybe a, s) o (Maybe a) -> m (Step (Maybe a, s) o (Maybe a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe a, s) o (Maybe a)
 -> m (Step (Maybe a, s) o (Maybe a)))
-> Step (Maybe a, s) o (Maybe a)
-> m (Step (Maybe a, s) o (Maybe a))
forall a b. (a -> b) -> a -> b
$ case Step s a ()
res of
            Stop () -> Maybe a -> Step (Maybe a, s) o (Maybe a)
forall s o r. r -> Step s o r
Stop Maybe a
mlast
            Skip s
s' -> (Maybe a, s) -> Step (Maybe a, s) o (Maybe a)
forall s o r. s -> Step s o r
Skip (Maybe a
mlast, s
s')
            Emit s
s' a
x -> (Maybe a, s) -> Step (Maybe a, s) o (Maybe a)
forall s o r. s -> Step s o r
Skip (a -> Maybe a
forall a. a -> Maybe a
Just a
x, s
s')
{-# INLINE lastS #-}

lastES :: (Monad m, Seq.IsSequence seq)
       => StreamConsumer seq m (Maybe (Element seq))
lastES :: StreamConsumer seq m (Maybe (Element seq))
lastES (Stream s -> m (Step s seq ())
step m s
ms0) =
    ((Maybe (NonNull seq), s)
 -> m (Step (Maybe (NonNull seq), s) o (Maybe (Element seq))))
-> m (Maybe (NonNull seq), s) -> Stream m o (Maybe (Element seq))
forall (m :: * -> *) o r s.
(s -> m (Step s o r)) -> m s -> Stream m o r
Stream (Maybe (NonNull seq), s)
-> m (Step (Maybe (NonNull seq), s) o (Maybe (Element seq)))
step' ((s -> (Maybe (NonNull seq), s))
-> m s -> m (Maybe (NonNull seq), s)
forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM (Maybe (NonNull seq)
forall a. Maybe a
Nothing, ) m s
ms0)
  where
    step' :: (Maybe (NonNull seq), s)
-> m (Step (Maybe (NonNull seq), s) o (Maybe (Element seq)))
step' (Maybe (NonNull seq)
mlast, s
s) = do
        Step s seq ()
res <- s -> m (Step s seq ())
step s
s
        Step (Maybe (NonNull seq), s) o (Maybe (Element seq))
-> m (Step (Maybe (NonNull seq), s) o (Maybe (Element seq)))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (NonNull seq), s) o (Maybe (Element seq))
 -> m (Step (Maybe (NonNull seq), s) o (Maybe (Element seq))))
-> Step (Maybe (NonNull seq), s) o (Maybe (Element seq))
-> m (Step (Maybe (NonNull seq), s) o (Maybe (Element seq)))
forall a b. (a -> b) -> a -> b
$ case Step s seq ()
res of
            Stop () -> Maybe (Element seq)
-> Step (Maybe (NonNull seq), s) o (Maybe (Element seq))
forall s o r. r -> Step s o r
Stop ((NonNull seq -> Element seq)
-> Maybe (NonNull seq) -> Maybe (Element seq)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap NonNull seq -> Element seq
forall mono. MonoFoldable mono => NonNull mono -> Element mono
NonNull.last Maybe (NonNull seq)
mlast)
            Skip s
s' -> (Maybe (NonNull seq), s)
-> Step (Maybe (NonNull seq), s) o (Maybe (Element seq))
forall s o r. s -> Step s o r
Skip (Maybe (NonNull seq)
mlast, s
s')
            Emit s
s' (seq -> Maybe (NonNull seq)
forall mono. MonoFoldable mono => mono -> Maybe (NonNull mono)
NonNull.fromNullable -> mlast' :: Maybe (NonNull seq)
mlast'@(Just NonNull seq
_)) -> (Maybe (NonNull seq), s)
-> Step (Maybe (NonNull seq), s) o (Maybe (Element seq))
forall s o r. s -> Step s o r
Skip (Maybe (NonNull seq)
mlast', s
s')
            Emit s
s' seq
_ -> (Maybe (NonNull seq), s)
-> Step (Maybe (NonNull seq), s) o (Maybe (Element seq))
forall s o r. s -> Step s o r
Skip (Maybe (NonNull seq)
mlast, s
s')
{-# INLINE lastES #-}

findS :: Monad m
      => (a -> Bool) -> StreamConsumer a m (Maybe a)
findS :: (a -> Bool) -> StreamConsumer a m (Maybe a)
findS a -> Bool
f (Stream s -> m (Step s a ())
step m s
ms0) =
    (s -> m (Step s o (Maybe a))) -> m s -> Stream m o (Maybe a)
forall (m :: * -> *) o r s.
(s -> m (Step s o r)) -> m s -> Stream m o r
Stream s -> m (Step s o (Maybe a))
step' m s
ms0
  where
    step' :: s -> m (Step s o (Maybe a))
step' s
s = do
      Step s a ()
res <- s -> m (Step s a ())
step s
s
      Step s o (Maybe a) -> m (Step s o (Maybe a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s o (Maybe a) -> m (Step s o (Maybe a)))
-> Step s o (Maybe a) -> m (Step s o (Maybe a))
forall a b. (a -> b) -> a -> b
$ case Step s a ()
res of
          Stop () -> Maybe a -> Step s o (Maybe a)
forall s o r. r -> Step s o r
Stop Maybe a
forall a. Maybe a
Nothing
          Skip s
s' -> s -> Step s o (Maybe a)
forall s o r. s -> Step s o r
Skip s
s'
          Emit s
s' a
x ->
              if a -> Bool
f a
x
                  then Maybe a -> Step s o (Maybe a)
forall s o r. r -> Step s o r
Stop (a -> Maybe a
forall a. a -> Maybe a
Just a
x)
                  else s -> Step s o (Maybe a)
forall s o r. s -> Step s o r
Skip s
s'
{-# INLINE findS #-}

concatMapS :: (Monad m, MonoFoldable mono)
           => (a -> mono)
           -> StreamConduit a m (Element mono)
concatMapS :: (a -> mono) -> StreamConduit a m (Element mono)
concatMapS a -> mono
f (Stream s -> m (Step s a ())
step m s
ms0) =
    (([Element mono], s)
 -> m (Step ([Element mono], s) (Element mono) ()))
-> m ([Element mono], s) -> Stream m (Element mono) ()
forall (m :: * -> *) o r s.
(s -> m (Step s o r)) -> m s -> Stream m o r
Stream ([Element mono], s)
-> m (Step ([Element mono], s) (Element mono) ())
step' ((s -> ([Element mono], s)) -> m s -> m ([Element mono], s)
forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM ([], ) m s
ms0)
  where
    step' :: ([Element mono], s)
-> m (Step ([Element mono], s) (Element mono) ())
step' ([], s
s) = do
        Step s a ()
res <- s -> m (Step s a ())
step s
s
        Step ([Element mono], s) (Element mono) ()
-> m (Step ([Element mono], s) (Element mono) ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Step ([Element mono], s) (Element mono) ()
 -> m (Step ([Element mono], s) (Element mono) ()))
-> Step ([Element mono], s) (Element mono) ()
-> m (Step ([Element mono], s) (Element mono) ())
forall a b. (a -> b) -> a -> b
$ case Step s a ()
res of
            Stop () -> () -> Step ([Element mono], s) (Element mono) ()
forall s o r. r -> Step s o r
Stop ()
            Skip s
s' -> ([Element mono], s) -> Step ([Element mono], s) (Element mono) ()
forall s o r. s -> Step s o r
Skip ([], s
s')
            Emit s
s' a
x -> ([Element mono], s) -> Step ([Element mono], s) (Element mono) ()
forall s o r. s -> Step s o r
Skip (mono -> [Element mono]
forall mono. MonoFoldable mono => mono -> [Element mono]
otoList (a -> mono
f a
x), s
s')
    step' ((Element mono
x:[Element mono]
xs), s
s) = Step ([Element mono], s) (Element mono) ()
-> m (Step ([Element mono], s) (Element mono) ())
forall (m :: * -> *) a. Monad m => a -> m a
return (([Element mono], s)
-> Element mono -> Step ([Element mono], s) (Element mono) ()
forall s o r. s -> o -> Step s o r
Emit ([Element mono]
xs, s
s) Element mono
x)
{-# INLINE concatMapS #-}

concatMapMS :: (Monad m, MonoFoldable mono)
             => (a -> m mono)
             -> StreamConduit a m (Element mono)
concatMapMS :: (a -> m mono) -> StreamConduit a m (Element mono)
concatMapMS a -> m mono
f (Stream s -> m (Step s a ())
step m s
ms0) =
    (([Element mono], s)
 -> m (Step ([Element mono], s) (Element mono) ()))
-> m ([Element mono], s) -> Stream m (Element mono) ()
forall (m :: * -> *) o r s.
(s -> m (Step s o r)) -> m s -> Stream m o r
Stream ([Element mono], s)
-> m (Step ([Element mono], s) (Element mono) ())
step' ((s -> ([Element mono], s)) -> m s -> m ([Element mono], s)
forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM ([], ) m s
ms0)
  where
    step' :: ([Element mono], s)
-> m (Step ([Element mono], s) (Element mono) ())
step' ([], s
s) = do
        Step s a ()
res <- s -> m (Step s a ())
step s
s
        case Step s a ()
res of
            Stop () -> Step ([Element mono], s) (Element mono) ()
-> m (Step ([Element mono], s) (Element mono) ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Step ([Element mono], s) (Element mono) ()
 -> m (Step ([Element mono], s) (Element mono) ()))
-> Step ([Element mono], s) (Element mono) ()
-> m (Step ([Element mono], s) (Element mono) ())
forall a b. (a -> b) -> a -> b
$ () -> Step ([Element mono], s) (Element mono) ()
forall s o r. r -> Step s o r
Stop ()
            Skip s
s' -> Step ([Element mono], s) (Element mono) ()
-> m (Step ([Element mono], s) (Element mono) ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Step ([Element mono], s) (Element mono) ()
 -> m (Step ([Element mono], s) (Element mono) ()))
-> Step ([Element mono], s) (Element mono) ()
-> m (Step ([Element mono], s) (Element mono) ())
forall a b. (a -> b) -> a -> b
$ ([Element mono], s) -> Step ([Element mono], s) (Element mono) ()
forall s o r. s -> Step s o r
Skip ([], s
s')
            Emit s
s' a
x -> do
                mono
o <- a -> m mono
f a
x
                Step ([Element mono], s) (Element mono) ()
-> m (Step ([Element mono], s) (Element mono) ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Step ([Element mono], s) (Element mono) ()
 -> m (Step ([Element mono], s) (Element mono) ()))
-> Step ([Element mono], s) (Element mono) ()
-> m (Step ([Element mono], s) (Element mono) ())
forall a b. (a -> b) -> a -> b
$ ([Element mono], s) -> Step ([Element mono], s) (Element mono) ()
forall s o r. s -> Step s o r
Skip (mono -> [Element mono]
forall mono. MonoFoldable mono => mono -> [Element mono]
otoList mono
o, s
s')
    step' ((Element mono
x:[Element mono]
xs), s
s) = Step ([Element mono], s) (Element mono) ()
-> m (Step ([Element mono], s) (Element mono) ())
forall (m :: * -> *) a. Monad m => a -> m a
return (([Element mono], s)
-> Element mono -> Step ([Element mono], s) (Element mono) ()
forall s o r. s -> o -> Step s o r
Emit ([Element mono]
xs, s
s) Element mono
x)
{-# INLINE concatMapMS #-}

concatS :: (Monad m, MonoFoldable mono)
         => StreamConduit mono m (Element mono)
concatS :: StreamConduit mono m (Element mono)
concatS = (mono -> mono) -> StreamConduit mono m (Element mono)
forall (m :: * -> *) mono a.
(Monad m, MonoFoldable mono) =>
(a -> mono) -> StreamConduit a m (Element mono)
concatMapS mono -> mono
forall a. a -> a
id
{-# INLINE concatS #-}

data ScanState a s
    = ScanEnded
    | ScanContinues a s

scanlS :: Monad m => (a -> b -> a) -> a -> StreamConduit b m a
scanlS :: (a -> b -> a) -> a -> StreamConduit b m a
scanlS a -> b -> a
f a
seed0 (Stream s -> m (Step s b ())
step m s
ms0) =
    (ScanState a s -> m (Step (ScanState a s) a ()))
-> m (ScanState a s) -> Stream m a ()
forall (m :: * -> *) o r s.
(s -> m (Step s o r)) -> m s -> Stream m o r
Stream ScanState a s -> m (Step (ScanState a s) a ())
step' ((s -> ScanState a s) -> m s -> m (ScanState a s)
forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM (a -> s -> ScanState a s
forall a s. a -> s -> ScanState a s
ScanContinues a
seed0) m s
ms0)
  where
    step' :: ScanState a s -> m (Step (ScanState a s) a ())
step' ScanState a s
ScanEnded = Step (ScanState a s) a () -> m (Step (ScanState a s) a ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (ScanState a s) a () -> m (Step (ScanState a s) a ()))
-> Step (ScanState a s) a () -> m (Step (ScanState a s) a ())
forall a b. (a -> b) -> a -> b
$ () -> Step (ScanState a s) a ()
forall s o r. r -> Step s o r
Stop ()
    step' (ScanContinues a
seed s
s) = do
        Step s b ()
res <- s -> m (Step s b ())
step s
s
        Step (ScanState a s) a () -> m (Step (ScanState a s) a ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (ScanState a s) a () -> m (Step (ScanState a s) a ()))
-> Step (ScanState a s) a () -> m (Step (ScanState a s) a ())
forall a b. (a -> b) -> a -> b
$ case Step s b ()
res of
            Stop () -> ScanState a s -> a -> Step (ScanState a s) a ()
forall s o r. s -> o -> Step s o r
Emit ScanState a s
forall a s. ScanState a s
ScanEnded a
seed
            Skip s
s' -> ScanState a s -> Step (ScanState a s) a ()
forall s o r. s -> Step s o r
Skip (a -> s -> ScanState a s
forall a s. a -> s -> ScanState a s
ScanContinues a
seed s
s')
            Emit s
s' b
x -> ScanState a s -> a -> Step (ScanState a s) a ()
forall s o r. s -> o -> Step s o r
Emit (a -> s -> ScanState a s
forall a s. a -> s -> ScanState a s
ScanContinues a
seed' s
s') a
seed
              where
                !seed' :: a
seed' = a -> b -> a
f a
seed b
x
{-# INLINE scanlS #-}

scanlMS :: Monad m => (a -> b -> m a) -> a -> StreamConduit b m a
scanlMS :: (a -> b -> m a) -> a -> StreamConduit b m a
scanlMS a -> b -> m a
f a
seed0 (Stream s -> m (Step s b ())
step m s
ms0) =
    (ScanState a s -> m (Step (ScanState a s) a ()))
-> m (ScanState a s) -> Stream m a ()
forall (m :: * -> *) o r s.
(s -> m (Step s o r)) -> m s -> Stream m o r
Stream ScanState a s -> m (Step (ScanState a s) a ())
step' ((s -> ScanState a s) -> m s -> m (ScanState a s)
forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM (a -> s -> ScanState a s
forall a s. a -> s -> ScanState a s
ScanContinues a
seed0) m s
ms0)
  where
    step' :: ScanState a s -> m (Step (ScanState a s) a ())
step' ScanState a s
ScanEnded = Step (ScanState a s) a () -> m (Step (ScanState a s) a ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (ScanState a s) a () -> m (Step (ScanState a s) a ()))
-> Step (ScanState a s) a () -> m (Step (ScanState a s) a ())
forall a b. (a -> b) -> a -> b
$ () -> Step (ScanState a s) a ()
forall s o r. r -> Step s o r
Stop ()
    step' (ScanContinues a
seed s
s) = do
        Step s b ()
res <- s -> m (Step s b ())
step s
s
        case Step s b ()
res of
            Stop () -> Step (ScanState a s) a () -> m (Step (ScanState a s) a ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (ScanState a s) a () -> m (Step (ScanState a s) a ()))
-> Step (ScanState a s) a () -> m (Step (ScanState a s) a ())
forall a b. (a -> b) -> a -> b
$ ScanState a s -> a -> Step (ScanState a s) a ()
forall s o r. s -> o -> Step s o r
Emit ScanState a s
forall a s. ScanState a s
ScanEnded a
seed
            Skip s
s' -> Step (ScanState a s) a () -> m (Step (ScanState a s) a ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (ScanState a s) a () -> m (Step (ScanState a s) a ()))
-> Step (ScanState a s) a () -> m (Step (ScanState a s) a ())
forall a b. (a -> b) -> a -> b
$ ScanState a s -> Step (ScanState a s) a ()
forall s o r. s -> Step s o r
Skip (a -> s -> ScanState a s
forall a s. a -> s -> ScanState a s
ScanContinues a
seed s
s')
            Emit s
s' b
x -> do
                !a
seed' <- a -> b -> m a
f a
seed b
x
                Step (ScanState a s) a () -> m (Step (ScanState a s) a ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (ScanState a s) a () -> m (Step (ScanState a s) a ()))
-> Step (ScanState a s) a () -> m (Step (ScanState a s) a ())
forall a b. (a -> b) -> a -> b
$ ScanState a s -> a -> Step (ScanState a s) a ()
forall s o r. s -> o -> Step s o r
Emit (a -> s -> ScanState a s
forall a s. a -> s -> ScanState a s
ScanContinues a
seed' s
s') a
seed
{-# INLINE scanlMS #-}

mapAccumWhileS :: Monad m =>
    (a -> s -> Either s (s, b)) -> s -> StreamConduitT a b m s
mapAccumWhileS :: (a -> s -> Either s (s, b)) -> s -> StreamConduitT a b m s
mapAccumWhileS a -> s -> Either s (s, b)
f s
initial (Stream s -> m (Step s a ())
step m s
ms0) =
    ((s, s) -> m (Step (s, s) b s)) -> m (s, s) -> Stream m b s
forall (m :: * -> *) o r s.
(s -> m (Step s o r)) -> m s -> Stream m o r
Stream (s, s) -> m (Step (s, s) b s)
step' ((s -> (s, s)) -> m s -> m (s, s)
forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM (s
initial, ) m s
ms0)
  where
    step' :: (s, s) -> m (Step (s, s) b s)
step' (!s
accum, s
s) = do
        Step s a ()
res <- s -> m (Step s a ())
step s
s
        Step (s, s) b s -> m (Step (s, s) b s)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (s, s) b s -> m (Step (s, s) b s))
-> Step (s, s) b s -> m (Step (s, s) b s)
forall a b. (a -> b) -> a -> b
$ case Step s a ()
res of
            Stop () -> s -> Step (s, s) b s
forall s o r. r -> Step s o r
Stop s
accum
            Skip s
s' -> (s, s) -> Step (s, s) b s
forall s o r. s -> Step s o r
Skip (s
accum, s
s')
            Emit s
s' a
x -> case a -> s -> Either s (s, b)
f a
x s
accum of
                Right (!s
accum', b
r) -> (s, s) -> b -> Step (s, s) b s
forall s o r. s -> o -> Step s o r
Emit (s
accum', s
s') b
r
                Left   !s
accum'     -> s -> Step (s, s) b s
forall s o r. r -> Step s o r
Stop s
accum'
{-# INLINE mapAccumWhileS #-}

mapAccumWhileMS :: Monad m =>
    (a -> s -> m (Either s (s, b))) -> s -> StreamConduitT a b m s
mapAccumWhileMS :: (a -> s -> m (Either s (s, b))) -> s -> StreamConduitT a b m s
mapAccumWhileMS a -> s -> m (Either s (s, b))
f s
initial (Stream s -> m (Step s a ())
step m s
ms0) =
    ((s, s) -> m (Step (s, s) b s)) -> m (s, s) -> Stream m b s
forall (m :: * -> *) o r s.
(s -> m (Step s o r)) -> m s -> Stream m o r
Stream (s, s) -> m (Step (s, s) b s)
step' ((s -> (s, s)) -> m s -> m (s, s)
forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM (s
initial, ) m s
ms0)
  where
    step' :: (s, s) -> m (Step (s, s) b s)
step' (!s
accum, s
s) = do
        Step s a ()
res <- s -> m (Step s a ())
step s
s
        case Step s a ()
res of
            Stop () -> Step (s, s) b s -> m (Step (s, s) b s)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (s, s) b s -> m (Step (s, s) b s))
-> Step (s, s) b s -> m (Step (s, s) b s)
forall a b. (a -> b) -> a -> b
$ s -> Step (s, s) b s
forall s o r. r -> Step s o r
Stop s
accum
            Skip s
s' -> Step (s, s) b s -> m (Step (s, s) b s)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (s, s) b s -> m (Step (s, s) b s))
-> Step (s, s) b s -> m (Step (s, s) b s)
forall a b. (a -> b) -> a -> b
$ (s, s) -> Step (s, s) b s
forall s o r. s -> Step s o r
Skip (s
accum, s
s')
            Emit s
s' a
x -> do
                Either s (s, b)
lr <- a -> s -> m (Either s (s, b))
f a
x s
accum
                Step (s, s) b s -> m (Step (s, s) b s)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (s, s) b s -> m (Step (s, s) b s))
-> Step (s, s) b s -> m (Step (s, s) b s)
forall a b. (a -> b) -> a -> b
$ case Either s (s, b)
lr of
                    Right (!s
accum', b
r) -> (s, s) -> b -> Step (s, s) b s
forall s o r. s -> o -> Step s o r
Emit (s
accum', s
s') b
r
                    Left   !s
accum'     -> s -> Step (s, s) b s
forall s o r. r -> Step s o r
Stop s
accum'
{-# INLINE mapAccumWhileMS #-}

data IntersperseState a s
    = IFirstValue s
    | IGotValue s a
    | IEmitValue s a

intersperseS :: Monad m => a -> StreamConduit a m a
intersperseS :: a -> StreamConduit a m a
intersperseS a
sep (Stream s -> m (Step s a ())
step m s
ms0) =
    (IntersperseState a s -> m (Step (IntersperseState a s) a ()))
-> m (IntersperseState a s) -> Stream m a ()
forall (m :: * -> *) o r s.
(s -> m (Step s o r)) -> m s -> Stream m o r
Stream IntersperseState a s -> m (Step (IntersperseState a s) a ())
step' ((s -> IntersperseState a s) -> m s -> m (IntersperseState a s)
forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM s -> IntersperseState a s
forall a s. s -> IntersperseState a s
IFirstValue m s
ms0)
  where
    step' :: IntersperseState a s -> m (Step (IntersperseState a s) a ())
step' (IFirstValue s
s) = do
        Step s a ()
res <- s -> m (Step s a ())
step s
s
        Step (IntersperseState a s) a ()
-> m (Step (IntersperseState a s) a ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (IntersperseState a s) a ()
 -> m (Step (IntersperseState a s) a ()))
-> Step (IntersperseState a s) a ()
-> m (Step (IntersperseState a s) a ())
forall a b. (a -> b) -> a -> b
$ case Step s a ()
res of
            Stop () -> () -> Step (IntersperseState a s) a ()
forall s o r. r -> Step s o r
Stop ()
            Skip s
s' -> IntersperseState a s -> Step (IntersperseState a s) a ()
forall s o r. s -> Step s o r
Skip (s -> IntersperseState a s
forall a s. s -> IntersperseState a s
IFirstValue s
s')
            Emit s
s' a
x -> IntersperseState a s -> a -> Step (IntersperseState a s) a ()
forall s o r. s -> o -> Step s o r
Emit (s -> a -> IntersperseState a s
forall a s. s -> a -> IntersperseState a s
IGotValue s
s' a
x) a
x
    -- Emit the separator once we know it's not the end of the list.
    step' (IGotValue s
s a
x) = do
        Step s a ()
res <- s -> m (Step s a ())
step s
s
        Step (IntersperseState a s) a ()
-> m (Step (IntersperseState a s) a ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (IntersperseState a s) a ()
 -> m (Step (IntersperseState a s) a ()))
-> Step (IntersperseState a s) a ()
-> m (Step (IntersperseState a s) a ())
forall a b. (a -> b) -> a -> b
$ case Step s a ()
res of
            Stop () -> () -> Step (IntersperseState a s) a ()
forall s o r. r -> Step s o r
Stop ()
            Skip s
s' -> IntersperseState a s -> Step (IntersperseState a s) a ()
forall s o r. s -> Step s o r
Skip (s -> a -> IntersperseState a s
forall a s. s -> a -> IntersperseState a s
IGotValue s
s' a
x)
            Emit s
s' a
x' -> IntersperseState a s -> a -> Step (IntersperseState a s) a ()
forall s o r. s -> o -> Step s o r
Emit (s -> a -> IntersperseState a s
forall a s. s -> a -> IntersperseState a s
IEmitValue s
s' a
x') a
sep
    -- We emitted a separator, now emit the value that comes after.
    step' (IEmitValue s
s a
x) = Step (IntersperseState a s) a ()
-> m (Step (IntersperseState a s) a ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (IntersperseState a s) a ()
 -> m (Step (IntersperseState a s) a ()))
-> Step (IntersperseState a s) a ()
-> m (Step (IntersperseState a s) a ())
forall a b. (a -> b) -> a -> b
$ IntersperseState a s -> a -> Step (IntersperseState a s) a ()
forall s o r. s -> o -> Step s o r
Emit (s -> a -> IntersperseState a s
forall a s. s -> a -> IntersperseState a s
IGotValue s
s a
x) a
x
{-# INLINE intersperseS #-}

data SlidingWindowState seq s
    = SWInitial Int seq s
    | SWSliding seq s
    | SWEarlyExit

slidingWindowS :: (Monad m, Seq.IsSequence seq, Element seq ~ a) => Int -> StreamConduit a m seq
slidingWindowS :: Int -> StreamConduit a m seq
slidingWindowS Int
sz (Stream s -> m (Step s a ())
step m s
ms0) =
    (SlidingWindowState seq s
 -> m (Step (SlidingWindowState seq s) seq ()))
-> m (SlidingWindowState seq s) -> Stream m seq ()
forall (m :: * -> *) o r s.
(s -> m (Step s o r)) -> m s -> Stream m o r
Stream SlidingWindowState seq s
-> m (Step (SlidingWindowState seq s) seq ())
step' ((s -> SlidingWindowState seq s)
-> m s -> m (SlidingWindowState seq s)
forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM (Int -> seq -> s -> SlidingWindowState seq s
forall seq s. Int -> seq -> s -> SlidingWindowState seq s
SWInitial (Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
1 Int
sz) seq
forall a. Monoid a => a
mempty) m s
ms0)
  where
    step' :: SlidingWindowState seq s
-> m (Step (SlidingWindowState seq s) seq ())
step' (SWInitial Int
n seq
st s
s) = do
        Step s a ()
res <- s -> m (Step s a ())
step s
s
        Step (SlidingWindowState seq s) seq ()
-> m (Step (SlidingWindowState seq s) seq ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (SlidingWindowState seq s) seq ()
 -> m (Step (SlidingWindowState seq s) seq ()))
-> Step (SlidingWindowState seq s) seq ()
-> m (Step (SlidingWindowState seq s) seq ())
forall a b. (a -> b) -> a -> b
$ case Step s a ()
res of
            Stop () -> SlidingWindowState seq s
-> seq -> Step (SlidingWindowState seq s) seq ()
forall s o r. s -> o -> Step s o r
Emit SlidingWindowState seq s
forall seq s. SlidingWindowState seq s
SWEarlyExit seq
st
            Skip s
s' -> SlidingWindowState seq s -> Step (SlidingWindowState seq s) seq ()
forall s o r. s -> Step s o r
Skip (Int -> seq -> s -> SlidingWindowState seq s
forall seq s. Int -> seq -> s -> SlidingWindowState seq s
SWInitial Int
n seq
st s
s')
            Emit s
s' a
x ->
                if Int
n Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
1
                    then SlidingWindowState seq s
-> seq -> Step (SlidingWindowState seq s) seq ()
forall s o r. s -> o -> Step s o r
Emit (seq -> s -> SlidingWindowState seq s
forall seq s. seq -> s -> SlidingWindowState seq s
SWSliding (seq -> seq
forall seq. IsSequence seq => seq -> seq
Seq.unsafeTail seq
st') s
s') seq
st'
                    else SlidingWindowState seq s -> Step (SlidingWindowState seq s) seq ()
forall s o r. s -> Step s o r
Skip (Int -> seq -> s -> SlidingWindowState seq s
forall seq s. Int -> seq -> s -> SlidingWindowState seq s
SWInitial (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) seq
st' s
s')
              where
                st' :: seq
st' = seq -> Element seq -> seq
forall seq. SemiSequence seq => seq -> Element seq -> seq
Seq.snoc seq
st a
Element seq
x
    -- After collecting the initial window, each upstream element
    -- causes an additional window to be yielded.
    step' (SWSliding seq
st s
s) = do
        Step s a ()
res <- s -> m (Step s a ())
step s
s
        Step (SlidingWindowState seq s) seq ()
-> m (Step (SlidingWindowState seq s) seq ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (SlidingWindowState seq s) seq ()
 -> m (Step (SlidingWindowState seq s) seq ()))
-> Step (SlidingWindowState seq s) seq ()
-> m (Step (SlidingWindowState seq s) seq ())
forall a b. (a -> b) -> a -> b
$ case Step s a ()
res of
            Stop () -> () -> Step (SlidingWindowState seq s) seq ()
forall s o r. r -> Step s o r
Stop ()
            Skip s
s' -> SlidingWindowState seq s -> Step (SlidingWindowState seq s) seq ()
forall s o r. s -> Step s o r
Skip (seq -> s -> SlidingWindowState seq s
forall seq s. seq -> s -> SlidingWindowState seq s
SWSliding seq
st s
s')
            Emit s
s' a
x -> SlidingWindowState seq s
-> seq -> Step (SlidingWindowState seq s) seq ()
forall s o r. s -> o -> Step s o r
Emit (seq -> s -> SlidingWindowState seq s
forall seq s. seq -> s -> SlidingWindowState seq s
SWSliding (seq -> seq
forall seq. IsSequence seq => seq -> seq
Seq.unsafeTail seq
st') s
s') seq
st'
              where
                st' :: seq
st' = seq -> Element seq -> seq
forall seq. SemiSequence seq => seq -> Element seq -> seq
Seq.snoc seq
st a
Element seq
x
    step' SlidingWindowState seq s
SWEarlyExit = Step (SlidingWindowState seq s) seq ()
-> m (Step (SlidingWindowState seq s) seq ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (SlidingWindowState seq s) seq ()
 -> m (Step (SlidingWindowState seq s) seq ()))
-> Step (SlidingWindowState seq s) seq ()
-> m (Step (SlidingWindowState seq s) seq ())
forall a b. (a -> b) -> a -> b
$ () -> Step (SlidingWindowState seq s) seq ()
forall s o r. r -> Step s o r
Stop ()

{-# INLINE slidingWindowS #-}

filterMS :: Monad m
         => (a -> m Bool)
         -> StreamConduit a m a
filterMS :: (a -> m Bool) -> StreamConduit a m a
filterMS a -> m Bool
f (Stream s -> m (Step s a ())
step m s
ms0) = do
    (s -> m (Step s a ())) -> m s -> Stream m a ()
forall (m :: * -> *) o r s.
(s -> m (Step s o r)) -> m s -> Stream m o r
Stream s -> m (Step s a ())
step' m s
ms0
  where
    step' :: s -> m (Step s a ())
step' s
s = do
        Step s a ()
res <- s -> m (Step s a ())
step s
s
        case Step s a ()
res of
            Stop () -> Step s a () -> m (Step s a ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s a () -> m (Step s a ())) -> Step s a () -> m (Step s a ())
forall a b. (a -> b) -> a -> b
$ () -> Step s a ()
forall s o r. r -> Step s o r
Stop ()
            Skip s
s' -> Step s a () -> m (Step s a ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s a () -> m (Step s a ())) -> Step s a () -> m (Step s a ())
forall a b. (a -> b) -> a -> b
$ s -> Step s a ()
forall s o r. s -> Step s o r
Skip s
s'
            Emit s
s' a
x -> do
                Bool
r <- a -> m Bool
f a
x
                Step s a () -> m (Step s a ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s a () -> m (Step s a ())) -> Step s a () -> m (Step s a ())
forall a b. (a -> b) -> a -> b
$
                    if Bool
r
                        then s -> a -> Step s a ()
forall s o r. s -> o -> Step s o r
Emit s
s' a
x
                        else s -> Step s a ()
forall s o r. s -> Step s o r
Skip s
s'
{-# INLINE filterMS #-}

data SplitState seq s
    = SplitDone
    -- When no element of seq passes the predicate.  This allows
    -- 'splitOnUnboundedES' to not run 'Seq.break' multiple times due
    -- to 'Skip's being sent by the upstream.
    | SplitNoSep seq s
    | SplitState seq s

splitOnUnboundedES :: (Monad m, Seq.IsSequence seq)
                   => (Element seq -> Bool) -> StreamConduit seq m seq
splitOnUnboundedES :: (Element seq -> Bool) -> StreamConduit seq m seq
splitOnUnboundedES Element seq -> Bool
f (Stream s -> m (Step s seq ())
step m s
ms0) =
    (SplitState seq s -> m (Step (SplitState seq s) seq ()))
-> m (SplitState seq s) -> Stream m seq ()
forall (m :: * -> *) o r s.
(s -> m (Step s o r)) -> m s -> Stream m o r
Stream SplitState seq s -> m (Step (SplitState seq s) seq ())
step' ((s -> SplitState seq s) -> m s -> m (SplitState seq s)
forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM (seq -> s -> SplitState seq s
forall seq s. seq -> s -> SplitState seq s
SplitState seq
forall a. Monoid a => a
mempty) m s
ms0)
  where
    step' :: SplitState seq s -> m (Step (SplitState seq s) seq ())
step' SplitState seq s
SplitDone = Step (SplitState seq s) seq ()
-> m (Step (SplitState seq s) seq ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (SplitState seq s) seq ()
 -> m (Step (SplitState seq s) seq ()))
-> Step (SplitState seq s) seq ()
-> m (Step (SplitState seq s) seq ())
forall a b. (a -> b) -> a -> b
$ () -> Step (SplitState seq s) seq ()
forall s o r. r -> Step s o r
Stop ()
    step' (SplitNoSep seq
t s
s) = do
        Step s seq ()
res <- s -> m (Step s seq ())
step s
s
        Step (SplitState seq s) seq ()
-> m (Step (SplitState seq s) seq ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (SplitState seq s) seq ()
 -> m (Step (SplitState seq s) seq ()))
-> Step (SplitState seq s) seq ()
-> m (Step (SplitState seq s) seq ())
forall a b. (a -> b) -> a -> b
$ case Step s seq ()
res of
            Stop () | Bool -> Bool
not (seq -> Bool
forall mono. MonoFoldable mono => mono -> Bool
onull seq
t) -> SplitState seq s -> seq -> Step (SplitState seq s) seq ()
forall s o r. s -> o -> Step s o r
Emit SplitState seq s
forall seq s. SplitState seq s
SplitDone seq
t
                    | Bool
otherwise -> () -> Step (SplitState seq s) seq ()
forall s o r. r -> Step s o r
Stop ()
            Skip s
s' -> SplitState seq s -> Step (SplitState seq s) seq ()
forall s o r. s -> Step s o r
Skip (seq -> s -> SplitState seq s
forall seq s. seq -> s -> SplitState seq s
SplitNoSep seq
t s
s')
            Emit s
s' seq
t' -> SplitState seq s -> Step (SplitState seq s) seq ()
forall s o r. s -> Step s o r
Skip (seq -> s -> SplitState seq s
forall seq s. seq -> s -> SplitState seq s
SplitState (seq
t seq -> seq -> seq
forall a. Monoid a => a -> a -> a
`mappend` seq
t') s
s')
    step' (SplitState seq
t s
s) = do
        if seq -> Bool
forall mono. MonoFoldable mono => mono -> Bool
onull seq
y
            then do
                Step s seq ()
res <- s -> m (Step s seq ())
step s
s
                Step (SplitState seq s) seq ()
-> m (Step (SplitState seq s) seq ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (SplitState seq s) seq ()
 -> m (Step (SplitState seq s) seq ()))
-> Step (SplitState seq s) seq ()
-> m (Step (SplitState seq s) seq ())
forall a b. (a -> b) -> a -> b
$ case Step s seq ()
res of
                    Stop () | Bool -> Bool
not (seq -> Bool
forall mono. MonoFoldable mono => mono -> Bool
onull seq
t) -> SplitState seq s -> seq -> Step (SplitState seq s) seq ()
forall s o r. s -> o -> Step s o r
Emit SplitState seq s
forall seq s. SplitState seq s
SplitDone seq
t
                            | Bool
otherwise -> () -> Step (SplitState seq s) seq ()
forall s o r. r -> Step s o r
Stop ()
                    Skip s
s' -> SplitState seq s -> Step (SplitState seq s) seq ()
forall s o r. s -> Step s o r
Skip (seq -> s -> SplitState seq s
forall seq s. seq -> s -> SplitState seq s
SplitNoSep seq
t s
s')
                    Emit s
s' seq
t' -> SplitState seq s -> Step (SplitState seq s) seq ()
forall s o r. s -> Step s o r
Skip (seq -> s -> SplitState seq s
forall seq s. seq -> s -> SplitState seq s
SplitState (seq
t seq -> seq -> seq
forall a. Monoid a => a -> a -> a
`mappend` seq
t') s
s')
            else Step (SplitState seq s) seq ()
-> m (Step (SplitState seq s) seq ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (SplitState seq s) seq ()
 -> m (Step (SplitState seq s) seq ()))
-> Step (SplitState seq s) seq ()
-> m (Step (SplitState seq s) seq ())
forall a b. (a -> b) -> a -> b
$ SplitState seq s -> seq -> Step (SplitState seq s) seq ()
forall s o r. s -> o -> Step s o r
Emit (seq -> s -> SplitState seq s
forall seq s. seq -> s -> SplitState seq s
SplitState (Index seq -> seq -> seq
forall seq. IsSequence seq => Index seq -> seq -> seq
Seq.drop Index seq
1 seq
y) s
s) seq
x
      where
        (seq
x, seq
y) = (Element seq -> Bool) -> seq -> (seq, seq)
forall seq.
IsSequence seq =>
(Element seq -> Bool) -> seq -> (seq, seq)
Seq.break Element seq -> Bool
f seq
t
{-# INLINE splitOnUnboundedES #-}

-- | Streaming versions of @Data.Conduit.Combinators.Internal.initReplicate@
initReplicateS :: Monad m => m seed -> (seed -> m a) -> Int -> StreamProducer m a
initReplicateS :: m seed -> (seed -> m a) -> Int -> StreamProducer m a
initReplicateS m seed
mseed seed -> m a
f Int
cnt Stream m i ()
_ =
    ((Int, seed) -> m (Step (Int, seed) a ()))
-> m (Int, seed) -> Stream m a ()
forall (m :: * -> *) o r s.
(s -> m (Step s o r)) -> m s -> Stream m o r
Stream (Int, seed) -> m (Step (Int, seed) a ())
step ((seed -> (Int, seed)) -> m seed -> m (Int, seed)
forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM (Int
cnt, ) m seed
mseed)
  where
    step :: (Int, seed) -> m (Step (Int, seed) a ())
step (Int
ix, seed
_) | Int
ix Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0 = Step (Int, seed) a () -> m (Step (Int, seed) a ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Int, seed) a () -> m (Step (Int, seed) a ()))
-> Step (Int, seed) a () -> m (Step (Int, seed) a ())
forall a b. (a -> b) -> a -> b
$ () -> Step (Int, seed) a ()
forall s o r. r -> Step s o r
Stop ()
    step (Int
ix, seed
seed) = do
        a
x <- seed -> m a
f seed
seed
        Step (Int, seed) a () -> m (Step (Int, seed) a ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Int, seed) a () -> m (Step (Int, seed) a ()))
-> Step (Int, seed) a () -> m (Step (Int, seed) a ())
forall a b. (a -> b) -> a -> b
$ (Int, seed) -> a -> Step (Int, seed) a ()
forall s o r. s -> o -> Step s o r
Emit (Int
ix Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1, seed
seed) a
x
{-# INLINE initReplicateS #-}

-- | Streaming versions of @Data.Conduit.Combinators.Internal.initRepeat@
initRepeatS :: Monad m => m seed -> (seed -> m a) -> StreamProducer m a
initRepeatS :: m seed -> (seed -> m a) -> StreamProducer m a
initRepeatS m seed
mseed seed -> m a
f Stream m i ()
_ =
    (seed -> m (Step seed a ())) -> m seed -> Stream m a ()
forall (m :: * -> *) o r s.
(s -> m (Step s o r)) -> m s -> Stream m o r
Stream seed -> m (Step seed a ())
step m seed
mseed
  where
    step :: seed -> m (Step seed a ())
step seed
seed = do
        a
x <- seed -> m a
f seed
seed
        Step seed a () -> m (Step seed a ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Step seed a () -> m (Step seed a ()))
-> Step seed a () -> m (Step seed a ())
forall a b. (a -> b) -> a -> b
$ seed -> a -> Step seed a ()
forall s o r. s -> o -> Step s o r
Emit seed
seed a
x
{-# INLINE initRepeatS #-}

-- | Utility function
fmapS :: Monad m
      => (a -> b)
      -> StreamConduitT i o m a
      -> StreamConduitT i o m b
fmapS :: (a -> b) -> StreamConduitT i o m a -> StreamConduitT i o m b
fmapS a -> b
f StreamConduitT i o m a
s Stream m i ()
inp =
    case StreamConduitT i o m a
s Stream m i ()
inp of
        Stream s -> m (Step s o a)
step m s
ms0 -> (s -> m (Step s o b)) -> m s -> Stream m o b
forall (m :: * -> *) o r s.
(s -> m (Step s o r)) -> m s -> Stream m o r
Stream ((m (Step s o a) -> m (Step s o b))
-> (s -> m (Step s o a)) -> s -> m (Step s o b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((Step s o a -> Step s o b) -> m (Step s o a) -> m (Step s o b)
forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM ((a -> b) -> Step s o a -> Step s o b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> b
f)) s -> m (Step s o a)
step) m s
ms0
{-# INLINE fmapS #-}