{-# LANGUAGE CPP                       #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE FlexibleContexts          #-}

-- |
-- Module      : Streamly.Internal.Data.Fold.Types
-- Copyright   : (c) 2019 Composewell Technologies
--               (c) 2013 Gabriel Gonzalez
-- License     : BSD3
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
-- = Stream Consumers
--
-- We can classify stream consumers in the following categories in order of
-- increasing complexity and power:
--
-- == Accumulators
--
-- These are the simplest folds that never fail and never terminate, they
-- accumulate the input values forever and always remain @partial@ and
-- @complete@ at the same time. It means that we can keep adding more input to
-- them or at any time retrieve a consistent result. A
-- 'Streamly.Internal.Data.Fold.sum' operation is an example of an accumulator.
--
-- We can distribute an input stream to two or more accumulators using a @tee@
-- style composition.  Accumulators cannot be applied on a stream one after the
-- other, which we call a @split@ style composition, as the first one itself
-- will never terminate, therefore, the next one will never get to run.
--
-- == Splitters
--
-- Splitters are accumulators that can terminate. When applied on a stream
-- splitters consume part of the stream, thereby, splitting it.  Splitters can
-- be used in a @split@ style composition where one splitter can be applied
-- after the other on an input stream. We can apply a splitter repeatedly on an
-- input stream splitting and consuming it in fragments.  Splitters never fail,
-- therefore, they do not need backtracking, but they can lookahead and return
-- unconsumed input. The 'Streamly.Internal.Data.Parser.take' operation is an
-- example of a splitter. It terminates after consuming @n@ items. Coupled with
-- an accumulator it can be used to split the stream into chunks of fixed size.
--
-- Consider the example of @takeWhile@ operation, it needs to inspect an
-- element for termination decision. However, it does not consume the element
-- on which it terminates. To implement @takeWhile@ a splitter will have to
-- implement a way to return unconsumed input to the driver.
--
-- == Parsers
--
-- Parsers are splitters that can fail and backtrack. Parsers can be composed
-- using an @alternative@ style composition where they can backtrack and apply
-- another parser if one parser fails. 'Streamly.Internal.Data.Parser.satisfy'
-- is a simple example of a parser, it would succeed if the condition is
-- satisfied and it would fail otherwise, on failure an alternative parser can
-- be used on the same input.
--
-- = Types for Stream Consumers
--
-- We use the 'Fold' type to implement the Accumulator and Splitter
-- functionality.  Parsers are represented by the
-- 'Streamly.Internal.Data.Parser.Parser' type.  This is a sweet spot to
-- balance ease of use, type safety and performance.  Using separate
-- Accumulator and Splitter types would encode more information in types but it
-- would make ease of use, implementation, maintenance effort worse. Combining
-- Accumulator, Splitter and Parser into a single
-- 'Streamly.Internal.Data.Parser.Parser' type would make ease of use even
-- better but type safety and performance worse.
--
-- One of the design requirements that we have placed for better ease of use
-- and code reuse is that 'Streamly.Internal.Data.Parser.Parser' type should be
-- a strict superset of the 'Fold' type i.e. it can do everything that a 'Fold'
-- can do and more. Therefore, folds can be easily upgraded to parsers and we
-- can use parser combinators on folds as well when needed.
--
-- = Fold Design
--
-- A fold is represented by a collection of "initial", "step" and "extract"
-- functions. The "initial" action generates the initial state of the fold. The
-- state is internal to the fold and maintains the accumulated output. The
-- "step" function is invoked using the current state and the next input value
-- and results in a @Yield@ or @Stop@. A @Yield@ returns the next intermediate
-- state of the fold, a @Stop@ indicates that the fold has terminated and
-- returns the final value of the accumulator.
--
-- Every @Yield@ indicates that a new accumulated output is available.  The
-- accumulated output can be extracted from the state at any point using
-- "extract". "extract" can never fail. A fold returns a valid output even
-- without any input i.e. even if you call "extract" on "initial" state it
-- provides an output. This is not true for parsers.
--
-- In general, "extract" is used in two cases:
--
-- * When the fold is used as a scan @extract@ is called on the intermediate
-- state every time it is yielded by the fold, the resulting value is yielded
-- as a stream.
-- * When the fold is used as a regular fold, @extract@ is called once when
-- we are done feeding input to the fold.
--
-- = Alternate Designs
--
-- An alternate and simpler design would be to return the intermediate output
-- via @Yield@ along with the state, instead of using "extract" on the yielded
-- state and remove the extract function altogether.
--
-- This may even facilitate more efficient implementation.  Extract from the
-- intermediate state after each yield may be more costly compared to the fold
-- step itself yielding the output. The fold may have more efficient ways to
-- retrieve the output rather than stuffing it in the state and using extract
-- on the state.
--
-- However, removing extract altogether may lead to less optimal code in some
-- cases because the driver of the fold needs to thread around the intermediate
-- output to return it if the stream stops before the fold could @Stop@.  When
-- using this approach, the @splitParse (FL.take filesize)@ benchmark shows a
-- 2x worse performance even after ensuring everything fuses.  So we keep the
-- "extract" approach to ensure better perf in all cases.
--
-- But we could still yield both state and the output in @Yield@, the output
-- can be used for the scan use case, instead of using extract. Extract would
-- then be used only for the case when the stream stops before the fold
-- completes.

module Streamly.Internal.Data.Fold.Types
    ( Fold (..)
    , Fold2 (..)
    , simplify
    , toListRevF  -- experimental
    -- $toListRevF
    , lmap
    , lmapM
    , lfilter
    , lfilterM
    , lcatMaybes
    , ltake
    , ltakeWhile
    , lsessionsOf
    , lchunksOf
    , lchunksOf2

    , duplicate
    , initialize
    , runStep
    )
where

import Control.Applicative (liftA2)
import Control.Concurrent (threadDelay, forkIO, killThread)
import Control.Concurrent.MVar (MVar, newMVar, takeMVar, putMVar)
import Control.Exception (SomeException(..), catch, mask)
import Control.Monad (void)
import Control.Monad.Catch (throwM)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Trans.Control (control)
import Data.Maybe (isJust, fromJust)
#if __GLASGOW_HASKELL__ < 808
import Data.Semigroup (Semigroup(..))
#endif
import Streamly.Internal.Data.Strict (Tuple'(..), Tuple3'(..), Either'(..))
import Streamly.Internal.Data.SVar (MonadAsync)

------------------------------------------------------------------------------
-- Monadic left folds
------------------------------------------------------------------------------

-- | Represents a left fold over an input stream consisting of values of type
-- @a@ to a single value of type @b@ in 'Monad' @m@.
--
-- The fold uses an intermediate state @s@ as accumulator. The @step@ function
-- updates the state and returns the new state. When the fold is done
-- the final result of the fold is extracted from the intermediate state
-- using the @extract@ function.
--
-- @since 0.7.0

data Fold m a b =
  -- | @Fold @ @ step @ @ initial @ @ extract@
  forall s. Fold (s -> a -> m s) (m s) (s -> m b)

-- | Experimental type to provide a side input to the fold for generating the
-- initial state. For example, if we have to fold chunks of a stream and write
-- each chunk to a different file, then we can generate the file name using a
-- monadic action. This is a generalized version of 'Fold'.
--
data Fold2 m c a b =
  -- | @Fold @ @ step @ @ inject @ @ extract@
  forall s. Fold2 (s -> a -> m s) (c -> m s) (s -> m b)

-- | Convert more general type 'Fold2' into a simpler type 'Fold'
simplify :: Fold2 m c a b -> c -> Fold m a b
simplify :: forall (m :: * -> *) c a b. Fold2 m c a b -> c -> Fold m a b
simplify (Fold2 s -> a -> m s
step c -> m s
inject s -> m b
extract) c
c = forall (m :: * -> *) a b s.
(s -> a -> m s) -> m s -> (s -> m b) -> Fold m a b
Fold s -> a -> m s
step (c -> m s
inject c
c) s -> m b
extract

-- | Maps a function on the output of the fold (the type @b@).
instance Functor m => Functor (Fold m a) where
    {-# INLINE fmap #-}
    fmap :: forall a b. (a -> b) -> Fold m a a -> Fold m a b
fmap a -> b
f (Fold s -> a -> m s
step m s
start s -> m a
done) = forall (m :: * -> *) a b s.
(s -> a -> m s) -> m s -> (s -> m b) -> Fold m a b
Fold s -> a -> m s
step m s
start s -> m b
done'
        where
        done' :: s -> m b
done' s
x = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> b
f forall a b. (a -> b) -> a -> b
$! s -> m a
done s
x

-- | The fold resulting from '<*>' distributes its input to both the argument
-- folds and combines their output using the supplied function.
instance Applicative m => Applicative (Fold m a) where
    {-# INLINE pure #-}
    pure :: forall a. a -> Fold m a a
pure a
b = forall (m :: * -> *) a b s.
(s -> a -> m s) -> m s -> (s -> m b) -> Fold m a b
Fold (\() a
_ -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) (forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) (\() -> forall (f :: * -> *) a. Applicative f => a -> f a
pure a
b)

    {-# INLINE (<*>) #-}
    (Fold s -> a -> m s
stepL m s
beginL s -> m (a -> b)
doneL) <*> :: forall a b. Fold m a (a -> b) -> Fold m a a -> Fold m a b
<*> (Fold s -> a -> m s
stepR m s
beginR s -> m a
doneR) =
        let step :: Tuple' s s -> a -> m (Tuple' s s)
step (Tuple' s
xL s
xR) a
a = forall a b. a -> b -> Tuple' a b
Tuple' forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> s -> a -> m s
stepL s
xL a
a forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> s -> a -> m s
stepR s
xR a
a
            begin :: m (Tuple' s s)
begin = forall a b. a -> b -> Tuple' a b
Tuple' forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m s
beginL forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> m s
beginR
            done :: Tuple' s s -> m b
done (Tuple' s
xL s
xR) = s -> m (a -> b)
doneL s
xL forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> s -> m a
doneR s
xR
        in  forall (m :: * -> *) a b s.
(s -> a -> m s) -> m s -> (s -> m b) -> Fold m a b
Fold Tuple' s s -> a -> m (Tuple' s s)
step m (Tuple' s s)
begin Tuple' s s -> m b
done

-- | Combines the outputs of the folds (the type @b@) using their 'Semigroup'
-- instances.
instance (Semigroup b, Monad m) => Semigroup (Fold m a b) where
    {-# INLINE (<>) #-}
    <> :: Fold m a b -> Fold m a b -> Fold m a b
(<>) = forall (f :: * -> *) a b c.
Applicative f =>
(a -> b -> c) -> f a -> f b -> f c
liftA2 forall a. Semigroup a => a -> a -> a
(<>)

-- | Combines the outputs of the folds (the type @b@) using their 'Monoid'
-- instances.
instance (Semigroup b, Monoid b, Monad m) => Monoid (Fold m a b) where
    {-# INLINE mempty #-}
    mempty :: Fold m a b
mempty = forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Monoid a => a
mempty

    {-# INLINE mappend #-}
    mappend :: Fold m a b -> Fold m a b -> Fold m a b
mappend = forall a. Semigroup a => a -> a -> a
(<>)

-- | Combines the fold outputs (type @b@) using their 'Num' instances.
instance (Monad m, Num b) => Num (Fold m a b) where
    {-# INLINE fromInteger #-}
    fromInteger :: Integer -> Fold m a b
fromInteger = forall (f :: * -> *) a. Applicative f => a -> f a
pure forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Num a => Integer -> a
fromInteger

    {-# INLINE negate #-}
    negate :: Fold m a b -> Fold m a b
negate = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. Num a => a -> a
negate

    {-# INLINE abs #-}
    abs :: Fold m a b -> Fold m a b
abs = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. Num a => a -> a
abs

    {-# INLINE signum #-}
    signum :: Fold m a b -> Fold m a b
signum = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. Num a => a -> a
signum

    {-# INLINE (+) #-}
    + :: Fold m a b -> Fold m a b -> Fold m a b
(+) = forall (f :: * -> *) a b c.
Applicative f =>
(a -> b -> c) -> f a -> f b -> f c
liftA2 forall a. Num a => a -> a -> a
(+)

    {-# INLINE (*) #-}
    * :: Fold m a b -> Fold m a b -> Fold m a b
(*) = forall (f :: * -> *) a b c.
Applicative f =>
(a -> b -> c) -> f a -> f b -> f c
liftA2 forall a. Num a => a -> a -> a
(*)

    {-# INLINE (-) #-}
    (-) = forall (f :: * -> *) a b c.
Applicative f =>
(a -> b -> c) -> f a -> f b -> f c
liftA2 (-)

-- | Combines the fold outputs (type @b@) using their 'Fractional' instances.
instance (Monad m, Fractional b) => Fractional (Fold m a b) where
    {-# INLINE fromRational #-}
    fromRational :: Rational -> Fold m a b
fromRational = forall (f :: * -> *) a. Applicative f => a -> f a
pure forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Fractional a => Rational -> a
fromRational

    {-# INLINE recip #-}
    recip :: Fold m a b -> Fold m a b
recip = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. Fractional a => a -> a
recip

    {-# INLINE (/) #-}
    / :: Fold m a b -> Fold m a b -> Fold m a b
(/) = forall (f :: * -> *) a b c.
Applicative f =>
(a -> b -> c) -> f a -> f b -> f c
liftA2 forall a. Fractional a => a -> a -> a
(/)

-- | Combines the fold outputs using their 'Floating' instances.
instance (Monad m, Floating b) => Floating (Fold m a b) where
    {-# INLINE pi #-}
    pi :: Fold m a b
pi = forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Floating a => a
pi

    {-# INLINE exp #-}
    exp :: Fold m a b -> Fold m a b
exp = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. Floating a => a -> a
exp

    {-# INLINE sqrt #-}
    sqrt :: Fold m a b -> Fold m a b
sqrt = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. Floating a => a -> a
sqrt

    {-# INLINE log #-}
    log :: Fold m a b -> Fold m a b
log = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. Floating a => a -> a
log

    {-# INLINE sin #-}
    sin :: Fold m a b -> Fold m a b
sin = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. Floating a => a -> a
sin

    {-# INLINE tan #-}
    tan :: Fold m a b -> Fold m a b
tan = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. Floating a => a -> a
tan

    {-# INLINE cos #-}
    cos :: Fold m a b -> Fold m a b
cos = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. Floating a => a -> a
cos

    {-# INLINE asin #-}
    asin :: Fold m a b -> Fold m a b
asin = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. Floating a => a -> a
asin

    {-# INLINE atan #-}
    atan :: Fold m a b -> Fold m a b
atan = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. Floating a => a -> a
atan

    {-# INLINE acos #-}
    acos :: Fold m a b -> Fold m a b
acos = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. Floating a => a -> a
acos

    {-# INLINE sinh #-}
    sinh :: Fold m a b -> Fold m a b
sinh = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. Floating a => a -> a
sinh

    {-# INLINE tanh #-}
    tanh :: Fold m a b -> Fold m a b
tanh = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. Floating a => a -> a
tanh

    {-# INLINE cosh #-}
    cosh :: Fold m a b -> Fold m a b
cosh = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. Floating a => a -> a
cosh

    {-# INLINE asinh #-}
    asinh :: Fold m a b -> Fold m a b
asinh = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. Floating a => a -> a
asinh

    {-# INLINE atanh #-}
    atanh :: Fold m a b -> Fold m a b
atanh = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. Floating a => a -> a
atanh

    {-# INLINE acosh #-}
    acosh :: Fold m a b -> Fold m a b
acosh = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. Floating a => a -> a
acosh

    {-# INLINE (**) #-}
    ** :: Fold m a b -> Fold m a b -> Fold m a b
(**) = forall (f :: * -> *) a b c.
Applicative f =>
(a -> b -> c) -> f a -> f b -> f c
liftA2 forall a. Floating a => a -> a -> a
(**)

    {-# INLINE logBase #-}
    logBase :: Fold m a b -> Fold m a b -> Fold m a b
logBase = forall (f :: * -> *) a b c.
Applicative f =>
(a -> b -> c) -> f a -> f b -> f c
liftA2 forall a. Floating a => a -> a -> a
logBase

------------------------------------------------------------------------------
-- Internal APIs
------------------------------------------------------------------------------

-- $toListRevF
-- This is more efficient than 'Streamly.Internal.Data.Fold.toList'. toList is
-- exactly the same as reversing the list after 'toListRevF'.

-- | Buffers the input stream to a list in the reverse order of the input.
--
-- /Warning!/ working on large lists accumulated as buffers in memory could be
-- very inefficient, consider using "Streamly.Array" instead.
--
-- @since 0.7.0

--  xn : ... : x2 : x1 : []
{-# INLINABLE toListRevF #-}
toListRevF :: Monad m => Fold m a [a]
toListRevF :: forall (m :: * -> *) a. Monad m => Fold m a [a]
toListRevF = forall (m :: * -> *) a b s.
(s -> a -> m s) -> m s -> (s -> m b) -> Fold m a b
Fold (\[a]
xs a
x -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ a
xforall a. a -> [a] -> [a]
:[a]
xs) (forall (m :: * -> *) a. Monad m => a -> m a
return []) forall (m :: * -> *) a. Monad m => a -> m a
return

-- | @(lmap f fold)@ maps the function @f@ on the input of the fold.
--
-- >>> S.fold (FL.lmap (\x -> x * x) FL.sum) (S.enumerateFromTo 1 100)
-- 338350
--
-- @since 0.7.0
{-# INLINABLE lmap #-}
lmap :: (a -> b) -> Fold m b r -> Fold m a r
lmap :: forall a b (m :: * -> *) r. (a -> b) -> Fold m b r -> Fold m a r
lmap a -> b
f (Fold s -> b -> m s
step m s
begin s -> m r
done) = forall (m :: * -> *) a b s.
(s -> a -> m s) -> m s -> (s -> m b) -> Fold m a b
Fold s -> a -> m s
step' m s
begin s -> m r
done
  where
    step' :: s -> a -> m s
step' s
x a
a = s -> b -> m s
step s
x (a -> b
f a
a)

-- | @(lmapM f fold)@ maps the monadic function @f@ on the input of the fold.
--
-- @since 0.7.0
{-# INLINABLE lmapM #-}
lmapM :: Monad m => (a -> m b) -> Fold m b r -> Fold m a r
lmapM :: forall (m :: * -> *) a b r.
Monad m =>
(a -> m b) -> Fold m b r -> Fold m a r
lmapM a -> m b
f (Fold s -> b -> m s
step m s
begin s -> m r
done) = forall (m :: * -> *) a b s.
(s -> a -> m s) -> m s -> (s -> m b) -> Fold m a b
Fold s -> a -> m s
step' m s
begin s -> m r
done
  where
    step' :: s -> a -> m s
step' s
x a
a = a -> m b
f a
a forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= s -> b -> m s
step s
x

------------------------------------------------------------------------------
-- Filtering
------------------------------------------------------------------------------

-- | Include only those elements that pass a predicate.
--
-- >>> S.fold (lfilter (> 5) FL.sum) [1..10]
-- 40
--
-- @since 0.7.0
{-# INLINABLE lfilter #-}
lfilter :: Monad m => (a -> Bool) -> Fold m a r -> Fold m a r
lfilter :: forall (m :: * -> *) a r.
Monad m =>
(a -> Bool) -> Fold m a r -> Fold m a r
lfilter a -> Bool
f (Fold s -> a -> m s
step m s
begin s -> m r
done) = forall (m :: * -> *) a b s.
(s -> a -> m s) -> m s -> (s -> m b) -> Fold m a b
Fold s -> a -> m s
step' m s
begin s -> m r
done
  where
    step' :: s -> a -> m s
step' s
x a
a = if a -> Bool
f a
a then s -> a -> m s
step s
x a
a else forall (m :: * -> *) a. Monad m => a -> m a
return s
x

-- | Like 'lfilter' but with a monadic predicate.
--
-- @since 0.7.0
{-# INLINABLE lfilterM #-}
lfilterM :: Monad m => (a -> m Bool) -> Fold m a r -> Fold m a r
lfilterM :: forall (m :: * -> *) a r.
Monad m =>
(a -> m Bool) -> Fold m a r -> Fold m a r
lfilterM a -> m Bool
f (Fold s -> a -> m s
step m s
begin s -> m r
done) = forall (m :: * -> *) a b s.
(s -> a -> m s) -> m s -> (s -> m b) -> Fold m a b
Fold s -> a -> m s
step' m s
begin s -> m r
done
  where
    step' :: s -> a -> m s
step' s
x a
a = do
      Bool
use <- a -> m Bool
f a
a
      if Bool
use then s -> a -> m s
step s
x a
a else forall (m :: * -> *) a. Monad m => a -> m a
return s
x

-- | Transform a fold from a pure input to a 'Maybe' input, consuming only
-- 'Just' values.
{-# INLINE lcatMaybes #-}
lcatMaybes :: Monad m => Fold m a b -> Fold m (Maybe a) b
lcatMaybes :: forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Fold m (Maybe a) b
lcatMaybes = forall (m :: * -> *) a r.
Monad m =>
(a -> Bool) -> Fold m a r -> Fold m a r
lfilter forall a. Maybe a -> Bool
isJust forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b (m :: * -> *) r. (a -> b) -> Fold m b r -> Fold m a r
lmap forall a. HasCallStack => Maybe a -> a
fromJust

------------------------------------------------------------------------------
-- Parsing
------------------------------------------------------------------------------

-- XXX These should become terminating folds.
--
-- | Take first @n@ elements from the stream and discard the rest.
--
-- @since 0.7.0
{-# INLINABLE ltake #-}
ltake :: Monad m => Int -> Fold m a b -> Fold m a b
ltake :: forall (m :: * -> *) a b.
Monad m =>
Int -> Fold m a b -> Fold m a b
ltake Int
n (Fold s -> a -> m s
step m s
initial s -> m b
done) = forall (m :: * -> *) a b s.
(s -> a -> m s) -> m s -> (s -> m b) -> Fold m a b
Fold Tuple' Int s -> a -> m (Tuple' Int s)
step' m (Tuple' Int s)
initial' forall {a}. Tuple' a s -> m b
done'
    where
    initial' :: m (Tuple' Int s)
initial' = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall a b. a -> b -> Tuple' a b
Tuple' Int
0) m s
initial
    step' :: Tuple' Int s -> a -> m (Tuple' Int s)
step' (Tuple' Int
i s
r) a
a = do
        if Int
i forall a. Ord a => a -> a -> Bool
< Int
n
        then do
            s
res <- s -> a -> m s
step s
r a
a
            forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b. a -> b -> Tuple' a b
Tuple' (Int
i forall a. Num a => a -> a -> a
+ Int
1) s
res
        else forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b. a -> b -> Tuple' a b
Tuple' Int
i s
r
    done' :: Tuple' a s -> m b
done' (Tuple' a
_ s
r) = s -> m b
done s
r

-- | Takes elements from the input as long as the predicate succeeds.
--
-- @since 0.7.0
{-# INLINABLE ltakeWhile #-}
ltakeWhile :: Monad m => (a -> Bool) -> Fold m a b -> Fold m a b
ltakeWhile :: forall (m :: * -> *) a r.
Monad m =>
(a -> Bool) -> Fold m a r -> Fold m a r
ltakeWhile a -> Bool
predicate (Fold s -> a -> m s
step m s
initial s -> m b
done) = forall (m :: * -> *) a b s.
(s -> a -> m s) -> m s -> (s -> m b) -> Fold m a b
Fold Either' s s -> a -> m (Either' s s)
step' forall {b}. m (Either' s b)
initial' Either' s s -> m b
done'
    where
    initial' :: m (Either' s b)
initial' = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a b. a -> Either' a b
Left' m s
initial
    step' :: Either' s s -> a -> m (Either' s s)
step' (Left' s
r) a
a = do
        if a -> Bool
predicate a
a
        then forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a b. a -> Either' a b
Left' forall a b. (a -> b) -> a -> b
$ s -> a -> m s
step s
r a
a
        else forall (m :: * -> *) a. Monad m => a -> m a
return (forall a b. b -> Either' a b
Right' s
r)
    step' Either' s s
r a
_ = forall (m :: * -> *) a. Monad m => a -> m a
return Either' s s
r
    done' :: Either' s s -> m b
done' (Left' s
r) = s -> m b
done s
r
    done' (Right' s
r) = s -> m b
done s
r

------------------------------------------------------------------------------
-- Nesting
------------------------------------------------------------------------------
--
-- | Modify the fold such that when the fold is done, instead of returning the
-- accumulator, it returns a fold. The returned fold starts from where we left
-- i.e. it uses the last accumulator value as the initial value of the
-- accumulator. Thus we can resume the fold later and feed it more input.
--
-- >> do
-- >    more <- S.fold (FL.duplicate FL.sum) (S.enumerateFromTo 1 10)
-- >    evenMore <- S.fold (FL.duplicate more) (S.enumerateFromTo 11 20)
-- >    S.fold evenMore (S.enumerateFromTo 21 30)
-- > 465
--
-- @since 0.7.0
{-# INLINABLE duplicate #-}
duplicate :: Applicative m => Fold m a b -> Fold m a (Fold m a b)
duplicate :: forall (m :: * -> *) a b.
Applicative m =>
Fold m a b -> Fold m a (Fold m a b)
duplicate (Fold s -> a -> m s
step m s
begin s -> m b
done) =
    forall (m :: * -> *) a b s.
(s -> a -> m s) -> m s -> (s -> m b) -> Fold m a b
Fold s -> a -> m s
step m s
begin (\s
x -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall (m :: * -> *) a b s.
(s -> a -> m s) -> m s -> (s -> m b) -> Fold m a b
Fold s -> a -> m s
step (forall (f :: * -> *) a. Applicative f => a -> f a
pure s
x) s -> m b
done))

-- | Run the initialization effect of a fold. The returned fold would use the
-- value returned by this effect as its initial value.
--
{-# INLINABLE initialize #-}
initialize :: Monad m => Fold m a b -> m (Fold m a b)
initialize :: forall (m :: * -> *) a b. Monad m => Fold m a b -> m (Fold m a b)
initialize (Fold s -> a -> m s
step m s
initial s -> m b
extract) = do
    s
i <- m s
initial
    forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b s.
(s -> a -> m s) -> m s -> (s -> m b) -> Fold m a b
Fold s -> a -> m s
step (forall (m :: * -> *) a. Monad m => a -> m a
return s
i) s -> m b
extract

-- | Run one step of a fold and store the accumulator as an initial value in
-- the returned fold.
{-# INLINABLE runStep #-}
runStep :: Monad m => Fold m a b -> a -> m (Fold m a b)
runStep :: forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> a -> m (Fold m a b)
runStep (Fold s -> a -> m s
step m s
initial s -> m b
extract) a
a = do
    s
i <- m s
initial
    s
r <- s -> a -> m s
step s
i a
a
    forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ (forall (m :: * -> *) a b s.
(s -> a -> m s) -> m s -> (s -> m b) -> Fold m a b
Fold s -> a -> m s
step (forall (m :: * -> *) a. Monad m => a -> m a
return s
r) s -> m b
extract)

------------------------------------------------------------------------------
-- Parsing
------------------------------------------------------------------------------

-- XXX These can be expressed using foldChunks repeatedly on the input of a
-- fold.

-- | For every n input items, apply the first fold and supply the result to the
-- next fold.
--
{-# INLINE lchunksOf #-}
lchunksOf :: Monad m => Int -> Fold m a b -> Fold m b c -> Fold m a c
lchunksOf :: forall (m :: * -> *) a b c.
Monad m =>
Int -> Fold m a b -> Fold m b c -> Fold m a c
lchunksOf Int
n (Fold s -> a -> m s
step1 m s
initial1 s -> m b
extract1) (Fold s -> b -> m s
step2 m s
initial2 s -> m c
extract2) =
    forall (m :: * -> *) a b s.
(s -> a -> m s) -> m s -> (s -> m b) -> Fold m a b
Fold Tuple3' Int s s -> a -> m (Tuple3' Int s s)
step' m (Tuple3' Int s s)
initial' forall {a}. Tuple3' a s s -> m c
extract'

    where

    initial' :: m (Tuple3' Int s s)
initial' = (forall a b c. a -> b -> c -> Tuple3' a b c
Tuple3' Int
0) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m s
initial1 forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> m s
initial2
    step' :: Tuple3' Int s s -> a -> m (Tuple3' Int s s)
step' (Tuple3' Int
i s
r1 s
r2) a
a = do
        if Int
i forall a. Ord a => a -> a -> Bool
< Int
n
        then do
            s
res <- s -> a -> m s
step1 s
r1 a
a
            forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b c. a -> b -> c -> Tuple3' a b c
Tuple3' (Int
i forall a. Num a => a -> a -> a
+ Int
1) s
res s
r2
        else do
            b
res <- s -> m b
extract1 s
r1
            s
acc2 <- s -> b -> m s
step2 s
r2 b
res

            s
i1 <- m s
initial1
            s
acc1 <- s -> a -> m s
step1 s
i1 a
a
            forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b c. a -> b -> c -> Tuple3' a b c
Tuple3' Int
1 s
acc1 s
acc2
    extract' :: Tuple3' a s s -> m c
extract' (Tuple3' a
_ s
r1 s
r2) = do
        b
res <- s -> m b
extract1 s
r1
        s
acc2 <- s -> b -> m s
step2 s
r2 b
res
        s -> m c
extract2 s
acc2

{-# INLINE lchunksOf2 #-}
lchunksOf2 :: Monad m => Int -> Fold m a b -> Fold2 m x b c -> Fold2 m x a c
lchunksOf2 :: forall (m :: * -> *) a b x c.
Monad m =>
Int -> Fold m a b -> Fold2 m x b c -> Fold2 m x a c
lchunksOf2 Int
n (Fold s -> a -> m s
step1 m s
initial1 s -> m b
extract1) (Fold2 s -> b -> m s
step2 x -> m s
inject2 s -> m c
extract2) =
    forall (m :: * -> *) c a b s.
(s -> a -> m s) -> (c -> m s) -> (s -> m b) -> Fold2 m c a b
Fold2 Tuple3' Int s s -> a -> m (Tuple3' Int s s)
step' forall {a}. Num a => x -> m (Tuple3' a s s)
inject' forall {a}. Tuple3' a s s -> m c
extract'

    where

    inject' :: x -> m (Tuple3' a s s)
inject' x
x = (forall a b c. a -> b -> c -> Tuple3' a b c
Tuple3' a
0) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m s
initial1 forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> x -> m s
inject2 x
x
    step' :: Tuple3' Int s s -> a -> m (Tuple3' Int s s)
step' (Tuple3' Int
i s
r1 s
r2) a
a = do
        if Int
i forall a. Ord a => a -> a -> Bool
< Int
n
        then do
            s
res <- s -> a -> m s
step1 s
r1 a
a
            forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b c. a -> b -> c -> Tuple3' a b c
Tuple3' (Int
i forall a. Num a => a -> a -> a
+ Int
1) s
res s
r2
        else do
            b
res <- s -> m b
extract1 s
r1
            s
acc2 <- s -> b -> m s
step2 s
r2 b
res

            s
i1 <- m s
initial1
            s
acc1 <- s -> a -> m s
step1 s
i1 a
a
            forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b c. a -> b -> c -> Tuple3' a b c
Tuple3' Int
1 s
acc1 s
acc2
    extract' :: Tuple3' a s s -> m c
extract' (Tuple3' a
_ s
r1 s
r2) = do
        b
res <- s -> m b
extract1 s
r1
        s
acc2 <- s -> b -> m s
step2 s
r2 b
res
        s -> m c
extract2 s
acc2

-- | Group the input stream into windows of n second each and then fold each
-- group using the provided fold function.
--
-- For example, we can copy and distribute a stream to multiple folds where
-- each fold can group the input differently e.g. by one second, one minute and
-- one hour windows respectively and fold each resulting stream of folds.
--
-- @
--
-- -----Fold m a b----|-Fold n a c-|-Fold n a c-|-...-|----Fold m a c
--
-- @
{-# INLINE lsessionsOf #-}
lsessionsOf :: MonadAsync m => Double -> Fold m a b -> Fold m b c -> Fold m a c
lsessionsOf :: forall (m :: * -> *) a b c.
MonadAsync m =>
Double -> Fold m a b -> Fold m b c -> Fold m a c
lsessionsOf Double
n (Fold s -> a -> m s
step1 m s
initial1 s -> m b
extract1) (Fold s -> b -> m s
step2 m s
initial2 s -> m c
extract2) =
    forall (m :: * -> *) a b s.
(s -> a -> m s) -> m s -> (s -> m b) -> Fold m a b
Fold forall {a} {c}.
Tuple3' a (MVar s) c -> a -> m (Tuple3' a (MVar s) c)
step' m (Tuple3' ThreadId (MVar s) (MVar (Either SomeException s)))
initial' forall {e} {b}.
Exception e =>
Tuple3' ThreadId b (MVar (Either e s)) -> m c
extract'

    where

    -- XXX MVar may be expensive we need a cheaper synch mechanism here
    initial' :: m (Tuple3' ThreadId (MVar s) (MVar (Either SomeException s)))
initial' = do
        s
i1 <- m s
initial1
        s
i2 <- m s
initial2
        MVar s
mv1 <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. a -> IO (MVar a)
newMVar s
i1
        MVar (Either SomeException s)
mv2 <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. a -> IO (MVar a)
newMVar (forall a b. b -> Either a b
Right s
i2)
        ThreadId
t <- forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
(RunInBase m b -> b (StM m a)) -> m a
control forall a b. (a -> b) -> a -> b
$ \RunInBase m IO
run ->
            forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore -> do
                ThreadId
tid <- IO () -> IO ThreadId
forkIO forall a b. (a -> b) -> a -> b
$ forall e a. Exception e => IO a -> (e -> IO a) -> IO a
catch (forall a. IO a -> IO a
restore forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ RunInBase m IO
run (forall {a} {b}. MVar s -> MVar (Either a s) -> m b
timerThread MVar s
mv1 MVar (Either SomeException s)
mv2))
                                      (forall a. MVar (Either SomeException a) -> SomeException -> IO ()
handleChildException MVar (Either SomeException s)
mv2)
                RunInBase m IO
run (forall (m :: * -> *) a. Monad m => a -> m a
return ThreadId
tid)
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a b c. a -> b -> c -> Tuple3' a b c
Tuple3' ThreadId
t MVar s
mv1 MVar (Either SomeException s)
mv2
    step' :: Tuple3' a (MVar s) c -> a -> m (Tuple3' a (MVar s) c)
step' acc :: Tuple3' a (MVar s) c
acc@(Tuple3' a
_ MVar s
mv1 c
_) a
a = do
            s
r1 <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> IO a
takeMVar MVar s
mv1
            s
res <- s -> a -> m s
step1 s
r1 a
a
            forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> a -> IO ()
putMVar MVar s
mv1 s
res
            forall (m :: * -> *) a. Monad m => a -> m a
return Tuple3' a (MVar s) c
acc
    extract' :: Tuple3' ThreadId b (MVar (Either e s)) -> m c
extract' (Tuple3' ThreadId
tid b
_ MVar (Either e s)
mv2) = do
        Either e s
r2 <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> IO a
takeMVar MVar (Either e s)
mv2
        forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ ThreadId -> IO ()
killThread ThreadId
tid
        case Either e s
r2 of
            Left e
e -> forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM e
e
            Right s
x -> s -> m c
extract2 s
x

    timerThread :: MVar s -> MVar (Either a s) -> m b
timerThread MVar s
mv1 MVar (Either a s)
mv2 = do
        forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay (forall a b. (RealFrac a, Integral b) => a -> b
round forall a b. (a -> b) -> a -> b
$ Double
n forall a. Num a => a -> a -> a
* Double
1000000)

        s
r1 <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> IO a
takeMVar MVar s
mv1
        s
i1 <- m s
initial1
        forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> a -> IO ()
putMVar MVar s
mv1 s
i1

        b
res1 <- s -> m b
extract1 s
r1
        Either a s
r2 <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> IO a
takeMVar MVar (Either a s)
mv2
        Either a s
res <- case Either a s
r2 of
                    Left a
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return Either a s
r2
                    Right s
x -> forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a b. b -> Either a b
Right forall a b. (a -> b) -> a -> b
$ s -> b -> m s
step2 s
x b
res1
        forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> a -> IO ()
putMVar MVar (Either a s)
mv2 Either a s
res
        MVar s -> MVar (Either a s) -> m b
timerThread MVar s
mv1 MVar (Either a s)
mv2

    handleChildException ::
        MVar (Either SomeException a) -> SomeException -> IO ()
    handleChildException :: forall a. MVar (Either SomeException a) -> SomeException -> IO ()
handleChildException MVar (Either SomeException a)
mv2 SomeException
e = do
        Either SomeException a
r2 <- forall a. MVar a -> IO a
takeMVar MVar (Either SomeException a)
mv2
        let r :: Either SomeException a
r = case Either SomeException a
r2 of
                    Left SomeException
_ -> Either SomeException a
r2
                    Right a
_ -> forall a b. a -> Either a b
Left SomeException
e
        forall a. MVar a -> a -> IO ()
putMVar MVar (Either SomeException a)
mv2 Either SomeException a
r