-- |
-- Module      : Streamly.Internal.Data.Stream.StreamD.Transform
-- Copyright   : (c) 2018 Composewell Technologies
--               (c) Roman Leshchinskiy 2008-2010
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
-- "Streamly.Internal.Data.Pipe" might ultimately replace this module.

-- A few functions in this module have been adapted from the vector package
-- (c) Roman Leshchinskiy. See the notes in specific combinators.

module Streamly.Internal.Data.Stream.StreamD.Transform
    (
    -- * Piping
    -- | Pass through a 'Pipe'.
      transform

    -- * Mapping
    -- | Stateless one-to-one maps.
    , map
    , mapM
    , sequence

    -- * Mapping Effects
    , tap
    , tapOffsetEvery
    , tapRate
    , pollCounts

    -- * Folding
    , foldrS
    , foldrT
    , foldlS
    , foldlT

    -- * Scanning By 'Fold'
    , postscanOnce -- XXX rename to postscan
    , scanOnce     -- XXX rename to scan

    -- * Scanning
    -- | Left scans. Stateful, mostly one-to-one maps.
    , scanlM'
    , scanlMAfter'
    , scanl'
    , scanlM
    , scanl
    , scanl1M'
    , scanl1'
    , scanl1M
    , scanl1

    , prescanl'
    , prescanlM'

    , postscanl
    , postscanlM
    , postscanl'
    , postscanlM'
    , postscanlMAfter'

    , postscanlx'
    , postscanlMx'
    , scanlMx'
    , scanlx'

    -- * Filtering
    -- | Produce a subset of the stream.
    , filter
    , filterM
    , deleteBy
    , uniq

    -- * Trimming
    -- | Produce a subset of the stream trimmed at ends.
    , take
    , takeByTime
    , takeWhile
    , takeWhileM
    , drop
    , dropByTime
    , dropWhile
    , dropWhileM

    -- * Inserting Elements
    -- | Produce a superset of the stream.
    , insertBy
    , intersperse
    , intersperseM
    , intersperseSuffix
    , intersperseSuffixBySpan

    -- * Inserting Side Effects
    , intersperseM_
    , intersperseSuffix_

    -- * Reordering
    -- | Produce strictly the same set but reordered.
    , reverse
    -- , reverse'

    -- * Position Indexing
    , indexed
    , indexedR

    -- * Searching
    , findIndices
    , slicesBy

    -- * Rolling map
    -- | Map using the previous element.
    , rollingMap
    , rollingMapM

    -- * Maybe Streams
    , mapMaybe
    , mapMaybeM
    )
where

#include "inline.hs"

import Control.Concurrent (killThread, threadDelay)
import Control.Exception (AsyncException)
import Control.Monad (void, when)
import Control.Monad.Catch (MonadCatch, throwM)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Trans.Class (MonadTrans(lift))
import Data.IORef (newIORef, mkWeakIORef)
import Data.Maybe (fromJust, isJust)
import GHC.Types (SPEC(..))
import qualified Control.Monad.Catch as MC

import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Control.ForkLifted (fork, forkManaged)
import Streamly.Internal.Data.Fold.Type (Fold(..))
import Streamly.Internal.Data.Pipe.Type (Pipe(..), PipeState(..))
import Streamly.Internal.Data.SVar.Type (defState, adaptState)
import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)
import Streamly.Internal.Data.Time.Units
       (TimeUnit64, toRelTime64, diffAbsTime64)

import qualified Streamly.Internal.Data.Fold.Type as FL
import qualified Streamly.Internal.Data.IORef.Prim as Prim
import qualified Streamly.Internal.Data.Pipe.Type as Pipe

import Prelude hiding
       ( drop, dropWhile, filter, map, mapM, reverse
       , scanl, scanl1, sequence, take, takeWhile)

import Streamly.Internal.Data.Stream.StreamD.Type

------------------------------------------------------------------------------
-- Piping
------------------------------------------------------------------------------

{-# INLINE_NORMAL transform #-}
transform :: Monad m => Pipe m a b -> Stream m a -> Stream m b
transform :: forall (m :: * -> *) a b.
Monad m =>
Pipe m a b -> Stream m a -> Stream m b
transform (Pipe s1 -> a -> m (Step (PipeState s1 s2) b)
pstep1 s2 -> m (Step (PipeState s1 s2) b)
pstep2 s1
pstate) (Stream State Stream m a -> s -> m (Step s a)
step s
state) =
    forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {m :: * -> *} {a}.
State Stream m a
-> (PipeState s1 s2, s) -> m (Step (PipeState s1 s2, s) b)
step' (forall s1 s2. s1 -> PipeState s1 s2
Consume s1
pstate, s
state)

  where

    {-# INLINE_LATE step' #-}

    step' :: State Stream m a
-> (PipeState s1 s2, s) -> m (Step (PipeState s1 s2, s) b)
step' State Stream m a
gst (Consume s1
pst, s
st) = s1
pst seq :: forall a b. a -> b -> b
`seq` do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st
        case Step s a
r of
            Yield a
x s
s -> do
                Step (PipeState s1 s2) b
res <- s1 -> a -> m (Step (PipeState s1 s2) b)
pstep1 s1
pst a
x
                case Step (PipeState s1 s2) b
res of
                    Pipe.Yield b
b PipeState s1 s2
pst' -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield b
b (PipeState s1 s2
pst', s
s)
                    Pipe.Continue PipeState s1 s2
pst' -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (PipeState s1 s2
pst', s
s)
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall s1 s2. s1 -> PipeState s1 s2
Consume s1
pst, s
s)
            Step s a
Stop   -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

    step' State Stream m a
_ (Produce s2
pst, s
st) = s2
pst seq :: forall a b. a -> b -> b
`seq` do
        Step (PipeState s1 s2) b
res <- s2 -> m (Step (PipeState s1 s2) b)
pstep2 s2
pst
        case Step (PipeState s1 s2) b
res of
            Pipe.Yield b
b PipeState s1 s2
pst' -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield b
b (PipeState s1 s2
pst', s
st)
            Pipe.Continue PipeState s1 s2
pst' -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (PipeState s1 s2
pst', s
st)

------------------------------------------------------------------------------
-- Transformation Folds
------------------------------------------------------------------------------

