{-# LANGUAGE CPP #-}
-- |
-- Module      : Streamly.Internal.Data.Stream.StreamD.Transform
-- Copyright   : (c) 2018 Composewell Technologies
--               (c) Roman Leshchinskiy 2008-2010
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
-- "Streamly.Internal.Data.Pipe" might ultimately replace this module.

-- A few functions in this module have been adapted from the vector package
-- (c) Roman Leshchinskiy. See the notes in specific combinators.

module Streamly.Internal.Data.Stream.StreamD.Transform
    (
    -- * Piping
    -- | Pass through a 'Pipe'.
      transform

    -- * Mapping
    -- | Stateless one-to-one maps.
    , map
    , mapM
    , sequence

    -- * Mapping Effects
    , tap
    , tapOffsetEvery
    , trace
    , trace_

    -- * Folding
    , foldrS
    , foldlS

    -- * Scanning By 'Fold'
    , postscan
    , scan
    , scanMany

    -- * Splitting
    , splitOn

    -- * Scanning
    -- | Left scans. Stateful, mostly one-to-one maps.
    , scanlM'
    , scanlMAfter'
    , scanl'
    , scanlM
    , scanl
    , scanl1M'
    , scanl1'
    , scanl1M
    , scanl1

    , prescanl'
    , prescanlM'

    , postscanl
    , postscanlM
    , postscanl'
    , postscanlM'
    , postscanlMAfter'

    , postscanlx'
    , postscanlMx'
    , scanlMx'
    , scanlx'

    -- * Filtering
    -- | Produce a subset of the stream.
    , with
    , scanMaybe
    , filter
    , filterM
    , deleteBy
    , uniqBy
    , uniq
    , prune
    , repeated

    -- * Trimming
    -- | Produce a subset of the stream trimmed at ends.
    , take
    , takeWhile
    , takeWhileM
    , takeWhileLast
    , takeWhileAround
    , drop
    , dropWhile
    , dropWhileM
    , dropLast
    , dropWhileLast
    , dropWhileAround

    -- * Inserting Elements
    -- | Produce a superset of the stream.
    , insertBy
    , intersperse
    , intersperseM
    , intersperseMWith
    , intersperseMSuffix
    , intersperseMSuffixWith

    -- * Inserting Side Effects
    , intersperseM_
    , intersperseMSuffix_
    , intersperseMPrefix_

    , delay
    , delayPre
    , delayPost

    -- * Reordering
    -- | Produce strictly the same set but reordered.
    , reverse
    , reverseUnbox
    , reassembleBy

    -- * Position Indexing
    , indexed
    , indexedR

    -- * Time Indexing
    , timestampWith
    , timestamped
    , timeIndexWith
    , timeIndexed

    -- * Searching
    , findIndices
    , elemIndices
    , slicesBy

    -- * Rolling map
    -- | Map using the previous element.
    , rollingMap
    , rollingMapM
    , rollingMap2

    -- * Maybe Streams
    , mapMaybe
    , mapMaybeM
    , catMaybes

    -- * Either Streams
    , catLefts
    , catRights
    , catEithers
    )
where

#include "inline.hs"

import Control.Concurrent (threadDelay)
import Control.Monad (void)
import Control.Monad.IO.Class (MonadIO (liftIO))
import Data.Either (fromLeft, isLeft, isRight, fromRight)
import Data.Functor ((<&>))
import Data.Maybe (fromJust, isJust)
import Fusion.Plugin.Types (Fuse(..))

import Streamly.Internal.Data.Fold.Type (Fold(..))
import Streamly.Internal.Data.Pipe.Type (Pipe(..), PipeState(..))
import Streamly.Internal.Data.SVar.Type (adaptState)
import Streamly.Internal.Data.Time.Units (AbsTime, RelTime64)
import Streamly.Internal.Data.Unboxed (Unbox)
import Streamly.Internal.System.IO (defaultChunkSize)

-- import qualified Data.List as List
import qualified Streamly.Internal.Data.Array.Type as A
import qualified Streamly.Internal.Data.Fold as FL
import qualified Streamly.Internal.Data.Pipe.Type as Pipe
import qualified Streamly.Internal.Data.Stream.StreamK.Type as K

import Prelude hiding
       ( drop, dropWhile, filter, map, mapM, reverse
       , scanl, scanl1, sequence, take, takeWhile, zipWith)

import Streamly.Internal.Data.Stream.StreamD.Generate
    (absTimesWith, relTimesWith)
import Streamly.Internal.Data.Stream.StreamD.Type

#include "DocTestDataStream.hs"

------------------------------------------------------------------------------
-- Piping
------------------------------------------------------------------------------

-- | Use a 'Pipe' to transform a stream.
--
-- /Pre-release/
--
{-# INLINE_NORMAL transform #-}
transform :: Monad m => Pipe m a b -> Stream m a -> Stream m b
transform :: forall (m :: * -> *) a b.
Monad m =>
Pipe m a b -> Stream m a -> Stream m b
transform (Pipe s1 -> a -> m (Step (PipeState s1 s2) b)
pstep1 s2 -> m (Step (PipeState s1 s2) b)
pstep2 s1
pstate) (Stream State StreamK m a -> s -> m (Step s a)
step s
state) =
    forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {m :: * -> *} {a}.
State StreamK m a
-> (PipeState s1 s2, s) -> m (Step (PipeState s1 s2, s) b)
step' (forall s1 s2. s1 -> PipeState s1 s2
Consume s1
pstate, s
state)

  where

    {-# INLINE_LATE step' #-}

    step' :: State StreamK m a
-> (PipeState s1 s2, s) -> m (Step (PipeState s1 s2, s) b)
step' State StreamK m a
gst (Consume s1
pst, s
st) = s1
pst seq :: forall a b. a -> b -> b
`seq` do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
st
        case Step s a
r of
            Yield a
x s
s -> do
                Step (PipeState s1 s2) b
res <- s1 -> a -> m (Step (PipeState s1 s2) b)
pstep1 s1
pst a
x
                case Step (PipeState s1 s2) b
res of
                    Pipe.Yield b
b PipeState s1 s2
pst' -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield b
b (PipeState s1 s2
pst', s
s)
                    Pipe.Continue PipeState s1 s2
pst' -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (PipeState s1 s2
pst', s
s)
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall s1 s2. s1 -> PipeState s1 s2
Consume s1
pst, s
s)
            Step s a
Stop   -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

    step' State StreamK m a
_ (Produce s2
pst, s
st) = s2
pst seq :: forall a b. a -> b -> b
`seq` do
        Step (PipeState s1 s2) b
res <- s2 -> m (Step (PipeState s1 s2) b)
pstep2 s2
pst
        case Step (PipeState s1 s2) b
res of
            Pipe.Yield b
b PipeState s1 s2
pst' -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield b
b (PipeState s1 s2
pst', s
st)
            Pipe.Continue PipeState s1 s2
pst' -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (PipeState s1 s2
pst', s
st)

------------------------------------------------------------------------------
-- Transformation Folds
------------------------------------------------------------------------------

