{-# OPTIONS_GHC -Wno-deprecations #-}

-- |
-- Module      : Streamly.Internal.Data.Stream.IsStream.Transform
-- Copyright   : (c) 2017 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC

module Streamly.Internal.Data.Stream.IsStream.Transform {-# DEPRECATED "Please use \"Streamly.Data.Stream.*\" instead." #-}
    (
    -- * Piping
    -- | Pass through a 'Pipe'.
      transform

    -- * Folding
    , foldrS
    , foldrSShared
    , foldrT

    -- * Mapping
    -- | Stateless one-to-one maps.
    , map
    , sequence
    , mapM
    , smapM

    -- * Mapping Side Effects (Observation)
    -- | See also the intersperse*_ combinators.
    , trace
    , trace_
    , tap
    , tapOffsetEvery
    , tapAsync
    , tapAsyncK
    , distributeAsync_
    , pollCounts

    -- * Scanning By 'Fold'
    , scan
    , scanMany
    , postscan

    -- * Scanning
    -- | Left scans. Stateful, mostly one-to-one maps.
    , scanl'
    , scanlM'
    , scanlMAfter'
    , postscanlMAfter'
    , postscanl'
    , postscanlM'
    , prescanl'
    , prescanlM'
    , scanl1'
    , scanl1M'

    -- XXX Once we have pipes the contravariant transformations can be
    -- represented by attaching pipes before a transformation.
    --
    -- , lscanl'
    -- , lscanlM'
    -- , lscanl1'
    -- , lscanl1M'
    --
    -- , lpostscanl'
    -- , lpostscanlM'
    -- , lprescanl'
    -- , lprescanlM'

    -- * Filtering
    -- | Produce a subset of the stream using criteria based on the values of
    -- the elements. We can use a concatMap and scan for filtering but these
    -- combinators are more efficient and convenient.

    , with
    , deleteBy
    , filter
    , filterM
    , uniq
    , uniqBy
    , nubBy
    , prune
    , repeated

    -- * Trimming
    -- | Produce a subset of the stream trimmed at ends.

    , take
    , takeLast
    , takeLastInterval
    , takeWhile
    , takeWhileM
    , takeWhileLast
    , takeWhileAround
    , drop
    , dropLast
    , dropLastInterval
    , dropWhile
    , dropWhileM
    , dropWhileLast
    , dropWhileAround

    -- * Inserting Elements
    -- | Produce a superset of the stream. This is the opposite of
    -- filtering/sampling.  We can always use concatMap and scan for inserting
    -- but these combinators are more efficient and convenient.

    -- Element agnostic (Opposite of sampling)
    , intersperse
    , intersperseM -- XXX naming
    , intersperseMWith

    , intersperseMSuffix
    , intersperseMSuffixWith
    , interjectSuffix

    -- , interspersePrefix
    -- , interspersePrefixBySpan

    -- * Inserting Side Effects/Time
    , intersperseM_ -- XXX naming
    , delay
    , intersperseMSuffix_
    , delayPost
    , intersperseMPrefix_
    , delayPre

    -- * Element Aware Insertion
    -- | Opposite of filtering
    , insertBy
    -- , intersperseByBefore
    -- , intersperseByAfter

    -- * Reordering
    , reverse
    , reverse'
    , reassembleBy

    -- * Position Indexing
    , indexed
    , indexedR

    -- * Time Indexing
    , timestamped
    , timestampWith
    -- , timestampedR -- timer
    , timeIndexed
    , timeIndexWith

    -- * Searching
    , findIndices -- XXX indicesBy
    , elemIndices -- XXX indicesOf

    -- * Rolling map
    -- | Map using the previous element.
    , rollingMapM
    , rollingMap
    , rollingMap2

    -- * Maybe Streams
    -- Move these to Streamly.Data.Maybe.Stream?
    , catMaybes -- XXX justs (like lefts/rights)
    , mapMaybe
    , mapMaybeM

    -- * Either Streams
    -- Move these to Streamly.Data.Either.Stream?
    , lefts
    , rights
    , both

    -- * Concurrent Evaluation
    -- ** Concurrent Pipelines
    -- | Run streaming stages concurrently.

    , mkAsync
    , mkParallel
    , applyAsync
    , (|$)
    , (|&)

    -- ** Concurrency Control
    , maxThreads

    -- ** Buffering and Sampling
    -- | Evaluate strictly using a buffer of results.  When the buffer becomes
    -- full we can block, drop the new elements, drop the oldest element and
    -- insert the new at the end or keep dropping elements uniformly to match
    -- the rate of the consumer.
    , maxBuffer
    , sampleOld
    , sampleNew
    , sampleRate

    -- ** Rate Limiting
    -- | Evaluate the stream at uniform intervals to maintain a specified
    -- evaluation rate.
    , Rate (..)
    , rate
    , avgRate
    , minRate
    , maxRate
    , constRate

    -- * Diagnostics
    , inspectMode

    -- * Deprecated
    , scanx
    )
where

#include "inline.hs"

import Control.Concurrent (threadDelay)
import Control.Monad (void)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Trans.Class (MonadTrans(..))
import Data.Either (isLeft, isRight, fromLeft, fromRight)
import Data.Kind (Type)
import Data.Maybe (isJust, fromJust)
import Streamly.Internal.Data.Fold.Type (Fold (..))
import Streamly.Internal.Data.Pipe.Type (Pipe (..))
import Streamly.Internal.Data.Stream.IsStream.Combinators
      ( inspectMode, maxBuffer, maxThreads, rate, avgRate, minRate
      , maxRate, constRate)
import Streamly.Internal.Data.Stream.IsStream.Common
    ( absTimesWith
    , drop
    , findIndices
    , map
    , postscanlM'
    , relTimesWith
    , reverse
    , reverse'
    , scanlMAfter'
    , postscanlMAfter'
    , smapM
    , take
    , takeWhile
    , interjectSuffix
    , intersperseM
    , mkAsync
    , mkParallel
    , zipWith
    )
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.Stream.IsStream.Type
    (IsStream(..), fromStreamD, toStreamD, toConsK)
import Streamly.Internal.Data.Stream.Serial (SerialT)
import Streamly.Internal.Data.SVar (Rate(..))
import Streamly.Internal.Data.Time.Units (AbsTime, RelTime64)

import qualified Streamly.Data.Fold as FL
import qualified Streamly.Internal.Data.Stream.Concurrent as Concur
import qualified Streamly.Internal.Data.Stream.Parallel as Par
import qualified Streamly.Internal.Data.Stream.Serial as Serial
import qualified Streamly.Internal.Data.Stream.StreamD as D
    (transform, foldrT, tap, tapOffsetEvery, mapM, scan
    , scanMany, postscan, scanlx', scanlM', scanl', postscanl', prescanl'
    , prescanlM', scanl1M', scanl1', filter, filterM, uniq, deleteBy, takeWhileM
    , dropWhile, dropWhileM, insertBy, intersperse
    , intersperseM_, intersperseMSuffix, intersperseMSuffix_
    , intersperseMSuffixWith, indexed, indexedR, rollingMap, rollingMapM
    , rollingMap2, mapMaybe, mapMaybeM)
import qualified Streamly.Internal.Data.Stream.StreamK.Type as K
    (foldrS, foldrSShared, mapMWith)
import qualified Prelude

import Prelude hiding
       ( filter, drop, dropWhile, take, takeWhile, foldr, map, mapM, sequence
       , reverse, foldr1 , scanl, scanl1, zipWith)

--
-- $setup
-- >>> :m
-- >>> :set -fno-warn-deprecations
-- >>> import Control.Concurrent (threadDelay)
-- >>> import Data.Function ((&))
-- >>> import Streamly.Prelude ((|$))
-- >>> import Prelude hiding ( filter, drop, dropWhile, take, takeWhile, foldr, map, mapM, sequence, reverse, foldr1 , scanl, scanl1)
-- >>> import qualified Streamly.Prelude as Stream
-- >>> import Streamly.Internal.Data.Stream.IsStream as Stream
-- >>> import qualified Streamly.Data.Fold as Fold
-- >>> import qualified Streamly.Internal.Data.Fold as Fold
-- >>> import System.IO (stdout, hSetBuffering, BufferMode(LineBuffering))
--
-- >>> hSetBuffering stdout LineBuffering

------------------------------------------------------------------------------
-- Piping
------------------------------------------------------------------------------

-- | Use a 'Pipe' to transform a stream.
--
-- /Pre-release/
--
{-# INLINE transform #-}
transform :: (IsStream t, Monad m) => Pipe m a b -> t m a -> t m b
transform :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Pipe m a b -> t m a -> t m b
transform Pipe m a b
pipe t m a
xs = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
Monad m =>
Pipe m a b -> Stream m a -> Stream m b
D.transform Pipe m a b
pipe (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
xs)

------------------------------------------------------------------------------
-- Transformation Folds
------------------------------------------------------------------------------

-- | Right fold to a streaming monad.
--
-- > foldrS Stream.cons Stream.nil === id
--
-- 'foldrS' can be used to perform stateless stream to stream transformations
-- like map and filter in general. It can be coupled with a scan to perform
-- stateful transformations. However, note that the custom map and filter
-- routines can be much more efficient than this due to better stream fusion.
--
-- >>> Stream.toList $ Stream.foldrS Stream.cons Stream.nil $ Stream.fromList [1..5]
-- [1,2,3,4,5]
--
-- Find if any element in the stream is 'True':
--
-- >>> Stream.toList $ Stream.foldrS (\x xs -> if odd x then (Stream.fromPure True) else xs) (Stream.fromPure False) $ (Stream.fromList (2:4:5:undefined) :: Stream.SerialT IO Int)
-- [True]
--
-- Map (+2) on odd elements and filter out the even elements:
--
-- >>> Stream.toList $ Stream.foldrS (\x xs -> if odd x then (x + 2) `Stream.cons` xs else xs) Stream.nil $ (Stream.fromList [1..5] :: Stream.SerialT IO Int)
-- [3,5,7]
--
-- 'foldrM' can also be represented in terms of 'foldrS', however, the former
-- is much more efficient:
--
-- > foldrM f z s = runIdentityT $ foldrS (\x xs -> lift $ f x (runIdentityT xs)) (lift z) s
--
-- /Pre-release/
{-# INLINE foldrS #-}
foldrS :: IsStream t => (a -> t m b -> t m b) -> t m b -> t m a -> t m b
foldrS :: forall (t :: (* -> *) -> * -> *) a (m :: * -> *) b.
IsStream t =>
(a -> t m b -> t m b) -> t m b -> t m a -> t m b
foldrS a -> t m b -> t m b
f t m b
z t m a
xs =
    forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
StreamK m a -> t m a
fromStream
        forall a b. (a -> b) -> a -> b
$ forall a (m :: * -> *) b.
(a -> StreamK m b -> StreamK m b)
-> StreamK m b -> StreamK m a -> StreamK m b
K.foldrS
            (\a
y StreamK m b
ys -> forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> StreamK m a
toStream forall a b. (a -> b) -> a -> b
$ a -> t m b -> t m b
f a
y (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
StreamK m a -> t m a
fromStream StreamK m b
ys))
            (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> StreamK m a
toStream t m b
z)
            (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> StreamK m a
toStream t m a
xs)

{-# INLINE foldrSShared #-}
foldrSShared :: IsStream t => (a -> t m b -> t m b) -> t m b -> t m a -> t m b
foldrSShared :: forall (t :: (* -> *) -> * -> *) a (m :: * -> *) b.
IsStream t =>
(a -> t m b -> t m b) -> t m b -> t m a -> t m b
foldrSShared a -> t m b -> t m b
f t m b
z t m a
xs =
    forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
StreamK m a -> t m a
fromStream
        forall a b. (a -> b) -> a -> b
$ forall a (m :: * -> *) b.
(a -> StreamK m b -> StreamK m b)
-> StreamK m b -> StreamK m a -> StreamK m b
K.foldrSShared
            (\a
y StreamK m b
ys -> forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> StreamK m a
toStream forall a b. (a -> b) -> a -> b
$ a -> t m b -> t m b
f a
y (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
StreamK m a -> t m a
fromStream StreamK m b
ys))
            (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> StreamK m a
toStream t m b
z)
            (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> StreamK m a
toStream t m a
xs)

-- | Right fold to a transformer monad.  This is the most general right fold
-- function. 'foldrS' is a special case of 'foldrT', however 'foldrS'
-- implementation can be more efficient:
--
-- > foldrS = foldrT
-- > foldrM f z s = runIdentityT $ foldrT (\x xs -> lift $ f x (runIdentityT xs)) (lift z) s
--
-- 'foldrT' can be used to translate streamly streams to other transformer
-- monads e.g.  to a different streaming type.
--
-- /Pre-release/
{-# INLINE foldrT #-}
foldrT :: (IsStream t, Monad m, Monad (s m), MonadTrans s)
    => (a -> s m b -> s m b) -> s m b -> t m a -> s m b
foldrT :: forall (t :: (* -> *) -> * -> *) (m :: * -> *)
       (s :: (* -> *) -> * -> *) a b.
(IsStream t, Monad m, Monad (s m), MonadTrans s) =>
(a -> s m b -> s m b) -> s m b -> t m a -> s m b
foldrT a -> s m b -> s m b
f s m b
z t m a
s = forall (m :: * -> *) (t :: (* -> *) -> * -> *) a b.
(Monad m, Monad (t m), MonadTrans t) =>
(a -> t m b -> t m b) -> t m b -> Stream m a -> t m b
D.foldrT a -> s m b -> s m b
f s m b
z (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
s)

------------------------------------------------------------------------------
-- Transformation by Mapping
------------------------------------------------------------------------------

-- |
-- @
-- mapM f = sequence . map f
-- @
--
-- Apply a monadic function to each element of the stream and replace it with
-- the output of the resulting action.
--
-- @
-- >>> drain $ Stream.mapM putStr $ Stream.fromList ["a", "b", "c"]
-- abc
--
-- >>> :{
--    drain $ Stream.replicateM 10 (return 1)
--      & (fromSerial . Stream.mapM (\x -> threadDelay 1000000 >> print x))
-- :}
-- 1
-- ...
-- 1
--
-- > drain $ Stream.replicateM 10 (return 1)
--  & (fromAsync . Stream.mapM (\x -> threadDelay 1000000 >> print x))
-- @
--
-- /Concurrent (do not use with 'fromParallel' on infinite streams)/
--
-- @since 0.1.0
{-# INLINE_EARLY mapM #-}
mapM :: forall t m a b. (IsStream t, MonadAsync m) =>
    (a -> m b) -> t m a -> t m b
mapM :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m b
mapM a -> m b
f = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
StreamK m a -> t m a
fromStream forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) b a.
(m b -> StreamK m b -> StreamK m b)
-> (a -> m b) -> StreamK m a -> StreamK m b
K.mapMWith (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(m a -> t m a -> t m a) -> m a -> StreamK m a -> StreamK m a
toConsK (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
m a -> t m a -> t m a
consM @t)) a -> m b
f forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> StreamK m a
toStream

{-# RULES "mapM serial" mapM = mapMSerial #-}
{-# INLINE mapMSerial #-}
mapMSerial :: Monad m => (a -> m b) -> SerialT m a -> SerialT m b
mapMSerial :: forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> SerialT m a -> SerialT m b
mapMSerial = forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> SerialT m a -> SerialT m b
Serial.mapM

-- |
-- @
-- sequence = mapM id
-- @
--
-- Replace the elements of a stream of monadic actions with the outputs of
-- those actions.
--
-- @
-- >>> drain $ Stream.sequence $ Stream.fromList [putStr "a", putStr "b", putStrLn "c"]
-- abc
--
-- >>> :{
-- drain $ Stream.replicateM 3 (return $ threadDelay 1000000 >> print 1)
--  & (fromSerial . Stream.sequence)
-- :}
-- 1
-- 1
-- 1
--
-- >>> :{
-- drain $ Stream.replicateM 3 (return $ threadDelay 1000000 >> print 1)
--  & (fromAsync . Stream.sequence)
-- :}
-- 1
-- 1
-- 1
--
-- @
--
-- /Concurrent (do not use with 'fromParallel' on infinite streams)/
--
-- @since 0.1.0
{-# INLINE sequence #-}
sequence :: (IsStream t, MonadAsync m) => t m (m a) -> t m a
sequence :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m (m a) -> t m a
sequence = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m b
mapM forall a. a -> a
id

------------------------------------------------------------------------------
-- Mapping side effects
------------------------------------------------------------------------------

-- | Tap the data flowing through a stream into a 'Fold'. For example, you may
-- add a tap to log the contents flowing through the stream. The fold is used
-- only for effects, its result is discarded.
--
-- @
--                   Fold m a b
--                       |
-- -----stream m a ---------------stream m a-----
--
-- @
--
-- >>> Stream.drain $ Stream.tap (Fold.drainBy print) (Stream.enumerateFromTo 1 2)
-- 1
-- 2
--
-- Compare with 'trace'.
--
-- @since 0.7.0
{-# INLINE tap #-}
tap :: (IsStream t, Monad m) => FL.Fold m a b -> t m a -> t m a
tap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Fold m a b -> t m a -> t m a
tap Fold m a b
f t m a
xs = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> Stream m a
D.tap Fold m a b
f (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
xs)

-- XXX Remove this. It can be expressed in terms of Fold.sampleFromThen.
--
-- | @tapOffsetEvery offset n@ taps every @n@th element in the stream
-- starting at @offset@. @offset@ can be between @0@ and @n - 1@. Offset 0
-- means start at the first element in the stream. If the offset is outside
-- this range then @offset `mod` n@ is used as offset.
--
-- >>> Stream.drain $ Stream.tapOffsetEvery 0 2 (Fold.rmapM print Fold.toList) $ Stream.enumerateFromTo 0 10
-- [0,2,4,6,8,10]

--
-- /Pre-release/
--
{-# INLINE tapOffsetEvery #-}
tapOffsetEvery :: (IsStream t, Monad m)
    => Int -> Int -> FL.Fold m a b -> t m a -> t m a
tapOffsetEvery :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Int -> Int -> Fold m a b -> t m a -> t m a
tapOffsetEvery Int
offset Int
n Fold m a b
f t m a
xs =
    forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
Monad m =>
Int -> Int -> Fold m a b -> Stream m a -> Stream m a
D.tapOffsetEvery Int
offset Int
n Fold m a b
f (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
xs)

-- | Redirect a copy of the stream to a supplied fold and run it concurrently
-- in an independent thread. The fold may buffer some elements. The buffer size
-- is determined by the prevailing 'maxBuffer' setting.
--
-- @
--               Stream m a -> m b
--                       |
-- -----stream m a ---------------stream m a-----
--
-- @
--
-- @
-- >>> Stream.drain $ Stream.tapAsync (Fold.drainBy print) (Stream.enumerateFromTo 1 2)
-- 1
-- 2
--
-- @
--
-- Exceptions from the concurrently running fold are propagated to the current
-- computation.  Note that, because of buffering in the fold, exceptions may be
-- delayed and may not correspond to the current element being processed in the
-- parent stream, but we guarantee that before the parent stream stops the tap
-- finishes and all exceptions from it are drained.
--
-- >>> tapAsync f = Stream.tapAsyncK (Stream.fold f . Stream.adapt)
--
-- Compare with 'tap'.
--
-- /Pre-release/
{-# INLINE tapAsync #-}
tapAsync :: (IsStream t, MonadAsync m) => FL.Fold m a b -> t m a -> t m a
tapAsync :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
Fold m a b -> t m a -> t m a
tapAsync Fold m a b
f t m a
xs = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
MonadAsync m =>
Fold m a b -> Stream m a -> Stream m a
Par.tapAsyncF Fold m a b
f (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
xs)

-- XXX We should keep only tapAsyncK as tapAsync can be implemented in terms
-- of tapAsyncK and it has better performance. Rename this to tapAsync.

-- | Like 'tapAsyncF' but uses a stream fold function instead of a 'Fold' type.
--
-- /Pre-release/
{-# INLINE tapAsyncK #-}
tapAsyncK :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> t m a
tapAsyncK :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(t m a -> m b) -> t m a -> t m a
tapAsyncK t m a -> m b
f t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
StreamK m a -> t m a
fromStream forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
MonadAsync m =>
(StreamK m a -> m b) -> StreamK m a -> StreamK m a
Par.tapAsyncK (t m a -> m b
f forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
StreamK m a -> t m a
fromStream) (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> StreamK m a
toStream t m a
m)

-- XXX rename this to tapManyAsync_?
--
-- | Concurrently distribute a stream to a collection of fold functions,
-- discarding the outputs of the folds.
--
-- @
-- > Stream.drain $ Stream.distributeAsync_ [Stream.mapM_ print, Stream.mapM_ print] (Stream.enumerateFromTo 1 2)
-- 1
-- 2
-- 1
-- 2
--
-- @
--
-- @
-- distributeAsync_ = flip (foldr tapAsync)
-- @
--
-- /Pre-release/
--
{-# INLINE distributeAsync_ #-}
distributeAsync_ :: (Foldable f, IsStream t, MonadAsync m)
    => f (t m a -> m b) -> t m a -> t m a
distributeAsync_ :: forall (f :: * -> *) (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(Foldable f, IsStream t, MonadAsync m) =>
f (t m a -> m b) -> t m a -> t m a
distributeAsync_ = forall a b c. (a -> b -> c) -> b -> a -> c
flip (forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
Prelude.foldr forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(t m a -> m b) -> t m a -> t m a
tapAsyncK)

-- | @pollCounts predicate transform fold stream@ counts those elements in the
-- stream that pass the @predicate@. The resulting count stream is sent to
-- another thread which transforms it using @transform@ and then folds it using
-- @fold@.  The thread is automatically cleaned up if the stream stops or
-- aborts due to exception.
--
-- For example, to print the count of elements processed every second:
--
-- @
-- > Stream.drain $ Stream.pollCounts (const True) (Stream.rollingMap (-) . Stream.delayPost 1) (FLold.drainBy print)
--           $ Stream.enumerateFrom 0
-- @
--
-- Note: This may not work correctly on 32-bit machines.
--
-- /Pre-release/
--
{-# INLINE pollCounts #-}
pollCounts ::
       (IsStream t, MonadAsync m)
    => (a -> Bool)
    -> (t m Int -> m b)
    -> t m a
    -> t m a
pollCounts :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> Bool) -> (t m Int -> m b) -> t m a -> t m a
pollCounts a -> Bool
predicate t m Int -> m b
f t m a
xs =
      forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD
    forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
MonadAsync m =>
(a -> Bool) -> (Stream m Int -> m b) -> Stream m a -> Stream m a
Concur.tapCountD a -> Bool
predicate (t m Int -> m b
f forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD)
    forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
xs

-- | Apply a monadic function to each element flowing through the stream and
-- discard the results.
--
-- >>> Stream.drain $ Stream.trace print (Stream.enumerateFromTo 1 2)
-- 1
-- 2
--
-- Compare with 'tap'.
--
-- @since 0.7.0
{-# INLINE trace #-}
trace :: (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m a
trace :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m a
trace a -> m b
f = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m b
mapM (\a
x -> forall (f :: * -> *) a. Functor f => f a -> f ()
void (a -> m b
f a
x) forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a. Monad m => a -> m a
return a
x)

-- | Perform a side effect before yielding each element of the stream and
-- discard the results.
--
-- @
-- >>> Stream.drain $ Stream.trace_ (print "got here") (Stream.enumerateFromTo 1 2)
-- "got here"
-- "got here"
--
-- @
--
-- Same as 'intersperseMPrefix_' but always serial.
--
-- See also: 'trace'
--
-- /Pre-release/
{-# INLINE trace_ #-}
trace_ :: (IsStream t, Monad m) => m b -> t m a -> t m a
trace_ :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
m b -> t m a -> t m a
trace_ m b
eff = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Stream m a -> Stream m b
D.mapM (\a
x -> m b
eff forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a. Monad m => a -> m a
return a
x) forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD

------------------------------------------------------------------------------
-- Scanning with a Fold
------------------------------------------------------------------------------

-- | Scan a stream using the given monadic fold.
--
-- >>> Stream.toList $ Stream.takeWhile (< 10) $ Stream.scan Fold.sum (Stream.fromList [1..10])
-- [0,1,3,6]
--
-- @since 0.7.0
{-# INLINE scan #-}
scan :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m b
scan :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Fold m a b -> t m a -> t m b
scan Fold m a b
fld t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> Stream m b
D.scan Fold m a b
fld forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m

-- | Like 'scan' but restarts scanning afresh when the scanning fold
-- terminates.
--
-- /Pre-release/
{-# INLINE scanMany #-}
scanMany :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m b
scanMany :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Fold m a b -> t m a -> t m b
scanMany Fold m a b
fld t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> Stream m b
D.scanMany Fold m a b
fld forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m

-- | Postscan a stream using the given monadic fold.
--
-- The following example extracts the input stream up to a point where the
-- running average of elements is no more than 10:
--
-- >>> import Data.Maybe (fromJust)
-- >>> let avg = Fold.teeWith (/) Fold.sum (fmap fromIntegral Fold.length)
-- >>> :{
--  Stream.toList
--   $ Stream.map (fromJust . fst)
--   $ Stream.takeWhile (\(_,x) -> x <= 10)
--   $ Stream.postscan (Fold.tee Fold.last avg) (Stream.enumerateFromTo 1.0 100.0)
-- :}
-- [1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,11.0,12.0,13.0,14.0,15.0,16.0,17.0,18.0,19.0]
--
-- @since 0.7.0
{-# INLINE postscan #-}
postscan :: (IsStream t, Monad m) => Fold m a b -> t m a -> t m b
postscan :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Fold m a b -> t m a -> t m b
postscan Fold m a b
fld = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> Stream m b
D.postscan Fold m a b
fld forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD

------------------------------------------------------------------------------
-- Scanning - Transformation by Folding
------------------------------------------------------------------------------

-- XXX It may be useful to have a version of scan where we can keep the
-- accumulator independent of the value emitted. So that we do not necessarily
-- have to keep a value in the accumulator which we are not using. We can pass
-- an extraction function that will take the accumulator and the current value
-- of the element and emit the next value in the stream. That will also make it
-- possible to modify the accumulator after using it. In fact, the step function
-- can return new accumulator and the value to be emitted. The signature would
-- be more like mapAccumL. Or we can change the signature of scanx to
-- accommodate this.
--
-- | Strict left scan with an extraction function. Like 'scanl'', but applies a
-- user supplied extraction function (the third argument) at each step. This is
-- designed to work with the @foldl@ library. The suffix @x@ is a mnemonic for
-- extraction.
--
-- /Since 0.2.0/
--
-- /Since: 0.7.0 (Monad m constraint)/
{-# DEPRECATED scanx "Please use scanl followed by map instead." #-}
{-# INLINE scanx #-}
scanx :: (IsStream t, Monad m) => (x -> a -> x) -> x -> (x -> b) -> t m a -> t m b
scanx :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) x a b.
(IsStream t, Monad m) =>
(x -> a -> x) -> x -> (x -> b) -> t m a -> t m b
scanx x -> a -> x
step x
begin x -> b
done = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) x a b.
Monad m =>
(x -> a -> x) -> x -> (x -> b) -> Stream m a -> Stream m b
D.scanlx' x -> a -> x
step x
begin x -> b
done forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD

-- XXX this needs to be concurrent
-- XXX because of the use of D.cons for appending, scanlM' has quadratic
-- complexity when iterated over a stream. We should use StreamK style scanlM'
-- for linear performance on iteration.
--
-- | Like 'scanl'' but with a monadic step function and a monadic seed.
--
-- /Since: 0.4.0/
--
-- /Since: 0.8.0 (signature change)/
{-# INLINE scanlM' #-}
scanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> t m a -> t m b
scanlM' :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
(b -> a -> m b) -> m b -> t m a -> t m b
scanlM' b -> a -> m b
step m b
begin t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> Stream m a -> Stream m b
D.scanlM' b -> a -> m b
step m b
begin forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m

-- XXX because of the use of D.cons for appending, scanl' has quadratic
-- complexity when iterated over a stream. We should use StreamK style scanlM'
-- for linear performance on iteration.

-- | Strict left scan. Like 'map', 'scanl'' too is a one to one transformation,
-- however it adds an extra element.
--
-- @
-- >>> Stream.toList $ Stream.scanl' (+) 0 $ fromList [1,2,3,4]
-- [0,1,3,6,10]
--
-- @
--
-- @
-- >>> Stream.toList $ Stream.scanl' (flip (:)) [] $ Stream.fromList [1,2,3,4]
-- [[],[1],[2,1],[3,2,1],[4,3,2,1]]
--
-- @
--
-- The output of 'scanl'' is the initial value of the accumulator followed by
-- all the intermediate steps and the final result of 'foldl''.
--
-- By streaming the accumulated state after each fold step, we can share the
-- state across multiple stages of stream composition. Each stage can modify or
-- extend the state, do some processing with it and emit it for the next stage,
-- thus modularizing the stream processing. This can be useful in
-- stateful or event-driven programming.
--
-- Consider the following monolithic example, computing the sum and the product
-- of the elements in a stream in one go using a @foldl'@:
--
-- @
-- >>> Stream.foldl' (\(s, p) x -> (s + x, p * x)) (0,1) $ Stream.fromList [1,2,3,4]
-- (10,24)
--
-- @
--
-- Using @scanl'@ we can make it modular by computing the sum in the first
-- stage and passing it down to the next stage for computing the product:
--
-- @
-- >>> :{
--   Stream.foldl' (\(_, p) (s, x) -> (s, p * x)) (0,1)
--   $ Stream.scanl' (\(s, _) x -> (s + x, x)) (0,1)
--   $ Stream.fromList [1,2,3,4]
-- :}
-- (10,24)
--
-- @
--
-- IMPORTANT: 'scanl'' evaluates the accumulator to WHNF.  To avoid building
-- lazy expressions inside the accumulator, it is recommended that a strict
-- data structure is used for accumulator.
--
-- >>> scanl' step z = scan (Fold.foldl' step z)
-- >>> scanl' f z xs = scanlM' (\a b -> return (f a b)) (return z) xs
-- >>> scanl' f z xs = z `Stream.cons` postscanl' f z xs
--
-- See also: 'usingStateT'
--
-- @since 0.2.0
{-# INLINE scanl' #-}
scanl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b
scanl' :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
(b -> a -> b) -> b -> t m a -> t m b
scanl' b -> a -> b
step b
z t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> Stream m a -> Stream m b
D.scanl' b -> a -> b
step b
z forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m

-- | Like 'scanl'' but does not stream the initial value of the accumulator.
--
-- >>> postscanl' step z = postscan (Fold.foldl' step z)
-- >>> postscanl' f z = postscanlM' (\a b -> return (f a b)) (return z)
-- >>> postscanl' f z xs = Stream.drop 1 $ Stream.scanl' f z xs
--
-- @since 0.7.0
{-# INLINE postscanl' #-}
postscanl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b
postscanl' :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
(b -> a -> b) -> b -> t m a -> t m b
postscanl' b -> a -> b
step b
z t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> Stream m a -> Stream m b
D.postscanl' b -> a -> b
step b
z forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m

-- XXX prescanl does not sound very useful, enable only if there is a
-- compelling use case.
--
-- | Like scanl' but does not stream the final value of the accumulator.
--
-- /Pre-release/
{-# INLINE prescanl' #-}
prescanl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b
prescanl' :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
(b -> a -> b) -> b -> t m a -> t m b
prescanl' b -> a -> b
step b
z t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> Stream m a -> Stream m b
D.prescanl' b -> a -> b
step b
z forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m

-- XXX this needs to be concurrent
-- | Like prescanl' but with a monadic step function and a monadic seed.
--
-- /Pre-release/
{-# INLINE prescanlM' #-}
prescanlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> m b -> t m a -> t m b
prescanlM' :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
(b -> a -> m b) -> m b -> t m a -> t m b
prescanlM' b -> a -> m b
step m b
z t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> Stream m a -> Stream m b
D.prescanlM' b -> a -> m b
step m b
z forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m

-- XXX this needs to be concurrent
-- | Like 'scanl1'' but with a monadic step function.
--
-- @since 0.6.0
{-# INLINE scanl1M' #-}
scanl1M' :: (IsStream t, Monad m) => (a -> a -> m a) -> t m a -> t m a
scanl1M' :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> a -> m a) -> t m a -> t m a
scanl1M' a -> a -> m a
step t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Monad m =>
(a -> a -> m a) -> Stream m a -> Stream m a
D.scanl1M' a -> a -> m a
step forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m

-- | Like 'scanl'' but for a non-empty stream. The first element of the stream
-- is used as the initial value of the accumulator. Does nothing if the stream
-- is empty.
--
-- @
-- >>> Stream.toList $ Stream.scanl1' (+) $ fromList [1,2,3,4]
-- [1,3,6,10]
--
-- @
--
-- @since 0.6.0
{-# INLINE scanl1' #-}
scanl1' :: (IsStream t, Monad m) => (a -> a -> a) -> t m a -> t m a
scanl1' :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> a -> a) -> t m a -> t m a
scanl1' a -> a -> a
step t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Monad m =>
(a -> a -> a) -> Stream m a -> Stream m a
D.scanl1' a -> a -> a
step forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m

------------------------------------------------------------------------------
-- Filtering
------------------------------------------------------------------------------

-- | Modify a @t m a -> t m a@ stream transformation that accepts a predicate
-- @(a -> b)@ to accept @((s, a) -> b)@ instead, provided a transformation @t m
-- a -> t m (s, a)@. Convenient to filter with index or time.
--
-- @
-- filterWithIndex = with indexed filter
-- filterWithAbsTime = with timestamped filter
-- filterWithRelTime = with timeIndexed filter
-- @
--
-- /Pre-release/
{-# INLINE with #-}
with :: forall (t :: (Type -> Type) -> Type -> Type) m a b s. Functor (t m) =>
       (t m a -> t m (s, a))
    -> (((s, a) -> b) -> t m (s, a) -> t m (s, a))
    -> (((s, a) -> b) -> t m a -> t m a)
with :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b s.
Functor (t m) =>
(t m a -> t m (s, a))
-> (((s, a) -> b) -> t m (s, a) -> t m (s, a))
-> ((s, a) -> b)
-> t m a
-> t m a
with t m a -> t m (s, a)
f ((s, a) -> b) -> t m (s, a) -> t m (s, a)
comb (s, a) -> b
g = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a b. (a, b) -> b
snd forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((s, a) -> b) -> t m (s, a) -> t m (s, a)
comb (s, a) -> b
g forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> t m (s, a)
f

-- | Include only those elements that pass a predicate.
--
-- @since 0.1.0
{-# INLINE filter #-}
filter :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a
filter :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
filter a -> Bool
p t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m a
D.filter a -> Bool
p forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m

-- | Same as 'filter' but with a monadic predicate.
--
-- @since 0.4.0
{-# INLINE filterM #-}
filterM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a
filterM :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> m Bool) -> t m a -> t m a
filterM a -> m Bool
p t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Monad m =>
(a -> m Bool) -> Stream m a -> Stream m a
D.filterM a -> m Bool
p forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m

-- | Drop repeated elements that are adjacent to each other using the supplied
-- comparison function.
--
-- @uniq = uniqBy (==)
--
-- To strip duplicate path separators:
--
-- @
-- f x y = x == '/' && x == y
-- Stream.toList $ Stream.uniqBy f $ Stream.fromList "//a//b"
-- "/a/b"
-- @
--
-- Space: @O(1)@
--
-- See also: 'nubBy'.
--
-- /Pre-release/
--
{-# INLINE uniqBy #-}
uniqBy :: (IsStream t, Monad m, Functor (t m)) =>
    (a -> a -> Bool) -> t m a -> t m a
uniqBy :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m, Functor (t m)) =>
(a -> a -> Bool) -> t m a -> t m a
uniqBy a -> a -> Bool
eq = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m, Functor (t m)) =>
t m (Maybe a) -> t m a
catMaybes forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(Maybe a -> a -> b) -> t m a -> t m b
rollingMap Maybe a -> a -> Maybe a
f

    where

    f :: Maybe a -> a -> Maybe a
f Maybe a
pre a
curr =
        case Maybe a
pre of
            Maybe a
Nothing -> forall a. a -> Maybe a
Just a
curr
            Just a
x -> if a
x a -> a -> Bool
`eq` a
curr then forall a. Maybe a
Nothing else forall a. a -> Maybe a
Just a
curr

-- | Drop repeated elements that are adjacent to each other.
--
-- @since 0.6.0
{-# INLINE uniq #-}
uniq :: (Eq a, IsStream t, Monad m) => t m a -> t m a
uniq :: forall a (t :: (* -> *) -> * -> *) (m :: * -> *).
(Eq a, IsStream t, Monad m) =>
t m a -> t m a
uniq = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a (m :: * -> *). (Eq a, Monad m) => Stream m a -> Stream m a
D.uniq forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD

-- | Strip all leading and trailing occurrences of an element passing a
-- predicate and make all other consecutive occurrences uniq.
--
-- @
-- prune p = dropWhileAround p $ uniqBy (x y -> p x && p y)
-- @
--
-- @
-- > Stream.prune isSpace (Stream.fromList "  hello      world!   ")
-- "hello world!"
--
-- @
--
-- Space: @O(1)@
--
-- /Unimplemented/
{-# INLINE prune #-}
prune ::
    -- (IsStream t, Monad m, Eq a) =>
    (a -> Bool) -> t m a -> t m a
prune :: forall a (t :: * -> * -> *) m. (a -> Bool) -> t m a -> t m a
prune = forall a. HasCallStack => [Char] -> a
error [Char]
"Not implemented yet!"

-- Possible implementation:
-- @repeated =
--      Stream.catMaybes . Stream.parseMany (Parser.groupBy (==) Fold.repeated)@
--
-- 'Fold.repeated' should return 'Just' when repeated, and 'Nothing' for a
-- single element.
--
-- | Emit only repeated elements, once.
--
-- /Unimplemented/
repeated :: -- (IsStream t, Monad m, Eq a) =>
    t m a -> t m a
repeated :: forall (t :: * -> * -> *) m a. t m a -> t m a
repeated = forall a. HasCallStack => a
undefined

-- We can have more efficient implementations for nubOrd and nubInt by using
-- Set and IntSet to find/remove duplication. For Hashable we can use a
-- hashmap. Use rewrite rules to specialize to more efficient impls.
--
-- | Drop repeated elements anywhere in the stream.
--
-- /Caution: not scalable for infinite streams/
--
-- /See also: nubWindowBy/
--
-- /Unimplemented/
--
{-# INLINE nubBy #-}
nubBy :: -- (IsStream t, Monad m) =>
    (a -> a -> Bool) -> t m a -> t m a
nubBy :: forall a (t :: * -> * -> *) m. (a -> a -> Bool) -> t m a -> t m a
nubBy = forall a. HasCallStack => a
undefined -- fromStreamD . D.nubBy . toStreamD

-- | Deletes the first occurrence of the element in the stream that satisfies
-- the given equality predicate.
--
-- @
-- >>> Stream.toList $ Stream.deleteBy (==) 3 $ Stream.fromList [1,3,3,5]
-- [1,3,5]
--
-- @
--
-- @since 0.6.0
{-# INLINE deleteBy #-}
deleteBy :: (IsStream t, Monad m) => (a -> a -> Bool) -> a -> t m a -> t m a
deleteBy :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> a -> Bool) -> a -> t m a -> t m a
deleteBy a -> a -> Bool
cmp a
x t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Monad m =>
(a -> a -> Bool) -> a -> Stream m a -> Stream m a
D.deleteBy a -> a -> Bool
cmp a
x (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m)

------------------------------------------------------------------------------
-- Lossy Buffering
------------------------------------------------------------------------------

-- XXX We could use 'maxBuffer Block/Drop/Rotate/Sample' instead. However we
-- may want to have the evaluation rate independent of the sampling rate. To
-- support that we can decouple evaluation and sampling in independent stages.
-- The sampling stage would strictly evaluate and sample, the evaluation stage
-- would control the evaluation rate.

-- | Evaluate the input stream continuously and keep only the oldest @n@
-- elements in the buffer, discard the new ones when the buffer is full.  When
-- the output stream is evaluated it consumes the values from the buffer in a
-- FIFO manner.
--
-- /Unimplemented/
--
{-# INLINE sampleOld #-}
sampleOld :: -- (IsStream t, Monad m) =>
    Int -> t m a -> t m a
sampleOld :: forall (t :: * -> * -> *) m a. Int -> t m a -> t m a
sampleOld = forall a. HasCallStack => a
undefined

-- | Evaluate the input stream continuously and keep only the latest @n@
-- elements in a ring buffer, keep discarding the older ones to make space for
-- the new ones.  When the output stream is evaluated it consumes the values
-- from the buffer in a FIFO manner.
--
-- /Unimplemented/
--
{-# INLINE sampleNew #-}
sampleNew :: -- (IsStream t, Monad m) =>
    Int -> t m a -> t m a
sampleNew :: forall (t :: * -> * -> *) m a. Int -> t m a -> t m a
sampleNew = forall a. HasCallStack => a
undefined

-- | Like 'sampleNew' but samples at uniform intervals to match the consumer
-- rate. Note that 'sampleNew' leads to non-uniform sampling depending on the
-- consumer pattern.
--
-- /Unimplemented/
--
{-# INLINE sampleRate #-}
sampleRate :: -- (IsStream t, Monad m) =>
    Double -> t m a -> t m a
sampleRate :: forall (t :: * -> * -> *) m a. Double -> t m a -> t m a
sampleRate = forall a. HasCallStack => a
undefined

------------------------------------------------------------------------------
-- Trimming
------------------------------------------------------------------------------

-- | Same as 'takeWhile' but with a monadic predicate.
--
-- @since 0.4.0
{-# INLINE takeWhileM #-}
takeWhileM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a
takeWhileM :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> m Bool) -> t m a -> t m a
takeWhileM a -> m Bool
p t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Monad m =>
(a -> m Bool) -> Stream m a -> Stream m a
D.takeWhileM a -> m Bool
p forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m

-- See the lastN fold for impl hints. Use a Data.Array based ring buffer.
--
-- takeLast n = Stream.concatM . fmap Array.toStream . fold Fold.lastN
--
-- | Take @n@ elements at the end of the stream.
--
-- O(n) space, where n is the number elements taken.
--
-- /Unimplemented/
{-# INLINE takeLast #-}
takeLast :: -- (IsStream t, Monad m) =>
    Int -> t m a -> t m a
takeLast :: forall (t :: * -> * -> *) m a. Int -> t m a -> t m a
takeLast = forall a. HasCallStack => a
undefined -- fromStreamD $ D.takeLast n $ toStreamD m

-- | Take time interval @i@ seconds at the end of the stream.
--
-- O(n) space, where n is the number elements taken.
--
-- /Unimplemented/
{-# INLINE takeLastInterval #-}
takeLastInterval :: -- (IsStream t, Monad m) =>
    Double -> t m a -> t m a
takeLastInterval :: forall (t :: * -> * -> *) m a. Double -> t m a -> t m a
takeLastInterval = forall a. HasCallStack => a
undefined -- fromStreamD $ D.takeLast n $ toStreamD m

-- | Take all consecutive elements at the end of the stream for which the
-- predicate is true.
--
-- O(n) space, where n is the number elements taken.
--
-- /Unimplemented/
{-# INLINE takeWhileLast #-}
takeWhileLast :: -- (IsStream t, Monad m) =>
    (a -> Bool) -> t m a -> t m a
takeWhileLast :: forall a (t :: * -> * -> *) m. (a -> Bool) -> t m a -> t m a
takeWhileLast = forall a. HasCallStack => a
undefined -- fromStreamD $ D.takeWhileLast n $ toStreamD m

-- | Like 'takeWhile' and 'takeWhileLast' combined.
--
-- O(n) space, where n is the number elements taken from the end.
--
-- /Unimplemented/
{-# INLINE takeWhileAround #-}
takeWhileAround :: -- (IsStream t, Monad m) =>
    (a -> Bool) -> t m a -> t m a
takeWhileAround :: forall a (t :: * -> * -> *) m. (a -> Bool) -> t m a -> t m a
takeWhileAround = forall a. HasCallStack => a
undefined -- fromStreamD $ D.takeWhileAround n $ toStreamD m

-- | Drop elements in the stream as long as the predicate succeeds and then
-- take the rest of the stream.
--
-- @since 0.1.0
{-# INLINE dropWhile #-}
dropWhile :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> t m a
dropWhile :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
dropWhile a -> Bool
p t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m a
D.dropWhile a -> Bool
p forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m

-- | Same as 'dropWhile' but with a monadic predicate.
--
-- @since 0.4.0
{-# INLINE dropWhileM #-}
dropWhileM :: (IsStream t, Monad m) => (a -> m Bool) -> t m a -> t m a
dropWhileM :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> m Bool) -> t m a -> t m a
dropWhileM a -> m Bool
p t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Monad m =>
(a -> m Bool) -> Stream m a -> Stream m a
D.dropWhileM a -> m Bool
p forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m

-- | Drop @n@ elements at the end of the stream.
--
-- O(n) space, where n is the number elements dropped.
--
-- /Unimplemented/
{-# INLINE dropLast #-}
dropLast :: -- (IsStream t, Monad m) =>
    Int -> t m a -> t m a
dropLast :: forall (t :: * -> * -> *) m a. Int -> t m a -> t m a
dropLast = forall a. HasCallStack => a
undefined -- fromStreamD $ D.dropLast n $ toStreamD m

-- | Drop time interval @i@ seconds at the end of the stream.
--
-- O(n) space, where n is the number elements dropped.
--
-- /Unimplemented/
{-# INLINE dropLastInterval #-}
dropLastInterval :: -- (IsStream t, Monad m) =>
    Int -> t m a -> t m a
dropLastInterval :: forall (t :: * -> * -> *) m a. Int -> t m a -> t m a
dropLastInterval = forall a. HasCallStack => a
undefined

-- | Drop all consecutive elements at the end of the stream for which the
-- predicate is true.
--
-- O(n) space, where n is the number elements dropped.
--
-- /Unimplemented/
{-# INLINE dropWhileLast #-}
dropWhileLast :: -- (IsStream t, Monad m) =>
    (a -> Bool) -> t m a -> t m a
dropWhileLast :: forall a (t :: * -> * -> *) m. (a -> Bool) -> t m a -> t m a
dropWhileLast = forall a. HasCallStack => a
undefined -- fromStreamD $ D.dropWhileLast n $ toStreamD m

-- | Like 'dropWhile' and 'dropWhileLast' combined.
--
-- O(n) space, where n is the number elements dropped from the end.
--
-- /Unimplemented/
{-# INLINE dropWhileAround #-}
dropWhileAround :: -- (IsStream t, Monad m) =>
    (a -> Bool) -> t m a -> t m a
dropWhileAround :: forall a (t :: * -> * -> *) m. (a -> Bool) -> t m a -> t m a
dropWhileAround = forall a. HasCallStack => a
undefined -- fromStreamD $ D.dropWhileAround n $ toStreamD m

------------------------------------------------------------------------------
-- Inserting Elements
------------------------------------------------------------------------------

-- | @insertBy cmp elem stream@ inserts @elem@ before the first element in
-- @stream@ that is less than @elem@ when compared using @cmp@.
--
-- @
-- insertBy cmp x = 'mergeBy' cmp ('fromPure' x)
-- @
--
-- @
-- >>> Stream.toList $ Stream.insertBy compare 2 $ Stream.fromList [1,3,5]
-- [1,2,3,5]
--
-- @
--
-- @since 0.6.0
{-# INLINE insertBy #-}
insertBy ::
       (IsStream t, Monad m) => (a -> a -> Ordering) -> a -> t m a -> t m a
insertBy :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> a -> Ordering) -> a -> t m a -> t m a
insertBy a -> a -> Ordering
cmp a
x t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Monad m =>
(a -> a -> Ordering) -> a -> Stream m a -> Stream m a
D.insertBy a -> a -> Ordering
cmp a
x (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m)

-- | Insert a pure value between successive elements of a stream.
--
-- >>> Stream.toList $ Stream.intersperse ',' $ Stream.fromList "hello"
-- "h,e,l,l,o"
--
-- @since 0.7.0
{-# INLINE intersperse #-}
intersperse :: (IsStream t, MonadAsync m) => a -> t m a -> t m a
intersperse :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
a -> t m a -> t m a
intersperse a
a = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. Monad m => a -> Stream m a -> Stream m a
D.intersperse a
a forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD

-- | Insert a side effect before consuming an element of a stream except the
-- first one.
--
-- >>> Stream.drain $ Stream.trace putChar $ Stream.intersperseM_ (putChar '.') $ Stream.fromList "hello"
-- h.e.l.l.o
--
-- /Pre-release/
{-# INLINE intersperseM_ #-}
intersperseM_ :: (IsStream t, Monad m) => m b -> t m a -> t m a
intersperseM_ :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
m b -> t m a -> t m a
intersperseM_ m b
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) b a.
Monad m =>
m b -> Stream m a -> Stream m a
D.intersperseM_ m b
m forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD

-- | Intersperse a monadic action into the input stream after every @n@
-- elements.
--
-- @
-- > Stream.toList $ Stream.intersperseMWith 2 (return ',') $ Stream.fromList "hello"
-- "he,ll,o"
--
-- @
--
-- /Unimplemented/
{-# INLINE intersperseMWith #-}
intersperseMWith :: -- IsStream t =>
    Int -> m a -> t m a -> t m a
intersperseMWith :: forall (m :: * -> *) a (t :: (* -> *) -> * -> *).
Int -> m a -> t m a -> t m a
intersperseMWith Int
_n m a
_f t m a
_xs = forall a. HasCallStack => a
undefined

-- | Insert an effect and its output after consuming an element of a stream.
--
-- >>> Stream.toList $ Stream.trace putChar $ intersperseMSuffix (putChar '.' >> return ',') $ Stream.fromList "hello"
-- h.,e.,l.,l.,o.,"h,e,l,l,o,"
--
-- /Pre-release/
{-# INLINE intersperseMSuffix #-}
intersperseMSuffix :: (IsStream t, Monad m) => m a -> t m a -> t m a
intersperseMSuffix :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
m a -> t m a -> t m a
intersperseMSuffix m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. Monad m => m a -> Stream m a -> Stream m a
D.intersperseMSuffix m a
m forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD

-- | Insert a side effect after consuming an element of a stream.
--
-- @
-- >>> Stream.mapM_ putChar $ Stream.intersperseMSuffix_ (threadDelay 1000000) $ Stream.fromList "hello"
-- hello
--
-- @
--
-- /Pre-release/
--
{-# INLINE intersperseMSuffix_ #-}
intersperseMSuffix_ :: (IsStream t, Monad m) => m b -> t m a -> t m a
intersperseMSuffix_ :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
m b -> t m a -> t m a
intersperseMSuffix_ m b
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) b a.
Monad m =>
m b -> Stream m a -> Stream m a
D.intersperseMSuffix_ m b
m forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD

-- XXX Use an offset argument, like tapOffsetEvery
--
-- | Like 'intersperseMSuffix' but intersperses an effectful action into the
-- input stream after every @n@ elements and after the last element.
--
-- >>> Stream.toList $ Stream.intersperseMSuffixWith 2 (return ',') $ Stream.fromList "hello"
-- "he,ll,o,"
--
-- /Pre-release/
--
{-# INLINE intersperseMSuffixWith #-}
intersperseMSuffixWith :: (IsStream t, Monad m)
    => Int -> m a -> t m a -> t m a
intersperseMSuffixWith :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Int -> m a -> t m a -> t m a
intersperseMSuffixWith Int
n m a
eff =
    forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a.
Monad m =>
Int -> m a -> Stream m a -> Stream m a
D.intersperseMSuffixWith Int
n m a
eff forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD

-- | Insert a side effect before consuming an element of a stream.
--
-- >>> Stream.toList $ Stream.trace putChar $ Stream.intersperseMPrefix_ (putChar '.' >> return ',') $ Stream.fromList "hello"
-- .h.e.l.l.o"hello"
--
-- Same as 'trace_' but may be concurrent.
--
-- /Concurrent/
--
-- /Pre-release/
--
{-# INLINE intersperseMPrefix_ #-}
intersperseMPrefix_ :: (IsStream t, MonadAsync m) => m b -> t m a -> t m a
intersperseMPrefix_ :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, MonadAsync m) =>
m b -> t m a -> t m a
intersperseMPrefix_ m b
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m b
mapM (\a
x -> forall (f :: * -> *) a. Functor f => f a -> f ()
void m b
m forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a. Monad m => a -> m a
return a
x)

------------------------------------------------------------------------------
-- Inserting Time
------------------------------------------------------------------------------

-- Note: delay must be serial.
--
-- | Introduce a delay of specified seconds before consuming an element of the
-- stream except the first one.
--
-- >>> Stream.mapM_ print $ Stream.timestamped $ Stream.delay 1 $ Stream.enumerateFromTo 1 3
-- (AbsTime (TimeSpec {sec = ..., nsec = ...}),1)
-- (AbsTime (TimeSpec {sec = ..., nsec = ...}),2)
-- (AbsTime (TimeSpec {sec = ..., nsec = ...}),3)
--
-- @since 0.8.0
--
{-# INLINE delay #-}
delay :: (IsStream t, MonadIO m) => Double -> t m a -> t m a
delay :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadIO m) =>
Double -> t m a -> t m a
delay Double
n = forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
m b -> t m a -> t m a
intersperseM_ forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay forall a b. (a -> b) -> a -> b
$ forall a b. (RealFrac a, Integral b) => a -> b
round forall a b. (a -> b) -> a -> b
$ Double
n forall a. Num a => a -> a -> a
* Double
1000000

-- Note: delay must be serial.
--
-- | Introduce a delay of specified seconds after consuming an element of a
-- stream.
--
-- >>> Stream.mapM_ print $ Stream.timestamped $ Stream.delayPost 1 $ Stream.enumerateFromTo 1 3
-- (AbsTime (TimeSpec {sec = ..., nsec = ...}),1)
-- (AbsTime (TimeSpec {sec = ..., nsec = ...}),2)
-- (AbsTime (TimeSpec {sec = ..., nsec = ...}),3)
--
-- /Pre-release/
--
{-# INLINE delayPost #-}
delayPost :: (IsStream t, MonadIO m) => Double -> t m a -> t m a
delayPost :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadIO m) =>
Double -> t m a -> t m a
delayPost Double
n = forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
m b -> t m a -> t m a
intersperseMSuffix_ forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay forall a b. (a -> b) -> a -> b
$ forall a b. (RealFrac a, Integral b) => a -> b
round forall a b. (a -> b) -> a -> b
$ Double
n forall a. Num a => a -> a -> a
* Double
1000000

-- Note: delay must be serial, that's why 'trace_' is used.
--
-- | Introduce a delay of specified seconds before consuming an element of a
-- stream.
--
-- >>> Stream.mapM_ print $ Stream.timestamped $ Stream.delayPre 1 $ Stream.enumerateFromTo 1 3
-- (AbsTime (TimeSpec {sec = ..., nsec = ...}),1)
-- (AbsTime (TimeSpec {sec = ..., nsec = ...}),2)
-- (AbsTime (TimeSpec {sec = ..., nsec = ...}),3)
--
-- /Pre-release/
--
{-# INLINE delayPre #-}
delayPre :: (IsStream t, MonadIO m) => Double -> t m a -> t m a
delayPre :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadIO m) =>
Double -> t m a -> t m a
delayPre Double
n = forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
m b -> t m a -> t m a
trace_ forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay forall a b. (a -> b) -> a -> b
$ forall a b. (RealFrac a, Integral b) => a -> b
round forall a b. (a -> b) -> a -> b
$ Double
n forall a. Num a => a -> a -> a
* Double
1000000

------------------------------------------------------------------------------
-- Reorder in sequence
------------------------------------------------------------------------------

-- | Buffer until the next element in sequence arrives. The function argument
-- determines the difference in sequence numbers. This could be useful in
-- implementing sequenced streams, for example, TCP reassembly.
--
-- /Unimplemented/
--
{-# INLINE reassembleBy #-}
reassembleBy
    :: -- (IsStream t, Monad m) =>
       Fold m a b
    -> (a -> a -> Int)
    -> t m a
    -> t m b
reassembleBy :: forall (m :: * -> *) a b (t :: (* -> *) -> * -> *).
Fold m a b -> (a -> a -> Int) -> t m a -> t m b
reassembleBy = forall a. HasCallStack => a
undefined

------------------------------------------------------------------------------
-- Position Indexing
------------------------------------------------------------------------------

-- |
-- > indexed = Stream.postscanl' (\(i, _) x -> (i + 1, x)) (-1,undefined)
-- > indexed = Stream.zipWith (,) (Stream.enumerateFrom 0)
--
-- Pair each element in a stream with its index, starting from index 0.
--
-- >>> Stream.toList $ Stream.indexed $ Stream.fromList "hello"
-- [(0,'h'),(1,'e'),(2,'l'),(3,'l'),(4,'o')]
--
-- @since 0.6.0
{-# INLINE indexed #-}
indexed :: (IsStream t, Monad m) => t m a -> t m (Int, a)
indexed :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> t m (Int, a)
indexed = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. Monad m => Stream m a -> Stream m (Int, a)
D.indexed forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD

-- |
-- > indexedR n = Stream.postscanl' (\(i, _) x -> (i - 1, x)) (n + 1,undefined)
-- > indexedR n = Stream.zipWith (,) (Stream.enumerateFromThen n (n - 1))
--
-- Pair each element in a stream with its index, starting from the
-- given index @n@ and counting down.
--
-- >>> Stream.toList $ Stream.indexedR 10 $ Stream.fromList "hello"
-- [(10,'h'),(9,'e'),(8,'l'),(7,'l'),(6,'o')]
--
-- @since 0.6.0
{-# INLINE indexedR #-}
indexedR :: (IsStream t, Monad m) => Int -> t m a -> t m (Int, a)
indexedR :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Int -> t m a -> t m (Int, a)
indexedR Int
n = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a.
Monad m =>
Int -> Stream m a -> Stream m (Int, a)
D.indexedR Int
n forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD

-------------------------------------------------------------------------------
-- Time Indexing
-------------------------------------------------------------------------------

-- Note: The timestamp stream must be the second stream in the zip so that the
-- timestamp is generated after generating the stream element and not before.
-- If we do not do that then the following example will generate the same
-- timestamp for first two elements:
--
-- Stream.mapM_ print $ Stream.timestamped $ Stream.delay $ Stream.enumerateFromTo 1 3
--
-- | Pair each element in a stream with an absolute timestamp, using a clock of
-- specified granularity.  The timestamp is generated just before the element
-- is consumed.
--
-- >>> Stream.mapM_ print $ Stream.timestampWith 0.01 $ Stream.delay 1 $ Stream.enumerateFromTo 1 3
-- (AbsTime (TimeSpec {sec = ..., nsec = ...}),1)
-- (AbsTime (TimeSpec {sec = ..., nsec = ...}),2)
-- (AbsTime (TimeSpec {sec = ..., nsec = ...}),3)
--
-- /Pre-release/
--
{-# INLINE timestampWith #-}
timestampWith :: (IsStream t, MonadAsync m, Functor (t m))
    => Double -> t m a -> t m (AbsTime, a)
timestampWith :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m a -> t m (AbsTime, a)
timestampWith Double
g t m a
stream = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b c.
(IsStream t, Monad m) =>
(a -> b -> c) -> t m a -> t m b -> t m c
zipWith (forall a b c. (a -> b -> c) -> b -> a -> c
flip (,)) t m a
stream (forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m AbsTime
absTimesWith Double
g)

-- TBD: check performance vs a custom implementation without using zipWith.
--
-- /Pre-release/
--
{-# INLINE timestamped #-}
timestamped :: (IsStream t, MonadAsync m, Functor (t m))
    => t m a -> t m (AbsTime, a)
timestamped :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
t m a -> t m (AbsTime, a)
timestamped = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m a -> t m (AbsTime, a)
timestampWith Double
0.01

-- | Pair each element in a stream with relative times starting from 0, using a
-- clock with the specified granularity. The time is measured just before the
-- element is consumed.
--
-- >>> Stream.mapM_ print $ Stream.timeIndexWith 0.01 $ Stream.delay 1 $ Stream.enumerateFromTo 1 3
-- (RelTime64 (NanoSecond64 ...),1)
-- (RelTime64 (NanoSecond64 ...),2)
-- (RelTime64 (NanoSecond64 ...),3)
--
-- /Pre-release/
--
{-# INLINE timeIndexWith #-}
timeIndexWith :: (IsStream t, MonadAsync m, Functor (t m))
    => Double -> t m a -> t m (RelTime64, a)
timeIndexWith :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m a -> t m (RelTime64, a)
timeIndexWith Double
g t m a
stream = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b c.
(IsStream t, Monad m) =>
(a -> b -> c) -> t m a -> t m b -> t m c
zipWith (forall a b c. (a -> b -> c) -> b -> a -> c
flip (,)) t m a
stream (forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m RelTime64
relTimesWith Double
g)

-- | Pair each element in a stream with relative times starting from 0, using a
-- 10 ms granularity clock. The time is measured just before the element is
-- consumed.
--
-- >>> Stream.mapM_ print $ Stream.timeIndexed $ Stream.delay 1 $ Stream.enumerateFromTo 1 3
-- (RelTime64 (NanoSecond64 ...),1)
-- (RelTime64 (NanoSecond64 ...),2)
-- (RelTime64 (NanoSecond64 ...),3)
--
-- /Pre-release/
--
{-# INLINE timeIndexed #-}
timeIndexed :: (IsStream t, MonadAsync m, Functor (t m))
    => t m a -> t m (RelTime64, a)
timeIndexed :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
t m a -> t m (RelTime64, a)
timeIndexed = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
Double -> t m a -> t m (RelTime64, a)
timeIndexWith Double
0.01

------------------------------------------------------------------------------
-- Searching
------------------------------------------------------------------------------

-- | Find all the indices where the value of the element in the stream is equal
-- to the given value.
--
-- > elemIndices a = findIndices (== a)
--
-- @since 0.5.0
{-# INLINE elemIndices #-}
elemIndices :: (IsStream t, Eq a, Monad m) => a -> t m a -> t m Int
elemIndices :: forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
(IsStream t, Eq a, Monad m) =>
a -> t m a -> t m Int
elemIndices a
a = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m Int
findIndices (forall a. Eq a => a -> a -> Bool
== a
a)

------------------------------------------------------------------------------
-- Rolling map
------------------------------------------------------------------------------

-- XXX this is not a one-to-one map so calling it map may not be right.
-- We can perhaps call it zipWithTail or rollWith.
--
-- | Apply a function on every two successive elements of a stream. The first
-- argument of the map function is the previous element and the second argument
-- is the current element. When the current element is the first element, the
-- previous element is 'Nothing'.
--
-- /Pre-release/
--
{-# INLINE rollingMap #-}
rollingMap :: (IsStream t, Monad m) => (Maybe a -> a -> b) -> t m a -> t m b
rollingMap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(Maybe a -> a -> b) -> t m a -> t m b
rollingMap Maybe a -> a -> b
f t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
Monad m =>
(Maybe a -> a -> b) -> Stream m a -> Stream m b
D.rollingMap Maybe a -> a -> b
f forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m

-- | Like 'rollingMap' but with an effectful map function.
--
-- /Pre-release/
--
{-# INLINE rollingMapM #-}
rollingMapM :: (IsStream t, Monad m) => (Maybe a -> a -> m b) -> t m a -> t m b
rollingMapM :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(Maybe a -> a -> m b) -> t m a -> t m b
rollingMapM Maybe a -> a -> m b
f t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
Monad m =>
(Maybe a -> a -> m b) -> Stream m a -> Stream m b
D.rollingMapM Maybe a -> a -> m b
f forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m

-- | Like 'rollingMap' but requires at least two elements in the stream,
-- returns an empty stream otherwise.
--
-- This is the stream equivalent of the list idiom @zipWith f xs (tail xs)@.
--
-- /Pre-release/
--
{-# INLINE rollingMap2 #-}
rollingMap2 :: (IsStream t, Monad m) => (a -> a -> b) -> t m a -> t m b
rollingMap2 :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> a -> b) -> t m a -> t m b
rollingMap2 a -> a -> b
f t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
Monad m =>
(a -> a -> b) -> Stream m a -> Stream m b
D.rollingMap2 a -> a -> b
f forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m

------------------------------------------------------------------------------
-- Maybe Streams
------------------------------------------------------------------------------

-- | Map a 'Maybe' returning function to a stream, filter out the 'Nothing'
-- elements, and return a stream of values extracted from 'Just'.
--
-- Equivalent to:
--
-- @
-- mapMaybe f = Stream.map 'fromJust' . Stream.filter 'isJust' . Stream.map f
-- @
--
-- @since 0.3.0
{-# INLINE mapMaybe #-}
mapMaybe :: (IsStream t, Monad m) => (a -> Maybe b) -> t m a -> t m b
mapMaybe :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> Maybe b) -> t m a -> t m b
mapMaybe a -> Maybe b
f t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
Monad m =>
(a -> Maybe b) -> Stream m a -> Stream m b
D.mapMaybe a -> Maybe b
f forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m

-- | Like 'mapMaybe' but maps a monadic function.
--
-- Equivalent to:
--
-- @
-- mapMaybeM f = Stream.map 'fromJust' . Stream.filter 'isJust' . Stream.mapM f
-- @
--
-- /Concurrent (do not use with 'fromParallel' on infinite streams)/
--
-- @since 0.3.0
{-# INLINE_EARLY mapMaybeM #-}
mapMaybeM :: (IsStream t, MonadAsync m, Functor (t m))
          => (a -> m (Maybe b)) -> t m a -> t m b
mapMaybeM :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m, Functor (t m)) =>
(a -> m (Maybe b)) -> t m a -> t m b
mapMaybeM a -> m (Maybe b)
f = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. HasCallStack => Maybe a -> a
fromJust forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
filter forall a. Maybe a -> Bool
isJust forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(a -> m b) -> t m a -> t m b
mapM a -> m (Maybe b)
f

{-# RULES "mapMaybeM serial" mapMaybeM = mapMaybeMSerial #-}
{-# INLINE mapMaybeMSerial #-}
mapMaybeMSerial :: Monad m => (a -> m (Maybe b)) -> SerialT m a -> SerialT m b
mapMaybeMSerial :: forall (m :: * -> *) a b.
Monad m =>
(a -> m (Maybe b)) -> SerialT m a -> SerialT m b
mapMaybeMSerial a -> m (Maybe b)
f SerialT m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
Monad m =>
(a -> m (Maybe b)) -> Stream m a -> Stream m b
D.mapMaybeM a -> m (Maybe b)
f forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD SerialT m a
m

-- | In a stream of 'Maybe's, discard 'Nothing's and unwrap 'Just's.
--
-- /Pre-release/
--
{-# INLINE catMaybes #-}
catMaybes :: (IsStream t, Monad m, Functor (t m)) => t m (Maybe a) -> t m a
catMaybes :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m, Functor (t m)) =>
t m (Maybe a) -> t m a
catMaybes = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. HasCallStack => Maybe a -> a
fromJust forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
filter forall a. Maybe a -> Bool
isJust

------------------------------------------------------------------------------
-- Either streams
------------------------------------------------------------------------------

-- | Discard 'Right's and unwrap 'Left's in an 'Either' stream.
--
-- /Pre-release/
--
{-# INLINE lefts #-}
lefts :: (IsStream t, Monad m, Functor (t m)) => t m (Either a b) -> t m a
lefts :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m, Functor (t m)) =>
t m (Either a b) -> t m a
lefts = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall a b. a -> Either a b -> a
fromLeft forall a. HasCallStack => a
undefined) forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
filter forall a b. Either a b -> Bool
isLeft

-- | Discard 'Left's and unwrap 'Right's in an 'Either' stream.
--
-- /Pre-release/
--
{-# INLINE rights #-}
rights :: (IsStream t, Monad m, Functor (t m)) => t m (Either a b) -> t m b
rights :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m, Functor (t m)) =>
t m (Either a b) -> t m b
rights = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall b a. b -> Either a b -> b
fromRight forall a. HasCallStack => a
undefined) forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
(a -> Bool) -> t m a -> t m a
filter forall a b. Either a b -> Bool
isRight

-- | Remove the either wrapper and flatten both lefts and as well as rights in
-- the output stream.
--
-- /Pre-release/
--
{-# INLINE both #-}
both :: Functor (t m) => t m (Either a a) -> t m a
both :: forall (t :: * -> * -> *) m a.
Functor (t m) =>
t m (Either a a) -> t m a
both = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either forall a. a -> a
id forall a. a -> a
id)

------------------------------------------------------------------------------
-- Concurrent Application
------------------------------------------------------------------------------

-- | Parallel transform application operator; applies a stream transformation
-- function @t m a -> t m b@ to a stream @t m a@ concurrently; the input stream
-- is evaluated asynchronously in an independent thread yielding elements to a
-- buffer and the transformation function runs in another thread consuming the
-- input from the buffer.  '|$' is just like regular function application
-- operator '$' except that it is concurrent.
--
-- If you read the signature as @(t m a -> t m b) -> (t m a -> t m b)@ you can
-- look at it as a transformation that converts a transform function to a
-- buffered concurrent transform function.
--
-- The following code prints a value every second even though each stage adds a
-- 1 second delay.
--
--
-- >>> :{
-- Stream.drain $
--    Stream.mapM (\x -> threadDelay 1000000 >> print x)
--      |$ Stream.replicateM 3 (threadDelay 1000000 >> return 1)
-- :}
-- 1
-- 1
-- 1
--
-- /Concurrent/
--
-- /Since: 0.3.0 ("Streamly")/
--
-- @since 0.8.0
{-# INLINE (|$) #-}
(|$) :: (IsStream t, MonadAsync m) => (t m a -> t m b) -> (t m a -> t m b)
-- (|$) f = f . mkAsync
|$ :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(t m a -> t m b) -> t m a -> t m b
(|$) t m a -> t m b
f = t m a -> t m b
f forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a
mkParallel

infixr 0 |$

-- | Same as '|$'.
--
--  /Internal/
--
{-# INLINE applyAsync #-}
applyAsync :: (IsStream t, MonadAsync m)
    => (t m a -> t m b) -> (t m a -> t m b)
applyAsync :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(t m a -> t m b) -> t m a -> t m b
applyAsync = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(t m a -> t m b) -> t m a -> t m b
(|$)

-- | Same as '|$' but with arguments reversed.
--
-- (|&) = flip (|$)
--
-- /Concurrent/
--
-- /Since: 0.3.0 ("Streamly")/
--
-- @since 0.8.0
{-# INLINE (|&) #-}
(|&) :: (IsStream t, MonadAsync m) => t m a -> (t m a -> t m b) -> t m b
t m a
x |& :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
t m a -> (t m a -> t m b) -> t m b
|& t m a -> t m b
f = t m a -> t m b
f forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(t m a -> t m b) -> t m a -> t m b
|$ t m a
x

infixl 1 |&