{-# INLINE_NORMAL foldlT #-}
foldlT :: (Monad m, Monad (s m), MonadTrans s)
    => (s m b -> a -> s m b) -> s m b -> Stream m a -> s m b
foldlT :: forall (m :: * -> *) (s :: (* -> *) -> * -> *) b a.
(Monad m, Monad (s m), MonadTrans s) =>
(s m b -> a -> s m b) -> s m b -> Stream m a -> s m b
foldlT s m b -> a -> s m b
fstep s m b
begin (Stream State Stream m a -> s -> m (Step s a)
step s
state) = SPEC -> s m b -> s -> s m b
go SPEC
SPEC s m b
begin s
state
  where
    go :: SPEC -> s m b -> s -> s m b
go !SPEC
_ s m b
acc s
st = do
        Step s a
r <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift forall a b. (a -> b) -> a -> b
$ State Stream m a -> s -> m (Step s a)
step forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState s
st
        case Step s a
r of
            Yield a
x s
s -> SPEC -> s m b -> s -> s m b
go SPEC
SPEC (s m b -> a -> s m b
fstep s m b
acc a
x) s
s
            Skip s
s -> SPEC -> s m b -> s -> s m b
go SPEC
SPEC s m b
acc s
s
            Step s a
Stop   -> s m b
acc

-- Note, this is going to have horrible performance, because of the nature of
-- the stream type (i.e. direct stream vs CPS). Its only for reference, it is
-- likely be practically unusable.
{-# INLINE_NORMAL foldlS #-}
foldlS :: Monad m
    => (Stream m b -> a -> Stream m b) -> Stream m b -> Stream m a -> Stream m b
foldlS :: forall (m :: * -> *) b a.
Monad m =>
(Stream m b -> a -> Stream m b)
-> Stream m b -> Stream m a -> Stream m b
foldlS Stream m b -> a -> Stream m b
fstep Stream m b
begin (Stream State Stream m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {m :: * -> *} {a}.
State Stream m a
-> Either (s, Stream m b) (Stream m b)
-> m (Step (Either (s, Stream m b) (Stream m b)) b)
step' (forall a b. a -> Either a b
Left (s
state, Stream m b
begin))
  where
    step' :: State Stream m a
-> Either (s, Stream m b) (Stream m b)
-> m (Step (Either (s, Stream m b) (Stream m b)) b)
step' State Stream m a
gst (Left (s
st, Stream m b
acc)) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ case Step s a
r of
            Yield a
x s
s -> forall s a. s -> Step s a
Skip (forall a b. a -> Either a b
Left (s
s, Stream m b -> a -> Stream m b
fstep Stream m b
acc a
x))
            Skip s
s -> forall s a. s -> Step s a
Skip (forall a b. a -> Either a b
Left (s
s, Stream m b
acc))
            Step s a
Stop   -> forall s a. s -> Step s a
Skip (forall a b. b -> Either a b
Right Stream m b
acc)

    step' State Stream m a
gst (Right (Stream State Stream m b -> s -> m (Step s b)
stp s
stt)) = do
        Step s b
r <- State Stream m b -> s -> m (Step s b)
stp (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
stt
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ case Step s b
r of
            Yield b
x s
s -> forall s a. a -> s -> Step s a
Yield b
x (forall a b. b -> Either a b
Right (forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m b -> s -> m (Step s b)
stp s
s))
            Skip s
s -> forall s a. s -> Step s a
Skip (forall a b. b -> Either a b
Right (forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m b -> s -> m (Step s b)
stp s
s))
            Step s b
Stop   -> forall s a. Step s a
Stop

------------------------------------------------------------------------------
-- Transformation by Mapping
------------------------------------------------------------------------------

{-# INLINE_NORMAL sequence #-}
sequence :: Monad m => Stream m (m a) -> Stream m a
sequence :: forall (m :: * -> *) a. Monad m => Stream m (m a) -> Stream m a
sequence (Stream State Stream m (m a) -> s -> m (Step s (m a))
step s
state) = forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {m :: * -> *} {a}. State Stream m a -> s -> m (Step s a)
step' s
state
  where
    {-# INLINE_LATE step' #-}
    step' :: State Stream m a -> s -> m (Step s a)
step' State Stream m a
gst s
st = do
         Step s (m a)
r <- State Stream m (m a) -> s -> m (Step s (m a))
step (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st
         case Step s (m a)
r of
             Yield m a
x s
s -> m a
x forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \a
a -> forall (m :: * -> *) a. Monad m => a -> m a
return (forall s a. a -> s -> Step s a
Yield a
a s
s)
             Skip s
s    -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip s
s
             Step s (m a)
Stop      -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

------------------------------------------------------------------------------
-- Mapping side effects
------------------------------------------------------------------------------

data TapState fs st a
    = TapInit | Tapping !fs st | TapDone st

-- XXX Multiple yield points
{-# INLINE tap #-}
tap :: Monad m => Fold m a b -> Stream m a -> Stream m a
tap :: forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> Stream m a
tap (Fold s -> a -> m (Step s b)
fstep m (Step s b)
initial s -> m b
extract) (Stream State Stream m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {a} {a}.
State Stream m a -> TapState s s a -> m (Step (TapState s s a) a)
step' forall fs st a. TapState fs st a
TapInit

    where

    step' :: State Stream m a -> TapState s s a -> m (Step (TapState s s a) a)
step' State Stream m a
_ TapState s s a
TapInit = do
        Step s b
res <- m (Step s b)
initial
        forall (m :: * -> *) a. Monad m => a -> m a
return
            forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip
            forall a b. (a -> b) -> a -> b
$ case Step s b
res of
                  FL.Partial s
s -> forall fs st a. fs -> st -> TapState fs st a
Tapping s
s s
state
                  FL.Done b
_ -> forall fs st a. st -> TapState fs st a
TapDone s
state
    step' State Stream m a
gst (Tapping s
acc s
st) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
        case Step s a
r of
            Yield a
x s
s -> do
                Step s b
res <- s -> a -> m (Step s b)
fstep s
acc a
x
                forall (m :: * -> *) a. Monad m => a -> m a
return
                    forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
x
                    forall a b. (a -> b) -> a -> b
$ case Step s b
res of
                          FL.Partial s
fs -> forall fs st a. fs -> st -> TapState fs st a
Tapping s
fs s
s
                          FL.Done b
_ -> forall fs st a. st -> TapState fs st a
TapDone s
s
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall fs st a. fs -> st -> TapState fs st a
Tapping s
acc s
s)
            Step s a
Stop -> do
                forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ s -> m b
extract s
acc
                forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop
    step' State Stream m a
gst (TapDone s
st) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
        forall (m :: * -> *) a. Monad m => a -> m a
return
            forall a b. (a -> b) -> a -> b
$ case Step s a
r of
                  Yield a
x s
s -> forall s a. a -> s -> Step s a
Yield a
x (forall fs st a. st -> TapState fs st a
TapDone s
s)
                  Skip s
s -> forall s a. s -> Step s a
Skip (forall fs st a. st -> TapState fs st a
TapDone s
s)
                  Step s a
Stop -> forall s a. Step s a
Stop

data TapOffState fs s a
    = TapOffInit
    | TapOffTapping !fs s Int
    | TapOffDone s

-- XXX Multiple yield points
{-# INLINE_NORMAL tapOffsetEvery #-}
tapOffsetEvery :: Monad m
    => Int -> Int -> Fold m a b -> Stream m a -> Stream m a
tapOffsetEvery :: forall (m :: * -> *) a b.
Monad m =>
Int -> Int -> Fold m a b -> Stream m a -> Stream m a
tapOffsetEvery Int
offset Int
n (Fold s -> a -> m (Step s b)
fstep m (Step s b)
initial s -> m b
extract) (Stream State Stream m a -> s -> m (Step s a)
step s
state) =
    forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {a} {a}.
State Stream m a
-> TapOffState s s a -> m (Step (TapOffState s s a) a)
step' forall fs s a. TapOffState fs s a
TapOffInit

    where

    {-# INLINE_LATE step' #-}
    step' :: State Stream m a
-> TapOffState s s a -> m (Step (TapOffState s s a) a)
step' State Stream m a
_ TapOffState s s a
TapOffInit = do
        Step s b
res <- m (Step s b)
initial
        forall (m :: * -> *) a. Monad m => a -> m a
return
            forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip
            forall a b. (a -> b) -> a -> b
$ case Step s b
res of
                  FL.Partial s
s -> forall fs s a. fs -> s -> Int -> TapOffState fs s a
TapOffTapping s
s s
state (Int
offset forall a. Integral a => a -> a -> a
`mod` Int
n)
                  FL.Done b
_ -> forall fs s a. s -> TapOffState fs s a
TapOffDone s
state
    step' State Stream m a
gst (TapOffTapping s
acc s
st Int
count) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
        case Step s a
r of
            Yield a
x s
s -> do
                TapOffState s s a
next <-
                    if Int
count forall a. Ord a => a -> a -> Bool
<= Int
0
                    then do
                        Step s b
res <- s -> a -> m (Step s b)
fstep s
acc a
x
                        forall (m :: * -> *) a. Monad m => a -> m a
return
                            forall a b. (a -> b) -> a -> b
$ case Step s b
res of
                                  FL.Partial s
sres ->
                                    forall fs s a. fs -> s -> Int -> TapOffState fs s a
TapOffTapping s
sres s
s (Int
n forall a. Num a => a -> a -> a
- Int
1)
                                  FL.Done b
_ -> forall fs s a. s -> TapOffState fs s a
TapOffDone s
s
                    else forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall fs s a. fs -> s -> Int -> TapOffState fs s a
TapOffTapping s
acc s
s (Int
count forall a. Num a => a -> a -> a
- Int
1)
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
x TapOffState s s a
next
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall fs s a. fs -> s -> Int -> TapOffState fs s a
TapOffTapping s
acc s
s Int
count)
            Step s a
Stop -> do
                forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ s -> m b
extract s
acc
                forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop
    step' State Stream m a
gst (TapOffDone s
st) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
        forall (m :: * -> *) a. Monad m => a -> m a
return
            forall a b. (a -> b) -> a -> b
$ case Step s a
r of
                  Yield a
x s
s -> forall s a. a -> s -> Step s a
Yield a
x (forall fs s a. s -> TapOffState fs s a
TapOffDone s
s)
                  Skip s
s -> forall s a. s -> Step s a
Skip (forall fs s a. s -> TapOffState fs s a
TapOffDone s
s)
                  Step s a
Stop -> forall s a. Step s a
Stop

{-# INLINE_NORMAL pollCounts #-}
pollCounts
    :: MonadAsync m
    => (a -> Bool)
    -> (Stream m Int -> Stream m Int)
    -> Fold m Int b
    -> Stream m a
    -> Stream m a
pollCounts :: forall (m :: * -> *) a b.
MonadAsync m =>
(a -> Bool)
-> (Stream m Int -> Stream m Int)
-> Fold m Int b
-> Stream m a
-> Stream m a
pollCounts a -> Bool
predicate Stream m Int -> Stream m Int
transf Fold m Int b
fld (Stream State Stream m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a
-> Maybe (IORef Int, ThreadId, s)
-> m (Step (Maybe (IORef Int, ThreadId, s)) a)
step' forall a. Maybe a
Nothing
  where

    {-# INLINE_LATE step' #-}
    step' :: State Stream m a
-> Maybe (IORef Int, ThreadId, s)
-> m (Step (Maybe (IORef Int, ThreadId, s)) a)
step' State Stream m a
_ Maybe (IORef Int, ThreadId, s)
Nothing = do
        -- As long as we are using an "Int" for counts lockfree reads from
        -- Var should work correctly on both 32-bit and 64-bit machines.
        -- However, an Int on a 32-bit machine may overflow quickly.
        IORef Int
countVar <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. Prim a => a -> IO (IORef a)
Prim.newIORef (Int
0 :: Int)
        ThreadId
tid <- forall (m :: * -> *). MonadRunInIO m => m () -> m ThreadId
forkManaged
            forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
fold Fold m Int b
fld
            forall a b. (a -> b) -> a -> b
$ Stream m Int -> Stream m Int
transf forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
(MonadIO m, Prim a) =>
IORef a -> Stream m a
Prim.toStreamD IORef Int
countVar
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall a. a -> Maybe a
Just (IORef Int
countVar, ThreadId
tid, s
state))

    step' State Stream m a
gst (Just (IORef Int
countVar, ThreadId
tid, s
st)) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
        case Step s a
r of
            Yield a
x s
s -> do
                forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (a -> Bool
predicate a
x) forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. Prim a => IORef a -> (a -> a) -> IO ()
Prim.modifyIORef' IORef Int
countVar (forall a. Num a => a -> a -> a
+ Int
1)
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
x (forall a. a -> Maybe a
Just (IORef Int
countVar, ThreadId
tid, s
s))
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall a. a -> Maybe a
Just (IORef Int
countVar, ThreadId
tid, s
s))
            Step s a
Stop -> do
                forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ ThreadId -> IO ()
killThread ThreadId
tid
                forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

{-# INLINE_NORMAL tapRate #-}
tapRate ::
       (MonadAsync m, MonadCatch m)
    => Double
    -> (Int -> m b)
    -> Stream m a
    -> Stream m a
tapRate :: forall (m :: * -> *) b a.
(MonadAsync m, MonadCatch m) =>
Double -> (Int -> m b) -> Stream m a -> Stream m a
tapRate Double
samplingRate Int -> m b
action (Stream State Stream m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a
-> Maybe (IORef Int, ThreadId, s, IORef ())
-> m (Step (Maybe (IORef Int, ThreadId, s, IORef ())) a)
step' forall a. Maybe a
Nothing
  where
    {-# NOINLINE loop #-}
    loop :: IORef Int -> Int -> m b
loop IORef Int
countVar Int
prev = do
        Int
i <-
            forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> (e -> m a) -> m a
MC.catch
                (do forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay (forall a b. (RealFrac a, Integral b) => a -> b
round forall a b. (a -> b) -> a -> b
$ Double
samplingRate forall a. Num a => a -> a -> a
* Double
1000000)
                    Int
i <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. Prim a => IORef a -> IO a
Prim.readIORef IORef Int
countVar
                    let !diff :: Int
diff = Int
i forall a. Num a => a -> a -> a
- Int
prev
                    forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ Int -> m b
action Int
diff
                    forall (m :: * -> *) a. Monad m => a -> m a
return Int
i)
                (\(AsyncException
e :: AsyncException) -> do
                     Int
i <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. Prim a => IORef a -> IO a
Prim.readIORef IORef Int
countVar
                     let !diff :: Int
diff = Int
i forall a. Num a => a -> a -> a
- Int
prev
                     forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ Int -> m b
action Int
diff
                     forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM (forall e. Exception e => e -> SomeException
MC.toException AsyncException
e))
        IORef Int -> Int -> m b
loop IORef Int
countVar Int
i

    {-# INLINE_LATE step' #-}
    step' :: State Stream m a
-> Maybe (IORef Int, ThreadId, s, IORef ())
-> m (Step (Maybe (IORef Int, ThreadId, s, IORef ())) a)
step' State Stream m a
_ Maybe (IORef Int, ThreadId, s, IORef ())
Nothing = do
        IORef Int
countVar <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. Prim a => a -> IO (IORef a)
Prim.newIORef Int
0
        ThreadId
tid <- forall (m :: * -> *). MonadRunInIO m => m () -> m ThreadId
fork forall a b. (a -> b) -> a -> b
$ forall {b}. IORef Int -> Int -> m b
loop IORef Int
countVar Int
0
        IORef ()
ref <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. a -> IO (IORef a)
newIORef ()
        Weak (IORef ())
_ <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO () -> IO (Weak (IORef a))
mkWeakIORef IORef ()
ref (ThreadId -> IO ()
killThread ThreadId
tid)
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall a. a -> Maybe a
Just (IORef Int
countVar, ThreadId
tid, s
state, IORef ()
ref))

    step' State Stream m a
gst (Just (IORef Int
countVar, ThreadId
tid, s
st, IORef ()
ref)) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
        case Step s a
r of
            Yield a
x s
s -> do
                forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. Prim a => IORef a -> (a -> a) -> IO ()
Prim.modifyIORef' IORef Int
countVar (forall a. Num a => a -> a -> a
+ Int
1)
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
x (forall a. a -> Maybe a
Just (IORef Int
countVar, ThreadId
tid, s
s, IORef ()
ref))
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall a. a -> Maybe a
Just (IORef Int
countVar, ThreadId
tid, s
s, IORef ()
ref))
            Step s a
Stop -> do
                forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ ThreadId -> IO ()
killThread ThreadId
tid
                forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

------------------------------------------------------------------------------
-- Scanning with a Fold
------------------------------------------------------------------------------

data ScanState s f = ScanInit s | ScanDo s !f | ScanDone

{-# INLINE_NORMAL postscanOnce #-}
postscanOnce :: Monad m => FL.Fold m a b -> Stream m a -> Stream m b
postscanOnce :: forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> Stream m b
postscanOnce (FL.Fold s -> a -> m (Step s b)
fstep m (Step s b)
initial s -> m b
extract) (Stream State Stream m a -> s -> m (Step s a)
sstep s
state) =
    forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {m :: * -> *} {a}.
State Stream m a -> ScanState s s -> m (Step (ScanState s s) b)
step (forall s f. s -> ScanState s f
ScanInit s
state)

    where

    {-# INLINE_LATE step #-}
    step :: State Stream m a -> ScanState s s -> m (Step (ScanState s s) b)
step State Stream m a
_ (ScanInit s
st) = do
        Step s b
res <- m (Step s b)
initial
        forall (m :: * -> *) a. Monad m => a -> m a
return
            forall a b. (a -> b) -> a -> b
$ case Step s b
res of
                  FL.Partial s
fs -> forall s a. s -> Step s a
Skip forall a b. (a -> b) -> a -> b
$ forall s f. s -> f -> ScanState s f
ScanDo s
st s
fs
                  FL.Done b
b -> forall s a. a -> s -> Step s a
Yield b
b forall s f. ScanState s f
ScanDone
    step State Stream m a
gst (ScanDo s
st s
fs) = do
        Step s a
res <- State Stream m a -> s -> m (Step s a)
sstep (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st
        case Step s a
res of
            Yield a
x s
s -> do
                Step s b
r <- s -> a -> m (Step s b)
fstep s
fs a
x
                case Step s b
r of
                    FL.Partial s
fs1 -> do
                        !b
b <- s -> m b
extract s
fs1
                        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield b
b forall a b. (a -> b) -> a -> b
$ forall s f. s -> f -> ScanState s f
ScanDo s
s s
fs1
                    FL.Done b
b -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield b
b forall s f. ScanState s f
ScanDone
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip forall a b. (a -> b) -> a -> b
$ forall s f. s -> f -> ScanState s f
ScanDo s
s s
fs
            Step s a
Stop -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop
    step State Stream m a
_ ScanState s s
ScanDone = forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

{-# INLINE scanOnce #-}
scanOnce :: Monad m
    => FL.Fold m a b -> Stream m a -> Stream m b
scanOnce :: forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> Stream m b
scanOnce (FL.Fold s -> a -> m (Step s b)
fstep m (Step s b)
initial s -> m b
extract) (Stream State Stream m a -> s -> m (Step s a)
sstep s
state) =
    forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {m :: * -> *} {a}.
State Stream m a -> ScanState s s -> m (Step (ScanState s s) b)
step (forall s f. s -> ScanState s f
ScanInit s
state)

    where

    {-# INLINE_LATE step #-}
    step :: State Stream m a -> ScanState s s -> m (Step (ScanState s s) b)
step State Stream m a
_ (ScanInit s
st) = do
        Step s b
res <- m (Step s b)
initial
        case Step s b
res of
            FL.Partial s
fs -> do
                !b
b <- s -> m b
extract s
fs
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield b
b forall a b. (a -> b) -> a -> b
$ forall s f. s -> f -> ScanState s f
ScanDo s
st s
fs
            FL.Done b
b -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield b
b forall s f. ScanState s f
ScanDone
    step State Stream m a
gst (ScanDo s
st s
fs) = do
        Step s a
res <- State Stream m a -> s -> m (Step s a)
sstep (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st
        case Step s a
res of
            Yield a
x s
s -> do
                Step s b
r <- s -> a -> m (Step s b)
fstep s
fs a
x
                case Step s b
r of
                    FL.Partial s
fs1 -> do
                        !b
b <- s -> m b
extract s
fs1
                        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield b
b forall a b. (a -> b) -> a -> b
$ forall s f. s -> f -> ScanState s f
ScanDo s
s s
fs1
                    FL.Done b
b -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield b
b forall s f. ScanState s f
ScanDone
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip forall a b. (a -> b) -> a -> b
$ forall s f. s -> f -> ScanState s f
ScanDo s
s s
fs
            Step s a
Stop -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop
    step State Stream m a
_ ScanState s s
ScanDone = forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

------------------------------------------------------------------------------
-- Scanning - Prescans
------------------------------------------------------------------------------

-- Adapted from the vector package.
--
-- XXX Is a prescan useful, discarding the last step does not sound useful?  I
-- am not sure about the utility of this function, so this is implemented but
-- not exposed. We can expose it if someone provides good reasons why this is
-- useful.
--
-- XXX We have to execute the stream one step ahead to know that we are at the
-- last step.  The vector implementation of prescan executes the last fold step
-- but does not yield the result. This means we have executed the effect but
-- discarded value. This does not sound right. In this implementation we are
-- not executing the last fold step.
{-# INLINE_NORMAL prescanlM' #-}
prescanlM' :: Monad m => (b -> a -> m b) -> m b -> Stream m a -> Stream m b
prescanlM' :: forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> Stream m a -> Stream m b
prescanlM' b -> a -> m b
f m b
mz (Stream State Stream m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {m :: * -> *} {a}.
State Stream m a -> (s, m b) -> m (Step (s, m b) b)
step' (s
state, m b
mz)
  where
    {-# INLINE_LATE step' #-}
    step' :: State Stream m a -> (s, m b) -> m (Step (s, m b) b)
step' State Stream m a
gst (s
st, m b
prev) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st
        case Step s a
r of
            Yield a
x s
s -> do
                b
acc <- m b
prev
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield b
acc (s
s, b -> a -> m b
f b
acc a
x)
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (s
s, m b
prev)
            Step s a
Stop   -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

{-# INLINE prescanl' #-}
prescanl' :: Monad m => (b -> a -> b) -> b -> Stream m a -> Stream m b
prescanl' :: forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> Stream m a -> Stream m b
prescanl' b -> a -> b
f b
z = forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> Stream m a -> Stream m b
prescanlM' (\b
a a
b -> forall (m :: * -> *) a. Monad m => a -> m a
return (b -> a -> b
f b
a a
b)) (forall (m :: * -> *) a. Monad m => a -> m a
return b
z)

------------------------------------------------------------------------------
-- Monolithic postscans (postscan followed by a map)
------------------------------------------------------------------------------

-- The performance of a modular postscan followed by a map seems to be
-- equivalent to this monolithic scan followed by map therefore we may not need
-- this implementation. We just have it for performance comparison and in case
-- modular version does not perform well in some situation.
--
{-# INLINE_NORMAL postscanlMx' #-}
postscanlMx' :: Monad m
    => (x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> Stream m b
postscanlMx' :: forall (m :: * -> *) x a b.
Monad m =>
(x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> Stream m b
postscanlMx' x -> a -> m x
fstep m x
begin x -> m b
done (Stream State Stream m a -> s -> m (Step s a)
step s
state) = do
    forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {m :: * -> *} {a}.
State Stream m a -> (s, m x) -> m (Step (s, m x) b)
step' (s
state, m x
begin)
  where
    {-# INLINE_LATE step' #-}
    step' :: State Stream m a -> (s, m x) -> m (Step (s, m x) b)
step' State Stream m a
gst (s
st, m x
acc) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st
        case Step s a
r of
            Yield a
x s
s -> do
                x
old <- m x
acc
                x
y <- x -> a -> m x
fstep x
old a
x
                b
v <- x -> m b
done x
y
                b
v seq :: forall a b. a -> b -> b
`seq` x
y seq :: forall a b. a -> b -> b
`seq` forall (m :: * -> *) a. Monad m => a -> m a
return (forall s a. a -> s -> Step s a
Yield b
v (s
s, forall (m :: * -> *) a. Monad m => a -> m a
return x
y))
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (s
s, m x
acc)
            Step s a
Stop   -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

{-# INLINE_NORMAL postscanlx' #-}
postscanlx' :: Monad m
    => (x -> a -> x) -> x -> (x -> b) -> Stream m a -> Stream m b
postscanlx' :: forall (m :: * -> *) x a b.
Monad m =>
(x -> a -> x) -> x -> (x -> b) -> Stream m a -> Stream m b
postscanlx' x -> a -> x
fstep x
begin x -> b
done =
    forall (m :: * -> *) x a b.
Monad m =>
(x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> Stream m b
postscanlMx' (\x
b a
a -> forall (m :: * -> *) a. Monad m => a -> m a
return (x -> a -> x
fstep x
b a
a)) (forall (m :: * -> *) a. Monad m => a -> m a
return x
begin) (forall (m :: * -> *) a. Monad m => a -> m a
return forall b c a. (b -> c) -> (a -> b) -> a -> c
. x -> b
done)

-- XXX do we need consM strict to evaluate the begin value?
{-# INLINE scanlMx' #-}
scanlMx' :: Monad m
    => (x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> Stream m b
scanlMx' :: forall (m :: * -> *) x a b.
Monad m =>
(x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> Stream m b
scanlMx' x -> a -> m x
fstep m x
begin x -> m b
done Stream m a
s =
    (m x
begin forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \x
x -> x
x seq :: forall a b. a -> b -> b
`seq` x -> m b
done x
x) forall (m :: * -> *) a.
Applicative m =>
m a -> Stream m a -> Stream m a
`consM` forall (m :: * -> *) x a b.
Monad m =>
(x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> Stream m b
postscanlMx' x -> a -> m x
fstep m x
begin x -> m b
done Stream m a
s

{-# INLINE scanlx' #-}
scanlx' :: Monad m
    => (x -> a -> x) -> x -> (x -> b) -> Stream m a -> Stream m b
scanlx' :: forall (m :: * -> *) x a b.
Monad m =>
(x -> a -> x) -> x -> (x -> b) -> Stream m a -> Stream m b
scanlx' x -> a -> x
fstep x
begin x -> b
done =
    forall (m :: * -> *) x a b.
Monad m =>
(x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> Stream m b
scanlMx' (\x
b a
a -> forall (m :: * -> *) a. Monad m => a -> m a
return (x -> a -> x
fstep x
b a
a)) (forall (m :: * -> *) a. Monad m => a -> m a
return x
begin) (forall (m :: * -> *) a. Monad m => a -> m a
return forall b c a. (b -> c) -> (a -> b) -> a -> c
. x -> b
done)

------------------------------------------------------------------------------
-- postscans
------------------------------------------------------------------------------

-- Adapted from the vector package.
{-# INLINE_NORMAL postscanlM' #-}
postscanlM' :: Monad m => (b -> a -> m b) -> m b -> Stream m a -> Stream m b
postscanlM' :: forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> Stream m a -> Stream m b
postscanlM' b -> a -> m b
fstep m b
begin (Stream State Stream m a -> s -> m (Step s a)
step s
state) =
    forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {m :: * -> *} {a}.
State Stream m a -> Maybe (s, b) -> m (Step (Maybe (s, b)) b)
step' forall a. Maybe a
Nothing
  where
    {-# INLINE_LATE step' #-}
    step' :: State Stream m a -> Maybe (s, b) -> m (Step (Maybe (s, b)) b)
step' State Stream m a
_ Maybe (s, b)
Nothing = do
        !b
x <- m b
begin
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall a. a -> Maybe a
Just (s
state, b
x))

    step' State Stream m a
gst (Just (s
st, b
acc)) =  do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st
        case Step s a
r of
            Yield a
x s
s -> do
                !b
y <- b -> a -> m b
fstep b
acc a
x
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield b
y (forall a. a -> Maybe a
Just (s
s, b
y))
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall a. a -> Maybe a
Just (s
s, b
acc))
            Step s a
Stop   -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

{-# INLINE_NORMAL postscanl' #-}
postscanl' :: Monad m => (a -> b -> a) -> a -> Stream m b -> Stream m a
postscanl' :: forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> Stream m a -> Stream m b
postscanl' a -> b -> a
f a
seed = forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> Stream m a -> Stream m b
postscanlM' (\a
a b
b -> forall (m :: * -> *) a. Monad m => a -> m a
return (a -> b -> a
f a
a b
b)) (forall (m :: * -> *) a. Monad m => a -> m a
return a
seed)

-- We can possibly have the "done" function as a Maybe to provide an option to
-- emit or not emit the accumulator when the stream stops.
--
-- TBD: use a single Yield point
--
{-# INLINE_NORMAL postscanlMAfter' #-}
postscanlMAfter' :: Monad m
    => (b -> a -> m b) -> m b -> (b -> m b) -> Stream m a -> Stream m b
postscanlMAfter' :: forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> (b -> m b) -> Stream m a -> Stream m b
postscanlMAfter' b -> a -> m b
fstep m b
initial b -> m b
done (Stream State Stream m a -> s -> m (Step s a)
step1 s
state1) = do
    forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {m :: * -> *} {a}.
State Stream m a -> Maybe (s, m b) -> m (Step (Maybe (s, m b)) b)
step (forall a. a -> Maybe a
Just (s
state1, m b
initial))

    where

    {-# INLINE_LATE step #-}
    step :: State Stream m a -> Maybe (s, m b) -> m (Step (Maybe (s, m b)) b)
step State Stream m a
gst (Just (s
st, m b
acc)) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step1 (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st
        case Step s a
r of
            Yield a
x s
s -> do
                b
old <- m b
acc
                b
y <- b -> a -> m b
fstep b
old a
x
                b
y seq :: forall a b. a -> b -> b
`seq` forall (m :: * -> *) a. Monad m => a -> m a
return (forall s a. a -> s -> Step s a
Yield b
y (forall a. a -> Maybe a
Just (s
s, forall (m :: * -> *) a. Monad m => a -> m a
return b
y)))
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just (s
s, m b
acc)
            Step s a
Stop -> m b
acc forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= b -> m b
done forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \b
res -> forall (m :: * -> *) a. Monad m => a -> m a
return (forall s a. a -> s -> Step s a
Yield b
res forall a. Maybe a
Nothing)
    step State Stream m a
_ Maybe (s, m b)
Nothing = forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

{-# INLINE_NORMAL postscanlM #-}
postscanlM :: Monad m => (b -> a -> m b) -> m b -> Stream m a -> Stream m b
postscanlM :: forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> Stream m a -> Stream m b
postscanlM b -> a -> m b
fstep m b
begin (Stream State Stream m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {m :: * -> *} {a}.
State Stream m a -> Maybe (s, b) -> m (Step (Maybe (s, b)) b)
step' forall a. Maybe a
Nothing
  where
    {-# INLINE_LATE step' #-}
    step' :: State Stream m a -> Maybe (s, b) -> m (Step (Maybe (s, b)) b)
step' State Stream m a
_ Maybe (s, b)
Nothing = do
        b
r <- m b
begin
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall a. a -> Maybe a
Just (s
state, b
r))

    step' State Stream m a
gst (Just (s
st, b
acc)) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st
        case Step s a
r of
            Yield a
x s
s -> do
                b
y <- b -> a -> m b
fstep b
acc a
x
                forall (m :: * -> *) a. Monad m => a -> m a
return (forall s a. a -> s -> Step s a
Yield b
y (forall a. a -> Maybe a
Just (s
s, b
y)))
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall a. a -> Maybe a
Just (s
s, b
acc))
            Step s a
Stop   -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

{-# INLINE_NORMAL postscanl #-}
postscanl :: Monad m => (a -> b -> a) -> a -> Stream m b -> Stream m a
postscanl :: forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> Stream m a -> Stream m b
postscanl a -> b -> a
f a
seed = forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> Stream m a -> Stream m b
postscanlM (\a
a b
b -> forall (m :: * -> *) a. Monad m => a -> m a
return (a -> b -> a
f a
a b
b)) (forall (m :: * -> *) a. Monad m => a -> m a
return a
seed)

{-# INLINE_NORMAL scanlM' #-}
scanlM' :: Monad m => (b -> a -> m b) -> m b -> Stream m a -> Stream m b
scanlM' :: forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> Stream m a -> Stream m b
scanlM' b -> a -> m b
fstep m b
begin (Stream State Stream m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {m :: * -> *} {a}.
State Stream m a -> Maybe (s, b) -> m (Step (Maybe (s, b)) b)
step' forall a. Maybe a
Nothing
  where
    {-# INLINE_LATE step' #-}
    step' :: State Stream m a -> Maybe (s, b) -> m (Step (Maybe (s, b)) b)
step' State Stream m a
_ Maybe (s, b)
Nothing = do
        !b
x <- m b
begin
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield b
x (forall a. a -> Maybe a
Just (s
state, b
x))
    step' State Stream m a
gst (Just (s
st, b
acc)) =  do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st
        case Step s a
r of
            Yield a
x s
s -> do
                !b
y <- b -> a -> m b
fstep b
acc a
x
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield b
y (forall a. a -> Maybe a
Just (s
s, b
y))
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall a. a -> Maybe a
Just (s
s, b
acc))
            Step s a
Stop   -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

{-# INLINE scanlMAfter' #-}
scanlMAfter' :: Monad m
    => (b -> a -> m b) -> m b -> (b -> m b) -> Stream m a -> Stream m b
scanlMAfter' :: forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> (b -> m b) -> Stream m a -> Stream m b
scanlMAfter' b -> a -> m b
fstep m b
initial b -> m b
done Stream m a
s =
    (m b
initial forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \b
x -> b
x seq :: forall a b. a -> b -> b
`seq` forall (m :: * -> *) a. Monad m => a -> m a
return b
x) forall (m :: * -> *) a.
Applicative m =>
m a -> Stream m a -> Stream m a
`consM`
        forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> (b -> m b) -> Stream m a -> Stream m b
postscanlMAfter' b -> a -> m b
fstep m b
initial b -> m b
done Stream m a
s

{-# INLINE scanl' #-}
scanl' :: Monad m => (b -> a -> b) -> b -> Stream m a -> Stream m b
scanl' :: forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> Stream m a -> Stream m b
scanl' b -> a -> b
f b
seed = forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> Stream m a -> Stream m b
scanlM' (\b
a a
b -> forall (m :: * -> *) a. Monad m => a -> m a
return (b -> a -> b
f b
a a
b)) (forall (m :: * -> *) a. Monad m => a -> m a
return b
seed)

{-# INLINE_NORMAL scanlM #-}
scanlM :: Monad m => (b -> a -> m b) -> m b -> Stream m a -> Stream m b
scanlM :: forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> Stream m a -> Stream m b
scanlM b -> a -> m b
fstep m b
begin (Stream State Stream m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {m :: * -> *} {a}.
State Stream m a -> Maybe (s, b) -> m (Step (Maybe (s, b)) b)
step' forall a. Maybe a
Nothing
  where
    {-# INLINE_LATE step' #-}
    step' :: State Stream m a -> Maybe (s, b) -> m (Step (Maybe (s, b)) b)
step' State Stream m a
_ Maybe (s, b)
Nothing = do
        b
x <- m b
begin
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield b
x (forall a. a -> Maybe a
Just (s
state, b
x))
    step' State Stream m a
gst (Just (s
st, b
acc)) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st
        case Step s a
r of
            Yield a
x s
s -> do
                b
y <- b -> a -> m b
fstep b
acc a
x
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield b
y (forall a. a -> Maybe a
Just (s
s, b
y))
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall a. a -> Maybe a
Just (s
s, b
acc))
            Step s a
Stop   -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

{-# INLINE scanl #-}
scanl :: Monad m => (b -> a -> b) -> b -> Stream m a -> Stream m b
scanl :: forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> Stream m a -> Stream m b
scanl b -> a -> b
f b
seed = forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> Stream m a -> Stream m b
scanlM (\b
a a
b -> forall (m :: * -> *) a. Monad m => a -> m a
return (b -> a -> b
f b
a a
b)) (forall (m :: * -> *) a. Monad m => a -> m a
return b
seed)

-- Adapted from the vector package
{-# INLINE_NORMAL scanl1M #-}
scanl1M :: Monad m => (a -> a -> m a) -> Stream m a -> Stream m a
scanl1M :: forall (m :: * -> *) a.
Monad m =>
(a -> a -> m a) -> Stream m a -> Stream m a
scanl1M a -> a -> m a
fstep (Stream State Stream m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a -> (s, Maybe a) -> m (Step (s, Maybe a) a)
step' (s
state, forall a. Maybe a
Nothing)
  where
    {-# INLINE_LATE step' #-}
    step' :: State Stream m a -> (s, Maybe a) -> m (Step (s, Maybe a) a)
step' State Stream m a
gst (s
st, Maybe a
Nothing) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
        case Step s a
r of
            Yield a
x s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
x (s
s, forall a. a -> Maybe a
Just a
x)
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (s
s, forall a. Maybe a
Nothing)
            Step s a
Stop   -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

    step' State Stream m a
gst (s
st, Just a
acc) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
        case Step s a
r of
            Yield a
y s
s -> do
                a
z <- a -> a -> m a
fstep a
acc a
y
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
z (s
s, forall a. a -> Maybe a
Just a
z)
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (s
s, forall a. a -> Maybe a
Just a
acc)
            Step s a
Stop   -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

{-# INLINE scanl1 #-}
scanl1 :: Monad m => (a -> a -> a) -> Stream m a -> Stream m a
scanl1 :: forall (m :: * -> *) a.
Monad m =>
(a -> a -> a) -> Stream m a -> Stream m a
scanl1 a -> a -> a
f = forall (m :: * -> *) a.
Monad m =>
(a -> a -> m a) -> Stream m a -> Stream m a
scanl1M (\a
x a
y -> forall (m :: * -> *) a. Monad m => a -> m a
return (a -> a -> a
f a
x a
y))

-- Adapted from the vector package
{-# INLINE_NORMAL scanl1M' #-}
scanl1M' :: Monad m => (a -> a -> m a) -> Stream m a -> Stream m a
scanl1M' :: forall (m :: * -> *) a.
Monad m =>
(a -> a -> m a) -> Stream m a -> Stream m a
scanl1M' a -> a -> m a
fstep (Stream State Stream m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a -> (s, Maybe a) -> m (Step (s, Maybe a) a)
step' (s
state, forall a. Maybe a
Nothing)
  where
    {-# INLINE_LATE step' #-}
    step' :: State Stream m a -> (s, Maybe a) -> m (Step (s, Maybe a) a)
step' State Stream m a
gst (s
st, Maybe a
Nothing) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
        case Step s a
r of
            Yield a
x s
s -> a
x seq :: forall a b. a -> b -> b
`seq` forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
x (s
s, forall a. a -> Maybe a
Just a
x)
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (s
s, forall a. Maybe a
Nothing)
            Step s a
Stop   -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

    step' State Stream m a
gst (s
st, Just a
acc) = a
acc seq :: forall a b. a -> b -> b
`seq` do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
        case Step s a
r of
            Yield a
y s
s -> do
                a
z <- a -> a -> m a
fstep a
acc a
y
                a
z seq :: forall a b. a -> b -> b
`seq` forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
z (s
s, forall a. a -> Maybe a
Just a
z)
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (s
s, forall a. a -> Maybe a
Just a
acc)
            Step s a
Stop   -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

{-# INLINE scanl1' #-}
scanl1' :: Monad m => (a -> a -> a) -> Stream m a -> Stream m a
scanl1' :: forall (m :: * -> *) a.
Monad m =>
(a -> a -> a) -> Stream m a -> Stream m a
scanl1' a -> a -> a
f = forall (m :: * -> *) a.
Monad m =>
(a -> a -> m a) -> Stream m a -> Stream m a
scanl1M' (\a
x a
y -> forall (m :: * -> *) a. Monad m => a -> m a
return (a -> a -> a
f a
x a
y))

-------------------------------------------------------------------------------
-- Filtering
-------------------------------------------------------------------------------

-- Adapted from the vector package
{-# INLINE_NORMAL filterM #-}
filterM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a
filterM :: forall (m :: * -> *) a.
Monad m =>
(a -> m Bool) -> Stream m a -> Stream m a
filterM a -> m Bool
f (Stream State Stream m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a -> s -> m (Step s a)
step' s
state
  where
    {-# INLINE_LATE step' #-}
    step' :: State Stream m a -> s -> m (Step s a)
step' State Stream m a
gst s
st = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
        case Step s a
r of
            Yield a
x s
s -> do
                Bool
b <- a -> m Bool
f a
x
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ if Bool
b
                         then forall s a. a -> s -> Step s a
Yield a
x s
s
                         else forall s a. s -> Step s a
Skip s
s
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip s
s
            Step s a
Stop   -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

{-# INLINE filter #-}
filter :: Monad m => (a -> Bool) -> Stream m a -> Stream m a
filter :: forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m a
filter a -> Bool
f = forall (m :: * -> *) a.
Monad m =>
(a -> m Bool) -> Stream m a -> Stream m a
filterM (forall (m :: * -> *) a. Monad m => a -> m a
return forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Bool
f)

-- Adapted from the vector package
{-# INLINE_NORMAL uniq #-}
uniq :: (Eq a, Monad m) => Stream m a -> Stream m a
uniq :: forall a (m :: * -> *). (Eq a, Monad m) => Stream m a -> Stream m a
uniq (Stream State Stream m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a -> (Maybe a, s) -> m (Step (Maybe a, s) a)
step' (forall a. Maybe a
Nothing, s
state)
  where
    {-# INLINE_LATE step' #-}
    step' :: State Stream m a -> (Maybe a, s) -> m (Step (Maybe a, s) a)
step' State Stream m a
gst (Maybe a
Nothing, s
st) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
        case Step s a
r of
            Yield a
x s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
x (forall a. a -> Maybe a
Just a
x, s
s)
            Skip  s
s   -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip  (forall a. Maybe a
Nothing, s
s)
            Step s a
Stop      -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop
    step' State Stream m a
gst (Just a
x, s
st)  = do
         Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
         case Step s a
r of
             Yield a
y s
s | a
x forall a. Eq a => a -> a -> Bool
== a
y   -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall a. a -> Maybe a
Just a
x, s
s)
                       | Bool
otherwise -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
y (forall a. a -> Maybe a
Just a
y, s
s)
             Skip  s
s   -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall a. a -> Maybe a
Just a
x, s
s)
             Step s a
Stop      -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

{-# INLINE_NORMAL deleteBy #-}
deleteBy :: Monad m => (a -> a -> Bool) -> a -> Stream m a -> Stream m a
deleteBy :: forall (m :: * -> *) a.
Monad m =>
(a -> a -> Bool) -> a -> Stream m a -> Stream m a
deleteBy a -> a -> Bool
eq a
x (Stream State Stream m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a -> (s, Bool) -> m (Step (s, Bool) a)
step' (s
state, Bool
False)
  where
    {-# INLINE_LATE step' #-}
    step' :: State Stream m a -> (s, Bool) -> m (Step (s, Bool) a)
step' State Stream m a
gst (s
st, Bool
False) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
        case Step s a
r of
            Yield a
y s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$
                if a -> a -> Bool
eq a
x a
y then forall s a. s -> Step s a
Skip (s
s, Bool
True) else forall s a. a -> s -> Step s a
Yield a
y (s
s, Bool
False)
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (s
s, Bool
False)
            Step s a
Stop   -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

    step' State Stream m a
gst (s
st, Bool
True) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
        case Step s a
r of
            Yield a
y s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
y (s
s, Bool
True)
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (s
s, Bool
True)
            Step s a
Stop   -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

------------------------------------------------------------------------------
-- Trimming
------------------------------------------------------------------------------

-- XXX using getTime in the loop can be pretty expensive especially for
-- computations where iterations are lightweight. We have the following
-- options:
--
-- 1) Run a timeout thread updating a flag asynchronously and check that
-- flag here, that way we can have a cheap termination check.
--
-- 2) Use COARSE clock to get time with lower resolution but more efficiently.
--
-- 3) Use rdtscp/rdtsc to get time directly from the processor, compute the
-- termination value of rdtsc in the beginning and then in each iteration just
-- get rdtsc and check if we should terminate.
--
data TakeByTime st s
    = TakeByTimeInit st
    | TakeByTimeCheck st s
    | TakeByTimeYield st s

{-# INLINE_NORMAL takeByTime #-}
takeByTime :: (MonadIO m, TimeUnit64 t) => t -> Stream m a -> Stream m a
takeByTime :: forall (m :: * -> *) t a.
(MonadIO m, TimeUnit64 t) =>
t -> Stream m a -> Stream m a
takeByTime t
duration (Stream State Stream m a -> s -> m (Step s a)
step1 s
state1) = forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a
-> TakeByTime s AbsTime -> m (Step (TakeByTime s AbsTime) a)
step (forall st s. st -> TakeByTime st s
TakeByTimeInit s
state1)
    where

    lim :: RelTime64
lim = forall a. TimeUnit64 a => a -> RelTime64
toRelTime64 t
duration

    {-# INLINE_LATE step #-}
    step :: State Stream m a
-> TakeByTime s AbsTime -> m (Step (TakeByTime s AbsTime) a)
step State Stream m a
_ (TakeByTimeInit s
_) | RelTime64
lim forall a. Eq a => a -> a -> Bool
== RelTime64
0 = forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop
    step State Stream m a
_ (TakeByTimeInit s
st) = do
        AbsTime
t0 <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Clock -> IO AbsTime
getTime Clock
Monotonic
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall st s. st -> s -> TakeByTime st s
TakeByTimeYield s
st AbsTime
t0)
    step State Stream m a
_ (TakeByTimeCheck s
st AbsTime
t0) = do
        AbsTime
t <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Clock -> IO AbsTime
getTime Clock
Monotonic
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$
            if AbsTime -> AbsTime -> RelTime64
diffAbsTime64 AbsTime
t AbsTime
t0 forall a. Ord a => a -> a -> Bool
> RelTime64
lim
            then forall s a. Step s a
Stop
            else forall s a. s -> Step s a
Skip (forall st s. st -> s -> TakeByTime st s
TakeByTimeYield s
st AbsTime
t0)
    step State Stream m a
gst (TakeByTimeYield s
st AbsTime
t0) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step1 State Stream m a
gst s
st
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ case Step s a
r of
             Yield a
x s
s -> forall s a. a -> s -> Step s a
Yield a
x (forall st s. st -> s -> TakeByTime st s
TakeByTimeCheck s
s AbsTime
t0)
             Skip s
s -> forall s a. s -> Step s a
Skip (forall st s. st -> s -> TakeByTime st s
TakeByTimeCheck s
s AbsTime
t0)
             Step s a
Stop -> forall s a. Step s a
Stop

data DropByTime st s x
    = DropByTimeInit st
    | DropByTimeGen st s
    | DropByTimeCheck st s x
    | DropByTimeYield st

{-# INLINE_NORMAL dropByTime #-}
dropByTime :: (MonadIO m, TimeUnit64 t) => t -> Stream m a -> Stream m a
dropByTime :: forall (m :: * -> *) t a.
(MonadIO m, TimeUnit64 t) =>
t -> Stream m a -> Stream m a
dropByTime t
duration (Stream State Stream m a -> s -> m (Step s a)
step1 s
state1) = forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a
-> DropByTime s AbsTime a -> m (Step (DropByTime s AbsTime a) a)
step (forall st s x. st -> DropByTime st s x
DropByTimeInit s
state1)
    where

    lim :: RelTime64
lim = forall a. TimeUnit64 a => a -> RelTime64
toRelTime64 t
duration

    {-# INLINE_LATE step #-}
    step :: State Stream m a
-> DropByTime s AbsTime a -> m (Step (DropByTime s AbsTime a) a)
step State Stream m a
_ (DropByTimeInit s
st) = do
        AbsTime
t0 <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Clock -> IO AbsTime
getTime Clock
Monotonic
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall st s x. st -> s -> DropByTime st s x
DropByTimeGen s
st AbsTime
t0)
    step State Stream m a
gst (DropByTimeGen s
st AbsTime
t0) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step1 State Stream m a
gst s
st
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ case Step s a
r of
             Yield a
x s
s -> forall s a. s -> Step s a
Skip (forall st s x. st -> s -> x -> DropByTime st s x
DropByTimeCheck s
s AbsTime
t0 a
x)
             Skip s
s -> forall s a. s -> Step s a
Skip (forall st s x. st -> s -> DropByTime st s x
DropByTimeGen s
s AbsTime
t0)
             Step s a
Stop -> forall s a. Step s a
Stop
    step State Stream m a
_ (DropByTimeCheck s
st AbsTime
t0 a
x) = do
        AbsTime
t <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Clock -> IO AbsTime
getTime Clock
Monotonic
        if AbsTime -> AbsTime -> RelTime64
diffAbsTime64 AbsTime
t AbsTime
t0 forall a. Ord a => a -> a -> Bool
<= RelTime64
lim
        then forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip forall a b. (a -> b) -> a -> b
$ forall st s x. st -> s -> DropByTime st s x
DropByTimeGen s
st AbsTime
t0
        else forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
x forall a b. (a -> b) -> a -> b
$ forall st s x. st -> DropByTime st s x
DropByTimeYield s
st
    step State Stream m a
gst (DropByTimeYield s
st) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step1 State Stream m a
gst s
st
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ case Step s a
r of
             Yield a
x s
s -> forall s a. a -> s -> Step s a
Yield a
x (forall st s x. st -> DropByTime st s x
DropByTimeYield s
s)
             Skip s
s -> forall s a. s -> Step s a
Skip (forall st s x. st -> DropByTime st s x
DropByTimeYield s
s)
             Step s a
Stop -> forall s a. Step s a
Stop

-- Adapted from the vector package
{-# INLINE_NORMAL drop #-}
drop :: Monad m => Int -> Stream m a -> Stream m a
drop :: forall (m :: * -> *) a. Monad m => Int -> Stream m a -> Stream m a
drop Int
n (Stream State Stream m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {a}.
(Ord a, Num a) =>
State Stream m a -> (s, Maybe a) -> m (Step (s, Maybe a) a)
step' (s
state, forall a. a -> Maybe a
Just Int
n)
  where
    {-# INLINE_LATE step' #-}
    step' :: State Stream m a -> (s, Maybe a) -> m (Step (s, Maybe a) a)
step' State Stream m a
gst (s
st, Just a
i)
      | a
i forall a. Ord a => a -> a -> Bool
> a
0 = do
          Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
          forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$
            case Step s a
r of
              Yield a
_ s
s -> forall s a. s -> Step s a
Skip (s
s, forall a. a -> Maybe a
Just (a
i forall a. Num a => a -> a -> a
- a
1))
              Skip s
s    -> forall s a. s -> Step s a
Skip (s
s, forall a. a -> Maybe a
Just a
i)
              Step s a
Stop      -> forall s a. Step s a
Stop
      | Bool
otherwise = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (s
st, forall a. Maybe a
Nothing)

    step' State Stream m a
gst (s
st, Maybe a
Nothing) = do
      Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
      forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$
        case Step s a
r of
          Yield a
x s
s -> forall s a. a -> s -> Step s a
Yield a
x (s
s, forall a. Maybe a
Nothing)
          Skip  s
s   -> forall s a. s -> Step s a
Skip (s
s, forall a. Maybe a
Nothing)
          Step s a
Stop      -> forall s a. Step s a
Stop

-- Adapted from the vector package
data DropWhileState s a
    = DropWhileDrop s
    | DropWhileYield a s
    | DropWhileNext s

{-# INLINE_NORMAL dropWhileM #-}
dropWhileM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a
dropWhileM :: forall (m :: * -> *) a.
Monad m =>
(a -> m Bool) -> Stream m a -> Stream m a
dropWhileM a -> m Bool
f (Stream State Stream m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {a}.
State Stream m a
-> DropWhileState s a -> m (Step (DropWhileState s a) a)
step' (forall s a. s -> DropWhileState s a
DropWhileDrop s
state)
  where
    {-# INLINE_LATE step' #-}
    step' :: State Stream m a
-> DropWhileState s a -> m (Step (DropWhileState s a) a)
step' State Stream m a
gst (DropWhileDrop s
st) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
        case Step s a
r of
            Yield a
x s
s -> do
                Bool
b <- a -> m Bool
f a
x
                if Bool
b
                then forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall s a. s -> DropWhileState s a
DropWhileDrop s
s)
                else forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall s a. a -> s -> DropWhileState s a
DropWhileYield a
x s
s)
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall s a. s -> DropWhileState s a
DropWhileDrop s
s)
            Step s a
Stop -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

    step' State Stream m a
gst (DropWhileNext s
st) =  do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
        case Step s a
r of
            Yield a
x s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall s a. a -> s -> DropWhileState s a
DropWhileYield a
x s
s)
            Skip s
s    -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall s a. s -> DropWhileState s a
DropWhileNext s
s)
            Step s a
Stop      -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

    step' State Stream m a
_ (DropWhileYield a
x s
st) = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
x (forall s a. s -> DropWhileState s a
DropWhileNext s
st)

{-# INLINE dropWhile #-}
dropWhile :: Monad m => (a -> Bool) -> Stream m a -> Stream m a
dropWhile :: forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m a
dropWhile a -> Bool
f = forall (m :: * -> *) a.
Monad m =>
(a -> m Bool) -> Stream m a -> Stream m a
dropWhileM (forall (m :: * -> *) a. Monad m => a -> m a
return forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Bool
f)

------------------------------------------------------------------------------
-- Inserting Elements
------------------------------------------------------------------------------

{-# INLINE_NORMAL insertBy #-}
insertBy :: Monad m => (a -> a -> Ordering) -> a -> Stream m a -> Stream m a
insertBy :: forall (m :: * -> *) a.
Monad m =>
(a -> a -> Ordering) -> a -> Stream m a -> Stream m a
insertBy a -> a -> Ordering
cmp a
a (Stream State Stream m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a
-> (s, Bool, Maybe a) -> m (Step (s, Bool, Maybe a) a)
step' (s
state, Bool
False, forall a. Maybe a
Nothing)
  where
    {-# INLINE_LATE step' #-}
    step' :: State Stream m a
-> (s, Bool, Maybe a) -> m (Step (s, Bool, Maybe a) a)
step' State Stream m a
gst (s
st, Bool
False, Maybe a
_) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
        case Step s a
r of
            Yield a
x s
s -> case a -> a -> Ordering
cmp a
a a
x of
                Ordering
GT -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
x (s
s, Bool
False, forall a. Maybe a
Nothing)
                Ordering
_  -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
a (s
s, Bool
True, forall a. a -> Maybe a
Just a
x)
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (s
s, Bool
False, forall a. Maybe a
Nothing)
            Step s a
Stop   -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
a (s
st, Bool
True, forall a. Maybe a
Nothing)

    step' State Stream m a
_ (s
_, Bool
True, Maybe a
Nothing) = forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

    step' State Stream m a
gst (s
st, Bool
True, Just a
prev) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
        case Step s a
r of
            Yield a
x s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
prev (s
s, Bool
True, forall a. a -> Maybe a
Just a
x)
            Skip s
s    -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (s
s, Bool
True, forall a. a -> Maybe a
Just a
prev)
            Step s a
Stop      -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
prev (s
st, Bool
True, forall a. Maybe a
Nothing)

data LoopState x s = FirstYield s
                   | InterspersingYield s
                   | YieldAndCarry x s

{-# INLINE_NORMAL intersperseM #-}
intersperseM :: Monad m => m a -> Stream m a -> Stream m a
intersperseM :: forall (m :: * -> *) a. Monad m => m a -> Stream m a -> Stream m a
intersperseM m a
m (Stream State Stream m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a -> LoopState a s -> m (Step (LoopState a s) a)
step' (forall x s. s -> LoopState x s
FirstYield s
state)
  where
    {-# INLINE_LATE step' #-}
    step' :: State Stream m a -> LoopState a s -> m (Step (LoopState a s) a)
step' State Stream m a
gst (FirstYield s
st) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$
            case Step s a
r of
                Yield a
x s
s -> forall s a. s -> Step s a
Skip (forall x s. x -> s -> LoopState x s
YieldAndCarry a
x s
s)
                Skip s
s -> forall s a. s -> Step s a
Skip (forall x s. s -> LoopState x s
FirstYield s
s)
                Step s a
Stop -> forall s a. Step s a
Stop

    step' State Stream m a
gst (InterspersingYield s
st) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
        case Step s a
r of
            Yield a
x s
s -> do
                a
a <- m a
m
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
a (forall x s. x -> s -> LoopState x s
YieldAndCarry a
x s
s)
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip forall a b. (a -> b) -> a -> b
$ forall x s. s -> LoopState x s
InterspersingYield s
s
            Step s a
Stop -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

    step' State Stream m a
_ (YieldAndCarry a
x s
st) = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
x (forall x s. s -> LoopState x s
InterspersingYield s
st)

{-# INLINE intersperse #-}
intersperse :: Monad m => a -> Stream m a -> Stream m a
intersperse :: forall (m :: * -> *) a. Monad m => a -> Stream m a -> Stream m a
intersperse a
a = forall (m :: * -> *) a. Monad m => m a -> Stream m a -> Stream m a
intersperseM (forall (m :: * -> *) a. Monad m => a -> m a
return a
a)

{-# INLINE_NORMAL intersperseM_ #-}
intersperseM_ :: Monad m => m b -> Stream m a -> Stream m a
intersperseM_ :: forall (m :: * -> *) b a.
Monad m =>
m b -> Stream m a -> Stream m a
intersperseM_ m b
m (Stream State Stream m a -> s -> m (Step s a)
step1 s
state1) = forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a
-> Either (m (), s) s -> m (Step (Either (m (), s) s) a)
step (forall a b. a -> Either a b
Left (forall (f :: * -> *) a. Applicative f => a -> f a
pure (), s
state1))
  where
    {-# INLINE_LATE step #-}
    step :: State Stream m a
-> Either (m (), s) s -> m (Step (Either (m (), s) s) a)
step State Stream m a
gst (Left (m ()
eff, s
st)) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step1 State Stream m a
gst s
st
        case Step s a
r of
            Yield a
x s
s -> m ()
eff forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a. Monad m => a -> m a
return (forall s a. a -> s -> Step s a
Yield a
x (forall a b. b -> Either a b
Right s
s))
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall a b. a -> Either a b
Left (m ()
eff, s
s))
            Step s a
Stop -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

    step State Stream m a
_ (Right s
st) = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip forall a b. (a -> b) -> a -> b
$ forall a b. a -> Either a b
Left (forall (f :: * -> *) a. Functor f => f a -> f ()
void m b
m, s
st)

data SuffixState s a
    = SuffixElem s
    | SuffixSuffix s
    | SuffixYield a (SuffixState s a)

{-# INLINE_NORMAL intersperseSuffix #-}
intersperseSuffix :: forall m a. Monad m => m a -> Stream m a -> Stream m a
intersperseSuffix :: forall (m :: * -> *) a. Monad m => m a -> Stream m a -> Stream m a
intersperseSuffix m a
action (Stream State Stream m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a -> SuffixState s a -> m (Step (SuffixState s a) a)
step' (forall s a. s -> SuffixState s a
SuffixElem s
state)
    where
    {-# INLINE_LATE step' #-}
    step' :: State Stream m a -> SuffixState s a -> m (Step (SuffixState s a) a)
step' State Stream m a
gst (SuffixElem s
st) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ case Step s a
r of
            Yield a
x s
s -> forall s a. s -> Step s a
Skip (forall s a. a -> SuffixState s a -> SuffixState s a
SuffixYield a
x (forall s a. s -> SuffixState s a
SuffixSuffix s
s))
            Skip s
s -> forall s a. s -> Step s a
Skip (forall s a. s -> SuffixState s a
SuffixElem s
s)
            Step s a
Stop -> forall s a. Step s a
Stop

    step' State Stream m a
_ (SuffixSuffix s
st) = do
        m a
action forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \a
r -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall s a. a -> SuffixState s a -> SuffixState s a
SuffixYield a
r (forall s a. s -> SuffixState s a
SuffixElem s
st))

    step' State Stream m a
_ (SuffixYield a
x SuffixState s a
next) = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
x SuffixState s a
next

{-# INLINE_NORMAL intersperseSuffix_ #-}
intersperseSuffix_ :: Monad m => m b -> Stream m a -> Stream m a
intersperseSuffix_ :: forall (m :: * -> *) b a.
Monad m =>
m b -> Stream m a -> Stream m a
intersperseSuffix_ m b
m (Stream State Stream m a -> s -> m (Step s a)
step1 s
state1) = forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a -> Either s s -> m (Step (Either s s) a)
step (forall a b. a -> Either a b
Left s
state1)
  where
    {-# INLINE_LATE step #-}
    step :: State Stream m a -> Either s s -> m (Step (Either s s) a)
step State Stream m a
gst (Left s
st) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step1 State Stream m a
gst s
st
        case Step s a
r of
            Yield a
x s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
x (forall a b. b -> Either a b
Right s
s)
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip forall a b. (a -> b) -> a -> b
$ forall a b. a -> Either a b
Left s
s
            Step s a
Stop -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

    step State Stream m a
_ (Right s
st) = m b
m forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a. Monad m => a -> m a
return (forall s a. s -> Step s a
Skip (forall a b. a -> Either a b
Left s
st))

data SuffixSpanState s a
    = SuffixSpanElem s Int
    | SuffixSpanSuffix s
    | SuffixSpanYield a (SuffixSpanState s a)
    | SuffixSpanLast
    | SuffixSpanStop

-- | intersperse after every n items
{-# INLINE_NORMAL intersperseSuffixBySpan #-}
intersperseSuffixBySpan :: forall m a. Monad m
    => Int -> m a -> Stream m a -> Stream m a
intersperseSuffixBySpan :: forall (m :: * -> *) a.
Monad m =>
Int -> m a -> Stream m a -> Stream m a
intersperseSuffixBySpan Int
n m a
action (Stream State Stream m a -> s -> m (Step s a)
step s
state) =
    forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State Stream m a
-> SuffixSpanState s a -> m (Step (SuffixSpanState s a) a)
step' (forall s a. s -> Int -> SuffixSpanState s a
SuffixSpanElem s
state Int
n)
    where
    {-# INLINE_LATE step' #-}
    step' :: State Stream m a
-> SuffixSpanState s a -> m (Step (SuffixSpanState s a) a)
step' State Stream m a
gst (SuffixSpanElem s
st Int
i) | Int
i forall a. Ord a => a -> a -> Bool
> Int
0 = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step State Stream m a
gst s
st
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ case Step s a
r of
            Yield a
x s
s -> forall s a. s -> Step s a
Skip (forall s a. a -> SuffixSpanState s a -> SuffixSpanState s a
SuffixSpanYield a
x (forall s a. s -> Int -> SuffixSpanState s a
SuffixSpanElem s
s (Int
i forall a. Num a => a -> a -> a
- Int
1)))
            Skip s
s -> forall s a. s -> Step s a
Skip (forall s a. s -> Int -> SuffixSpanState s a
SuffixSpanElem s
s Int
i)
            Step s a
Stop -> if Int
i forall a. Eq a => a -> a -> Bool
== Int
n then forall s a. Step s a
Stop else forall s a. s -> Step s a
Skip forall s a. SuffixSpanState s a
SuffixSpanLast
    step' State Stream m a
_ (SuffixSpanElem s
st Int
_) = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall s a. s -> SuffixSpanState s a
SuffixSpanSuffix s
st)

    step' State Stream m a
_ (SuffixSpanSuffix s
st) = do
        m a
action forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \a
r -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall s a. a -> SuffixSpanState s a -> SuffixSpanState s a
SuffixSpanYield a
r (forall s a. s -> Int -> SuffixSpanState s a
SuffixSpanElem s
st Int
n))

    step' State Stream m a
_ SuffixSpanState s a
SuffixSpanLast = do
        m a
action forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \a
r -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall s a. a -> SuffixSpanState s a -> SuffixSpanState s a
SuffixSpanYield a
r forall s a. SuffixSpanState s a
SuffixSpanStop)

    step' State Stream m a
_ (SuffixSpanYield a
x SuffixSpanState s a
next) = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
x SuffixSpanState s a
next

    step' State Stream m a
_ SuffixSpanState s a
SuffixSpanStop = forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

------------------------------------------------------------------------------
-- Reordering
------------------------------------------------------------------------------

-- We can implement reverse as:
--
-- > reverse = foldlS (flip cons) nil
--
-- However, this implementation is unusable because of the horrible performance
-- of cons. So we just convert it to a list first and then stream from the
-- list.
--
-- XXX Maybe we can use an Array instead of a list here?
{-# INLINE_NORMAL reverse #-}
reverse :: Monad m => Stream m a -> Stream m a
reverse :: forall (m :: * -> *) a. Monad m => Stream m a -> Stream m a
reverse Stream m a
m = forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {p}. p -> Maybe [a] -> m (Step (Maybe [a]) a)
step forall a. Maybe a
Nothing
    where
    {-# INLINE_LATE step #-}
    step :: p -> Maybe [a] -> m (Step (Maybe [a]) a)
step p
_ Maybe [a]
Nothing = do
        [a]
xs <- forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> Stream m a -> m b
foldl' (forall a b c. (a -> b -> c) -> b -> a -> c
flip (:)) [] Stream m a
m
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall a. a -> Maybe a
Just [a]
xs)
    step p
_ (Just (a
x:[a]
xs)) = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
x (forall a. a -> Maybe a
Just [a]
xs)
    step p
_ (Just []) = forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

-- Much faster reverse for Storables
{-
{-# INLINE_NORMAL reverse' #-}
reverse' :: forall m a. (MonadIO m, Storable a) => Stream m a -> Stream m a
-- This commented implementation copies the whole stream into one single array
-- and then streams from that array, this has exactly the same performance as
-- the chunked code in IsStream.Common.reverse' .  Though this could be problematic due to
-- unbounded large allocations. However, if we use an idiomatic implementation
-- of arraysOf instead of the custom implementation then the chunked code
-- becomes worse by 6 times. Need to investigate if that can be improved.
import Foreign.ForeignPtr (touchForeignPtr)
import Foreign.ForeignPtr.Unsafe (unsafeForeignPtrToPtr)
import Foreign.Ptr (Ptr, plusPtr)
import Streamly.Internal.Data.Array.Foreign.Mut.Type (sizeOfElem)
reverse' m = Stream step Nothing
    where
    {-# INLINE_LATE step #-}
    step _ Nothing = do
        arr <- A.fromStreamD m
        let p = A.aEnd arr `plusPtr` negate (sizeOfElem (undefined :: a))
        return $ Skip $ Just (A.aStart arr, p)

    step _ (Just (start, p)) | p < unsafeForeignPtrToPtr start = return Stop

    step _ (Just (start, p)) = do
        let !x = A.unsafeInlineIO $ do
                    r <- peek p
                    touchForeignPtr start
                    return r
            next = p `plusPtr` negate (sizeOfElem (undefined :: a))
        return $ Yield x (Just (start, next))
-}

------------------------------------------------------------------------------
-- Position Indexing
------------------------------------------------------------------------------

-- Adapted from the vector package
{-# INLINE_NORMAL indexed #-}
indexed :: Monad m => Stream m a -> Stream m (Int, a)
indexed :: forall (m :: * -> *) a. Monad m => Stream m a -> Stream m (Int, a)
indexed (Stream State Stream m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {b} {m :: * -> *} {a}.
Num b =>
State Stream m a -> (s, b) -> m (Step (s, b) (b, a))
step' (s
state, Int
0)
  where
    {-# INLINE_LATE step' #-}
    step' :: State Stream m a -> (s, b) -> m (Step (s, b) (b, a))
step' State Stream m a
gst (s
st, b
i) = b
i seq :: forall a b. a -> b -> b
`seq` do
         Step s a
r <- State Stream m a -> s -> m (Step s a)
step (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st
         case Step s a
r of
             Yield a
x s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield (b
i, a
x) (s
s, b
iforall a. Num a => a -> a -> a
+b
1)
             Skip    s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (s
s, b
i)
             Step s a
Stop      -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

-- Adapted from the vector package
{-# INLINE_NORMAL indexedR #-}
indexedR :: Monad m => Int -> Stream m a -> Stream m (Int, a)
indexedR :: forall (m :: * -> *) a.
Monad m =>
Int -> Stream m a -> Stream m (Int, a)
indexedR Int
m (Stream State Stream m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {b} {m :: * -> *} {a}.
Num b =>
State Stream m a -> (s, b) -> m (Step (s, b) (b, a))
step' (s
state, Int
m)
  where
    {-# INLINE_LATE step' #-}
    step' :: State Stream m a -> (s, b) -> m (Step (s, b) (b, a))
step' State Stream m a
gst (s
st, b
i) = b
i seq :: forall a b. a -> b -> b
`seq` do
         Step s a
r <- State Stream m a -> s -> m (Step s a)
step (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st
         case Step s a
r of
             Yield a
x s
s -> let i' :: b
i' = b
i forall a. Num a => a -> a -> a
- b
1
                          in forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield (b
i, a
x) (s
s, b
i')
             Skip    s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (s
s, b
i)
             Step s a
Stop      -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

------------------------------------------------------------------------------
-- Searching
------------------------------------------------------------------------------

{-# INLINE_NORMAL findIndices #-}
findIndices :: Monad m => (a -> Bool) -> Stream m a -> Stream m Int
findIndices :: forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m Int
findIndices a -> Bool
p (Stream State Stream m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {b} {m :: * -> *} {a}.
Num b =>
State Stream m a -> (s, b) -> m (Step (s, b) b)
step' (s
state, Int
0)
  where
    {-# INLINE_LATE step' #-}
    step' :: State Stream m a -> (s, b) -> m (Step (s, b) b)
step' State Stream m a
gst (s
st, b
i) = b
i seq :: forall a b. a -> b -> b
`seq` do
      Step s a
r <- State Stream m a -> s -> m (Step s a)
step (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st
      forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ case Step s a
r of
          Yield a
x s
s -> if a -> Bool
p a
x then forall s a. a -> s -> Step s a
Yield b
i (s
s, b
iforall a. Num a => a -> a -> a
+b
1) else forall s a. s -> Step s a
Skip (s
s, b
iforall a. Num a => a -> a -> a
+b
1)
          Skip s
s -> forall s a. s -> Step s a
Skip (s
s, b
i)
          Step s a
Stop   -> forall s a. Step s a
Stop

{-# INLINE_NORMAL slicesBy #-}
slicesBy :: Monad m => (a -> Bool) -> Stream m a -> Stream m (Int, Int)
slicesBy :: forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m (Int, Int)
slicesBy a -> Bool
p (Stream State Stream m a -> s -> m (Step s a)
step1 s
state1) = forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {a} {m :: * -> *} {a}.
(Num a, Eq a) =>
State Stream m a
-> Maybe (s, a, a) -> m (Step (Maybe (s, a, a)) (a, a))
step (forall a. a -> Maybe a
Just (s
state1, Int
0, Int
0))

    where

    {-# INLINE_LATE step #-}
    step :: State Stream m a
-> Maybe (s, a, a) -> m (Step (Maybe (s, a, a)) (a, a))
step State Stream m a
gst (Just (s
st, a
i, a
len)) = a
i seq :: forall a b. a -> b -> b
`seq` a
len seq :: forall a b. a -> b -> b
`seq` do
      Step s a
r <- State Stream m a -> s -> m (Step s a)
step1 (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st
      forall (m :: * -> *) a. Monad m => a -> m a
return
        forall a b. (a -> b) -> a -> b
$ case Step s a
r of
              Yield a
x s
s ->
                if a -> Bool
p a
x
                then forall s a. a -> s -> Step s a
Yield (a
i, a
len forall a. Num a => a -> a -> a
+ a
1) (forall a. a -> Maybe a
Just (s
s, a
i forall a. Num a => a -> a -> a
+ a
len forall a. Num a => a -> a -> a
+ a
1, a
0))
                else forall s a. s -> Step s a
Skip (forall a. a -> Maybe a
Just (s
s, a
i, a
len forall a. Num a => a -> a -> a
+ a
1))
              Skip s
s -> forall s a. s -> Step s a
Skip (forall a. a -> Maybe a
Just (s
s, a
i, a
len))
              Step s a
Stop -> if a
len forall a. Eq a => a -> a -> Bool
== a
0 then forall s a. Step s a
Stop else forall s a. a -> s -> Step s a
Yield (a
i, a
len) forall a. Maybe a
Nothing
    step State Stream m a
_ Maybe (s, a, a)
Nothing = forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

------------------------------------------------------------------------------
-- Rolling map
------------------------------------------------------------------------------

data RollingMapState s a = RollingMapInit s | RollingMapGo s a

{-# INLINE rollingMapM #-}
rollingMapM :: Monad m => (a -> a -> m b) -> Stream m a -> Stream m b
rollingMapM :: forall (m :: * -> *) a b.
Monad m =>
(a -> a -> m b) -> Stream m a -> Stream m b
rollingMapM a -> a -> m b
f (Stream State Stream m a -> s -> m (Step s a)
step1 s
state1) = forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {m :: * -> *} {a}.
State Stream m a
-> RollingMapState s a -> m (Step (RollingMapState s a) b)
step (forall s a. s -> RollingMapState s a
RollingMapInit s
state1)
    where
    step :: State Stream m a
-> RollingMapState s a -> m (Step (RollingMapState s a) b)
step State Stream m a
gst (RollingMapInit s
st) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step1 (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
st
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ case Step s a
r of
            Yield a
x s
s -> forall s a. s -> Step s a
Skip forall a b. (a -> b) -> a -> b
$ forall s a. s -> a -> RollingMapState s a
RollingMapGo s
s a
x
            Skip s
s -> forall s a. s -> Step s a
Skip forall a b. (a -> b) -> a -> b
$ forall s a. s -> RollingMapState s a
RollingMapInit s
s
            Step s a
Stop   -> forall s a. Step s a
Stop

    step State Stream m a
gst (RollingMapGo s
s1 a
x1) = do
        Step s a
r <- State Stream m a -> s -> m (Step s a)
step1 (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
gst) s
s1
        case Step s a
r of
            Yield a
x s
s -> do
                !b
res <- a -> a -> m b
f a
x a
x1
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield b
res forall a b. (a -> b) -> a -> b
$ forall s a. s -> a -> RollingMapState s a
RollingMapGo s
s a
x
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip forall a b. (a -> b) -> a -> b
$ forall s a. s -> a -> RollingMapState s a
RollingMapGo s
s a
x1
            Step s a
Stop   -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

{-# INLINE rollingMap #-}
rollingMap :: Monad m => (a -> a -> b) -> Stream m a -> Stream m b
rollingMap :: forall (m :: * -> *) a b.
Monad m =>
(a -> a -> b) -> Stream m a -> Stream m b
rollingMap a -> a -> b
f = forall (m :: * -> *) a b.
Monad m =>
(a -> a -> m b) -> Stream m a -> Stream m b
rollingMapM (\a
x a
y -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ a -> a -> b
f a
x a
y)

------------------------------------------------------------------------------
-- Maybe Streams
------------------------------------------------------------------------------

-- XXX Will this always fuse properly?
{-# INLINE_NORMAL mapMaybe #-}
mapMaybe :: Monad m => (a -> Maybe b) -> Stream m a -> Stream m b
mapMaybe :: forall (m :: * -> *) a b.
Monad m =>
(a -> Maybe b) -> Stream m a -> Stream m b
mapMaybe a -> Maybe b
f = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. HasCallStack => Maybe a -> a
fromJust forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m a
filter forall a. Maybe a -> Bool
isJust forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a b.
Monad m =>
(a -> b) -> Stream m a -> Stream m b
map a -> Maybe b
f

{-# INLINE_NORMAL mapMaybeM #-}
mapMaybeM :: Monad m => (a -> m (Maybe b)) -> Stream m a -> Stream m b
mapMaybeM :: forall (m :: * -> *) a b.
Monad m =>
(a -> m (Maybe b)) -> Stream m a -> Stream m b
mapMaybeM a -> m (Maybe b)
f = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. HasCallStack => Maybe a -> a
fromJust forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m a
filter forall a. Maybe a -> Bool
isJust forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Stream m a -> Stream m b
mapM a -> m (Maybe b)
f