-- Note, this is going to have horrible performance, because of the nature of
-- the stream type (i.e. direct stream vs CPS). Its only for reference, it is
-- likely be practically unusable.
{-# INLINE_NORMAL foldlS #-}
foldlS :: Monad m
    => (Stream m b -> a -> Stream m b) -> Stream m b -> Stream m a -> Stream m b
foldlS :: forall (m :: * -> *) b a.
Monad m =>
(Stream m b -> a -> Stream m b)
-> Stream m b -> Stream m a -> Stream m b
foldlS Stream m b -> a -> Stream m b
fstep Stream m b
begin (Stream State StreamK m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {m :: * -> *} {a}.
State StreamK m a
-> Either (s, Stream m b) (Stream m b)
-> m (Step (Either (s, Stream m b) (Stream m b)) b)
step' (forall a b. a -> Either a b
Left (s
state, Stream m b
begin))
  where
    step' :: State StreamK m a
-> Either (s, Stream m b) (Stream m b)
-> m (Step (Either (s, Stream m b) (Stream m b)) b)
step' State StreamK m a
gst (Left (s
st, Stream m b
acc)) = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
st
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ case Step s a
r of
            Yield a
x s
s -> forall s a. s -> Step s a
Skip (forall a b. a -> Either a b
Left (s
s, Stream m b -> a -> Stream m b
fstep Stream m b
acc a
x))
            Skip s
s -> forall s a. s -> Step s a
Skip (forall a b. a -> Either a b
Left (s
s, Stream m b
acc))
            Step s a
Stop   -> forall s a. s -> Step s a
Skip (forall a b. b -> Either a b
Right Stream m b
acc)

    step' State StreamK m a
gst (Right (Stream State StreamK m b -> s -> m (Step s b)
stp s
stt)) = do
        Step s b
r <- State StreamK m b -> s -> m (Step s b)
stp (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
stt
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ case Step s b
r of
            Yield b
x s
s -> forall s a. a -> s -> Step s a
Yield b
x (forall a b. b -> Either a b
Right (forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m b -> s -> m (Step s b)
stp s
s))
            Skip s
s -> forall s a. s -> Step s a
Skip (forall a b. b -> Either a b
Right (forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m b -> s -> m (Step s b)
stp s
s))
            Step s b
Stop   -> forall s a. Step s a
Stop

------------------------------------------------------------------------------
-- Transformation by Mapping
------------------------------------------------------------------------------

-- |
-- >>> sequence = Stream.mapM id
--
-- Replace the elements of a stream of monadic actions with the outputs of
-- those actions.
--
-- >>> s = Stream.fromList [putStr "a", putStr "b", putStrLn "c"]
-- >>> Stream.fold Fold.drain $ Stream.sequence s
-- abc
--
{-# INLINE_NORMAL sequence #-}
sequence :: Monad m => Stream m (m a) -> Stream m a
sequence :: forall (m :: * -> *) a. Monad m => Stream m (m a) -> Stream m a
sequence (Stream State StreamK m (m a) -> s -> m (Step s (m a))
step s
state) = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {m :: * -> *} {a}. State StreamK m a -> s -> m (Step s a)
step' s
state
  where
    {-# INLINE_LATE step' #-}
    step' :: State StreamK m a -> s -> m (Step s a)
step' State StreamK m a
gst s
st = do
         Step s (m a)
r <- State StreamK m (m a) -> s -> m (Step s (m a))
step (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
st
         case Step s (m a)
r of
             Yield m a
x s
s -> m a
x forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \a
a -> forall (m :: * -> *) a. Monad m => a -> m a
return (forall s a. a -> s -> Step s a
Yield a
a s
s)
             Skip s
s    -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip s
s
             Step s (m a)
Stop      -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

------------------------------------------------------------------------------
-- Mapping side effects
------------------------------------------------------------------------------

data TapState fs st a
    = TapInit | Tapping !fs st | TapDone st

-- XXX Multiple yield points

-- | Tap the data flowing through a stream into a 'Fold'. For example, you may
-- add a tap to log the contents flowing through the stream. The fold is used
-- only for effects, its result is discarded.
--
-- @
--                   Fold m a b
--                       |
-- -----stream m a ---------------stream m a-----
--
-- @
--
-- >>> s = Stream.enumerateFromTo 1 2
-- >>> Stream.fold Fold.drain $ Stream.tap (Fold.drainMapM print) s
-- 1
-- 2
--
-- Compare with 'trace'.
--
{-# INLINE tap #-}
tap :: Monad m => Fold m a b -> Stream m a -> Stream m a
tap :: forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> Stream m a
tap (Fold s -> a -> m (Step s b)
fstep m (Step s b)
initial s -> m b
extract) (Stream State StreamK m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {a} {a}.
State StreamK m a -> TapState s s a -> m (Step (TapState s s a) a)
step' forall fs st a. TapState fs st a
TapInit

    where

    step' :: State StreamK m a -> TapState s s a -> m (Step (TapState s s a) a)
step' State StreamK m a
_ TapState s s a
TapInit = do
        Step s b
res <- m (Step s b)
initial
        forall (m :: * -> *) a. Monad m => a -> m a
return
            forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip
            forall a b. (a -> b) -> a -> b
$ case Step s b
res of
                  FL.Partial s
s -> forall fs st a. fs -> st -> TapState fs st a
Tapping s
s s
state
                  FL.Done b
_ -> forall fs st a. st -> TapState fs st a
TapDone s
state
    step' State StreamK m a
gst (Tapping s
acc s
st) = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step State StreamK m a
gst s
st
        case Step s a
r of
            Yield a
x s
s -> do
                Step s b
res <- s -> a -> m (Step s b)
fstep s
acc a
x
                forall (m :: * -> *) a. Monad m => a -> m a
return
                    forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
x
                    forall a b. (a -> b) -> a -> b
$ case Step s b
res of
                          FL.Partial s
fs -> forall fs st a. fs -> st -> TapState fs st a
Tapping s
fs s
s
                          FL.Done b
_ -> forall fs st a. st -> TapState fs st a
TapDone s
s
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall fs st a. fs -> st -> TapState fs st a
Tapping s
acc s
s)
            Step s a
Stop -> do
                forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ s -> m b
extract s
acc
                forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop
    step' State StreamK m a
gst (TapDone s
st) = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step State StreamK m a
gst s
st
        forall (m :: * -> *) a. Monad m => a -> m a
return
            forall a b. (a -> b) -> a -> b
$ case Step s a
r of
                  Yield a
x s
s -> forall s a. a -> s -> Step s a
Yield a
x (forall fs st a. st -> TapState fs st a
TapDone s
s)
                  Skip s
s -> forall s a. s -> Step s a
Skip (forall fs st a. st -> TapState fs st a
TapDone s
s)
                  Step s a
Stop -> forall s a. Step s a
Stop

data TapOffState fs s a
    = TapOffInit
    | TapOffTapping !fs s Int
    | TapOffDone s

-- XXX Multiple yield points
{-# INLINE_NORMAL tapOffsetEvery #-}
tapOffsetEvery :: Monad m
    => Int -> Int -> Fold m a b -> Stream m a -> Stream m a
tapOffsetEvery :: forall (m :: * -> *) a b.
Monad m =>
Int -> Int -> Fold m a b -> Stream m a -> Stream m a
tapOffsetEvery Int
offset Int
n (Fold s -> a -> m (Step s b)
fstep m (Step s b)
initial s -> m b
extract) (Stream State StreamK m a -> s -> m (Step s a)
step s
state) =
    forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {a} {a}.
State StreamK m a
-> TapOffState s s a -> m (Step (TapOffState s s a) a)
step' forall fs s a. TapOffState fs s a
TapOffInit

    where

    {-# INLINE_LATE step' #-}
    step' :: State StreamK m a
-> TapOffState s s a -> m (Step (TapOffState s s a) a)
step' State StreamK m a
_ TapOffState s s a
TapOffInit = do
        Step s b
res <- m (Step s b)
initial
        forall (m :: * -> *) a. Monad m => a -> m a
return
            forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip
            forall a b. (a -> b) -> a -> b
$ case Step s b
res of
                  FL.Partial s
s -> forall fs s a. fs -> s -> Int -> TapOffState fs s a
TapOffTapping s
s s
state (Int
offset forall a. Integral a => a -> a -> a
`mod` Int
n)
                  FL.Done b
_ -> forall fs s a. s -> TapOffState fs s a
TapOffDone s
state
    step' State StreamK m a
gst (TapOffTapping s
acc s
st Int
count) = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step State StreamK m a
gst s
st
        case Step s a
r of
            Yield a
x s
s -> do
                TapOffState s s a
next <-
                    if Int
count forall a. Ord a => a -> a -> Bool
<= Int
0
                    then do
                        Step s b
res <- s -> a -> m (Step s b)
fstep s
acc a
x
                        forall (m :: * -> *) a. Monad m => a -> m a
return
                            forall a b. (a -> b) -> a -> b
$ case Step s b
res of
                                  FL.Partial s
sres ->
                                    forall fs s a. fs -> s -> Int -> TapOffState fs s a
TapOffTapping s
sres s
s (Int
n forall a. Num a => a -> a -> a
- Int
1)
                                  FL.Done b
_ -> forall fs s a. s -> TapOffState fs s a
TapOffDone s
s
                    else forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall fs s a. fs -> s -> Int -> TapOffState fs s a
TapOffTapping s
acc s
s (Int
count forall a. Num a => a -> a -> a
- Int
1)
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
x TapOffState s s a
next
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall fs s a. fs -> s -> Int -> TapOffState fs s a
TapOffTapping s
acc s
s Int
count)
            Step s a
Stop -> do
                forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ s -> m b
extract s
acc
                forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop
    step' State StreamK m a
gst (TapOffDone s
st) = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step State StreamK m a
gst s
st
        forall (m :: * -> *) a. Monad m => a -> m a
return
            forall a b. (a -> b) -> a -> b
$ case Step s a
r of
                  Yield a
x s
s -> forall s a. a -> s -> Step s a
Yield a
x (forall fs s a. s -> TapOffState fs s a
TapOffDone s
s)
                  Skip s
s -> forall s a. s -> Step s a
Skip (forall fs s a. s -> TapOffState fs s a
TapOffDone s
s)
                  Step s a
Stop -> forall s a. Step s a
Stop

-- | Apply a monadic function to each element flowing through the stream and
-- discard the results.
--
-- >>> s = Stream.enumerateFromTo 1 2
-- >>> Stream.fold Fold.drain $ Stream.trace print s
-- 1
-- 2
--
-- Compare with 'tap'.
--
{-# INLINE trace #-}
trace :: Monad m => (a -> m b) -> Stream m a -> Stream m a
trace :: forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Stream m a -> Stream m a
trace a -> m b
f = forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Stream m a -> Stream m b
mapM (\a
x -> forall (f :: * -> *) a. Functor f => f a -> f ()
void (a -> m b
f a
x) forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a. Monad m => a -> m a
return a
x)

-- | Perform a side effect before yielding each element of the stream and
-- discard the results.
--
-- >>> s = Stream.enumerateFromTo 1 2
-- >>> Stream.fold Fold.drain $ Stream.trace_ (print "got here") s
-- "got here"
-- "got here"
--
-- Same as 'intersperseMPrefix_' but always serial.
--
-- See also: 'trace'
--
-- /Pre-release/
{-# INLINE trace_ #-}
trace_ :: Monad m => m b -> Stream m a -> Stream m a
trace_ :: forall (m :: * -> *) b a.
Monad m =>
m b -> Stream m a -> Stream m a
trace_ m b
eff = forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Stream m a -> Stream m b
mapM (\a
x -> m b
eff forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a. Monad m => a -> m a
return a
x)

------------------------------------------------------------------------------
-- Scanning with a Fold
------------------------------------------------------------------------------

data ScanState s f = ScanInit s | ScanDo s !f | ScanDone

-- | Postscan a stream using the given monadic fold.
--
-- The following example extracts the input stream up to a point where the
-- running average of elements is no more than 10:
--
-- >>> import Data.Maybe (fromJust)
-- >>> let avg = Fold.teeWith (/) Fold.sum (fmap fromIntegral Fold.length)
-- >>> s = Stream.enumerateFromTo 1.0 100.0
-- >>> :{
--  Stream.fold Fold.toList
--   $ fmap (fromJust . fst)
--   $ Stream.takeWhile (\(_,x) -> x <= 10)
--   $ Stream.postscan (Fold.tee Fold.latest avg) s
-- :}
-- [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,11.0,12.0,13.0,14.0,15.0,16.0,17.0,18.0,19.0]
--
{-# INLINE_NORMAL postscan #-}
postscan :: Monad m => FL.Fold m a b -> Stream m a -> Stream m b
postscan :: forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> Stream m b
postscan (FL.Fold s -> a -> m (Step s b)
fstep m (Step s b)
initial s -> m b
extract) (Stream State StreamK m a -> s -> m (Step s a)
sstep s
state) =
    forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {m :: * -> *} {a}.
State StreamK m a -> ScanState s s -> m (Step (ScanState s s) b)
step (forall s f. s -> ScanState s f
ScanInit s
state)

    where

    {-# INLINE_LATE step #-}
    step :: State StreamK m a -> ScanState s s -> m (Step (ScanState s s) b)
step State StreamK m a
_ (ScanInit s
st) = do
        Step s b
res <- m (Step s b)
initial
        forall (m :: * -> *) a. Monad m => a -> m a
return
            forall a b. (a -> b) -> a -> b
$ case Step s b
res of
                  FL.Partial s
fs -> forall s a. s -> Step s a
Skip forall a b. (a -> b) -> a -> b
$ forall s f. s -> f -> ScanState s f
ScanDo s
st s
fs
                  FL.Done b
b -> forall s a. a -> s -> Step s a
Yield b
b forall s f. ScanState s f
ScanDone
    step State StreamK m a
gst (ScanDo s
st s
fs) = do
        Step s a
res <- State StreamK m a -> s -> m (Step s a)
sstep (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
st
        case Step s a
res of
            Yield a
x s
s -> do
                Step s b
r <- s -> a -> m (Step s b)
fstep s
fs a
x
                case Step s b
r of
                    FL.Partial s
fs1 -> do
                        !b
b <- s -> m b
extract s
fs1
                        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield b
b forall a b. (a -> b) -> a -> b
$ forall s f. s -> f -> ScanState s f
ScanDo s
s s
fs1
                    FL.Done b
b -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield b
b forall s f. ScanState s f
ScanDone
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip forall a b. (a -> b) -> a -> b
$ forall s f. s -> f -> ScanState s f
ScanDo s
s s
fs
            Step s a
Stop -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop
    step State StreamK m a
_ ScanState s s
ScanDone = forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

{-# INLINE scanWith #-}
scanWith :: Monad m
    => Bool -> Fold m a b -> Stream m a -> Stream m b
scanWith :: forall (m :: * -> *) a b.
Monad m =>
Bool -> Fold m a b -> Stream m a -> Stream m b
scanWith Bool
restart (Fold s -> a -> m (Step s b)
fstep m (Step s b)
initial s -> m b
extract) (Stream State StreamK m a -> s -> m (Step s a)
sstep s
state) =
    forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {m :: * -> *} {a}.
State StreamK m a -> ScanState s s -> m (Step (ScanState s s) b)
step (forall s f. s -> ScanState s f
ScanInit s
state)

    where

    {-# INLINE runStep #-}
    runStep :: s -> m (Step s b) -> m (Step (ScanState s s) b)
runStep s
st m (Step s b)
action = do
        Step s b
res <- m (Step s b)
action
        case Step s b
res of
            FL.Partial s
fs -> do
                !b
b <- s -> m b
extract s
fs
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield b
b forall a b. (a -> b) -> a -> b
$ forall s f. s -> f -> ScanState s f
ScanDo s
st s
fs
            FL.Done b
b ->
                let next :: ScanState s f
next = if Bool
restart then forall s f. s -> ScanState s f
ScanInit s
st else forall s f. ScanState s f
ScanDone
                 in forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield b
b forall {f}. ScanState s f
next

    {-# INLINE_LATE step #-}
    step :: State StreamK m a -> ScanState s s -> m (Step (ScanState s s) b)
step State StreamK m a
_ (ScanInit s
st) = forall {s}. s -> m (Step s b) -> m (Step (ScanState s s) b)
runStep s
st m (Step s b)
initial
    step State StreamK m a
gst (ScanDo s
st s
fs) = do
        Step s a
res <- State StreamK m a -> s -> m (Step s a)
sstep (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
st
        case Step s a
res of
            Yield a
x s
s -> forall {s}. s -> m (Step s b) -> m (Step (ScanState s s) b)
runStep s
s (s -> a -> m (Step s b)
fstep s
fs a
x)
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip forall a b. (a -> b) -> a -> b
$ forall s f. s -> f -> ScanState s f
ScanDo s
s s
fs
            Step s a
Stop -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop
    step State StreamK m a
_ ScanState s s
ScanDone = forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

-- XXX It may be useful to have a version of scan where we can keep the
-- accumulator independent of the value emitted. So that we do not necessarily
-- have to keep a value in the accumulator which we are not using. We can pass
-- an extraction function that will take the accumulator and the current value
-- of the element and emit the next value in the stream. That will also make it
-- possible to modify the accumulator after using it. In fact, the step function
-- can return new accumulator and the value to be emitted. The signature would
-- be more like mapAccumL.

-- | Strict left scan. Scan a stream using the given monadic fold.
--
-- >>> s = Stream.fromList [1..10]
-- >>> Stream.fold Fold.toList $ Stream.takeWhile (< 10) $ Stream.scan Fold.sum s
-- [0,1,3,6]
--
-- See also: 'usingStateT'
--

-- EXPLANATION:
-- >>> scanl' step z = Stream.scan (Fold.foldl' step z)
--
-- Like 'map', 'scanl'' too is a one to one transformation,
-- however it adds an extra element.
--
-- >>> s = Stream.fromList [1,2,3,4]
-- >>> Stream.fold Fold.toList $ scanl' (+) 0 s
-- [0,1,3,6,10]
--
-- >>> Stream.fold Fold.toList $ scanl' (flip (:)) [] s
-- [[],[1],[2,1],[3,2,1],[4,3,2,1]]
--
-- The output of 'scanl'' is the initial value of the accumulator followed by
-- all the intermediate steps and the final result of 'foldl''.
--
-- By streaming the accumulated state after each fold step, we can share the
-- state across multiple stages of stream composition. Each stage can modify or
-- extend the state, do some processing with it and emit it for the next stage,
-- thus modularizing the stream processing. This can be useful in
-- stateful or event-driven programming.
--
-- Consider the following monolithic example, computing the sum and the product
-- of the elements in a stream in one go using a @foldl'@:
--
-- >>> foldl' step z = Stream.fold (Fold.foldl' step z)
-- >>> foldl' (\(s, p) x -> (s + x, p * x)) (0,1) s
-- (10,24)
--
-- Using @scanl'@ we can make it modular by computing the sum in the first
-- stage and passing it down to the next stage for computing the product:
--
-- >>> :{
--   foldl' (\(_, p) (s, x) -> (s, p * x)) (0,1)
--   $ scanl' (\(s, _) x -> (s + x, x)) (0,1)
--   $ Stream.fromList [1,2,3,4]
-- :}
-- (10,24)
--
-- IMPORTANT: 'scanl'' evaluates the accumulator to WHNF.  To avoid building
-- lazy expressions inside the accumulator, it is recommended that a strict
-- data structure is used for accumulator.
--
{-# INLINE_NORMAL scan #-}
scan :: Monad m
    => FL.Fold m a b -> Stream m a -> Stream m b
scan :: forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> Stream m b
scan = forall (m :: * -> *) a b.
Monad m =>
Bool -> Fold m a b -> Stream m a -> Stream m b
scanWith Bool
False

-- | Like 'scan' but restarts scanning afresh when the scanning fold
-- terminates.
--
{-# INLINE_NORMAL scanMany #-}
scanMany :: Monad m
    => FL.Fold m a b -> Stream m a -> Stream m b
scanMany :: forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> Stream m b
scanMany = forall (m :: * -> *) a b.
Monad m =>
Bool -> Fold m a b -> Stream m a -> Stream m b
scanWith Bool
True

------------------------------------------------------------------------------
-- Scanning - Prescans
------------------------------------------------------------------------------

-- Adapted from the vector package.
--
-- XXX Is a prescan useful, discarding the last step does not sound useful?  I
-- am not sure about the utility of this function, so this is implemented but
-- not exposed. We can expose it if someone provides good reasons why this is
-- useful.
--
-- XXX We have to execute the stream one step ahead to know that we are at the
-- last step.  The vector implementation of prescan executes the last fold step
-- but does not yield the result. This means we have executed the effect but
-- discarded value. This does not sound right. In this implementation we are
-- not executing the last fold step.
{-# INLINE_NORMAL prescanlM' #-}
prescanlM' :: Monad m => (b -> a -> m b) -> m b -> Stream m a -> Stream m b
prescanlM' :: forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> Stream m a -> Stream m b
prescanlM' b -> a -> m b
f m b
mz (Stream State StreamK m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {m :: * -> *} {a}.
State StreamK m a -> (s, m b) -> m (Step (s, m b) b)
step' (s
state, m b
mz)
  where
    {-# INLINE_LATE step' #-}
    step' :: State StreamK m a -> (s, m b) -> m (Step (s, m b) b)
step' State StreamK m a
gst (s
st, m b
prev) = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
st
        case Step s a
r of
            Yield a
x s
s -> do
                b
acc <- m b
prev
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield b
acc (s
s, b -> a -> m b
f b
acc a
x)
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (s
s, m b
prev)
            Step s a
Stop   -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

{-# INLINE prescanl' #-}
prescanl' :: Monad m => (b -> a -> b) -> b -> Stream m a -> Stream m b
prescanl' :: forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> Stream m a -> Stream m b
prescanl' b -> a -> b
f b
z = forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> Stream m a -> Stream m b
prescanlM' (\b
a a
b -> forall (m :: * -> *) a. Monad m => a -> m a
return (b -> a -> b
f b
a a
b)) (forall (m :: * -> *) a. Monad m => a -> m a
return b
z)

------------------------------------------------------------------------------
-- Monolithic postscans (postscan followed by a map)
------------------------------------------------------------------------------

-- The performance of a modular postscan followed by a map seems to be
-- equivalent to this monolithic scan followed by map therefore we may not need
-- this implementation. We just have it for performance comparison and in case
-- modular version does not perform well in some situation.
--
{-# INLINE_NORMAL postscanlMx' #-}
postscanlMx' :: Monad m
    => (x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> Stream m b
postscanlMx' :: forall (m :: * -> *) x a b.
Monad m =>
(x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> Stream m b
postscanlMx' x -> a -> m x
fstep m x
begin x -> m b
done (Stream State StreamK m a -> s -> m (Step s a)
step s
state) = do
    forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {m :: * -> *} {a}.
State StreamK m a -> (s, m x) -> m (Step (s, m x) b)
step' (s
state, m x
begin)
  where
    {-# INLINE_LATE step' #-}
    step' :: State StreamK m a -> (s, m x) -> m (Step (s, m x) b)
step' State StreamK m a
gst (s
st, m x
acc) = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
st
        case Step s a
r of
            Yield a
x s
s -> do
                x
old <- m x
acc
                x
y <- x -> a -> m x
fstep x
old a
x
                b
v <- x -> m b
done x
y
                b
v seq :: forall a b. a -> b -> b
`seq` x
y seq :: forall a b. a -> b -> b
`seq` forall (m :: * -> *) a. Monad m => a -> m a
return (forall s a. a -> s -> Step s a
Yield b
v (s
s, forall (m :: * -> *) a. Monad m => a -> m a
return x
y))
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (s
s, m x
acc)
            Step s a
Stop   -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

{-# INLINE_NORMAL postscanlx' #-}
postscanlx' :: Monad m
    => (x -> a -> x) -> x -> (x -> b) -> Stream m a -> Stream m b
postscanlx' :: forall (m :: * -> *) x a b.
Monad m =>
(x -> a -> x) -> x -> (x -> b) -> Stream m a -> Stream m b
postscanlx' x -> a -> x
fstep x
begin x -> b
done =
    forall (m :: * -> *) x a b.
Monad m =>
(x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> Stream m b
postscanlMx' (\x
b a
a -> forall (m :: * -> *) a. Monad m => a -> m a
return (x -> a -> x
fstep x
b a
a)) (forall (m :: * -> *) a. Monad m => a -> m a
return x
begin) (forall (m :: * -> *) a. Monad m => a -> m a
return forall b c a. (b -> c) -> (a -> b) -> a -> c
. x -> b
done)

-- XXX do we need consM strict to evaluate the begin value?
{-# INLINE scanlMx' #-}
scanlMx' :: Monad m
    => (x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> Stream m b
scanlMx' :: forall (m :: * -> *) x a b.
Monad m =>
(x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> Stream m b
scanlMx' x -> a -> m x
fstep m x
begin x -> m b
done Stream m a
s =
    (m x
begin forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \x
x -> x
x seq :: forall a b. a -> b -> b
`seq` x -> m b
done x
x) forall (m :: * -> *) a.
Applicative m =>
m a -> Stream m a -> Stream m a
`consM` forall (m :: * -> *) x a b.
Monad m =>
(x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> Stream m b
postscanlMx' x -> a -> m x
fstep m x
begin x -> m b
done Stream m a
s

{-# INLINE scanlx' #-}
scanlx' :: Monad m
    => (x -> a -> x) -> x -> (x -> b) -> Stream m a -> Stream m b
scanlx' :: forall (m :: * -> *) x a b.
Monad m =>
(x -> a -> x) -> x -> (x -> b) -> Stream m a -> Stream m b
scanlx' x -> a -> x
fstep x
begin x -> b
done =
    forall (m :: * -> *) x a b.
Monad m =>
(x -> a -> m x) -> m x -> (x -> m b) -> Stream m a -> Stream m b
scanlMx' (\x
b a
a -> forall (m :: * -> *) a. Monad m => a -> m a
return (x -> a -> x
fstep x
b a
a)) (forall (m :: * -> *) a. Monad m => a -> m a
return x
begin) (forall (m :: * -> *) a. Monad m => a -> m a
return forall b c a. (b -> c) -> (a -> b) -> a -> c
. x -> b
done)

------------------------------------------------------------------------------
-- postscans
------------------------------------------------------------------------------

-- Adapted from the vector package.
{-# INLINE_NORMAL postscanlM' #-}
postscanlM' :: Monad m => (b -> a -> m b) -> m b -> Stream m a -> Stream m b
postscanlM' :: forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> Stream m a -> Stream m b
postscanlM' b -> a -> m b
fstep m b
begin (Stream State StreamK m a -> s -> m (Step s a)
step s
state) =
    forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {m :: * -> *} {a}.
State StreamK m a -> Maybe (s, b) -> m (Step (Maybe (s, b)) b)
step' forall a. Maybe a
Nothing
  where
    {-# INLINE_LATE step' #-}
    step' :: State StreamK m a -> Maybe (s, b) -> m (Step (Maybe (s, b)) b)
step' State StreamK m a
_ Maybe (s, b)
Nothing = do
        !b
x <- m b
begin
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall a. a -> Maybe a
Just (s
state, b
x))

    step' State StreamK m a
gst (Just (s
st, b
acc)) =  do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
st
        case Step s a
r of
            Yield a
x s
s -> do
                !b
y <- b -> a -> m b
fstep b
acc a
x
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield b
y (forall a. a -> Maybe a
Just (s
s, b
y))
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall a. a -> Maybe a
Just (s
s, b
acc))
            Step s a
Stop   -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

{-# INLINE_NORMAL postscanl' #-}
postscanl' :: Monad m => (a -> b -> a) -> a -> Stream m b -> Stream m a
postscanl' :: forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> Stream m a -> Stream m b
postscanl' a -> b -> a
f a
seed = forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> Stream m a -> Stream m b
postscanlM' (\a
a b
b -> forall (m :: * -> *) a. Monad m => a -> m a
return (a -> b -> a
f a
a b
b)) (forall (m :: * -> *) a. Monad m => a -> m a
return a
seed)

{-# ANN type PScanAfterState Fuse #-}
data PScanAfterState m st acc =
      PScanAfterStep st (m acc)
    | PScanAfterYield acc (PScanAfterState m st acc)
    | PScanAfterStop

-- We can possibly have the "done" function as a Maybe to provide an option to
-- emit or not emit the accumulator when the stream stops.
--
-- TBD: use a single Yield point
--
{-# INLINE_NORMAL postscanlMAfter' #-}
postscanlMAfter' :: Monad m
    => (b -> a -> m b) -> m b -> (b -> m b) -> Stream m a -> Stream m b
postscanlMAfter' :: forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> (b -> m b) -> Stream m a -> Stream m b
postscanlMAfter' b -> a -> m b
fstep m b
initial b -> m b
done (Stream State StreamK m a -> s -> m (Step s a)
step1 s
state1) = do
    forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {m :: * -> *} {a}.
State StreamK m a
-> PScanAfterState m s b -> m (Step (PScanAfterState m s b) b)
step (forall (m :: * -> *) st acc.
st -> m acc -> PScanAfterState m st acc
PScanAfterStep s
state1 m b
initial)

    where

    {-# INLINE_LATE step #-}
    step :: State StreamK m a
-> PScanAfterState m s b -> m (Step (PScanAfterState m s b) b)
step State StreamK m a
gst (PScanAfterStep s
st m b
acc) = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step1 (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
st
        case Step s a
r of
            Yield a
x s
s -> do
                !b
old <- m b
acc
                !b
y <- b -> a -> m b
fstep b
old a
x
                forall (m :: * -> *) a. Monad m => a -> m a
return (forall s a. s -> Step s a
Skip forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) st acc.
acc -> PScanAfterState m st acc -> PScanAfterState m st acc
PScanAfterYield b
y (forall (m :: * -> *) st acc.
st -> m acc -> PScanAfterState m st acc
PScanAfterStep s
s (forall (m :: * -> *) a. Monad m => a -> m a
return b
y)))
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) st acc.
st -> m acc -> PScanAfterState m st acc
PScanAfterStep s
s m b
acc
            -- Strictness is important for fusion
            Step s a
Stop -> do
                !b
v <- m b
acc
                !b
res <- b -> m b
done b
v
                forall (m :: * -> *) a. Monad m => a -> m a
return (forall s a. s -> Step s a
Skip forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) st acc.
acc -> PScanAfterState m st acc -> PScanAfterState m st acc
PScanAfterYield b
res forall (m :: * -> *) st acc. PScanAfterState m st acc
PScanAfterStop)
    step State StreamK m a
_ (PScanAfterYield b
acc PScanAfterState m s b
next) = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield b
acc PScanAfterState m s b
next
    step State StreamK m a
_ PScanAfterState m s b
PScanAfterStop = forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

{-# INLINE_NORMAL postscanlM #-}
postscanlM :: Monad m => (b -> a -> m b) -> m b -> Stream m a -> Stream m b
postscanlM :: forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> Stream m a -> Stream m b
postscanlM b -> a -> m b
fstep m b
begin (Stream State StreamK m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {m :: * -> *} {a}.
State StreamK m a -> Maybe (s, b) -> m (Step (Maybe (s, b)) b)
step' forall a. Maybe a
Nothing
  where
    {-# INLINE_LATE step' #-}
    step' :: State StreamK m a -> Maybe (s, b) -> m (Step (Maybe (s, b)) b)
step' State StreamK m a
_ Maybe (s, b)
Nothing = do
        b
r <- m b
begin
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall a. a -> Maybe a
Just (s
state, b
r))

    step' State StreamK m a
gst (Just (s
st, b
acc)) = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
st
        case Step s a
r of
            Yield a
x s
s -> do
                b
y <- b -> a -> m b
fstep b
acc a
x
                forall (m :: * -> *) a. Monad m => a -> m a
return (forall s a. a -> s -> Step s a
Yield b
y (forall a. a -> Maybe a
Just (s
s, b
y)))
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall a. a -> Maybe a
Just (s
s, b
acc))
            Step s a
Stop   -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

{-# INLINE_NORMAL postscanl #-}
postscanl :: Monad m => (a -> b -> a) -> a -> Stream m b -> Stream m a
postscanl :: forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> Stream m a -> Stream m b
postscanl a -> b -> a
f a
seed = forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> Stream m a -> Stream m b
postscanlM (\a
a b
b -> forall (m :: * -> *) a. Monad m => a -> m a
return (a -> b -> a
f a
a b
b)) (forall (m :: * -> *) a. Monad m => a -> m a
return a
seed)

-- | Like 'scanl'' but with a monadic step function and a monadic seed.
--
{-# INLINE_NORMAL scanlM' #-}
scanlM' :: Monad m => (b -> a -> m b) -> m b -> Stream m a -> Stream m b
scanlM' :: forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> Stream m a -> Stream m b
scanlM' b -> a -> m b
fstep m b
begin (Stream State StreamK m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {m :: * -> *} {a}.
State StreamK m a -> Maybe (s, b) -> m (Step (Maybe (s, b)) b)
step' forall a. Maybe a
Nothing
  where
    {-# INLINE_LATE step' #-}
    step' :: State StreamK m a -> Maybe (s, b) -> m (Step (Maybe (s, b)) b)
step' State StreamK m a
_ Maybe (s, b)
Nothing = do
        !b
x <- m b
begin
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield b
x (forall a. a -> Maybe a
Just (s
state, b
x))
    step' State StreamK m a
gst (Just (s
st, b
acc)) =  do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
st
        case Step s a
r of
            Yield a
x s
s -> do
                !b
y <- b -> a -> m b
fstep b
acc a
x
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield b
y (forall a. a -> Maybe a
Just (s
s, b
y))
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall a. a -> Maybe a
Just (s
s, b
acc))
            Step s a
Stop   -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

-- | @scanlMAfter' accumulate initial done stream@ is like 'scanlM'' except
-- that it provides an additional @done@ function to be applied on the
-- accumulator when the stream stops. The result of @done@ is also emitted in
-- the stream.
--
-- This function can be used to allocate a resource in the beginning of the
-- scan and release it when the stream ends or to flush the internal state of
-- the scan at the end.
--
-- /Pre-release/
--
{-# INLINE scanlMAfter' #-}
scanlMAfter' :: Monad m
    => (b -> a -> m b) -> m b -> (b -> m b) -> Stream m a -> Stream m b
scanlMAfter' :: forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> (b -> m b) -> Stream m a -> Stream m b
scanlMAfter' b -> a -> m b
fstep m b
initial b -> m b
done Stream m a
s =
    m b
initial forall (m :: * -> *) a.
Applicative m =>
m a -> Stream m a -> Stream m a
`consM` forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> (b -> m b) -> Stream m a -> Stream m b
postscanlMAfter' b -> a -> m b
fstep m b
initial b -> m b
done Stream m a
s

-- >>> scanl' f z xs = z `Stream.cons` postscanl' f z xs

-- | Strict left scan. Like 'map', 'scanl'' too is a one to one transformation,
-- however it adds an extra element.
--
-- >>> Stream.toList $ Stream.scanl' (+) 0 $ Stream.fromList [1,2,3,4]
-- [0,1,3,6,10]
--
-- >>> Stream.toList $ Stream.scanl' (flip (:)) [] $ Stream.fromList [1,2,3,4]
-- [[],[1],[2,1],[3,2,1],[4,3,2,1]]
--
-- The output of 'scanl'' is the initial value of the accumulator followed by
-- all the intermediate steps and the final result of 'foldl''.
--
-- By streaming the accumulated state after each fold step, we can share the
-- state across multiple stages of stream composition. Each stage can modify or
-- extend the state, do some processing with it and emit it for the next stage,
-- thus modularizing the stream processing. This can be useful in
-- stateful or event-driven programming.
--
-- Consider the following monolithic example, computing the sum and the product
-- of the elements in a stream in one go using a @foldl'@:
--
-- >>> Stream.fold (Fold.foldl' (\(s, p) x -> (s + x, p * x)) (0,1)) $ Stream.fromList [1,2,3,4]
-- (10,24)
--
-- Using @scanl'@ we can make it modular by computing the sum in the first
-- stage and passing it down to the next stage for computing the product:
--
-- >>> :{
--   Stream.fold (Fold.foldl' (\(_, p) (s, x) -> (s, p * x)) (0,1))
--   $ Stream.scanl' (\(s, _) x -> (s + x, x)) (0,1)
--   $ Stream.fromList [1,2,3,4]
-- :}
-- (10,24)
--
-- IMPORTANT: 'scanl'' evaluates the accumulator to WHNF.  To avoid building
-- lazy expressions inside the accumulator, it is recommended that a strict
-- data structure is used for accumulator.
--
-- >>> scanl' step z = Stream.scan (Fold.foldl' step z)
-- >>> scanl' f z xs = Stream.scanlM' (\a b -> return (f a b)) (return z) xs
--
-- See also: 'usingStateT'
--
{-# INLINE scanl' #-}
scanl' :: Monad m => (b -> a -> b) -> b -> Stream m a -> Stream m b
scanl' :: forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> Stream m a -> Stream m b
scanl' b -> a -> b
f b
seed = forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> Stream m a -> Stream m b
scanlM' (\b
a a
b -> forall (m :: * -> *) a. Monad m => a -> m a
return (b -> a -> b
f b
a a
b)) (forall (m :: * -> *) a. Monad m => a -> m a
return b
seed)

{-# INLINE_NORMAL scanlM #-}
scanlM :: Monad m => (b -> a -> m b) -> m b -> Stream m a -> Stream m b
scanlM :: forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> Stream m a -> Stream m b
scanlM b -> a -> m b
fstep m b
begin (Stream State StreamK m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {m :: * -> *} {a}.
State StreamK m a -> Maybe (s, b) -> m (Step (Maybe (s, b)) b)
step' forall a. Maybe a
Nothing
  where
    {-# INLINE_LATE step' #-}
    step' :: State StreamK m a -> Maybe (s, b) -> m (Step (Maybe (s, b)) b)
step' State StreamK m a
_ Maybe (s, b)
Nothing = do
        b
x <- m b
begin
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield b
x (forall a. a -> Maybe a
Just (s
state, b
x))
    step' State StreamK m a
gst (Just (s
st, b
acc)) = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
st
        case Step s a
r of
            Yield a
x s
s -> do
                b
y <- b -> a -> m b
fstep b
acc a
x
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield b
y (forall a. a -> Maybe a
Just (s
s, b
y))
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall a. a -> Maybe a
Just (s
s, b
acc))
            Step s a
Stop   -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

{-# INLINE scanl #-}
scanl :: Monad m => (b -> a -> b) -> b -> Stream m a -> Stream m b
scanl :: forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> Stream m a -> Stream m b
scanl b -> a -> b
f b
seed = forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> Stream m a -> Stream m b
scanlM (\b
a a
b -> forall (m :: * -> *) a. Monad m => a -> m a
return (b -> a -> b
f b
a a
b)) (forall (m :: * -> *) a. Monad m => a -> m a
return b
seed)

-- Adapted from the vector package
{-# INLINE_NORMAL scanl1M #-}
scanl1M :: Monad m => (a -> a -> m a) -> Stream m a -> Stream m a
scanl1M :: forall (m :: * -> *) a.
Monad m =>
(a -> a -> m a) -> Stream m a -> Stream m a
scanl1M a -> a -> m a
fstep (Stream State StreamK m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a -> (s, Maybe a) -> m (Step (s, Maybe a) a)
step' (s
state, forall a. Maybe a
Nothing)
  where
    {-# INLINE_LATE step' #-}
    step' :: State StreamK m a -> (s, Maybe a) -> m (Step (s, Maybe a) a)
step' State StreamK m a
gst (s
st, Maybe a
Nothing) = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step State StreamK m a
gst s
st
        case Step s a
r of
            Yield a
x s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
x (s
s, forall a. a -> Maybe a
Just a
x)
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (s
s, forall a. Maybe a
Nothing)
            Step s a
Stop   -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

    step' State StreamK m a
gst (s
st, Just a
acc) = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step State StreamK m a
gst s
st
        case Step s a
r of
            Yield a
y s
s -> do
                a
z <- a -> a -> m a
fstep a
acc a
y
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
z (s
s, forall a. a -> Maybe a
Just a
z)
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (s
s, forall a. a -> Maybe a
Just a
acc)
            Step s a
Stop   -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

{-# INLINE scanl1 #-}
scanl1 :: Monad m => (a -> a -> a) -> Stream m a -> Stream m a
scanl1 :: forall (m :: * -> *) a.
Monad m =>
(a -> a -> a) -> Stream m a -> Stream m a
scanl1 a -> a -> a
f = forall (m :: * -> *) a.
Monad m =>
(a -> a -> m a) -> Stream m a -> Stream m a
scanl1M (\a
x a
y -> forall (m :: * -> *) a. Monad m => a -> m a
return (a -> a -> a
f a
x a
y))

-- Adapted from the vector package

-- | Like 'scanl1'' but with a monadic step function.
--
{-# INLINE_NORMAL scanl1M' #-}
scanl1M' :: Monad m => (a -> a -> m a) -> Stream m a -> Stream m a
scanl1M' :: forall (m :: * -> *) a.
Monad m =>
(a -> a -> m a) -> Stream m a -> Stream m a
scanl1M' a -> a -> m a
fstep (Stream State StreamK m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a -> (s, Maybe a) -> m (Step (s, Maybe a) a)
step' (s
state, forall a. Maybe a
Nothing)
  where
    {-# INLINE_LATE step' #-}
    step' :: State StreamK m a -> (s, Maybe a) -> m (Step (s, Maybe a) a)
step' State StreamK m a
gst (s
st, Maybe a
Nothing) = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step State StreamK m a
gst s
st
        case Step s a
r of
            Yield a
x s
s -> a
x seq :: forall a b. a -> b -> b
`seq` forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
x (s
s, forall a. a -> Maybe a
Just a
x)
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (s
s, forall a. Maybe a
Nothing)
            Step s a
Stop   -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

    step' State StreamK m a
gst (s
st, Just a
acc) = a
acc seq :: forall a b. a -> b -> b
`seq` do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step State StreamK m a
gst s
st
        case Step s a
r of
            Yield a
y s
s -> do
                a
z <- a -> a -> m a
fstep a
acc a
y
                a
z seq :: forall a b. a -> b -> b
`seq` forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
z (s
s, forall a. a -> Maybe a
Just a
z)
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (s
s, forall a. a -> Maybe a
Just a
acc)
            Step s a
Stop   -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

-- | Like 'scanl'' but for a non-empty stream. The first element of the stream
-- is used as the initial value of the accumulator. Does nothing if the stream
-- is empty.
--
-- >>> Stream.toList $ Stream.scanl1' (+) $ Stream.fromList [1,2,3,4]
-- [1,3,6,10]
--
{-# INLINE scanl1' #-}
scanl1' :: Monad m => (a -> a -> a) -> Stream m a -> Stream m a
scanl1' :: forall (m :: * -> *) a.
Monad m =>
(a -> a -> a) -> Stream m a -> Stream m a
scanl1' a -> a -> a
f = forall (m :: * -> *) a.
Monad m =>
(a -> a -> m a) -> Stream m a -> Stream m a
scanl1M' (\a
x a
y -> forall (m :: * -> *) a. Monad m => a -> m a
return (a -> a -> a
f a
x a
y))

-------------------------------------------------------------------------------
-- Filtering
-------------------------------------------------------------------------------

-- | Modify a @Stream m a -> Stream m a@ stream transformation that accepts a
-- predicate @(a -> b)@ to accept @((s, a) -> b)@ instead, provided a
-- transformation @Stream m a -> Stream m (s, a)@. Convenient to filter with
-- index or time.
--
-- >>> filterWithIndex = Stream.with Stream.indexed Stream.filter
--
-- /Pre-release/
{-# INLINE with #-}
with :: Monad m =>
       (Stream m a -> Stream m (s, a))
    -> (((s, a) -> b) -> Stream m (s, a) -> Stream m (s, a))
    -> (((s, a) -> b) -> Stream m a -> Stream m a)
with :: forall (m :: * -> *) a s b.
Monad m =>
(Stream m a -> Stream m (s, a))
-> (((s, a) -> b) -> Stream m (s, a) -> Stream m (s, a))
-> ((s, a) -> b)
-> Stream m a
-> Stream m a
with Stream m a -> Stream m (s, a)
f ((s, a) -> b) -> Stream m (s, a) -> Stream m (s, a)
comb (s, a) -> b
g = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a b. (a, b) -> b
snd forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((s, a) -> b) -> Stream m (s, a) -> Stream m (s, a)
comb (s, a) -> b
g forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream m a -> Stream m (s, a)
f

-- Adapted from the vector package

-- | Same as 'filter' but with a monadic predicate.
--
-- >>> f p x = p x >>= \r -> return $ if r then Just x else Nothing
-- >>> filterM p = Stream.mapMaybeM (f p)
--
{-# INLINE_NORMAL filterM #-}
filterM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a
filterM :: forall (m :: * -> *) a.
Monad m =>
(a -> m Bool) -> Stream m a -> Stream m a
filterM a -> m Bool
f (Stream State StreamK m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a -> s -> m (Step s a)
step' s
state
  where
    {-# INLINE_LATE step' #-}
    step' :: State StreamK m a -> s -> m (Step s a)
step' State StreamK m a
gst s
st = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step State StreamK m a
gst s
st
        case Step s a
r of
            Yield a
x s
s -> do
                Bool
b <- a -> m Bool
f a
x
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ if Bool
b
                         then forall s a. a -> s -> Step s a
Yield a
x s
s
                         else forall s a. s -> Step s a
Skip s
s
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip s
s
            Step s a
Stop   -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

-- | Include only those elements that pass a predicate.
--
-- >>> filter p = Stream.filterM (return . p)
-- >>> filter p = Stream.mapMaybe (\x -> if p x then Just x else Nothing)
-- >>> filter p = Stream.scanMaybe (Fold.filtering p)
--
{-# INLINE filter #-}
filter :: Monad m => (a -> Bool) -> Stream m a -> Stream m a
filter :: forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m a
filter a -> Bool
f = forall (m :: * -> *) a.
Monad m =>
(a -> m Bool) -> Stream m a -> Stream m a
filterM (forall (m :: * -> *) a. Monad m => a -> m a
return forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Bool
f)
-- filter p = scanMaybe (FL.filtering p)

-- | Drop repeated elements that are adjacent to each other using the supplied
-- comparison function.
--
-- >>> uniq = Stream.uniqBy (==)
--
-- To strip duplicate path separators:
--
-- >>> input = Stream.fromList "//a//b"
-- >>> f x y = x == '/' && y == '/'
-- >>> Stream.fold Fold.toList $ Stream.uniqBy f input
-- "/a/b"
--
-- Space: @O(1)@
--
-- /Pre-release/
--
{-# INLINE uniqBy #-}
uniqBy :: Monad m =>
    (a -> a -> Bool) -> Stream m a -> Stream m a
-- uniqBy eq = scanMaybe (FL.uniqBy eq)
uniqBy :: forall (m :: * -> *) a.
Monad m =>
(a -> a -> Bool) -> Stream m a -> Stream m a
uniqBy a -> a -> Bool
eq = forall (m :: * -> *) a. Monad m => Stream m (Maybe a) -> Stream m a
catMaybes forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a b.
Monad m =>
(Maybe a -> a -> b) -> Stream m a -> Stream m b
rollingMap Maybe a -> a -> Maybe a
f

    where

    f :: Maybe a -> a -> Maybe a
f Maybe a
pre a
curr =
        case Maybe a
pre of
            Maybe a
Nothing -> forall a. a -> Maybe a
Just a
curr
            Just a
x -> if a
x a -> a -> Bool
`eq` a
curr then forall a. Maybe a
Nothing else forall a. a -> Maybe a
Just a
curr

-- Adapted from the vector package

-- | Drop repeated elements that are adjacent to each other.
--
-- >>> uniq = Stream.uniqBy (==)
--
{-# INLINE_NORMAL uniq #-}
uniq :: (Eq a, Monad m) => Stream m a -> Stream m a
-- uniq = scanMaybe FL.uniq
uniq :: forall a (m :: * -> *). (Eq a, Monad m) => Stream m a -> Stream m a
uniq (Stream State StreamK m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a -> (Maybe a, s) -> m (Step (Maybe a, s) a)
step' (forall a. Maybe a
Nothing, s
state)
  where
    {-# INLINE_LATE step' #-}
    step' :: State StreamK m a -> (Maybe a, s) -> m (Step (Maybe a, s) a)
step' State StreamK m a
gst (Maybe a
Nothing, s
st) = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step State StreamK m a
gst s
st
        case Step s a
r of
            Yield a
x s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
x (forall a. a -> Maybe a
Just a
x, s
s)
            Skip  s
s   -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip  (forall a. Maybe a
Nothing, s
s)
            Step s a
Stop      -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop
    step' State StreamK m a
gst (Just a
x, s
st)  = do
         Step s a
r <- State StreamK m a -> s -> m (Step s a)
step State StreamK m a
gst s
st
         case Step s a
r of
             Yield a
y s
s | a
x forall a. Eq a => a -> a -> Bool
== a
y   -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall a. a -> Maybe a
Just a
x, s
s)
                       | Bool
otherwise -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
y (forall a. a -> Maybe a
Just a
y, s
s)
             Skip  s
s   -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall a. a -> Maybe a
Just a
x, s
s)
             Step s a
Stop      -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

-- | Deletes the first occurrence of the element in the stream that satisfies
-- the given equality predicate.
--
-- >>> input = Stream.fromList [1,3,3,5]
-- >>> Stream.fold Fold.toList $ Stream.deleteBy (==) 3 input
-- [1,3,5]
--
{-# INLINE_NORMAL deleteBy #-}
deleteBy :: Monad m => (a -> a -> Bool) -> a -> Stream m a -> Stream m a
-- deleteBy cmp x = scanMaybe (FL.deleteBy cmp x)
deleteBy :: forall (m :: * -> *) a.
Monad m =>
(a -> a -> Bool) -> a -> Stream m a -> Stream m a
deleteBy a -> a -> Bool
eq a
x (Stream State StreamK m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a -> (s, Bool) -> m (Step (s, Bool) a)
step' (s
state, Bool
False)
  where
    {-# INLINE_LATE step' #-}
    step' :: State StreamK m a -> (s, Bool) -> m (Step (s, Bool) a)
step' State StreamK m a
gst (s
st, Bool
False) = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step State StreamK m a
gst s
st
        case Step s a
r of
            Yield a
y s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$
                if a -> a -> Bool
eq a
x a
y then forall s a. s -> Step s a
Skip (s
s, Bool
True) else forall s a. a -> s -> Step s a
Yield a
y (s
s, Bool
False)
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (s
s, Bool
False)
            Step s a
Stop   -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

    step' State StreamK m a
gst (s
st, Bool
True) = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step State StreamK m a
gst s
st
        case Step s a
r of
            Yield a
y s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
y (s
s, Bool
True)
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (s
s, Bool
True)
            Step s a
Stop   -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

-- | Strip all leading and trailing occurrences of an element passing a
-- predicate and make all other consecutive occurrences uniq.
--
-- >> prune p = Stream.dropWhileAround p $ Stream.uniqBy (x y -> p x && p y)
--
-- @
-- > Stream.prune isSpace (Stream.fromList "  hello      world!   ")
-- "hello world!"
--
-- @
--
-- Space: @O(1)@
--
-- /Unimplemented/
{-# INLINE prune #-}
prune ::
    -- (Monad m, Eq a) =>
    (a -> Bool) -> Stream m a -> Stream m a
prune :: forall a (m :: * -> *). (a -> Bool) -> Stream m a -> Stream m a
prune = forall a. HasCallStack => [Char] -> a
error [Char]
"Not implemented yet!"

-- Possible implementation:
-- @repeated =
--      Stream.catMaybes . Stream.parseMany (Parser.groupBy (==) Fold.repeated)@
--
-- 'Fold.repeated' should return 'Just' when repeated, and 'Nothing' for a
-- single element.

-- | Emit only repeated elements, once.
--
-- /Unimplemented/
repeated :: -- (Monad m, Eq a) =>
    Stream m a -> Stream m a
repeated :: forall (m :: * -> *) a. Stream m a -> Stream m a
repeated = forall a. HasCallStack => a
undefined

------------------------------------------------------------------------------
-- Trimming
------------------------------------------------------------------------------

-- | Take all consecutive elements at the end of the stream for which the
-- predicate is true.
--
-- O(n) space, where n is the number elements taken.
--
-- /Unimplemented/
{-# INLINE takeWhileLast #-}
takeWhileLast :: -- Monad m =>
    (a -> Bool) -> Stream m a -> Stream m a
takeWhileLast :: forall a (m :: * -> *). (a -> Bool) -> Stream m a -> Stream m a
takeWhileLast = forall a. HasCallStack => a
undefined -- fromStreamD $ D.takeWhileLast n $ toStreamD m

-- | Like 'takeWhile' and 'takeWhileLast' combined.
--
-- O(n) space, where n is the number elements taken from the end.
--
-- /Unimplemented/
{-# INLINE takeWhileAround #-}
takeWhileAround :: -- Monad m =>
    (a -> Bool) -> Stream m a -> Stream m a
takeWhileAround :: forall a (m :: * -> *). (a -> Bool) -> Stream m a -> Stream m a
takeWhileAround = forall a. HasCallStack => a
undefined -- fromStreamD $ D.takeWhileAround n $ toStreamD m

-- Adapted from the vector package

-- | Discard first 'n' elements from the stream and take the rest.
--
{-# INLINE_NORMAL drop #-}
drop :: Monad m => Int -> Stream m a -> Stream m a
drop :: forall (m :: * -> *) a. Monad m => Int -> Stream m a -> Stream m a
drop Int
n (Stream State StreamK m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {a}.
(Ord a, Num a) =>
State StreamK m a -> (s, Maybe a) -> m (Step (s, Maybe a) a)
step' (s
state, forall a. a -> Maybe a
Just Int
n)
  where
    {-# INLINE_LATE step' #-}
    step' :: State StreamK m a -> (s, Maybe a) -> m (Step (s, Maybe a) a)
step' State StreamK m a
gst (s
st, Just a
i)
      | a
i forall a. Ord a => a -> a -> Bool
> a
0 = do
          Step s a
r <- State StreamK m a -> s -> m (Step s a)
step State StreamK m a
gst s
st
          forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$
            case Step s a
r of
              Yield a
_ s
s -> forall s a. s -> Step s a
Skip (s
s, forall a. a -> Maybe a
Just (a
i forall a. Num a => a -> a -> a
- a
1))
              Skip s
s    -> forall s a. s -> Step s a
Skip (s
s, forall a. a -> Maybe a
Just a
i)
              Step s a
Stop      -> forall s a. Step s a
Stop
      | Bool
otherwise = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (s
st, forall a. Maybe a
Nothing)

    step' State StreamK m a
gst (s
st, Maybe a
Nothing) = do
      Step s a
r <- State StreamK m a -> s -> m (Step s a)
step State StreamK m a
gst s
st
      forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$
        case Step s a
r of
          Yield a
x s
s -> forall s a. a -> s -> Step s a
Yield a
x (s
s, forall a. Maybe a
Nothing)
          Skip  s
s   -> forall s a. s -> Step s a
Skip (s
s, forall a. Maybe a
Nothing)
          Step s a
Stop      -> forall s a. Step s a
Stop

-- Adapted from the vector package
data DropWhileState s a
    = DropWhileDrop s
    | DropWhileYield a s
    | DropWhileNext s

-- | Same as 'dropWhile' but with a monadic predicate.
--
{-# INLINE_NORMAL dropWhileM #-}
dropWhileM :: Monad m => (a -> m Bool) -> Stream m a -> Stream m a
-- dropWhileM p = scanMaybe (FL.droppingWhileM p)
dropWhileM :: forall (m :: * -> *) a.
Monad m =>
(a -> m Bool) -> Stream m a -> Stream m a
dropWhileM a -> m Bool
f (Stream State StreamK m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {a}.
State StreamK m a
-> DropWhileState s a -> m (Step (DropWhileState s a) a)
step' (forall s a. s -> DropWhileState s a
DropWhileDrop s
state)
  where
    {-# INLINE_LATE step' #-}
    step' :: State StreamK m a
-> DropWhileState s a -> m (Step (DropWhileState s a) a)
step' State StreamK m a
gst (DropWhileDrop s
st) = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step State StreamK m a
gst s
st
        case Step s a
r of
            Yield a
x s
s -> do
                Bool
b <- a -> m Bool
f a
x
                if Bool
b
                then forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall s a. s -> DropWhileState s a
DropWhileDrop s
s)
                else forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall s a. a -> s -> DropWhileState s a
DropWhileYield a
x s
s)
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall s a. s -> DropWhileState s a
DropWhileDrop s
s)
            Step s a
Stop -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

    step' State StreamK m a
gst (DropWhileNext s
st) =  do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step State StreamK m a
gst s
st
        case Step s a
r of
            Yield a
x s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall s a. a -> s -> DropWhileState s a
DropWhileYield a
x s
s)
            Skip s
s    -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall s a. s -> DropWhileState s a
DropWhileNext s
s)
            Step s a
Stop      -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

    step' State StreamK m a
_ (DropWhileYield a
x s
st) = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
x (forall s a. s -> DropWhileState s a
DropWhileNext s
st)

-- | Drop elements in the stream as long as the predicate succeeds and then
-- take the rest of the stream.
--
{-# INLINE dropWhile #-}
dropWhile :: Monad m => (a -> Bool) -> Stream m a -> Stream m a
-- dropWhile p = scanMaybe (FL.droppingWhile p)
dropWhile :: forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m a
dropWhile a -> Bool
f = forall (m :: * -> *) a.
Monad m =>
(a -> m Bool) -> Stream m a -> Stream m a
dropWhileM (forall (m :: * -> *) a. Monad m => a -> m a
return forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Bool
f)

-- | Drop @n@ elements at the end of the stream.
--
-- O(n) space, where n is the number elements dropped.
--
-- /Unimplemented/
{-# INLINE dropLast #-}
dropLast :: -- Monad m =>
    Int -> Stream m a -> Stream m a
dropLast :: forall (m :: * -> *) a. Int -> Stream m a -> Stream m a
dropLast = forall a. HasCallStack => a
undefined -- fromStreamD $ D.dropLast n $ toStreamD m

-- | Drop all consecutive elements at the end of the stream for which the
-- predicate is true.
--
-- O(n) space, where n is the number elements dropped.
--
-- /Unimplemented/
{-# INLINE dropWhileLast #-}
dropWhileLast :: -- Monad m =>
    (a -> Bool) -> Stream m a -> Stream m a
dropWhileLast :: forall a (m :: * -> *). (a -> Bool) -> Stream m a -> Stream m a
dropWhileLast = forall a. HasCallStack => a
undefined -- fromStreamD $ D.dropWhileLast n $ toStreamD m

-- | Like 'dropWhile' and 'dropWhileLast' combined.
--
-- O(n) space, where n is the number elements dropped from the end.
--
-- /Unimplemented/
{-# INLINE dropWhileAround #-}
dropWhileAround :: -- Monad m =>
    (a -> Bool) -> Stream m a -> Stream m a
dropWhileAround :: forall a (m :: * -> *). (a -> Bool) -> Stream m a -> Stream m a
dropWhileAround = forall a. HasCallStack => a
undefined -- fromStreamD $ D.dropWhileAround n $ toStreamD m

------------------------------------------------------------------------------
-- Inserting Elements
------------------------------------------------------------------------------

-- | @insertBy cmp elem stream@ inserts @elem@ before the first element in
-- @stream@ that is less than @elem@ when compared using @cmp@.
--
-- >>> insertBy cmp x = Stream.mergeBy cmp (Stream.fromPure x)
--
-- >>> input = Stream.fromList [1,3,5]
-- >>> Stream.fold Fold.toList $ Stream.insertBy compare 2 input
-- [1,2,3,5]
--
{-# INLINE_NORMAL insertBy #-}
insertBy :: Monad m => (a -> a -> Ordering) -> a -> Stream m a -> Stream m a
insertBy :: forall (m :: * -> *) a.
Monad m =>
(a -> a -> Ordering) -> a -> Stream m a -> Stream m a
insertBy a -> a -> Ordering
cmp a
a (Stream State StreamK m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a
-> (s, Bool, Maybe a) -> m (Step (s, Bool, Maybe a) a)
step' (s
state, Bool
False, forall a. Maybe a
Nothing)
  where
    {-# INLINE_LATE step' #-}
    step' :: State StreamK m a
-> (s, Bool, Maybe a) -> m (Step (s, Bool, Maybe a) a)
step' State StreamK m a
gst (s
st, Bool
False, Maybe a
_) = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step State StreamK m a
gst s
st
        case Step s a
r of
            Yield a
x s
s -> case a -> a -> Ordering
cmp a
a a
x of
                Ordering
GT -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
x (s
s, Bool
False, forall a. Maybe a
Nothing)
                Ordering
_  -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
a (s
s, Bool
True, forall a. a -> Maybe a
Just a
x)
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (s
s, Bool
False, forall a. Maybe a
Nothing)
            Step s a
Stop   -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
a (s
st, Bool
True, forall a. Maybe a
Nothing)

    step' State StreamK m a
_ (s
_, Bool
True, Maybe a
Nothing) = forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

    step' State StreamK m a
gst (s
st, Bool
True, Just a
prev) = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step State StreamK m a
gst s
st
        case Step s a
r of
            Yield a
x s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
prev (s
s, Bool
True, forall a. a -> Maybe a
Just a
x)
            Skip s
s    -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (s
s, Bool
True, forall a. a -> Maybe a
Just a
prev)
            Step s a
Stop      -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
prev (s
st, Bool
True, forall a. Maybe a
Nothing)

data LoopState x s = FirstYield s
                   | InterspersingYield s
                   | YieldAndCarry x s

-- intersperseM = intersperseMWith 1

-- | Insert an effect and its output before consuming an element of a stream
-- except the first one.
--
-- >>> input = Stream.fromList "hello"
-- >>> Stream.fold Fold.toList $ Stream.trace putChar $ Stream.intersperseM (putChar '.' >> return ',') input
-- h.,e.,l.,l.,o"h,e,l,l,o"
--
-- Be careful about the order of effects. In the above example we used trace
-- after the intersperse, if we use it before the intersperse the output would
-- be he.l.l.o."h,e,l,l,o".
--
-- >>> Stream.fold Fold.toList $ Stream.intersperseM (putChar '.' >> return ',') $ Stream.trace putChar input
-- he.l.l.o."h,e,l,l,o"
--
{-# INLINE_NORMAL intersperseM #-}
intersperseM :: Monad m => m a -> Stream m a -> Stream m a
intersperseM :: forall (m :: * -> *) a. Monad m => m a -> Stream m a -> Stream m a
intersperseM m a
m (Stream State StreamK m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a -> LoopState a s -> m (Step (LoopState a s) a)
step' (forall x s. s -> LoopState x s
FirstYield s
state)
  where
    {-# INLINE_LATE step' #-}
    step' :: State StreamK m a -> LoopState a s -> m (Step (LoopState a s) a)
step' State StreamK m a
gst (FirstYield s
st) = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step State StreamK m a
gst s
st
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$
            case Step s a
r of
                Yield a
x s
s -> forall s a. s -> Step s a
Skip (forall x s. x -> s -> LoopState x s
YieldAndCarry a
x s
s)
                Skip s
s -> forall s a. s -> Step s a
Skip (forall x s. s -> LoopState x s
FirstYield s
s)
                Step s a
Stop -> forall s a. Step s a
Stop

    step' State StreamK m a
gst (InterspersingYield s
st) = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step State StreamK m a
gst s
st
        case Step s a
r of
            Yield a
x s
s -> do
                a
a <- m a
m
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
a (forall x s. x -> s -> LoopState x s
YieldAndCarry a
x s
s)
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip forall a b. (a -> b) -> a -> b
$ forall x s. s -> LoopState x s
InterspersingYield s
s
            Step s a
Stop -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

    step' State StreamK m a
_ (YieldAndCarry a
x s
st) = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
x (forall x s. s -> LoopState x s
InterspersingYield s
st)

-- | Insert a pure value between successive elements of a stream.
--
-- >>> input = Stream.fromList "hello"
-- >>> Stream.fold Fold.toList $ Stream.intersperse ',' input
-- "h,e,l,l,o"
--
{-# INLINE intersperse #-}
intersperse :: Monad m => a -> Stream m a -> Stream m a
intersperse :: forall (m :: * -> *) a. Monad m => a -> Stream m a -> Stream m a
intersperse a
a = forall (m :: * -> *) a. Monad m => m a -> Stream m a -> Stream m a
intersperseM (forall (m :: * -> *) a. Monad m => a -> m a
return a
a)

-- | Insert a side effect before consuming an element of a stream except the
-- first one.
--
-- >>> input = Stream.fromList "hello"
-- >>> Stream.fold Fold.drain $ Stream.trace putChar $ Stream.intersperseM_ (putChar '.') input
-- h.e.l.l.o
--
-- /Pre-release/
{-# INLINE_NORMAL intersperseM_ #-}
intersperseM_ :: Monad m => m b -> Stream m a -> Stream m a
intersperseM_ :: forall (m :: * -> *) b a.
Monad m =>
m b -> Stream m a -> Stream m a
intersperseM_ m b
m (Stream State StreamK m a -> s -> m (Step s a)
step1 s
state1) = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a
-> Either (m (), s) s -> m (Step (Either (m (), s) s) a)
step (forall a b. a -> Either a b
Left (forall (f :: * -> *) a. Applicative f => a -> f a
pure (), s
state1))
  where
    {-# INLINE_LATE step #-}
    step :: State StreamK m a
-> Either (m (), s) s -> m (Step (Either (m (), s) s) a)
step State StreamK m a
gst (Left (m ()
eff, s
st)) = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step1 State StreamK m a
gst s
st
        case Step s a
r of
            Yield a
x s
s -> m ()
eff forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a. Monad m => a -> m a
return (forall s a. a -> s -> Step s a
Yield a
x (forall a b. b -> Either a b
Right s
s))
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall a b. a -> Either a b
Left (m ()
eff, s
s))
            Step s a
Stop -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

    step State StreamK m a
_ (Right s
st) = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip forall a b. (a -> b) -> a -> b
$ forall a b. a -> Either a b
Left (forall (f :: * -> *) a. Functor f => f a -> f ()
void m b
m, s
st)

-- | Intersperse a monadic action into the input stream after every @n@
-- elements.
--
-- >> input = Stream.fromList "hello"
-- >> Stream.fold Fold.toList $ Stream.intersperseMWith 2 (return ',') input
-- "he,ll,o"
--
-- /Unimplemented/
{-# INLINE intersperseMWith #-}
intersperseMWith :: -- Monad m =>
    Int -> m a -> Stream m a -> Stream m a
intersperseMWith :: forall (m :: * -> *) a. Int -> m a -> Stream m a -> Stream m a
intersperseMWith Int
_n m a
_f Stream m a
_xs = forall a. HasCallStack => a
undefined

data SuffixState s a
    = SuffixElem s
    | SuffixSuffix s
    | SuffixYield a (SuffixState s a)

-- | Insert an effect and its output after consuming an element of a stream.
--
-- >>> input = Stream.fromList "hello"
-- >>> Stream.fold Fold.toList $ Stream.trace putChar $ Stream.intersperseMSuffix (putChar '.' >> return ',') input
-- h.,e.,l.,l.,o.,"h,e,l,l,o,"
--
-- /Pre-release/
{-# INLINE_NORMAL intersperseMSuffix #-}
intersperseMSuffix :: forall m a. Monad m => m a -> Stream m a -> Stream m a
intersperseMSuffix :: forall (m :: * -> *) a. Monad m => m a -> Stream m a -> Stream m a
intersperseMSuffix m a
action (Stream State StreamK m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a
-> SuffixState s a -> m (Step (SuffixState s a) a)
step' (forall s a. s -> SuffixState s a
SuffixElem s
state)
    where
    {-# INLINE_LATE step' #-}
    step' :: State StreamK m a
-> SuffixState s a -> m (Step (SuffixState s a) a)
step' State StreamK m a
gst (SuffixElem s
st) = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step State StreamK m a
gst s
st
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ case Step s a
r of
            Yield a
x s
s -> forall s a. s -> Step s a
Skip (forall s a. a -> SuffixState s a -> SuffixState s a
SuffixYield a
x (forall s a. s -> SuffixState s a
SuffixSuffix s
s))
            Skip s
s -> forall s a. s -> Step s a
Skip (forall s a. s -> SuffixState s a
SuffixElem s
s)
            Step s a
Stop -> forall s a. Step s a
Stop

    step' State StreamK m a
_ (SuffixSuffix s
st) = do
        m a
action forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \a
r -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall s a. a -> SuffixState s a -> SuffixState s a
SuffixYield a
r (forall s a. s -> SuffixState s a
SuffixElem s
st))

    step' State StreamK m a
_ (SuffixYield a
x SuffixState s a
next) = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
x SuffixState s a
next

-- | Insert a side effect after consuming an element of a stream.
--
-- >>> input = Stream.fromList "hello"
-- >>> Stream.fold Fold.toList $ Stream.intersperseMSuffix_ (threadDelay 1000000) input
-- "hello"
--
-- /Pre-release/
--
{-# INLINE_NORMAL intersperseMSuffix_ #-}
intersperseMSuffix_ :: Monad m => m b -> Stream m a -> Stream m a
intersperseMSuffix_ :: forall (m :: * -> *) b a.
Monad m =>
m b -> Stream m a -> Stream m a
intersperseMSuffix_ m b
m (Stream State StreamK m a -> s -> m (Step s a)
step1 s
state1) = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a -> Either s s -> m (Step (Either s s) a)
step (forall a b. a -> Either a b
Left s
state1)
  where
    {-# INLINE_LATE step #-}
    step :: State StreamK m a -> Either s s -> m (Step (Either s s) a)
step State StreamK m a
gst (Left s
st) = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step1 State StreamK m a
gst s
st
        case Step s a
r of
            Yield a
x s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
x (forall a b. b -> Either a b
Right s
s)
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip forall a b. (a -> b) -> a -> b
$ forall a b. a -> Either a b
Left s
s
            Step s a
Stop -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

    step State StreamK m a
_ (Right s
st) = m b
m forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a. Monad m => a -> m a
return (forall s a. s -> Step s a
Skip (forall a b. a -> Either a b
Left s
st))

data SuffixSpanState s a
    = SuffixSpanElem s Int
    | SuffixSpanSuffix s
    | SuffixSpanYield a (SuffixSpanState s a)
    | SuffixSpanLast
    | SuffixSpanStop

-- | Like 'intersperseMSuffix' but intersperses an effectful action into the
-- input stream after every @n@ elements and after the last element.
--
-- >>> input = Stream.fromList "hello"
-- >>> Stream.fold Fold.toList $ Stream.intersperseMSuffixWith 2 (return ',') input
-- "he,ll,o,"
--
-- /Pre-release/
--
{-# INLINE_NORMAL intersperseMSuffixWith #-}
intersperseMSuffixWith :: forall m a. Monad m
    => Int -> m a -> Stream m a -> Stream m a
intersperseMSuffixWith :: forall (m :: * -> *) a.
Monad m =>
Int -> m a -> Stream m a -> Stream m a
intersperseMSuffixWith Int
n m a
action (Stream State StreamK m a -> s -> m (Step s a)
step s
state) =
    forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream State StreamK m a
-> SuffixSpanState s a -> m (Step (SuffixSpanState s a) a)
step' (forall s a. s -> Int -> SuffixSpanState s a
SuffixSpanElem s
state Int
n)
    where
    {-# INLINE_LATE step' #-}
    step' :: State StreamK m a
-> SuffixSpanState s a -> m (Step (SuffixSpanState s a) a)
step' State StreamK m a
gst (SuffixSpanElem s
st Int
i) | Int
i forall a. Ord a => a -> a -> Bool
> Int
0 = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step State StreamK m a
gst s
st
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ case Step s a
r of
            Yield a
x s
s -> forall s a. s -> Step s a
Skip (forall s a. a -> SuffixSpanState s a -> SuffixSpanState s a
SuffixSpanYield a
x (forall s a. s -> Int -> SuffixSpanState s a
SuffixSpanElem s
s (Int
i forall a. Num a => a -> a -> a
- Int
1)))
            Skip s
s -> forall s a. s -> Step s a
Skip (forall s a. s -> Int -> SuffixSpanState s a
SuffixSpanElem s
s Int
i)
            Step s a
Stop -> if Int
i forall a. Eq a => a -> a -> Bool
== Int
n then forall s a. Step s a
Stop else forall s a. s -> Step s a
Skip forall s a. SuffixSpanState s a
SuffixSpanLast
    step' State StreamK m a
_ (SuffixSpanElem s
st Int
_) = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall s a. s -> SuffixSpanState s a
SuffixSpanSuffix s
st)

    step' State StreamK m a
_ (SuffixSpanSuffix s
st) = do
        m a
action forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \a
r -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall s a. a -> SuffixSpanState s a -> SuffixSpanState s a
SuffixSpanYield a
r (forall s a. s -> Int -> SuffixSpanState s a
SuffixSpanElem s
st Int
n))

    step' State StreamK m a
_ SuffixSpanState s a
SuffixSpanLast = do
        m a
action forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \a
r -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall s a. a -> SuffixSpanState s a -> SuffixSpanState s a
SuffixSpanYield a
r forall s a. SuffixSpanState s a
SuffixSpanStop)

    step' State StreamK m a
_ (SuffixSpanYield a
x SuffixSpanState s a
next) = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
x SuffixSpanState s a
next

    step' State StreamK m a
_ SuffixSpanState s a
SuffixSpanStop = forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

-- | Insert a side effect before consuming an element of a stream.
--
-- Definition:
--
-- >>> intersperseMPrefix_ m = Stream.mapM (\x -> void m >> return x)
--
-- >>> input = Stream.fromList "hello"
-- >>> Stream.fold Fold.toList $ Stream.trace putChar $ Stream.intersperseMPrefix_ (putChar '.' >> return ',') input
-- .h.e.l.l.o"hello"
--
-- Same as 'trace_'.
--
-- /Pre-release/
--
{-# INLINE intersperseMPrefix_ #-}
intersperseMPrefix_ :: Monad m => m b -> Stream m a -> Stream m a
intersperseMPrefix_ :: forall (m :: * -> *) b a.
Monad m =>
m b -> Stream m a -> Stream m a
intersperseMPrefix_ m b
m = forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Stream m a -> Stream m b
mapM (\a
x -> forall (f :: * -> *) a. Functor f => f a -> f ()
void m b
m forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a. Monad m => a -> m a
return a
x)

------------------------------------------------------------------------------
-- Inserting Time
------------------------------------------------------------------------------

-- XXX This should be in Prelude, should we export this as a helper function?

-- | Block the current thread for specified number of seconds.
{-# INLINE sleep #-}
sleep :: MonadIO m => Double -> m ()
sleep :: forall (m :: * -> *). MonadIO m => Double -> m ()
sleep Double
n = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay forall a b. (a -> b) -> a -> b
$ forall a b. (RealFrac a, Integral b) => a -> b
round forall a b. (a -> b) -> a -> b
$ Double
n forall a. Num a => a -> a -> a
* Double
1000000

-- | Introduce a delay of specified seconds between elements of the stream.
--
-- Definition:
--
-- >>> sleep n = liftIO $ threadDelay $ round $ n * 1000000
-- >>> delay = Stream.intersperseM_ . sleep
--
-- Example:
--
-- >>> input = Stream.enumerateFromTo 1 3
-- >>> Stream.fold (Fold.drainMapM print) $ Stream.delay 1 input
-- 1
-- 2
-- 3
--
{-# INLINE delay #-}
delay :: MonadIO m => Double -> Stream m a -> Stream m a
delay :: forall (m :: * -> *) a.
MonadIO m =>
Double -> Stream m a -> Stream m a
delay = forall (m :: * -> *) b a.
Monad m =>
m b -> Stream m a -> Stream m a
intersperseM_ forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *). MonadIO m => Double -> m ()
sleep

-- | Introduce a delay of specified seconds after consuming an element of a
-- stream.
--
-- Definition:
--
-- >>> sleep n = liftIO $ threadDelay $ round $ n * 1000000
-- >>> delayPost = Stream.intersperseMSuffix_ . sleep
--
-- Example:
--
-- >>> input = Stream.enumerateFromTo 1 3
-- >>> Stream.fold (Fold.drainMapM print) $ Stream.delayPost 1 input
-- 1
-- 2
-- 3
--
-- /Pre-release/
--
{-# INLINE delayPost #-}
delayPost :: MonadIO m => Double -> Stream m a -> Stream m a
delayPost :: forall (m :: * -> *) a.
MonadIO m =>
Double -> Stream m a -> Stream m a
delayPost Double
n = forall (m :: * -> *) b a.
Monad m =>
m b -> Stream m a -> Stream m a
intersperseMSuffix_ forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay forall a b. (a -> b) -> a -> b
$ forall a b. (RealFrac a, Integral b) => a -> b
round forall a b. (a -> b) -> a -> b
$ Double
n forall a. Num a => a -> a -> a
* Double
1000000

-- | Introduce a delay of specified seconds before consuming an element of a
-- stream.
--
-- Definition:
--
-- >>> sleep n = liftIO $ threadDelay $ round $ n * 1000000
-- >>> delayPre = Stream.intersperseMPrefix_. sleep
--
-- Example:
--
-- >>> input = Stream.enumerateFromTo 1 3
-- >>> Stream.fold (Fold.drainMapM print) $ Stream.delayPre 1 input
-- 1
-- 2
-- 3
--
-- /Pre-release/
--
{-# INLINE delayPre #-}
delayPre :: MonadIO m => Double -> Stream m a -> Stream m a
delayPre :: forall (m :: * -> *) a.
MonadIO m =>
Double -> Stream m a -> Stream m a
delayPre = forall (m :: * -> *) b a.
Monad m =>
m b -> Stream m a -> Stream m a
intersperseMPrefix_forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *). MonadIO m => Double -> m ()
sleep

------------------------------------------------------------------------------
-- Reordering
------------------------------------------------------------------------------

-- | Returns the elements of the stream in reverse order.  The stream must be
-- finite. Note that this necessarily buffers the entire stream in memory.
--
-- Definition:
--
-- >>> reverse m = Stream.concatEffect $ Stream.fold Fold.toListRev m >>= return . Stream.fromList
--
{-# INLINE_NORMAL reverse #-}
reverse :: Monad m => Stream m a -> Stream m a
reverse :: forall (m :: * -> *) a. Monad m => Stream m a -> Stream m a
reverse Stream m a
m = forall (m :: * -> *) a. Monad m => m (Stream m a) -> Stream m a
concatEffect forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
fold forall (m :: * -> *) a. Monad m => Fold m a [a]
FL.toListRev Stream m a
m forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> forall (m :: * -> *) a. Applicative m => [a] -> Stream m a
fromList
{-
reverse m = Stream step Nothing
    where
    {-# INLINE_LATE step #-}
    step _ Nothing = do
        xs <- foldl' (flip (:)) [] m
        return $ Skip (Just xs)
    step _ (Just (x:xs)) = return $ Yield x (Just xs)
    step _ (Just []) = return Stop
-}

-- | Like 'reverse' but several times faster, requires an 'Unbox' instance.
--
-- /O(n) space/
--
-- /Pre-release/
{-# INLINE reverseUnbox #-}
reverseUnbox :: (MonadIO m, Unbox a) => Stream m a -> Stream m a
reverseUnbox :: forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Stream m a -> Stream m a
reverseUnbox =
    forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Stream m (Array a) -> Stream m a
A.flattenArraysRev -- unfoldMany A.readRev
        forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
fromStreamK
        forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. StreamK m a -> StreamK m a
K.reverse
        forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
toStreamK
        forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Int -> Stream m a -> Stream m (Array a)
A.chunksOf Int
defaultChunkSize

-- | Buffer until the next element in sequence arrives. The function argument
-- determines the difference in sequence numbers. This could be useful in
-- implementing sequenced streams, for example, TCP reassembly.
--
-- /Unimplemented/
--
{-# INLINE reassembleBy #-}
reassembleBy
    :: -- Monad m =>
       Fold m a b
    -> (a -> a -> Int)
    -> Stream m a
    -> Stream m b
reassembleBy :: forall (m :: * -> *) a b.
Fold m a b -> (a -> a -> Int) -> Stream m a -> Stream m b
reassembleBy = forall a. HasCallStack => a
undefined

------------------------------------------------------------------------------
-- Position Indexing
------------------------------------------------------------------------------

-- Adapted from the vector package

-- |
-- >>> f = Fold.foldl' (\(i, _) x -> (i + 1, x)) (-1,undefined)
-- >>> indexed = Stream.postscan f
-- >>> indexed = Stream.zipWith (,) (Stream.enumerateFrom 0)
-- >>> indexedR n = fmap (\(i, a) -> (n - i, a)) . indexed
--
-- Pair each element in a stream with its index, starting from index 0.
--
-- >>> Stream.fold Fold.toList $ Stream.indexed $ Stream.fromList "hello"
-- [(0,'h'),(1,'e'),(2,'l'),(3,'l'),(4,'o')]
--
{-# INLINE_NORMAL indexed #-}
indexed :: Monad m => Stream m a -> Stream m (Int, a)
-- indexed = scanMaybe FL.indexing
indexed :: forall (m :: * -> *) a. Monad m => Stream m a -> Stream m (Int, a)
indexed (Stream State StreamK m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {b} {m :: * -> *} {a}.
Num b =>
State StreamK m a -> (s, b) -> m (Step (s, b) (b, a))
step' (s
state, Int
0)
  where
    {-# INLINE_LATE step' #-}
    step' :: State StreamK m a -> (s, b) -> m (Step (s, b) (b, a))
step' State StreamK m a
gst (s
st, b
i) = b
i seq :: forall a b. a -> b -> b
`seq` do
         Step s a
r <- State StreamK m a -> s -> m (Step s a)
step (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
st
         case Step s a
r of
             Yield a
x s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield (b
i, a
x) (s
s, b
iforall a. Num a => a -> a -> a
+b
1)
             Skip    s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (s
s, b
i)
             Step s a
Stop      -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

-- Adapted from the vector package

-- |
-- >>> f n = Fold.foldl' (\(i, _) x -> (i - 1, x)) (n + 1,undefined)
-- >>> indexedR n = Stream.postscan (f n)
--
-- >>> s n = Stream.enumerateFromThen n (n - 1)
-- >>> indexedR n = Stream.zipWith (,) (s n)
--
-- Pair each element in a stream with its index, starting from the
-- given index @n@ and counting down.
--
-- >>> Stream.fold Fold.toList $ Stream.indexedR 10 $ Stream.fromList "hello"
-- [(10,'h'),(9,'e'),(8,'l'),(7,'l'),(6,'o')]
--
{-# INLINE_NORMAL indexedR #-}
indexedR :: Monad m => Int -> Stream m a -> Stream m (Int, a)
-- indexedR n = scanMaybe (FL.indexingRev n)
indexedR :: forall (m :: * -> *) a.
Monad m =>
Int -> Stream m a -> Stream m (Int, a)
indexedR Int
m (Stream State StreamK m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {b} {m :: * -> *} {a}.
Num b =>
State StreamK m a -> (s, b) -> m (Step (s, b) (b, a))
step' (s
state, Int
m)
  where
    {-# INLINE_LATE step' #-}
    step' :: State StreamK m a -> (s, b) -> m (Step (s, b) (b, a))
step' State StreamK m a
gst (s
st, b
i) = b
i seq :: forall a b. a -> b -> b
`seq` do
         Step s a
r <- State StreamK m a -> s -> m (Step s a)
step (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
st
         case Step s a
r of
             Yield a
x s
s -> let i' :: b
i' = b
i forall a. Num a => a -> a -> a
- b
1
                          in forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield (b
i, a
x) (s
s, b
i')
             Skip    s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (s
s, b
i)
             Step s a
Stop      -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

-------------------------------------------------------------------------------
-- Time Indexing
-------------------------------------------------------------------------------

-- Note: The timestamp stream must be the second stream in the zip so that the
-- timestamp is generated after generating the stream element and not before.
-- If we do not do that then the following example will generate the same
-- timestamp for first two elements:
--
-- Stream.fold Fold.toList $ Stream.timestamped $ Stream.delay $ Stream.enumerateFromTo 1 3

-- | Pair each element in a stream with an absolute timestamp, using a clock of
-- specified granularity.  The timestamp is generated just before the element
-- is consumed.
--
-- >>> Stream.fold Fold.toList $ Stream.timestampWith 0.01 $ Stream.delay 1 $ Stream.enumerateFromTo 1 3
-- [(AbsTime (TimeSpec {sec = ..., nsec = ...}),1),(AbsTime (TimeSpec {sec = ..., nsec = ...}),2),(AbsTime (TimeSpec {sec = ..., nsec = ...}),3)]
--
-- /Pre-release/
--
{-# INLINE timestampWith #-}
timestampWith :: (MonadIO m)
    => Double -> Stream m a -> Stream m (AbsTime, a)
timestampWith :: forall (m :: * -> *) a.
MonadIO m =>
Double -> Stream m a -> Stream m (AbsTime, a)
timestampWith Double
g Stream m a
stream = forall (m :: * -> *) a b c.
Monad m =>
(a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
zipWith (forall a b c. (a -> b -> c) -> b -> a -> c
flip (,)) Stream m a
stream (forall (m :: * -> *). MonadIO m => Double -> Stream m AbsTime
absTimesWith Double
g)

-- TBD: check performance vs a custom implementation without using zipWith.
--
-- /Pre-release/
--
{-# INLINE timestamped #-}
timestamped :: (MonadIO m)
    => Stream m a -> Stream m (AbsTime, a)
timestamped :: forall (m :: * -> *) a.
MonadIO m =>
Stream m a -> Stream m (AbsTime, a)
timestamped = forall (m :: * -> *) a.
MonadIO m =>
Double -> Stream m a -> Stream m (AbsTime, a)
timestampWith Double
0.01

-- | Pair each element in a stream with relative times starting from 0, using a
-- clock with the specified granularity. The time is measured just before the
-- element is consumed.
--
-- >>> Stream.fold Fold.toList $ Stream.timeIndexWith 0.01 $ Stream.delay 1 $ Stream.enumerateFromTo 1 3
-- [(RelTime64 (NanoSecond64 ...),1),(RelTime64 (NanoSecond64 ...),2),(RelTime64 (NanoSecond64 ...),3)]
--
-- /Pre-release/
--
{-# INLINE timeIndexWith #-}
timeIndexWith :: (MonadIO m)
    => Double -> Stream m a -> Stream m (RelTime64, a)
timeIndexWith :: forall (m :: * -> *) a.
MonadIO m =>
Double -> Stream m a -> Stream m (RelTime64, a)
timeIndexWith Double
g Stream m a
stream = forall (m :: * -> *) a b c.
Monad m =>
(a -> b -> c) -> Stream m a -> Stream m b -> Stream m c
zipWith (forall a b c. (a -> b -> c) -> b -> a -> c
flip (,)) Stream m a
stream (forall (m :: * -> *). MonadIO m => Double -> Stream m RelTime64
relTimesWith Double
g)

-- | Pair each element in a stream with relative times starting from 0, using a
-- 10 ms granularity clock. The time is measured just before the element is
-- consumed.
--
-- >>> Stream.fold Fold.toList $ Stream.timeIndexed $ Stream.delay 1 $ Stream.enumerateFromTo 1 3
-- [(RelTime64 (NanoSecond64 ...),1),(RelTime64 (NanoSecond64 ...),2),(RelTime64 (NanoSecond64 ...),3)]
--
-- /Pre-release/
--
{-# INLINE timeIndexed #-}
timeIndexed :: (MonadIO m)
    => Stream m a -> Stream m (RelTime64, a)
timeIndexed :: forall (m :: * -> *) a.
MonadIO m =>
Stream m a -> Stream m (RelTime64, a)
timeIndexed = forall (m :: * -> *) a.
MonadIO m =>
Double -> Stream m a -> Stream m (RelTime64, a)
timeIndexWith Double
0.01

------------------------------------------------------------------------------
-- Searching
------------------------------------------------------------------------------

-- | Find all the indices where the element in the stream satisfies the given
-- predicate.
--
-- >>> findIndices p = Stream.scanMaybe (Fold.findIndices p)
--
{-# INLINE_NORMAL findIndices #-}
findIndices :: Monad m => (a -> Bool) -> Stream m a -> Stream m Int
findIndices :: forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m Int
findIndices a -> Bool
p (Stream State StreamK m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {b} {m :: * -> *} {a}.
Num b =>
State StreamK m a -> (s, b) -> m (Step (s, b) b)
step' (s
state, Int
0)
  where
    {-# INLINE_LATE step' #-}
    step' :: State StreamK m a -> (s, b) -> m (Step (s, b) b)
step' State StreamK m a
gst (s
st, b
i) = b
i seq :: forall a b. a -> b -> b
`seq` do
      Step s a
r <- State StreamK m a -> s -> m (Step s a)
step (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
st
      forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ case Step s a
r of
          Yield a
x s
s -> if a -> Bool
p a
x then forall s a. a -> s -> Step s a
Yield b
i (s
s, b
iforall a. Num a => a -> a -> a
+b
1) else forall s a. s -> Step s a
Skip (s
s, b
iforall a. Num a => a -> a -> a
+b
1)
          Skip s
s -> forall s a. s -> Step s a
Skip (s
s, b
i)
          Step s a
Stop   -> forall s a. Step s a
Stop

-- | Find all the indices where the value of the element in the stream is equal
-- to the given value.
--
-- >>> elemIndices a = Stream.findIndices (== a)
--
{-# INLINE elemIndices #-}
elemIndices :: (Monad m, Eq a) => a -> Stream m a -> Stream m Int
elemIndices :: forall (m :: * -> *) a.
(Monad m, Eq a) =>
a -> Stream m a -> Stream m Int
elemIndices a
a = forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m Int
findIndices (forall a. Eq a => a -> a -> Bool
== a
a)

{-# INLINE_NORMAL slicesBy #-}
slicesBy :: Monad m => (a -> Bool) -> Stream m a -> Stream m (Int, Int)
slicesBy :: forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m (Int, Int)
slicesBy a -> Bool
p (Stream State StreamK m a -> s -> m (Step s a)
step1 s
state1) = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {a} {m :: * -> *} {a}.
(Num a, Eq a) =>
State StreamK m a
-> Maybe (s, a, a) -> m (Step (Maybe (s, a, a)) (a, a))
step (forall a. a -> Maybe a
Just (s
state1, Int
0, Int
0))

    where

    {-# INLINE_LATE step #-}
    step :: State StreamK m a
-> Maybe (s, a, a) -> m (Step (Maybe (s, a, a)) (a, a))
step State StreamK m a
gst (Just (s
st, a
i, a
len)) = a
i seq :: forall a b. a -> b -> b
`seq` a
len seq :: forall a b. a -> b -> b
`seq` do
      Step s a
r <- State StreamK m a -> s -> m (Step s a)
step1 (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
st
      forall (m :: * -> *) a. Monad m => a -> m a
return
        forall a b. (a -> b) -> a -> b
$ case Step s a
r of
              Yield a
x s
s ->
                if a -> Bool
p a
x
                then forall s a. a -> s -> Step s a
Yield (a
i, a
len forall a. Num a => a -> a -> a
+ a
1) (forall a. a -> Maybe a
Just (s
s, a
i forall a. Num a => a -> a -> a
+ a
len forall a. Num a => a -> a -> a
+ a
1, a
0))
                else forall s a. s -> Step s a
Skip (forall a. a -> Maybe a
Just (s
s, a
i, a
len forall a. Num a => a -> a -> a
+ a
1))
              Skip s
s -> forall s a. s -> Step s a
Skip (forall a. a -> Maybe a
Just (s
s, a
i, a
len))
              Step s a
Stop -> if a
len forall a. Eq a => a -> a -> Bool
== a
0 then forall s a. Step s a
Stop else forall s a. a -> s -> Step s a
Yield (a
i, a
len) forall a. Maybe a
Nothing
    step State StreamK m a
_ Maybe (s, a, a)
Nothing = forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

------------------------------------------------------------------------------
-- Rolling map
------------------------------------------------------------------------------

data RollingMapState s a = RollingMapGo s a

-- | Like 'rollingMap' but with an effectful map function.
--
-- /Pre-release/
--
{-# INLINE rollingMapM #-}
rollingMapM :: Monad m => (Maybe a -> a -> m b) -> Stream m a -> Stream m b
-- rollingMapM f = scanMaybe (FL.slide2 $ Window.rollingMapM f)
rollingMapM :: forall (m :: * -> *) a b.
Monad m =>
(Maybe a -> a -> m b) -> Stream m a -> Stream m b
rollingMapM Maybe a -> a -> m b
f (Stream State StreamK m a -> s -> m (Step s a)
step1 s
state1) = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {m :: * -> *} {a}.
State StreamK m a
-> RollingMapState s (Maybe a)
-> m (Step (RollingMapState s (Maybe a)) b)
step (forall s a. s -> a -> RollingMapState s a
RollingMapGo s
state1 forall a. Maybe a
Nothing)

    where

    step :: State StreamK m a
-> RollingMapState s (Maybe a)
-> m (Step (RollingMapState s (Maybe a)) b)
step State StreamK m a
gst (RollingMapGo s
s1 Maybe a
curr) = do
        Step s a
r <- State StreamK m a -> s -> m (Step s a)
step1 (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
s1
        case Step s a
r of
            Yield a
x s
s -> do
                !b
res <- Maybe a -> a -> m b
f Maybe a
curr a
x
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield b
res forall a b. (a -> b) -> a -> b
$ forall s a. s -> a -> RollingMapState s a
RollingMapGo s
s (forall a. a -> Maybe a
Just a
x)
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip forall a b. (a -> b) -> a -> b
$ forall s a. s -> a -> RollingMapState s a
RollingMapGo s
s Maybe a
curr
            Step s a
Stop   -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

-- rollingMap is a special case of an incremental sliding fold. It can be
-- written as:
--
-- > fld f = slidingWindow 1 (Fold.foldl' (\_ (x,y) -> f y x)
-- > rollingMap f = Stream.postscan (fld f) undefined

-- | Apply a function on every two successive elements of a stream. The first
-- argument of the map function is the previous element and the second argument
-- is the current element. When the current element is the first element, the
-- previous element is 'Nothing'.
--
-- /Pre-release/
--
{-# INLINE rollingMap #-}
rollingMap :: Monad m => (Maybe a -> a -> b) -> Stream m a -> Stream m b
-- rollingMap f = scanMaybe (FL.slide2 $ Window.rollingMap f)
rollingMap :: forall (m :: * -> *) a b.
Monad m =>
(Maybe a -> a -> b) -> Stream m a -> Stream m b
rollingMap Maybe a -> a -> b
f = forall (m :: * -> *) a b.
Monad m =>
(Maybe a -> a -> m b) -> Stream m a -> Stream m b
rollingMapM (\Maybe a
x a
y -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Maybe a -> a -> b
f Maybe a
x a
y)

-- | Like 'rollingMap' but requires at least two elements in the stream,
-- returns an empty stream otherwise.
--
-- This is the stream equivalent of the list idiom @zipWith f xs (tail xs)@.
--
-- /Pre-release/
--
{-# INLINE rollingMap2 #-}
rollingMap2 :: Monad m => (a -> a -> b) -> Stream m a -> Stream m b
rollingMap2 :: forall (m :: * -> *) a b.
Monad m =>
(a -> a -> b) -> Stream m a -> Stream m b
rollingMap2 a -> a -> b
f = forall (m :: * -> *) a. Monad m => Stream m (Maybe a) -> Stream m a
catMaybes forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a b.
Monad m =>
(Maybe a -> a -> b) -> Stream m a -> Stream m b
rollingMap Maybe a -> a -> Maybe b
g

    where

    g :: Maybe a -> a -> Maybe b
g Maybe a
Nothing a
_ = forall a. Maybe a
Nothing
    g (Just a
x) a
y = forall a. a -> Maybe a
Just (a -> a -> b
f a
x a
y)

------------------------------------------------------------------------------
-- Maybe Streams
------------------------------------------------------------------------------

-- XXX Will this always fuse properly?

-- | Map a 'Maybe' returning function to a stream, filter out the 'Nothing'
-- elements, and return a stream of values extracted from 'Just'.
--
-- Equivalent to:
--
-- >>> mapMaybe f = Stream.catMaybes . fmap f
--
{-# INLINE_NORMAL mapMaybe #-}
mapMaybe :: Monad m => (a -> Maybe b) -> Stream m a -> Stream m b
mapMaybe :: forall (m :: * -> *) a b.
Monad m =>
(a -> Maybe b) -> Stream m a -> Stream m b
mapMaybe a -> Maybe b
f = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. HasCallStack => Maybe a -> a
fromJust forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m a
filter forall a. Maybe a -> Bool
isJust forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a b.
Monad m =>
(a -> b) -> Stream m a -> Stream m b
map a -> Maybe b
f

-- | Like 'mapMaybe' but maps a monadic function.
--
-- Equivalent to:
--
-- >>> mapMaybeM f = Stream.catMaybes . Stream.mapM f
--
-- >>> mapM f = Stream.mapMaybeM (\x -> Just <$> f x)
--
{-# INLINE_NORMAL mapMaybeM #-}
mapMaybeM :: Monad m => (a -> m (Maybe b)) -> Stream m a -> Stream m b
mapMaybeM :: forall (m :: * -> *) a b.
Monad m =>
(a -> m (Maybe b)) -> Stream m a -> Stream m b
mapMaybeM a -> m (Maybe b)
f = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. HasCallStack => Maybe a -> a
fromJust forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m a
filter forall a. Maybe a -> Bool
isJust forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Stream m a -> Stream m b
mapM a -> m (Maybe b)
f

-- | In a stream of 'Maybe's, discard 'Nothing's and unwrap 'Just's.
--
-- >>> catMaybes = Stream.mapMaybe id
-- >>> catMaybes = fmap fromJust . Stream.filter isJust
--
-- /Pre-release/
--
{-# INLINE catMaybes #-}
catMaybes :: Monad m => Stream m (Maybe a) -> Stream m a
-- catMaybes = fmap fromJust . filter isJust
catMaybes :: forall (m :: * -> *) a. Monad m => Stream m (Maybe a) -> Stream m a
catMaybes (Stream State StreamK m (Maybe a) -> s -> m (Step s (Maybe a))
step s
state) = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
Stream forall {m :: * -> *} {a}. State StreamK m a -> s -> m (Step s a)
step1 s
state

    where

    {-# INLINE_LATE step1 #-}
    step1 :: State StreamK m a -> s -> m (Step s a)
step1 State StreamK m a
gst s
st = do
        Step s (Maybe a)
r <- State StreamK m (Maybe a) -> s -> m (Step s (Maybe a))
step (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
gst) s
st
        case Step s (Maybe a)
r of
            Yield Maybe a
x s
s -> do
                forall (m :: * -> *) a. Monad m => a -> m a
return
                    forall a b. (a -> b) -> a -> b
$ case Maybe a
x of
                        Just a
a -> forall s a. a -> s -> Step s a
Yield a
a s
s
                        Maybe a
Nothing -> forall s a. s -> Step s a
Skip s
s
            Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip s
s
            Step s (Maybe a)
Stop -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

-- | Use a filtering fold on a stream.
--
-- >>> scanMaybe f = Stream.catMaybes . Stream.postscan f
--
{-# INLINE scanMaybe #-}
scanMaybe :: Monad m => Fold m a (Maybe b) -> Stream m a -> Stream m b
scanMaybe :: forall (m :: * -> *) a b.
Monad m =>
Fold m a (Maybe b) -> Stream m a -> Stream m b
scanMaybe Fold m a (Maybe b)
f = forall (m :: * -> *) a. Monad m => Stream m (Maybe a) -> Stream m a
catMaybes forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> Stream m b
postscan Fold m a (Maybe b)
f

------------------------------------------------------------------------------
-- Either streams
------------------------------------------------------------------------------

-- | Discard 'Right's and unwrap 'Left's in an 'Either' stream.
--
-- >>> catLefts = fmap (fromLeft undefined) . Stream.filter isLeft
--
-- /Pre-release/
--
{-# INLINE catLefts #-}
catLefts :: Monad m => Stream m (Either a b) -> Stream m a
catLefts :: forall (m :: * -> *) a b.
Monad m =>
Stream m (Either a b) -> Stream m a
catLefts = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall a b. a -> Either a b -> a
fromLeft forall a. HasCallStack => a
undefined) forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m a
filter forall a b. Either a b -> Bool
isLeft

-- | Discard 'Left's and unwrap 'Right's in an 'Either' stream.
--
-- >>> catRights = fmap (fromRight undefined) . Stream.filter isRight
--
-- /Pre-release/
--
{-# INLINE catRights #-}
catRights :: Monad m => Stream m (Either a b) -> Stream m b
catRights :: forall (m :: * -> *) a b.
Monad m =>
Stream m (Either a b) -> Stream m b
catRights = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall b a. b -> Either a b -> b
fromRight forall a. HasCallStack => a
undefined) forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m a
filter forall a b. Either a b -> Bool
isRight

-- | Remove the either wrapper and flatten both lefts and as well as rights in
-- the output stream.
--
-- >>> catEithers = fmap (either id id)
--
-- /Pre-release/
--
{-# INLINE catEithers #-}
catEithers :: Monad m => Stream m (Either a a) -> Stream m a
catEithers :: forall (m :: * -> *) a.
Monad m =>
Stream m (Either a a) -> Stream m a
catEithers = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either forall a. a -> a
id forall a. a -> a
id)

------------------------------------------------------------------------------
-- Splitting
------------------------------------------------------------------------------

-- | Split on an infixed separator element, dropping the separator.  The
-- supplied 'Fold' is applied on the split segments.  Splits the stream on
-- separator elements determined by the supplied predicate, separator is
-- considered as infixed between two segments:
--
-- >>> splitOn' p xs = Stream.fold Fold.toList $ Stream.splitOn p Fold.toList (Stream.fromList xs)
-- >>> splitOn' (== '.') "a.b"
-- ["a","b"]
--
-- An empty stream is folded to the default value of the fold:
--
-- >>> splitOn' (== '.') ""
-- [""]
--
-- If one or both sides of the separator are missing then the empty segment on
-- that side is folded to the default output of the fold:
--
-- >>> splitOn' (== '.') "."
-- ["",""]
--
-- >>> splitOn' (== '.') ".a"
-- ["","a"]
--
-- >>> splitOn' (== '.') "a."
-- ["a",""]
--
-- >>> splitOn' (== '.') "a..b"
-- ["a","","b"]
--
-- splitOn is an inverse of intercalating single element:
--
-- > Stream.intercalate (Stream.fromPure '.') Unfold.fromList . Stream.splitOn (== '.') Fold.toList === id
--
-- Assuming the input stream does not contain the separator:
--
-- > Stream.splitOn (== '.') Fold.toList . Stream.intercalate (Stream.fromPure '.') Unfold.fromList === id
--
{-# INLINE splitOn #-}
splitOn :: Monad m => (a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
splitOn :: forall (m :: * -> *) a b.
Monad m =>
(a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
splitOn a -> Bool
predicate Fold m a b
f =
    -- We can express the infix splitting in terms of optional suffix split
    -- fold.  After applying a suffix split fold repeatedly if the last segment
    -- ends with a suffix then we need to return the default output of the fold
    -- after that to make it an infix split.
    --
    -- Alternately, we can also express it using an optional prefix split fold.
    -- If the first segment starts with a prefix then we need to emit the
    -- default output of the fold before that to make it an infix split, and
    -- then apply prefix split fold repeatedly.
    --
    -- Since a suffix split fold can be easily expressed using a
    -- non-backtracking fold, we use that.
    forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> Stream m b
foldManyPost (forall (m :: * -> *) a b.
Monad m =>
(a -> Bool) -> Fold m a b -> Fold m a b
FL.takeEndBy_ a -> Bool
predicate Fold m a b
f)