{-# OPTIONS_GHC -Wno-deprecations #-}

-- |
-- Module      : Streamly.Internal.Data.Stream.IsStream.Reduce
-- Copyright   : (c) 2017 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
-- Reduce streams by streams, folds or parsers.

module Streamly.Internal.Data.Stream.IsStream.Reduce {-# DEPRECATED "Please use \"Streamly.Data.Stream.*\" instead." #-}
    (
    -- * Reduce By Streams
      dropPrefix
    , dropInfix
    , dropSuffix

    -- * Reduce By Folds
    -- |
    -- Reduce a stream by folding or parsing chunks of the stream.  Functions
    -- generally ending in these shapes:
    --
    -- @
    -- f (Fold m a b) -> t m a -> t m b
    -- f (Parser a m b) -> t m a -> t m b
    -- @

    -- ** Generic Folding
    -- | Apply folds on a stream.
    , foldMany
    , foldManyPost
    , refoldMany
    , foldSequence
    , foldIterateM
    , refoldIterateM

    -- ** Chunking
    -- | Element unaware grouping.
    , chunksOf
    , arraysOf
    , intervalsOf
    , chunksOfTimeout

    -- ** Splitting
    -- | Streams can be sliced into segments in space or in time. We use the
    -- term @chunk@ to refer to a spatial length of the stream (spatial window)
    -- and the term @session@ to refer to a length in time (time window).

    -- -- *** Using Element Separators
    , splitOn
    , splitOnSuffix
    , splitOnPrefix
    , splitOnAny

    -- , splitBy
    , splitWithSuffix
    -- , splitByPrefix

    -- -- *** Splitting By Sequences
    , splitBySeq
    , splitOnSeq
    , splitOnSuffixSeq
    -- , splitOnPrefixSeq

    -- Keeping the delimiters
    , splitWithSuffixSeq
    -- , splitByPrefixSeq
    -- , wordsBySeq

    -- Splitting using multiple sequence separators
    -- , splitOnAnySeq
    , splitOnSuffixSeqAny
    -- , splitOnAnyPrefixSeq

    -- -- *** Splitting By Streams
    -- -- | Splitting a stream using another stream as separator.

    -- ** Keyed Window Classification

    -- | Split the stream into chunks or windows by position or time. Each
    -- window can be associated with a key, all events associated with a
    -- particular key in the window can be folded to a single result.  The
    -- window termination can be dynamically controlled by the fold.
    --
    -- The term "chunk" is used for a window defined by position of elements
    -- and the term "session" is used for a time window.

    -- *** Tumbling Windows
    -- | A new window starts after the previous window is finished.

    -- , classifyChunksOf
    , classifySessionsByGeneric
    , classifySessionsBy
    , classifySessionsOf

    -- *** Keep Alive Windows
    -- | The window size is extended if an event arrives within the specified
    -- window size. This can represent sessions with idle or inactive timeout.

    -- , classifyKeepAliveChunks
    , classifyKeepAliveSessions

    {-
    -- *** Sliding Windows
    -- | A new window starts after the specified slide from the previous
    -- window. Therefore windows can overlap.
    , classifySlidingChunks
    , classifySlidingSessions
    -- *** Sliding Window Buffers
    -- , slidingChunkBuffer
    -- , slidingSessionBuffer
    -}

    -- * Reduce By Parsers
    -- ** Generic Parsing
    -- | Apply parsers on a stream.
    , parseMany
    , parseManyD
    , parseManyTill
    , parseSequence
    , parseIterate

    -- ** Grouping
    -- In imperative terms, grouped folding can be considered as a nested loop
    -- where we loop over the stream to group elements and then loop over
    -- individual groups to fold them to a single value that is yielded in the
    -- output stream.

    , wordsBy -- stripAndCompactBy
    , wordsOn
    , groups
    , groupsBy
    , groupsByRolling

    -- -- *** Searching Sequences
    -- , seqIndices -- search a sequence in the stream

    -- -- *** Searching Multiple Sequences
    -- , seqIndicesAny -- search any of the given sequence in the stream

    -- -- -- ** Searching Streams
    -- -- | Finding a stream within another stream.

    -- * Nested splitting
    , splitInnerBy
    , splitInnerBySuffix
    )
where

#include "inline.hs"

import Control.Exception (assert)
import Control.Monad.IO.Class (MonadIO(..))
import Data.Heap (Entry(..))
import Data.Kind (Type)
import Data.Map (Map)
import Data.Maybe (isNothing)
import Data.Proxy (Proxy(..))
import Foreign.Storable (Storable)
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.Fold.Type (Fold (..))
import Streamly.Internal.Data.IsMap (IsMap(..))
import Streamly.Internal.Data.Refold.Type (Refold (..))
import Streamly.Internal.Data.Parser (Parser (..), ParseError)
import Streamly.Internal.Data.Array.Type (Array)
import Streamly.Internal.Data.Stream.IsStream.Common
    ( fold
    , interjectSuffix
    , intersperseM
    , map
    , scanlMAfter'
    , foldManyPost
    , splitOnSeq
    , fromPure)
import Streamly.Internal.Data.Stream.IsStream.Type
    (IsStream(..), fromStreamD, toStreamD, cons)
import Streamly.Internal.Data.Stream.Serial(toStreamK)
import Streamly.Internal.Data.Time.Units
       ( AbsTime, MilliSecond64(..), addToAbsTime, toRelTime
       , toAbsTime)
import Streamly.Internal.Data.Unboxed (Unbox)

import qualified Data.Heap as H
import qualified Streamly.Internal.Data.Unfold as Unfold
import qualified Streamly.Internal.Data.Array.Type as A
    (chunksOf, read)
import qualified Streamly.Internal.Data.Fold as FL
    (Fold, Step(..), takeEndBy_, takeEndBy, catMaybes, take)
import qualified Streamly.Internal.Data.IsMap as IsMap
import qualified Streamly.Internal.Data.Parser.ParserD as PRD (Parser(..))
import qualified Streamly.Internal.Data.Stream.IsStream.Type as IsStream
import qualified Streamly.Internal.Data.Stream.StreamD as D
    ( foldMany
    , groupsOf
    , refoldMany
    , foldIterateM
    , refoldIterateM
    , parseManyD
    , parseIterateD
    , groupsBy
    , groupsRollingBy
    , wordsBy
    , splitOnSuffixSeq
    , splitInnerBy
    , splitInnerBySuffix
    )
import qualified Streamly.Internal.Data.Stream.IsStream.Expand as Expand
import qualified Streamly.Internal.Data.Stream.IsStream.Transform as Transform

import Prelude hiding (concatMap, map)

-- $setup
-- >>> :m
-- >>> :set -fno-warn-deprecations
-- >>> import Prelude hiding (zipWith, concatMap, concat)
-- >>> import qualified Streamly.Prelude as Stream
-- >>> import Streamly.Internal.Data.Stream.IsStream as Stream
-- >>> import qualified Streamly.Data.Fold as Fold
-- >>> import qualified Streamly.Internal.Data.Fold as Fold
-- >>> import qualified Streamly.Internal.Data.Unfold as Unfold
-- >>> import qualified Streamly.Internal.Data.Parser as Parser
-- >>> import qualified Streamly.Data.Array as Array

------------------------------------------------------------------------------
-- Trimming
------------------------------------------------------------------------------

-- | Drop prefix from the input stream if present.
--
-- Space: @O(1)@
--
-- /Unimplemented/
{-# INLINE dropPrefix #-}
dropPrefix ::
    -- (Eq a, IsStream t, Monad m) =>
    t m a -> t m a -> t m a
dropPrefix :: forall (t :: * -> * -> *) m a. t m a -> t m a -> t m a
dropPrefix = forall a. HasCallStack => [Char] -> a
error [Char]
"Not implemented yet!"

-- | Drop all matching infix from the input stream if present. Infix stream
-- may be consumed multiple times.
--
-- Space: @O(n)@ where n is the length of the infix.
--
-- /Unimplemented/
{-# INLINE dropInfix #-}
dropInfix ::
    -- (Eq a, IsStream t, Monad m) =>
    t m a -> t m a -> t m a
dropInfix :: forall (t :: * -> * -> *) m a. t m a -> t m a -> t m a
dropInfix = forall a. HasCallStack => [Char] -> a
error [Char]
"Not implemented yet!"

-- | Drop suffix from the input stream if present. Suffix stream may be
-- consumed multiple times.
--
-- Space: @O(n)@ where n is the length of the suffix.
--
-- /Unimplemented/
{-# INLINE dropSuffix #-}
dropSuffix ::
    -- (Eq a, IsStream t, Monad m) =>
    t m a -> t m a -> t m a
dropSuffix :: forall (t :: * -> * -> *) m a. t m a -> t m a -> t m a
dropSuffix = forall a. HasCallStack => [Char] -> a
error [Char]
"Not implemented yet!"

------------------------------------------------------------------------------
-- Folding
------------------------------------------------------------------------------

-- Splitting operations that take a predicate and a Fold can be
-- expressed using parseMany. Operations like chunksOf, intervalsOf, split*,
-- can be expressed using parseMany when used with an appropriate Parser.
--
-- XXX We need takeGE/takeBetween to implement "some" using "many".

-- | Apply a 'Fold' repeatedly on a stream and emit the fold outputs in the
-- output stream.
--
-- To sum every two contiguous elements in a stream:
--
-- >>> f = Fold.take 2 Fold.sum
-- >>> Stream.toList $ Stream.foldMany f $ Stream.fromList [1..10]
-- [3,7,11,15,19]
--
-- On an empty stream the output is empty:
--
-- >>> Stream.toList $ Stream.foldMany f $ Stream.fromList []
-- []
--
-- Note @Stream.foldMany (Fold.take 0)@ would result in an infinite loop in a
-- non-empty stream.
--
-- @since 0.8.0
--
{-# INLINE foldMany #-}
foldMany
    :: (IsStream t, Monad m)
    => Fold m a b
    -> t m a
    -> t m b
foldMany :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Fold m a b -> t m a -> t m b
foldMany Fold m a b
f t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> Stream m b
D.foldMany Fold m a b
f (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m)

-- | Like 'foldMany' but using the 'Refold' type instead of 'Fold'.
--
-- /Pre-release/
{-# INLINE refoldMany #-}
refoldMany :: (IsStream t, Monad m) =>
    Refold m c a b -> m c -> t m a -> t m b
refoldMany :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) c a b.
(IsStream t, Monad m) =>
Refold m c a b -> m c -> t m a -> t m b
refoldMany Refold m c a b
f m c
action = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) x a b.
Monad m =>
Refold m x a b -> m x -> Stream m a -> Stream m b
D.refoldMany Refold m c a b
f m c
action forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD

-- | Apply a stream of folds to an input stream and emit the results in the
-- output stream.
--
-- /Unimplemented/
--
{-# INLINE foldSequence #-}
foldSequence
       :: -- (IsStream t, Monad m) =>
       t m (Fold m a b)
    -> t m a
    -> t m b
foldSequence :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
t m (Fold m a b) -> t m a -> t m b
foldSequence t m (Fold m a b)
_f t m a
_m = forall a. HasCallStack => a
undefined

-- | Iterate a fold generator on a stream. The initial value @b@ is used to
-- generate the first fold, the fold is applied on the stream and the result of
-- the fold is used to generate the next fold and so on.
--
-- @
-- >>> import Data.Monoid (Sum(..))
-- >>> f x = return (Fold.take 2 (Fold.sconcat x))
-- >>> s = Stream.map Sum $ Stream.fromList [1..10]
-- >>> Stream.toList $ Stream.map getSum $ Stream.foldIterateM f (pure 0) s
-- [3,10,21,36,55,55]
--
-- @
--
-- This is the streaming equivalent of monad like sequenced application of
-- folds where next fold is dependent on the previous fold.
--
-- /Pre-release/
--
{-# INLINE foldIterateM #-}
foldIterateM ::
       (IsStream t, Monad m) => (b -> m (Fold m a b)) -> m b -> t m a -> t m b
foldIterateM :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
(b -> m (Fold m a b)) -> m b -> t m a -> t m b
foldIterateM b -> m (Fold m a b)
f m b
i t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) b a.
Monad m =>
(b -> m (Fold m a b)) -> m b -> Stream m a -> Stream m b
D.foldIterateM b -> m (Fold m a b)
f m b
i (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m)

-- | Like 'foldIterateM' but using the 'Refold' type instead. This could be
-- much more efficient due to stream fusion.
--
-- /Internal/
{-# INLINE refoldIterateM #-}
refoldIterateM :: (IsStream t, Monad m) =>
    Refold m b a b -> m b -> t m a -> t m b
refoldIterateM :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
Refold m b a b -> m b -> t m a -> t m b
refoldIterateM Refold m b a b
c m b
i t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) b a.
Monad m =>
Refold m b a b -> m b -> Stream m a -> Stream m b
D.refoldIterateM Refold m b a b
c m b
i (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m)

------------------------------------------------------------------------------
-- Parsing
------------------------------------------------------------------------------

-- | Apply a 'Parser' repeatedly on a stream and emit the parsed values in the
-- output stream.
--
-- This is the streaming equivalent of the 'Streamly.Internal.Data.Parser.many'
-- parse combinator.
--
-- >>> Stream.toList $ Stream.parseMany (Parser.takeBetween 0 2 Fold.sum) $ Stream.fromList [1..10]
-- [Right 3,Right 7,Right 11,Right 15,Right 19]
--
-- @
-- > Stream.toList $ Stream.parseMany (Parser.line Fold.toList) $ Stream.fromList "hello\\nworld"
-- ["hello\\n","world"]
--
-- @
--
-- @
-- foldMany f = parseMany (fromFold f)
-- @
--
-- Known Issues: When the parser fails there is no way to get the remaining
-- stream.
--
-- /Pre-release/
--
{-# INLINE parseMany #-}
parseMany
    :: (IsStream t, Monad m)
    => Parser a m b
    -> t m a
    -> t m (Either ParseError b)
parseMany :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Parser a m b -> t m a -> t m (Either ParseError b)
parseMany Parser a m b
p t m a
m =
    forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
Monad m =>
Parser a m b -> Stream m a -> Stream m (Either ParseError b)
D.parseManyD Parser a m b
p (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m)

-- | Same as parseMany but for StreamD streams.
--
-- /Internal/
--
{-# INLINE parseManyD #-}
parseManyD
    :: (IsStream t, Monad m)
    => PRD.Parser a m b
    -> t m a
    -> t m (Either ParseError b)
parseManyD :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Parser a m b -> t m a -> t m (Either ParseError b)
parseManyD Parser a m b
p t m a
m =
    forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
Monad m =>
Parser a m b -> Stream m a -> Stream m (Either ParseError b)
D.parseManyD Parser a m b
p (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m)

-- | Apply a stream of parsers to an input stream and emit the results in the
-- output stream.
--
-- /Unimplemented/
--
{-# INLINE parseSequence #-}
parseSequence
       :: -- (IsStream t, Monad m) =>
       t m (Parser a m b)
    -> t m a
    -> t m b
parseSequence :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
t m (Parser a m b) -> t m a -> t m b
parseSequence t m (Parser a m b)
_f t m a
_m = forall a. HasCallStack => a
undefined

-- XXX Change the parser arguments' order
--
-- | @parseManyTill collect test stream@ tries the parser @test@ on the input,
-- if @test@ fails it backtracks and tries @collect@, after @collect@ succeeds
-- @test@ is tried again and so on. The parser stops when @test@ succeeds.  The
-- output of @test@ is discarded and the output of @collect@ is emitted in the
-- output stream. The parser fails if @collect@ fails.
--
-- /Unimplemented/
--
{-# INLINE parseManyTill #-}
parseManyTill ::
    -- (IsStream t, MonadThrow m) =>
       Parser a m b
    -> Parser a m x
    -> t m a
    -> t m b
parseManyTill :: forall a (m :: * -> *) b x (t :: (* -> *) -> * -> *).
Parser a m b -> Parser a m x -> t m a -> t m b
parseManyTill = forall a. HasCallStack => a
undefined

-- | Iterate a parser generating function on a stream. The initial value @b@ is
-- used to generate the first parser, the parser is applied on the stream and
-- the result is used to generate the next parser and so on.
--
-- >>> import Data.Monoid (Sum(..))
-- >>> Stream.toList $ fmap getSum $ Stream.rights $ Stream.parseIterate (\b -> Parser.takeBetween 0 2 (Fold.sconcat b)) (Sum 0) $ fmap Sum $ Stream.fromList [1..10]
-- [3,10,21,36,55,55]
--
-- This is the streaming equivalent of monad like sequenced application of
-- parsers where next parser is dependent on the previous parser.
--
-- /Pre-release/
--
{-# INLINE parseIterate #-}
parseIterate
    :: (IsStream t, Monad m)
    => (b -> Parser a m b)
    -> b
    -> t m a
    -> t m (Either ParseError b)
parseIterate :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
(b -> Parser a m b) -> b -> t m a -> t m (Either ParseError b)
parseIterate b -> Parser a m b
f b
i t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$
    forall (m :: * -> *) b a.
Monad m =>
(b -> Parser a m b)
-> b -> Stream m a -> Stream m (Either ParseError b)
D.parseIterateD b -> Parser a m b
f b
i (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m)

------------------------------------------------------------------------------
-- Grouping
------------------------------------------------------------------------------

-- | @groupsBy cmp f $ S.fromList [a,b,c,...]@ assigns the element @a@ to the
-- first group, if @b \`cmp` a@ is 'True' then @b@ is also assigned to the same
-- group.  If @c \`cmp` a@ is 'True' then @c@ is also assigned to the same
-- group and so on. When the comparison fails a new group is started. Each
-- group is folded using the fold @f@ and the result of the fold is emitted in
-- the output stream.
--
-- >>> Stream.toList $ Stream.groupsBy (>) Fold.toList $ Stream.fromList [1,3,7,0,2,5]
-- [[1,3,7],[0,2,5]]
--
-- @since 0.7.0
{-# INLINE groupsBy #-}
groupsBy
    :: (IsStream t, Monad m)
    => (a -> a -> Bool)
    -> Fold m a b
    -> t m a
    -> t m b
groupsBy :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> a -> Bool) -> Fold m a b -> t m a -> t m b
groupsBy a -> a -> Bool
cmp Fold m a b
f t m a
m = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
Monad m =>
(a -> a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
D.groupsBy a -> a -> Bool
cmp Fold m a b
f (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m)

-- | Unlike @groupsBy@ this function performs a rolling comparison of two
-- successive elements in the input stream. @groupsByRolling cmp f $ S.fromList
-- [a,b,c,...]@ assigns the element @a@ to the first group, if @a \`cmp` b@ is
-- 'True' then @b@ is also assigned to the same group.  If @b \`cmp` c@ is
-- 'True' then @c@ is also assigned to the same group and so on. When the
-- comparison fails a new group is started. Each group is folded using the fold
-- @f@.
--
-- >>> Stream.toList $ Stream.groupsByRolling (\a b -> a + 1 == b) Fold.toList $ Stream.fromList [1,2,3,7,8,9]
-- [[1,2,3],[7,8,9]]
--
-- @since 0.7.0
{-# INLINE groupsByRolling #-}
groupsByRolling
    :: (IsStream t, Monad m)
    => (a -> a -> Bool)
    -> Fold m a b
    -> t m a
    -> t m b
groupsByRolling :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> a -> Bool) -> Fold m a b -> t m a -> t m b
groupsByRolling a -> a -> Bool
cmp Fold m a b
f t m a
m =  forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
Monad m =>
(a -> a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
D.groupsRollingBy a -> a -> Bool
cmp Fold m a b
f (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m)

-- |
-- > groups = groupsBy (==)
-- > groups = groupsByRolling (==)
--
-- Groups contiguous spans of equal elements together in individual groups.
--
-- >>> Stream.toList $ Stream.groups Fold.toList $ Stream.fromList [1,1,2,2]
-- [[1,1],[2,2]]
--
-- @since 0.7.0
{-# INLINE groups #-}
groups :: (IsStream t, Monad m, Eq a) => Fold m a b -> t m a -> t m b
groups :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m, Eq a) =>
Fold m a b -> t m a -> t m b
groups = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> a -> Bool) -> Fold m a b -> t m a -> t m b
groupsBy forall a. Eq a => a -> a -> Bool
(==)

------------------------------------------------------------------------------
-- Splitting - by a predicate
------------------------------------------------------------------------------

-- In general we can use deintercalate for splitting.  Then we can also use
-- uniqBy to condense the separators.  One way to generalize splitting is to
-- output:
--
-- data Segment a b = Empty | Segment b | Separator a
--
-- XXX splitOn and splitOnSuffix have a different behavior on an empty stream,
-- is that desirable?

-- | Split on an infixed separator element, dropping the separator.  The
-- supplied 'Fold' is applied on the split segments.  Splits the stream on
-- separator elements determined by the supplied predicate, separator is
-- considered as infixed between two segments:
--
-- >>> splitOn' p xs = Stream.toList $ Stream.splitOn p Fold.toList (Stream.fromList xs)
-- >>> splitOn' (== '.') "a.b"
-- ["a","b"]
--
-- An empty stream is folded to the default value of the fold:
--
-- >>> splitOn' (== '.') ""
-- [""]
--
-- If one or both sides of the separator are missing then the empty segment on
-- that side is folded to the default output of the fold:
--
-- >>> splitOn' (== '.') "."
-- ["",""]
--
-- >>> splitOn' (== '.') ".a"
-- ["","a"]
--
-- >>> splitOn' (== '.') "a."
-- ["a",""]
--
-- >>> splitOn' (== '.') "a..b"
-- ["a","","b"]
--
-- splitOn is an inverse of intercalating single element:
--
-- > Stream.intercalate (Stream.fromPure '.') Unfold.fromList . Stream.splitOn (== '.') Fold.toList === id
--
-- Assuming the input stream does not contain the separator:
--
-- > Stream.splitOn (== '.') Fold.toList . Stream.intercalate (Stream.fromPure '.') Unfold.fromList === id
--
-- @since 0.7.0

{-# INLINE splitOn #-}
splitOn
    :: (IsStream t, Monad m)
    => (a -> Bool) -> Fold m a b -> t m a -> t m b
splitOn :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> Bool) -> Fold m a b -> t m a -> t m b
splitOn a -> Bool
predicate Fold m a b
f =
    -- We can express the infix splitting in terms of optional suffix split
    -- fold.  After applying a suffix split fold repeatedly if the last segment
    -- ends with a suffix then we need to return the default output of the fold
    -- after that to make it an infix split.
    --
    -- Alternately, we can also express it using an optional prefix split fold.
    -- If the first segment starts with a prefix then we need to emit the
    -- default output of the fold before that to make it an infix split, and
    -- then apply prefix split fold repeatedly.
    --
    -- Since a suffix split fold can be easily expressed using a
    -- non-backtracking fold, we use that.
    forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Fold m a b -> t m a -> t m b
foldManyPost (forall (m :: * -> *) a b.
Monad m =>
(a -> Bool) -> Fold m a b -> Fold m a b
FL.takeEndBy_ a -> Bool
predicate Fold m a b
f)

-- | Split on a suffixed separator element, dropping the separator.  The
-- supplied 'Fold' is applied on the split segments.
--
-- >>> splitOnSuffix' p xs = Stream.toList $ Stream.splitOnSuffix p Fold.toList (Stream.fromList xs)
-- >>> splitOnSuffix' (== '.') "a.b."
-- ["a","b"]
--
-- >>> splitOnSuffix' (== '.') "a."
-- ["a"]
--
-- An empty stream results in an empty output stream:
--
-- >>> splitOnSuffix' (== '.') ""
-- []
--
-- An empty segment consisting of only a suffix is folded to the default output
-- of the fold:
--
-- >>> splitOnSuffix' (== '.') "."
-- [""]
--
-- >>> splitOnSuffix' (== '.') "a..b.."
-- ["a","","b",""]
--
-- A suffix is optional at the end of the stream:
--
-- >>> splitOnSuffix' (== '.') "a"
-- ["a"]
--
-- >>> splitOnSuffix' (== '.') ".a"
-- ["","a"]
--
-- >>> splitOnSuffix' (== '.') "a.b"
-- ["a","b"]
--
-- > lines = splitOnSuffix (== '\n')
--
-- 'splitOnSuffix' is an inverse of 'intercalateSuffix' with a single element:
--
-- > Stream.intercalateSuffix (Stream.fromPure '.') Unfold.fromList . Stream.splitOnSuffix (== '.') Fold.toList === id
--
-- Assuming the input stream does not contain the separator:
--
-- > Stream.splitOnSuffix (== '.') Fold.toList . Stream.intercalateSuffix (Stream.fromPure '.') Unfold.fromList === id
--
-- @since 0.7.0

{-# INLINE splitOnSuffix #-}
splitOnSuffix
    :: (IsStream t, Monad m)
    => (a -> Bool) -> Fold m a b -> t m a -> t m b
splitOnSuffix :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> Bool) -> Fold m a b -> t m a -> t m b
splitOnSuffix a -> Bool
predicate Fold m a b
f = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Fold m a b -> t m a -> t m b
foldMany (forall (m :: * -> *) a b.
Monad m =>
(a -> Bool) -> Fold m a b -> Fold m a b
FL.takeEndBy_ a -> Bool
predicate Fold m a b
f)

-- | Split on a prefixed separator element, dropping the separator.  The
-- supplied 'Fold' is applied on the split segments.
--
-- @
-- > splitOnPrefix' p xs = Stream.toList $ Stream.splitOnPrefix p (Fold.toList) (Stream.fromList xs)
-- > splitOnPrefix' (== '.') ".a.b"
-- ["a","b"]
-- @
--
-- An empty stream results in an empty output stream:
-- @
-- > splitOnPrefix' (== '.') ""
-- []
-- @
--
-- An empty segment consisting of only a prefix is folded to the default output
-- of the fold:
--
-- @
-- > splitOnPrefix' (== '.') "."
-- [""]
--
-- > splitOnPrefix' (== '.') ".a.b."
-- ["a","b",""]
--
-- > splitOnPrefix' (== '.') ".a..b"
-- ["a","","b"]
--
-- @
--
-- A prefix is optional at the beginning of the stream:
--
-- @
-- > splitOnPrefix' (== '.') "a"
-- ["a"]
--
-- > splitOnPrefix' (== '.') "a.b"
-- ["a","b"]
-- @
--
-- 'splitOnPrefix' is an inverse of 'intercalatePrefix' with a single element:
--
-- > Stream.intercalatePrefix (Stream.fromPure '.') Unfold.fromList . Stream.splitOnPrefix (== '.') Fold.toList === id
--
-- Assuming the input stream does not contain the separator:
--
-- > Stream.splitOnPrefix (== '.') Fold.toList . Stream.intercalatePrefix (Stream.fromPure '.') Unfold.fromList === id
--
-- /Unimplemented/

{-# INLINE splitOnPrefix #-}
splitOnPrefix :: -- (IsStream t, MonadCatch m) =>
    (a -> Bool) -> Fold m a b -> t m a -> t m b
splitOnPrefix :: forall a (m :: * -> *) b (t :: (* -> *) -> * -> *).
(a -> Bool) -> Fold m a b -> t m a -> t m b
splitOnPrefix a -> Bool
_predicate Fold m a b
_f = forall a. HasCallStack => a
undefined
    -- parseMany (Parser.sliceBeginBy predicate f)

-- | Like 'splitOn' after stripping leading, trailing, and repeated separators.
-- Therefore, @".a..b."@ with '.' as the separator would be parsed as
-- @["a","b"]@.  In other words, its like parsing words from whitespace
-- separated text.
--
-- >>> wordsBy' p xs = Stream.toList $ Stream.wordsBy p Fold.toList (Stream.fromList xs)
--
-- >>> wordsBy' (== ',') ""
-- []
--
-- >>> wordsBy' (== ',') ","
-- []
--
-- >>> wordsBy' (== ',') ",a,,b,"
-- ["a","b"]
--
-- > words = wordsBy isSpace
--
-- @since 0.7.0

-- It is equivalent to splitting in any of the infix/prefix/suffix styles
-- followed by removal of empty segments.
{-# INLINE wordsBy #-}
wordsBy
    :: (IsStream t, Monad m)
    => (a -> Bool) -> Fold m a b -> t m a -> t m b
wordsBy :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> Bool) -> Fold m a b -> t m a -> t m b
wordsBy a -> Bool
predicate Fold m a b
f t m a
m =
    forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
Monad m =>
(a -> Bool) -> Fold m a b -> Stream m a -> Stream m b
D.wordsBy a -> Bool
predicate Fold m a b
f (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m)

-- | Like 'splitOnSuffix' but keeps the suffix attached to the resulting
-- splits.
--
-- >>> splitWithSuffix' p xs = Stream.toList $ splitWithSuffix p Fold.toList (Stream.fromList xs)
--
-- >>> splitWithSuffix' (== '.') ""
-- []
--
-- >>> splitWithSuffix' (== '.') "."
-- ["."]
--
-- >>> splitWithSuffix' (== '.') "a"
-- ["a"]
--
-- >>> splitWithSuffix' (== '.') ".a"
-- [".","a"]
--
-- >>> splitWithSuffix' (== '.') "a."
-- ["a."]
--
-- >>> splitWithSuffix' (== '.') "a.b"
-- ["a.","b"]
--
-- >>> splitWithSuffix' (== '.') "a.b."
-- ["a.","b."]
--
-- >>> splitWithSuffix' (== '.') "a..b.."
-- ["a.",".","b.","."]
--
-- @since 0.7.0

{-# INLINE splitWithSuffix #-}
splitWithSuffix
    :: (IsStream t, Monad m)
    => (a -> Bool) -> Fold m a b -> t m a -> t m b
splitWithSuffix :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> Bool) -> Fold m a b -> t m a -> t m b
splitWithSuffix a -> Bool
predicate Fold m a b
f = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Fold m a b -> t m a -> t m b
foldMany (forall (m :: * -> *) a b.
Monad m =>
(a -> Bool) -> Fold m a b -> Fold m a b
FL.takeEndBy a -> Bool
predicate Fold m a b
f)

------------------------------------------------------------------------------
-- Splitting - on a delimiter sequence
------------------------------------------------------------------------------

-- Int list examples for splitOn:
--
-- >>> splitList [] [1,2,3,3,4]
-- > [[1],[2],[3],[3],[4]]
--
-- >>> splitList [5] [1,2,3,3,4]
-- > [[1,2,3,3,4]]
--
-- >>> splitList [1] [1,2,3,3,4]
-- > [[],[2,3,3,4]]
--
-- >>> splitList [4] [1,2,3,3,4]
-- > [[1,2,3,3],[]]
--
-- >>> splitList [2] [1,2,3,3,4]
-- > [[1],[3,3,4]]
--
-- >>> splitList [3] [1,2,3,3,4]
-- > [[1,2],[],[4]]
--
-- >>> splitList [3,3] [1,2,3,3,4]
-- > [[1,2],[4]]
--
-- >>> splitList [1,2,3,3,4] [1,2,3,3,4]
-- > [[],[]]

-- This can be implemented easily using Rabin Karp
-- | Split on any one of the given patterns.
--
-- /Unimplemented/
--
{-# INLINE splitOnAny #-}
splitOnAny :: -- (IsStream t, Monad m, Unbox a, Integral a) =>
    [Array a] -> Fold m a b -> t m a -> t m b
splitOnAny :: forall a (m :: * -> *) b (t :: (* -> *) -> * -> *).
[Array a] -> Fold m a b -> t m a -> t m b
splitOnAny [Array a]
_subseq Fold m a b
_f t m a
_m =
    forall a. HasCallStack => a
undefined -- D.fromStreamD $ D.splitOnAny f subseq (D.toStreamD m)

-- XXX use a non-monadic intersperse to remove the MonadAsync constraint.
-- XXX Use two folds, one ring buffer fold for separator sequence and the other
-- split consumer fold. The input is fed to the ring fold first and the
-- rejected input is fed to the split fold. If the separator matches, the ring
-- fold would consume all.
--
-- | Like 'splitOnSeq' but splits the separator as well, as an infix token.
--
-- >>> splitOn'_ pat xs = Stream.toList $ Stream.splitBySeq (Array.fromList pat) Fold.toList (Stream.fromList xs)
--
-- >>> splitOn'_ "" "hello"
-- ["h","","e","","l","","l","","o"]
--
-- >>> splitOn'_ "hello" ""
-- [""]
--
-- >>> splitOn'_ "hello" "hello"
-- ["","hello",""]
--
-- >>> splitOn'_ "x" "hello"
-- ["hello"]
--
-- >>> splitOn'_ "h" "hello"
-- ["","h","ello"]
--
-- >>> splitOn'_ "o" "hello"
-- ["hell","o",""]
--
-- >>> splitOn'_ "e" "hello"
-- ["h","e","llo"]
--
-- >>> splitOn'_ "l" "hello"
-- ["he","l","","l","o"]
--
-- >>> splitOn'_ "ll" "hello"
-- ["he","ll","o"]
--
-- /Pre-release/
{-# INLINE splitBySeq #-}
splitBySeq
    :: (IsStream t, MonadAsync m, Storable a, Unbox a, Enum a, Eq a)
    => Array a -> Fold m a b -> t m a -> t m b
splitBySeq :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m, Storable a, Unbox a, Enum a, Eq a) =>
Array a -> Fold m a b -> t m a -> t m b
splitBySeq Array a
patt Fold m a b
f t m a
m =
    forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
m a -> t m a -> t m a
intersperseM (forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> SerialT m a -> m b
fold Fold m a b
f (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. (Monad m, Unbox a) => Array a -> Stream m a
A.read Array a
patt)) forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadIO m, Storable a, Unbox a, Enum a, Eq a) =>
Array a -> Fold m a b -> t m a -> t m b
splitOnSeq Array a
patt Fold m a b
f t m a
m

-- | Like 'splitSuffixBy' but the separator is a sequence of elements, instead
-- of a predicate for a single element.
--
-- >>> splitOnSuffixSeq_ pat xs = Stream.toList $ Stream.splitOnSuffixSeq (Array.fromList pat) Fold.toList (Stream.fromList xs)
--
-- >>> splitOnSuffixSeq_ "." ""
-- []
--
-- >>> splitOnSuffixSeq_ "." "."
-- [""]
--
-- >>> splitOnSuffixSeq_ "." "a"
-- ["a"]
--
-- >>> splitOnSuffixSeq_ "." ".a"
-- ["","a"]
--
-- >>> splitOnSuffixSeq_ "." "a."
-- ["a"]
--
-- >>> splitOnSuffixSeq_ "." "a.b"
-- ["a","b"]
--
-- >>> splitOnSuffixSeq_ "." "a.b."
-- ["a","b"]
--
-- >>> splitOnSuffixSeq_ "." "a..b.."
-- ["a","","b",""]
--
-- > lines = splitOnSuffixSeq "\n"
--
-- 'splitOnSuffixSeq' is an inverse of 'intercalateSuffix'. The following law
-- always holds:
--
-- > intercalateSuffix . splitOnSuffixSeq == id
--
-- The following law holds when the separator is non-empty and contains none of
-- the elements present in the input lists:
--
-- > splitSuffixOn . intercalateSuffix == id
--
-- >>> splitOnSuffixSeq pat f = Stream.foldMany (Fold.takeEndBySeq_ pat f)
--
-- /Pre-release/
{-# INLINE splitOnSuffixSeq #-}
splitOnSuffixSeq
    :: (IsStream t, MonadIO m, Storable a, Unbox a, Enum a, Eq a)
    => Array a -> Fold m a b -> t m a -> t m b
splitOnSuffixSeq :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadIO m, Storable a, Unbox a, Enum a, Eq a) =>
Array a -> Fold m a b -> t m a -> t m b
splitOnSuffixSeq Array a
patt Fold m a b
f t m a
m =
    forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
(MonadIO m, Storable a, Unbox a, Enum a, Eq a) =>
Bool -> Array a -> Fold m a b -> Stream m a -> Stream m b
D.splitOnSuffixSeq Bool
False Array a
patt Fold m a b
f (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m)
-- XXX This may have a problem if the stream terminates and we start extracting
-- the fold and then the fold terminates before consuming all the buffered
-- input.  We have no way to supply the buffered input back to the driver.
-- splitOnSuffixSeq patt f =
--     foldMany (FL.takeEndBySeq_ patt f)

-- | Like 'splitOn' but drops any empty splits.
--
-- /Unimplemented/
{-# INLINE wordsOn #-}
wordsOn :: -- (IsStream t, Monad m, Unbox a, Eq a) =>
    Array a -> Fold m a b -> t m a -> t m b
wordsOn :: forall a (m :: * -> *) b (t :: (* -> *) -> * -> *).
Array a -> Fold m a b -> t m a -> t m b
wordsOn Array a
_subseq Fold m a b
_f t m a
_m =
    forall a. HasCallStack => a
undefined -- D.fromStreamD $ D.wordsOn f subseq (D.toStreamD m)

-- | Like 'splitOnSuffixSeq' but keeps the suffix intact in the splits.
--
-- >>> splitWithSuffixSeq' pat xs = Stream.toList $ Stream.splitWithSuffixSeq (Array.fromList pat) Fold.toList (Stream.fromList xs)
--
-- >>> splitWithSuffixSeq' "." ""
-- []
--
-- >>> splitWithSuffixSeq' "." "."
-- ["."]
--
-- >>> splitWithSuffixSeq' "." "a"
-- ["a"]
--
-- >>> splitWithSuffixSeq' "." ".a"
-- [".","a"]
--
-- >>> splitWithSuffixSeq' "." "a."
-- ["a."]
--
-- >>> splitWithSuffixSeq' "." "a.b"
-- ["a.","b"]
--
-- >>> splitWithSuffixSeq' "." "a.b."
-- ["a.","b."]
--
-- >>> splitWithSuffixSeq' "." "a..b.."
-- ["a.",".","b.","."]
--
-- >>> splitWithSuffixSeq pat f = Stream.foldMany (Fold.takeEndBySeq pat f)
--
-- /Pre-release/
{-# INLINE splitWithSuffixSeq #-}
splitWithSuffixSeq
    :: (IsStream t, MonadIO m, Storable a, Unbox a, Enum a, Eq a)
    => Array a -> Fold m a b -> t m a -> t m b
splitWithSuffixSeq :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadIO m, Storable a, Unbox a, Enum a, Eq a) =>
Array a -> Fold m a b -> t m a -> t m b
splitWithSuffixSeq Array a
patt Fold m a b
f t m a
m =
    forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
(MonadIO m, Storable a, Unbox a, Enum a, Eq a) =>
Bool -> Array a -> Fold m a b -> Stream m a -> Stream m b
D.splitOnSuffixSeq Bool
True Array a
patt Fold m a b
f (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m a
m)
-- splitWithSuffixSeq patt f =
--     foldMany (FL.takeEndBySeq patt f)

-- This can be implemented easily using Rabin Karp
-- | Split post any one of the given patterns.
--
-- /Unimplemented/
{-# INLINE splitOnSuffixSeqAny #-}
splitOnSuffixSeqAny :: -- (IsStream t, Monad m, Unbox a, Integral a) =>
    [Array a] -> Fold m a b -> t m a -> t m b
splitOnSuffixSeqAny :: forall a (m :: * -> *) b (t :: (* -> *) -> * -> *).
[Array a] -> Fold m a b -> t m a -> t m b
splitOnSuffixSeqAny [Array a]
_subseq Fold m a b
_f t m a
_m = forall a. HasCallStack => a
undefined
    -- D.fromStreamD $ D.splitPostAny f subseq (D.toStreamD m)

------------------------------------------------------------------------------
-- Chunking
------------------------------------------------------------------------------

-- | Group the input stream into groups of @n@ elements each and then fold each
-- group using the provided fold function.
--
-- >>> Stream.toList $ Stream.chunksOf 2 Fold.sum (Stream.enumerateFromTo 1 10)
-- [3,7,11,15,19]
--
-- This can be considered as an n-fold version of 'take' where we apply
-- 'take' repeatedly on the leftover stream until the stream exhausts.
--
-- @chunksOf n f = foldMany (FL.take n f)@
--
-- @since 0.7.0
{-# INLINE chunksOf #-}
chunksOf
    :: (IsStream t, Monad m)
    => Int -> Fold m a b -> t m a -> t m b
chunksOf :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Int -> Fold m a b -> t m a -> t m b
chunksOf Int
n Fold m a b
f = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a b.
Monad m =>
Int -> Fold m a b -> Stream m a -> Stream m b
D.groupsOf Int
n Fold m a b
f forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD

-- | @arraysOf n stream@ groups the elements in the input stream into arrays of
-- @n@ elements each.
--
-- Same as the following but may be more efficient:
--
-- > arraysOf n = Stream.foldMany (A.writeN n)
--
-- /Pre-release/
{-# INLINE arraysOf #-}
arraysOf :: (IsStream t, MonadIO m, Unbox a)
    => Int -> t m a -> t m (Array a)
arraysOf :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadIO m, Unbox a) =>
Int -> t m a -> t m (Array a)
arraysOf Int
n = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a.
(MonadIO m, Unbox a) =>
Int -> Stream m a -> Stream m (Array a)
A.chunksOf Int
n forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD

-- XXX we can implement this by repeatedly applying the 'lrunFor' fold.
-- XXX add this example after fixing the serial stream rate control
--
-- | Group the input stream into windows of @n@ second each and then fold each
-- group using the provided fold function.
--
-- >>> Stream.toList $ Stream.take 5 $ Stream.intervalsOf 1 Fold.sum $ Stream.constRate 2 $ Stream.enumerateFrom 1
-- [...,...,...,...,...]
--
-- @since 0.7.0
{-# INLINE intervalsOf #-}
intervalsOf
    :: (IsStream t, MonadAsync m)
    => Double -> Fold m a b -> t m a -> t m b
intervalsOf :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
Double -> Fold m a b -> t m a -> t m b
intervalsOf Double
n Fold m a b
f t m a
xs =
    forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> Bool) -> Fold m a b -> t m a -> t m b
splitWithSuffix forall a. Maybe a -> Bool
isNothing (forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Fold m (Maybe a) b
FL.catMaybes Fold m a b
f)
        (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
Double -> m a -> t m a -> t m a
interjectSuffix Double
n (forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing) (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
map forall a. a -> Maybe a
Just t m a
xs))

-- XXX This can be implemented more efficiently by sharing a Clock.Timer across
-- parallel threads and resetting it whenever a span is emitted.
--
-- | Like 'chunksOf' but if the chunk is not completed within the specified
-- time interval then emit whatever we have collected till now. The chunk
-- timeout is reset whenever a chunk is emitted. The granularity of the clock
-- is 100 ms.
--
-- >>> s = Stream.delayPost 0.3 $ Stream.fromList [1..1000]
-- >>> f = Stream.mapM_ print $ Stream.chunksOfTimeout 5 1 Fold.toList s
--
-- /Pre-release/
{-# INLINE chunksOfTimeout #-}
chunksOfTimeout :: (IsStream t, MonadAsync m, Functor (t m))
    => Int -> Double -> FL.Fold m a b -> t m a -> t m b
chunksOfTimeout :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m, Functor (t m)) =>
Int -> Double -> Fold m a b -> t m a -> t m b
chunksOfTimeout Int
n Double
timeout Fold m a b
f =
      forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
map forall a b. (a, b) -> b
snd
    forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
(IsStream t, MonadAsync m, Ord k) =>
Double
-> Bool
-> (Int -> m Bool)
-> Double
-> Fold m a b
-> t m (AbsTime, (k, a))
-> t m (k, b)
classifySessionsBy
        Double
0.1 Bool
False (forall a b. a -> b -> a
const (forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False)) Double
timeout (forall (m :: * -> *) a b.
Monad m =>
Int -> Fold m a b -> Fold m a b
FL.take Int
n Fold m a b
f)
    forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m, Functor (t m)) =>
t m a -> t m (AbsTime, a)
Transform.timestamped
    forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
map ((),)

------------------------------------------------------------------------------
-- Windowed classification
------------------------------------------------------------------------------

-- TODO: To mark the position in space or time we can have Indexed or
-- TimeStamped types. That can make it easy to deal with the position indices
-- or timestamps.

------------------------------------------------------------------------------
-- Keyed Sliding Windows
------------------------------------------------------------------------------

{-
{-# INLINABLE classifySlidingChunks #-}
classifySlidingChunks
    :: (IsStream t, MonadAsync m, Ord k)
    => Int              -- ^ window size
    -> Int              -- ^ window slide
    -> Fold m a b       -- ^ Fold to be applied to window events
    -> t m (k, a, Bool) -- ^ window key, data, close event
    -> t m (k, b)
classifySlidingChunks wsize wslide (Fold step initial extract) str
    = undefined

-- XXX Another variant could be to slide the window on an event, e.g. in TCP we
-- slide the send window when an ack is received and we slide the receive
-- window when a sequence is complete. Sliding is stateful in case of TCP,
-- sliding releases the send buffer or makes data available to the user from
-- the receive buffer.
{-# INLINABLE classifySlidingSessions #-}
classifySlidingSessions
    :: (IsStream t, MonadAsync m, Ord k)
    => Double         -- ^ timer tick in seconds
    -> Double         -- ^ time window size
    -> Double         -- ^ window slide
    -> Fold m a b     -- ^ Fold to be applied to window events
    -> t m (k, a, Bool, AbsTime) -- ^ window key, data, close flag, timestamp
    -> t m (k, b)
classifySlidingSessions tick interval slide (Fold step initial extract) str
    = undefined
-}

------------------------------------------------------------------------------
-- Sliding Window Buffers
------------------------------------------------------------------------------

-- These buffered versions could be faster than concurrent incremental folds of
-- all overlapping windows as in many cases we may not need all the values to
-- compute the fold, we can just compute the result using the old value and new
-- value.  However, we may need the buffer once in a while, for example for
-- string search we usually compute the hash incrementally but when the hash
-- matches the hash of the pattern we need to compare the whole string.
--
-- XXX we should be able to implement sequence based splitting combinators
-- using this combinator.

{-
-- | Buffer n elements of the input in a ring buffer. When t new elements are
-- collected, slide the window to remove the same number of oldest elements,
-- insert the new elements, and apply an incremental fold on the sliding
-- window, supplying the outgoing elements, the new ring buffer as arguments.
slidingChunkBuffer
    :: (IsStream t, Monad m, Ord a, Unbox a)
    => Int -- window size
    -> Int -- window slide
    -> Fold m (Ring a, Array a) b
    -> t m a
    -> t m b
slidingChunkBuffer = undefined

-- Buffer n seconds worth of stream elements of the input in a radix tree.
-- Every t seconds, remove the items that are older than n seconds, and apply
-- an incremental fold on the sliding window, supplying the outgoing elements,
-- and the new radix tree buffer as arguments.
slidingSessionBuffer
    :: (IsStream t, Monad m, Ord a, Unbox a)
    => Int    -- window size
    -> Int    -- tick size
    -> Fold m (RTree a, Array a) b
    -> t m a
    -> t m b
slidingSessionBuffer = undefined
-}

------------------------------------------------------------------------------
-- Keyed Session Windows
------------------------------------------------------------------------------

{-
-- | Keyed variable size space windows. Close the window if we do not receive a
-- window event in the next "spaceout" elements.
{-# INLINABLE classifyChunksBy #-}
classifyChunksBy
    :: (IsStream t, MonadAsync m, Ord k)
    => Int   -- ^ window spaceout (spread)
    -> Bool  -- ^ reset the spaceout when a chunk window element is received
    -> Fold m a b       -- ^ Fold to be applied to chunk window elements
    -> t m (k, a, Bool) -- ^ chunk key, data, last element
    -> t m (k, b)
classifyChunksBy spanout reset (Fold step initial extract) str = undefined

-- | Like 'classifyChunksOf' but the chunk size is reset if an element is
-- received within the chunk size window. The chunk gets closed only if no
-- element is received within the chunk window.
--
{-# INLINABLE classifyKeepAliveChunks #-}
classifyKeepAliveChunks
    :: (IsStream t, MonadAsync m, Ord k)
    => Int   -- ^ window spaceout (spread)
    -> Fold m a b       -- ^ Fold to be applied to chunk window elements
    -> t m (k, a, Bool) -- ^ chunk key, data, last element
    -> t m (k, b)
classifyKeepAliveChunks spanout = classifyChunksBy spanout True
-}

data SessionState t m f s b = SessionState
    { forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> AbsTime
sessionCurTime :: !AbsTime  -- ^ time since last event
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> AbsTime
sessionEventTime :: !AbsTime -- ^ time as per last event
    -- We can use the Map size instead of maintaining a count, but if we have
    -- to switch to HashMap then it can be useful.
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> Int
sessionCount :: !Int -- ^ total number of sessions in progress
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> Heap (Entry AbsTime (Key f))
sessionTimerHeap :: H.Heap (H.Entry AbsTime (Key f)) -- ^ heap for timeouts
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> f s
sessionKeyValueMap :: f s -- ^ Stored sessions for keys
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> t m (Key f, b)
sessionOutputStream :: t (m :: Type -> Type) (Key f, b) -- ^ Completed sessions
    }

data SessionEntry s = LiveSession !AbsTime !s | ZombieSession

-- delete from map and output the fold accumulator
ejectEntry :: (Monad m, IsStream t, IsMap f) =>
    (acc -> m b)
    -> heap
    -> f entry
    -> t m (Key f, b)
    -> Int
    -> acc
    -> Key f
    -> m (heap, f entry, t m (Key f, b), Int)
ejectEntry :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) (f :: * -> *) acc b
       heap entry.
(Monad m, IsStream t, IsMap f) =>
(acc -> m b)
-> heap
-> f entry
-> t m (Key f, b)
-> Int
-> acc
-> Key f
-> m (heap, f entry, t m (Key f, b), Int)
ejectEntry acc -> m b
extract heap
hp f entry
mp t m (Key f, b)
out Int
cnt acc
acc Key f
key = do
    b
sess <- acc -> m b
extract acc
acc
    let out1 :: t m (Key f, b)
out1 = (Key f
key, b
sess) forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a -> t m a
`cons` t m (Key f, b)
out
    let mp1 :: f entry
mp1 = forall (f :: * -> *) a. IsMap f => Key f -> f a -> f a
IsMap.mapDelete Key f
key f entry
mp
    forall (m :: * -> *) a. Monad m => a -> m a
return (heap
hp, f entry
mp1, t m (Key f, b)
out1, Int
cnt forall a. Num a => a -> a -> a
- Int
1)

{-# NOINLINE flush #-}
flush :: (Monad m, IsMap f, IsStream t) =>
       (s -> m b)
    -> SessionState t m f (SessionEntry s) b
    -> m (SessionState t m f (SessionEntry s) b)
flush :: forall (m :: * -> *) (f :: * -> *) (t :: (* -> *) -> * -> *) s b.
(Monad m, IsMap f, IsStream t) =>
(s -> m b)
-> SessionState t m f (SessionEntry s) b
-> m (SessionState t m f (SessionEntry s) b)
flush s -> m b
extract session :: SessionState t m f (SessionEntry s) b
session@SessionState{f (SessionEntry s)
t m (Key f, b)
Int
Heap (Entry AbsTime (Key f))
AbsTime
sessionOutputStream :: t m (Key f, b)
sessionKeyValueMap :: f (SessionEntry s)
sessionTimerHeap :: Heap (Entry AbsTime (Key f))
sessionCount :: Int
sessionEventTime :: AbsTime
sessionCurTime :: AbsTime
sessionOutputStream :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> t m (Key f, b)
sessionKeyValueMap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> f s
sessionTimerHeap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> Heap (Entry AbsTime (Key f))
sessionCount :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> Int
sessionEventTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> AbsTime
sessionCurTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> AbsTime
..} = do
    (Heap (Entry AbsTime (Key f))
hp', f (SessionEntry s)
mp', t m (Key f, b)
out, Int
count) <-
        forall {f :: * -> *} {t :: (* -> *) -> * -> *} {p}.
(IsMap f, IsStream t) =>
(Heap (Entry p (Key f)), f (SessionEntry s), t m (Key f, b), Int)
-> m (Heap (Entry p (Key f)), f (SessionEntry s), t m (Key f, b),
      Int)
ejectAll
            ( Heap (Entry AbsTime (Key f))
sessionTimerHeap
            , f (SessionEntry s)
sessionKeyValueMap
            , forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
IsStream.nil
            , Int
sessionCount
            )
    forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ SessionState t m f (SessionEntry s) b
session
        { sessionCount :: Int
sessionCount = Int
count
        , sessionTimerHeap :: Heap (Entry AbsTime (Key f))
sessionTimerHeap = Heap (Entry AbsTime (Key f))
hp'
        , sessionKeyValueMap :: f (SessionEntry s)
sessionKeyValueMap = f (SessionEntry s)
mp'
        , sessionOutputStream :: t m (Key f, b)
sessionOutputStream = t m (Key f, b)
out
        }

    where

    ejectAll :: (Heap (Entry p (Key f)), f (SessionEntry s), t m (Key f, b), Int)
-> m (Heap (Entry p (Key f)), f (SessionEntry s), t m (Key f, b),
      Int)
ejectAll (Heap (Entry p (Key f))
hp, f (SessionEntry s)
mp, t m (Key f, b)
out, !Int
cnt) = do
        let hres :: Maybe (Entry p (Key f), Heap (Entry p (Key f)))
hres = forall a. Heap a -> Maybe (a, Heap a)
H.uncons Heap (Entry p (Key f))
hp
        case Maybe (Entry p (Key f), Heap (Entry p (Key f)))
hres of
            Just (Entry p
_ Key f
key, Heap (Entry p (Key f))
hp1) -> do
                (Heap (Entry p (Key f)), f (SessionEntry s), t m (Key f, b), Int)
r <- case forall (f :: * -> *) a. IsMap f => Key f -> f a -> Maybe a
IsMap.mapLookup Key f
key f (SessionEntry s)
mp of
                    Maybe (SessionEntry s)
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry p (Key f))
hp1, f (SessionEntry s)
mp, t m (Key f, b)
out, Int
cnt)
                    Just SessionEntry s
ZombieSession ->
                        forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry p (Key f))
hp1, forall (f :: * -> *) a. IsMap f => Key f -> f a -> f a
IsMap.mapDelete Key f
key f (SessionEntry s)
mp, t m (Key f, b)
out, Int
cnt)
                    Just (LiveSession AbsTime
_ s
acc) ->
                        forall (m :: * -> *) (t :: (* -> *) -> * -> *) (f :: * -> *) acc b
       heap entry.
(Monad m, IsStream t, IsMap f) =>
(acc -> m b)
-> heap
-> f entry
-> t m (Key f, b)
-> Int
-> acc
-> Key f
-> m (heap, f entry, t m (Key f, b), Int)
ejectEntry s -> m b
extract Heap (Entry p (Key f))
hp1 f (SessionEntry s)
mp t m (Key f, b)
out Int
cnt s
acc Key f
key
                (Heap (Entry p (Key f)), f (SessionEntry s), t m (Key f, b), Int)
-> m (Heap (Entry p (Key f)), f (SessionEntry s), t m (Key f, b),
      Int)
ejectAll (Heap (Entry p (Key f)), f (SessionEntry s), t m (Key f, b), Int)
r
            Maybe (Entry p (Key f), Heap (Entry p (Key f)))
Nothing -> do
                forall a. HasCallStack => Bool -> a -> a
assert (forall (f :: * -> *) a. IsMap f => f a -> Bool
IsMap.mapNull f (SessionEntry s)
mp) (forall (m :: * -> *) a. Monad m => a -> m a
return ())
                forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry p (Key f))
hp, f (SessionEntry s)
mp, t m (Key f, b)
out, Int
cnt)

{-# NOINLINE ejectOne #-}
ejectOne :: (IsMap f, IsStream t, Monad m) =>
       Bool
    -> (acc -> m b)
    -> ( H.Heap (Entry AbsTime (Key f))
       , f (SessionEntry acc)
       , t m (Key f, b)
       , Int
       )
    -> m ( H.Heap (Entry AbsTime (Key f))
         , f (SessionEntry acc)
         , t m (Key f, b), Int
         )
ejectOne :: forall (f :: * -> *) (t :: (* -> *) -> * -> *) (m :: * -> *) acc b.
(IsMap f, IsStream t, Monad m) =>
Bool
-> (acc -> m b)
-> (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
    t m (Key f, b), Int)
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
      t m (Key f, b), Int)
ejectOne Bool
reset acc -> m b
extract = forall {f :: * -> *} {t :: (* -> *) -> * -> *}.
(IsMap f, IsStream t) =>
(Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
 t m (Key f, b), Int)
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
      t m (Key f, b), Int)
go

    where

    go :: (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
 t m (Key f, b), Int)
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
      t m (Key f, b), Int)
go (Heap (Entry AbsTime (Key f))
hp, f (SessionEntry acc)
mp, t m (Key f, b)
out, !Int
cnt) = do
        let hres :: Maybe (Entry AbsTime (Key f), Heap (Entry AbsTime (Key f)))
hres = forall a. Heap a -> Maybe (a, Heap a)
H.uncons Heap (Entry AbsTime (Key f))
hp
        case Maybe (Entry AbsTime (Key f), Heap (Entry AbsTime (Key f)))
hres of
            Just (Entry AbsTime
expiry Key f
key, Heap (Entry AbsTime (Key f))
hp1) ->
                case forall (f :: * -> *) a. IsMap f => Key f -> f a -> Maybe a
IsMap.mapLookup Key f
key f (SessionEntry acc)
mp of
                    Maybe (SessionEntry acc)
Nothing -> (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
 t m (Key f, b), Int)
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
      t m (Key f, b), Int)
go (Heap (Entry AbsTime (Key f))
hp1, f (SessionEntry acc)
mp, t m (Key f, b)
out, Int
cnt)
                    Just SessionEntry acc
ZombieSession ->
                        (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
 t m (Key f, b), Int)
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
      t m (Key f, b), Int)
go (Heap (Entry AbsTime (Key f))
hp1, forall (f :: * -> *) a. IsMap f => Key f -> f a -> f a
IsMap.mapDelete Key f
key f (SessionEntry acc)
mp, t m (Key f, b)
out, Int
cnt)
                    Just (LiveSession AbsTime
expiry1 acc
acc) -> do
                        if Bool -> Bool
not Bool
reset Bool -> Bool -> Bool
|| AbsTime
expiry1 forall a. Ord a => a -> a -> Bool
<= AbsTime
expiry
                        then forall (m :: * -> *) (t :: (* -> *) -> * -> *) (f :: * -> *) acc b
       heap entry.
(Monad m, IsStream t, IsMap f) =>
(acc -> m b)
-> heap
-> f entry
-> t m (Key f, b)
-> Int
-> acc
-> Key f
-> m (heap, f entry, t m (Key f, b), Int)
ejectEntry acc -> m b
extract Heap (Entry AbsTime (Key f))
hp1 f (SessionEntry acc)
mp t m (Key f, b)
out Int
cnt acc
acc Key f
key
                        else
                            -- reset the session timeout and continue
                            let hp2 :: Heap (Entry AbsTime (Key f))
hp2 = forall a. Ord a => a -> Heap a -> Heap a
H.insert (forall p a. p -> a -> Entry p a
Entry AbsTime
expiry1 Key f
key) Heap (Entry AbsTime (Key f))
hp1
                            in (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
 t m (Key f, b), Int)
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
      t m (Key f, b), Int)
go (Heap (Entry AbsTime (Key f))
hp2, f (SessionEntry acc)
mp, t m (Key f, b)
out, Int
cnt)
            Maybe (Entry AbsTime (Key f), Heap (Entry AbsTime (Key f)))
Nothing -> do
                forall a. HasCallStack => Bool -> a -> a
assert (forall (f :: * -> *) a. IsMap f => f a -> Bool
IsMap.mapNull f (SessionEntry acc)
mp) (forall (m :: * -> *) a. Monad m => a -> m a
return ())
                forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry AbsTime (Key f))
hp, f (SessionEntry acc)
mp, t m (Key f, b)
out, Int
cnt)

{-# NOINLINE ejectExpired #-}
ejectExpired :: (IsMap f, IsStream t, Monad m) =>
       Bool
    -> (Int -> m Bool)
    -> (acc -> m b)
    -> SessionState t m f (SessionEntry acc) b
    -> AbsTime
    -> m (SessionState t m f (SessionEntry acc) b)
ejectExpired :: forall (f :: * -> *) (t :: (* -> *) -> * -> *) (m :: * -> *) acc b.
(IsMap f, IsStream t, Monad m) =>
Bool
-> (Int -> m Bool)
-> (acc -> m b)
-> SessionState t m f (SessionEntry acc) b
-> AbsTime
-> m (SessionState t m f (SessionEntry acc) b)
ejectExpired Bool
reset Int -> m Bool
ejectPred acc -> m b
extract session :: SessionState t m f (SessionEntry acc) b
session@SessionState{f (SessionEntry acc)
t m (Key f, b)
Int
Heap (Entry AbsTime (Key f))
AbsTime
sessionOutputStream :: t m (Key f, b)
sessionKeyValueMap :: f (SessionEntry acc)
sessionTimerHeap :: Heap (Entry AbsTime (Key f))
sessionCount :: Int
sessionEventTime :: AbsTime
sessionCurTime :: AbsTime
sessionOutputStream :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> t m (Key f, b)
sessionKeyValueMap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> f s
sessionTimerHeap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> Heap (Entry AbsTime (Key f))
sessionCount :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> Int
sessionEventTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> AbsTime
sessionCurTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> AbsTime
..} AbsTime
curTime = do
    (Heap (Entry AbsTime (Key f))
hp', f (SessionEntry acc)
mp', t m (Key f, b)
out, Int
count) <-
        forall {f :: * -> *} {t :: (* -> *) -> * -> *}.
(IsMap f, IsStream t) =>
Heap (Entry AbsTime (Key f))
-> f (SessionEntry acc)
-> t m (Key f, b)
-> Int
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
      t m (Key f, b), Int)
ejectLoop
            Heap (Entry AbsTime (Key f))
sessionTimerHeap f (SessionEntry acc)
sessionKeyValueMap forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
IsStream.nil Int
sessionCount
    forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ SessionState t m f (SessionEntry acc) b
session
        { sessionCurTime :: AbsTime
sessionCurTime = AbsTime
curTime
        , sessionCount :: Int
sessionCount = Int
count
        , sessionTimerHeap :: Heap (Entry AbsTime (Key f))
sessionTimerHeap = Heap (Entry AbsTime (Key f))
hp'
        , sessionKeyValueMap :: f (SessionEntry acc)
sessionKeyValueMap = f (SessionEntry acc)
mp'
        , sessionOutputStream :: t m (Key f, b)
sessionOutputStream = t m (Key f, b)
out
        }

    where

    ejectLoop :: Heap (Entry AbsTime (Key f))
-> f (SessionEntry acc)
-> t m (Key f, b)
-> Int
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
      t m (Key f, b), Int)
ejectLoop Heap (Entry AbsTime (Key f))
hp f (SessionEntry acc)
mp t m (Key f, b)
out !Int
cnt = do
        let hres :: Maybe (Entry AbsTime (Key f), Heap (Entry AbsTime (Key f)))
hres = forall a. Heap a -> Maybe (a, Heap a)
H.uncons Heap (Entry AbsTime (Key f))
hp
        case Maybe (Entry AbsTime (Key f), Heap (Entry AbsTime (Key f)))
hres of
            Just (Entry AbsTime
expiry Key f
key, Heap (Entry AbsTime (Key f))
hp1) -> do
                (Bool
eject, Bool
force) <-
                    if AbsTime
curTime forall a. Ord a => a -> a -> Bool
>= AbsTime
expiry
                    then forall (m :: * -> *) a. Monad m => a -> m a
return (Bool
True, Bool
False)
                    else do
                        Bool
r <- Int -> m Bool
ejectPred Int
cnt
                        forall (m :: * -> *) a. Monad m => a -> m a
return (Bool
r, Bool
r)
                if Bool
eject
                then
                    case forall (f :: * -> *) a. IsMap f => Key f -> f a -> Maybe a
IsMap.mapLookup Key f
key f (SessionEntry acc)
mp of
                        Maybe (SessionEntry acc)
Nothing -> Heap (Entry AbsTime (Key f))
-> f (SessionEntry acc)
-> t m (Key f, b)
-> Int
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
      t m (Key f, b), Int)
ejectLoop Heap (Entry AbsTime (Key f))
hp1 f (SessionEntry acc)
mp t m (Key f, b)
out Int
cnt
                        Just SessionEntry acc
ZombieSession ->
                            Heap (Entry AbsTime (Key f))
-> f (SessionEntry acc)
-> t m (Key f, b)
-> Int
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
      t m (Key f, b), Int)
ejectLoop Heap (Entry AbsTime (Key f))
hp1 (forall (f :: * -> *) a. IsMap f => Key f -> f a -> f a
IsMap.mapDelete Key f
key f (SessionEntry acc)
mp) t m (Key f, b)
out Int
cnt
                        Just (LiveSession AbsTime
expiry1 acc
acc) -> do
                            if AbsTime
expiry1 forall a. Ord a => a -> a -> Bool
<= AbsTime
curTime Bool -> Bool -> Bool
|| Bool -> Bool
not Bool
reset Bool -> Bool -> Bool
|| Bool
force
                            then do
                                (Heap (Entry AbsTime (Key f))
hp2,f (SessionEntry acc)
mp1,t m (Key f, b)
out1,Int
cnt1) <-
                                    forall (m :: * -> *) (t :: (* -> *) -> * -> *) (f :: * -> *) acc b
       heap entry.
(Monad m, IsStream t, IsMap f) =>
(acc -> m b)
-> heap
-> f entry
-> t m (Key f, b)
-> Int
-> acc
-> Key f
-> m (heap, f entry, t m (Key f, b), Int)
ejectEntry acc -> m b
extract Heap (Entry AbsTime (Key f))
hp1 f (SessionEntry acc)
mp t m (Key f, b)
out Int
cnt acc
acc Key f
key
                                Heap (Entry AbsTime (Key f))
-> f (SessionEntry acc)
-> t m (Key f, b)
-> Int
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
      t m (Key f, b), Int)
ejectLoop Heap (Entry AbsTime (Key f))
hp2 f (SessionEntry acc)
mp1 t m (Key f, b)
out1 Int
cnt1
                            else
                                -- reset the session timeout and continue
                                let hp2 :: Heap (Entry AbsTime (Key f))
hp2 = forall a. Ord a => a -> Heap a -> Heap a
H.insert (forall p a. p -> a -> Entry p a
Entry AbsTime
expiry1 Key f
key) Heap (Entry AbsTime (Key f))
hp1
                                in Heap (Entry AbsTime (Key f))
-> f (SessionEntry acc)
-> t m (Key f, b)
-> Int
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
      t m (Key f, b), Int)
ejectLoop Heap (Entry AbsTime (Key f))
hp2 f (SessionEntry acc)
mp t m (Key f, b)
out Int
cnt
                else forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry AbsTime (Key f))
hp, f (SessionEntry acc)
mp, t m (Key f, b)
out, Int
cnt)
            Maybe (Entry AbsTime (Key f), Heap (Entry AbsTime (Key f)))
Nothing -> do
                forall a. HasCallStack => Bool -> a -> a
assert (forall (f :: * -> *) a. IsMap f => f a -> Bool
IsMap.mapNull f (SessionEntry acc)
mp) (forall (m :: * -> *) a. Monad m => a -> m a
return ())
                forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry AbsTime (Key f))
hp, f (SessionEntry acc)
mp, t m (Key f, b)
out, Int
cnt)

-- XXX Use mutable IORef in accumulator
{-# INLINE classifySessionsByGeneric #-}
classifySessionsByGeneric
    :: forall t m f a b. (IsStream t, MonadAsync m, IsMap f)
    => Proxy (f :: (Type -> Type))
    -> Double         -- ^ timer tick in seconds
    -> Bool           -- ^ reset the timer when an event is received
    -> (Int -> m Bool) -- ^ predicate to eject sessions based on session count
    -> Double         -- ^ session timeout in seconds
    -> Fold m a b  -- ^ Fold to be applied to session data
    -> t m (AbsTime, (Key f, a)) -- ^ timestamp, (session key, session data)
    -> t m (Key f, b) -- ^ session key, fold result
classifySessionsByGeneric :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) a b.
(IsStream t, MonadAsync m, IsMap f) =>
Proxy f
-> Double
-> Bool
-> (Int -> m Bool)
-> Double
-> Fold m a b
-> t m (AbsTime, (Key f, a))
-> t m (Key f, b)
classifySessionsByGeneric Proxy f
_ Double
tick Bool
reset Int -> m Bool
ejectPred Double
tmout
    (Fold s -> a -> m (Step s b)
step m (Step s b)
initial s -> m b
extract) t m (AbsTime, (Key f, a))
input =
    forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Unfold m a b -> t m a -> t m b
Expand.unfoldMany
        (forall a c (m :: * -> *) b.
(a -> c) -> Unfold m c b -> Unfold m a b
Unfold.lmap (forall (m :: * -> *) a. SerialT m a -> Stream m a
toStreamK forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> t m (Key f, b)
sessionOutputStream) forall (m :: * -> *) a. Applicative m => Unfold m (StreamK m a) a
Unfold.fromStreamK)
        forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
(b -> a -> m b) -> m b -> (b -> m b) -> t m a -> t m b
scanlMAfter' forall {t :: (* -> *) -> * -> *} {f :: * -> *}.
(IsStream t, IsMap f) =>
SessionState t m f (SessionEntry s) b
-> Maybe (AbsTime, (Key f, a))
-> m (SessionState t m f (SessionEntry s) b)
sstep (forall (m :: * -> *) a. Monad m => a -> m a
return forall {m :: * -> *} {s} {b}. SessionState SerialT m f s b
szero) (forall (m :: * -> *) (f :: * -> *) (t :: (* -> *) -> * -> *) s b.
(Monad m, IsMap f, IsStream t) =>
(s -> m b)
-> SessionState t m f (SessionEntry s) b
-> m (SessionState t m f (SessionEntry s) b)
flush s -> m b
extract)
        forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
Double -> m a -> t m a -> t m a
interjectSuffix Double
tick (forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing)
        forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
map forall a. a -> Maybe a
Just t m (AbsTime, (Key f, a))
input

    where

    timeoutMs :: RelTime
timeoutMs = forall a. TimeUnit a => a -> RelTime
toRelTime (forall a b. (RealFrac a, Integral b) => a -> b
round (Double
tmout forall a. Num a => a -> a -> a
* Double
1000) :: MilliSecond64)
    tickMs :: RelTime
tickMs = forall a. TimeUnit a => a -> RelTime
toRelTime (forall a b. (RealFrac a, Integral b) => a -> b
round (Double
tick forall a. Num a => a -> a -> a
* Double
1000) :: MilliSecond64)
    szero :: SessionState SerialT m f s b
szero = SessionState
        { sessionCurTime :: AbsTime
sessionCurTime = forall a. TimeUnit a => a -> AbsTime
toAbsTime (MilliSecond64
0 :: MilliSecond64)
        , sessionEventTime :: AbsTime
sessionEventTime = forall a. TimeUnit a => a -> AbsTime
toAbsTime (MilliSecond64
0 :: MilliSecond64)
        , sessionCount :: Int
sessionCount = Int
0
        , sessionTimerHeap :: Heap (Entry AbsTime (Key f))
sessionTimerHeap = forall a. Heap a
H.empty
        , sessionKeyValueMap :: f s
sessionKeyValueMap = forall (f :: * -> *) a. IsMap f => f a
IsMap.mapEmpty :: f s
        , sessionOutputStream :: SerialT m (Key f, b)
sessionOutputStream = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
IsStream.nil
        }

    -- We can eject sessions based on the current session count to limit
    -- memory consumption. There are two possible strategies:
    --
    -- 1) Eject old sessions or sessions beyond a certain/lower timeout
    -- threshold even before timeout, effectively reduce the timeout.
    -- 2) Drop creation of new sessions but keep accepting new events for the
    -- old ones.
    --
    -- We use the first strategy as of now.

    -- Got a new stream input element
    sstep :: SessionState t m f (SessionEntry s) b
-> Maybe (AbsTime, (Key f, a))
-> m (SessionState t m f (SessionEntry s) b)
sstep session :: SessionState t m f (SessionEntry s) b
session@SessionState{f (SessionEntry s)
t m (Key f, b)
Int
Heap (Entry AbsTime (Key f))
AbsTime
sessionOutputStream :: t m (Key f, b)
sessionKeyValueMap :: f (SessionEntry s)
sessionTimerHeap :: Heap (Entry AbsTime (Key f))
sessionCount :: Int
sessionEventTime :: AbsTime
sessionCurTime :: AbsTime
sessionOutputStream :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> t m (Key f, b)
sessionKeyValueMap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> f s
sessionTimerHeap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> Heap (Entry AbsTime (Key f))
sessionCount :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> Int
sessionEventTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> AbsTime
sessionCurTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> AbsTime
..} (Just (AbsTime
timestamp, (Key f
key, a
value))) = do
        -- XXX instead of a heap we could use a timer wheel.
        --
        -- XXX if the key is an Int, we can also use an IntMap for slightly
        -- better performance.
        --
        -- How it works:
        --
        -- Values for each key are collected in a map using the supplied fold.
        -- When we insert a key in the Map we insert an entry into the heap as
        -- well with the session expiry as the sort key.  The Map entry
        -- consists of the fold result, and the expiry time of the session. If
        -- "reset" is True the expiry time is readjusted whenever a new event
        -- is processed. If the fold terminates and a new session is started
        -- for the same key the expiry time is set to the first timestamp of
        -- the new session.
        --
        -- The heap must have at most one entry for any given key. The heap is
        -- processed periodically to remove the expired entries.  We pick up an
        -- expired entry from the top of the heap and if the session has
        -- expired based on the expiry time in the Map entry then we remove the
        -- session from the Map and yield its fold output. Otherwise, we
        -- reinsert the entry into the heap based on the current expiry in the
        -- Map entry.
        --
        -- If an entry is removed from the Map and not removed from the heap
        -- and in the meantime it is inserted again in the Map (using the same
        -- key) then how do we avoid inserting multiple entries in the heap?
        -- For this reason we maintain the invariant that the Map entry is
        -- removed only when the heap entry is removed. Even if the fold has
        -- finished we still keep a dummy Map entry (ZombieSession) until the
        -- heap entry is removed. That way if we have a Map entry we do not
        -- insert a heap entry because we know it is already there.
        -- XXX The ZombieSession mechanism does not work as expected as we
        -- ignore ZombieSession when inserting a new entry. Anyway, we can
        -- remove this mechanism as at most only two heap entries may be
        -- created and they will be ultimately cleaned up.
        --
        -- Heap processing needs the map and map processing needs the heap,
        -- therefore we cannot separate the two for modularity unless we have a
        -- way to achieve mutual recursion.
        --
        let curTime :: AbsTime
curTime = forall a. Ord a => a -> a -> a
max AbsTime
sessionEventTime AbsTime
timestamp
            mOld :: Maybe (SessionEntry s)
mOld = forall (f :: * -> *) a. IsMap f => Key f -> f a -> Maybe a
IsMap.mapLookup Key f
key f (SessionEntry s)
sessionKeyValueMap
        let done :: b -> m (SessionState t m f (SessionEntry s) b)
done b
fb = do
                -- deleting a key from the heap is expensive, so we never
                -- delete a key from heap, we just purge it from the Map and it
                -- gets purged from the heap on timeout. We just need an extra
                -- lookup in the Map when the key is purged from the heap, that
                -- should not be expensive.
                --
                let (f (SessionEntry s)
mp, Int
cnt) = case Maybe (SessionEntry s)
mOld of
                        Just (LiveSession AbsTime
_ s
_) ->
                            ( forall (f :: * -> *) a. IsMap f => Key f -> a -> f a -> f a
IsMap.mapInsert
                                Key f
key forall s. SessionEntry s
ZombieSession f (SessionEntry s)
sessionKeyValueMap
                            , Int
sessionCount forall a. Num a => a -> a -> a
- Int
1
                            )
                        Maybe (SessionEntry s)
_ -> (f (SessionEntry s)
sessionKeyValueMap, Int
sessionCount)
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ SessionState t m f (SessionEntry s) b
session
                    { sessionCurTime :: AbsTime
sessionCurTime = AbsTime
curTime
                    , sessionEventTime :: AbsTime
sessionEventTime = AbsTime
curTime
                    , sessionCount :: Int
sessionCount = Int
cnt
                    , sessionKeyValueMap :: f (SessionEntry s)
sessionKeyValueMap = f (SessionEntry s)
mp
                    , sessionOutputStream :: t m (Key f, b)
sessionOutputStream = forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
fromPure (Key f
key, b
fb)
                    }
            partial :: s -> m (SessionState t m f (SessionEntry s) b)
partial s
fs1 = do
                let expiry :: AbsTime
expiry = AbsTime -> RelTime -> AbsTime
addToAbsTime AbsTime
timestamp RelTime
timeoutMs
                (Heap (Entry AbsTime (Key f))
hp1, f (SessionEntry s)
mp1, t m (Key f, b)
out1, Int
cnt1) <- do
                        let vars :: (Heap (Entry AbsTime (Key f)), f (SessionEntry s), t m a, Int)
vars = (Heap (Entry AbsTime (Key f))
sessionTimerHeap, f (SessionEntry s)
sessionKeyValueMap,
                                           forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
IsStream.nil, Int
sessionCount)
                        case Maybe (SessionEntry s)
mOld of
                            -- inserting new entry
                            Maybe (SessionEntry s)
Nothing -> do
                                -- Eject a session from heap and map if needed
                                Bool
eject <- Int -> m Bool
ejectPred Int
sessionCount
                                (Heap (Entry AbsTime (Key f))
hp, f (SessionEntry s)
mp, t m (Key f, b)
out, Int
cnt) <-
                                    if Bool
eject
                                    then forall (f :: * -> *) (t :: (* -> *) -> * -> *) (m :: * -> *) acc b.
(IsMap f, IsStream t, Monad m) =>
Bool
-> (acc -> m b)
-> (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
    t m (Key f, b), Int)
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
      t m (Key f, b), Int)
ejectOne Bool
reset s -> m b
extract forall {m :: * -> *} {a}.
(Heap (Entry AbsTime (Key f)), f (SessionEntry s), t m a, Int)
vars
                                    else forall (m :: * -> *) a. Monad m => a -> m a
return forall {m :: * -> *} {a}.
(Heap (Entry AbsTime (Key f)), f (SessionEntry s), t m a, Int)
vars

                                -- Insert the new session in heap
                                let hp' :: Heap (Entry AbsTime (Key f))
hp' = forall a. Ord a => a -> Heap a -> Heap a
H.insert (forall p a. p -> a -> Entry p a
Entry AbsTime
expiry Key f
key) Heap (Entry AbsTime (Key f))
hp
                                 in forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry AbsTime (Key f))
hp', f (SessionEntry s)
mp, t m (Key f, b)
out, Int
cnt forall a. Num a => a -> a -> a
+ Int
1)
                            -- updating old entry
                            Just SessionEntry s
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return forall {m :: * -> *} {a}.
(Heap (Entry AbsTime (Key f)), f (SessionEntry s), t m a, Int)
vars

                let acc :: SessionEntry s
acc = forall s. AbsTime -> s -> SessionEntry s
LiveSession AbsTime
expiry s
fs1
                    mp2 :: f (SessionEntry s)
mp2 = forall (f :: * -> *) a. IsMap f => Key f -> a -> f a -> f a
IsMap.mapInsert Key f
key SessionEntry s
acc f (SessionEntry s)
mp1
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ SessionState
                    { sessionCurTime :: AbsTime
sessionCurTime = AbsTime
curTime
                    , sessionEventTime :: AbsTime
sessionEventTime = AbsTime
curTime
                    , sessionCount :: Int
sessionCount = Int
cnt1
                    , sessionTimerHeap :: Heap (Entry AbsTime (Key f))
sessionTimerHeap = Heap (Entry AbsTime (Key f))
hp1
                    , sessionKeyValueMap :: f (SessionEntry s)
sessionKeyValueMap = f (SessionEntry s)
mp2
                    , sessionOutputStream :: t m (Key f, b)
sessionOutputStream = t m (Key f, b)
out1
                    }
        Step s b
res0 <- do
            case Maybe (SessionEntry s)
mOld of
                Just (LiveSession AbsTime
_ s
acc) -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s b. s -> Step s b
FL.Partial s
acc
                Maybe (SessionEntry s)
_ -> m (Step s b)
initial
        case Step s b
res0 of
            FL.Done b
_ ->
                forall a. HasCallStack => [Char] -> a
error forall a b. (a -> b) -> a -> b
$ [Char]
"classifySessionsBy: "
                    forall a. [a] -> [a] -> [a]
++ [Char]
"The supplied fold must consume at least one input"
            FL.Partial s
fs -> do
                Step s b
res <- s -> a -> m (Step s b)
step s
fs a
value
                case Step s b
res of
                    FL.Done b
fb -> forall {m :: * -> *} {t :: (* -> *) -> * -> *} {b} {m :: * -> *}.
(Monad m, IsStream t) =>
b -> m (SessionState t m f (SessionEntry s) b)
done b
fb
                    FL.Partial s
fs1 -> forall {t :: (* -> *) -> * -> *}.
IsStream t =>
s -> m (SessionState t m f (SessionEntry s) b)
partial s
fs1

    -- Got a timer tick event
    sstep sessionState :: SessionState t m f (SessionEntry s) b
sessionState@SessionState{f (SessionEntry s)
t m (Key f, b)
Int
Heap (Entry AbsTime (Key f))
AbsTime
sessionOutputStream :: t m (Key f, b)
sessionKeyValueMap :: f (SessionEntry s)
sessionTimerHeap :: Heap (Entry AbsTime (Key f))
sessionCount :: Int
sessionEventTime :: AbsTime
sessionCurTime :: AbsTime
sessionOutputStream :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> t m (Key f, b)
sessionKeyValueMap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> f s
sessionTimerHeap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> Heap (Entry AbsTime (Key f))
sessionCount :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> Int
sessionEventTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> AbsTime
sessionCurTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> AbsTime
..} Maybe (AbsTime, (Key f, a))
Nothing =
        let curTime :: AbsTime
curTime = AbsTime -> RelTime -> AbsTime
addToAbsTime AbsTime
sessionCurTime RelTime
tickMs
        in forall (f :: * -> *) (t :: (* -> *) -> * -> *) (m :: * -> *) acc b.
(IsMap f, IsStream t, Monad m) =>
Bool
-> (Int -> m Bool)
-> (acc -> m b)
-> SessionState t m f (SessionEntry acc) b
-> AbsTime
-> m (SessionState t m f (SessionEntry acc) b)
ejectExpired Bool
reset Int -> m Bool
ejectPred s -> m b
extract SessionState t m f (SessionEntry s) b
sessionState AbsTime
curTime

-- | @classifySessionsBy tick keepalive predicate timeout fold stream@
-- classifies an input event @stream@ consisting of  @(timestamp, (key,
-- value))@ into sessions based on the @key@, folding all the values
-- corresponding to the same key into a session using the supplied @fold@.
--
-- When the fold terminates or a @timeout@ occurs, a tuple consisting of the
-- session key and the folded value is emitted in the output stream. The
-- timeout is measured from the first event in the session.  If the @keepalive@
-- option is set to 'True' the timeout is reset to 0 whenever an event is
-- received.
--
-- The @timestamp@ in the input stream is an absolute time from some epoch,
-- characterizing the time when the input event was generated.  The notion of
-- current time is maintained by a monotonic event time clock using the
-- timestamps seen in the input stream. The latest timestamp seen till now is
-- used as the base for the current time.  When no new events are seen, a timer
-- is started with a clock resolution of @tick@ seconds. This timer is used to
-- detect session timeouts in the absence of new events.
--
-- To ensure an upper bound on the memory used the number of sessions can be
-- limited to an upper bound. If the ejection @predicate@ returns 'True', the
-- oldest session is ejected before inserting a new session.
--
-- When the stream ends any buffered sessions are ejected immediately.
--
-- If a session key is received even after a session has finished, another
-- session is created for that key.
--
-- >>> :{
-- Stream.mapM_ print
--     $ Stream.classifySessionsBy 1 False (const (return False)) 3 (Fold.take 3 Fold.toList)
--     $ Stream.timestamped
--     $ Stream.delay 0.1
--     $ Stream.fromList ((,) <$> [1,2,3] <*> ['a','b','c'])
-- :}
-- (1,"abc")
-- (2,"abc")
-- (3,"abc")
--
-- /Pre-release/
{-# INLINE classifySessionsBy #-}
classifySessionsBy
    :: (IsStream t, MonadAsync m, Ord k)
    => Double         -- ^ timer tick in seconds
    -> Bool           -- ^ reset the timer when an event is received
    -> (Int -> m Bool) -- ^ predicate to eject sessions based on session count
    -> Double         -- ^ session timeout in seconds
    -> Fold m a b  -- ^ Fold to be applied to session data
    -> t m (AbsTime, (k, a)) -- ^ timestamp, (session key, session data)
    -> t m (k, b) -- ^ session key, fold result
classifySessionsBy :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
(IsStream t, MonadAsync m, Ord k) =>
Double
-> Bool
-> (Int -> m Bool)
-> Double
-> Fold m a b
-> t m (AbsTime, (k, a))
-> t m (k, b)
classifySessionsBy = forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) a b.
(IsStream t, MonadAsync m, IsMap f) =>
Proxy f
-> Double
-> Bool
-> (Int -> m Bool)
-> Double
-> Fold m a b
-> t m (AbsTime, (Key f, a))
-> t m (Key f, b)
classifySessionsByGeneric (forall {k} (t :: k). Proxy t
Proxy :: Proxy (Map k))

-- | Same as 'classifySessionsBy' with a timer tick of 1 second and keepalive
-- option set to 'True'.
--
-- @
-- classifyKeepAliveSessions = classifySessionsBy 1 True
-- @
--
-- /Pre-release/
--
{-# INLINE classifyKeepAliveSessions #-}
classifyKeepAliveSessions ::
       (IsStream t, MonadAsync m, Ord k)
    => (Int -> m Bool) -- ^ predicate to eject sessions on session count
    -> Double -- ^ session inactive timeout
    -> Fold m a b -- ^ Fold to be applied to session payload data
    -> t m (AbsTime, (k, a)) -- ^ timestamp, (session key, session data)
    -> t m (k, b)
classifyKeepAliveSessions :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
(IsStream t, MonadAsync m, Ord k) =>
(Int -> m Bool)
-> Double -> Fold m a b -> t m (AbsTime, (k, a)) -> t m (k, b)
classifyKeepAliveSessions = forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
(IsStream t, MonadAsync m, Ord k) =>
Double
-> Bool
-> (Int -> m Bool)
-> Double
-> Fold m a b
-> t m (AbsTime, (k, a))
-> t m (k, b)
classifySessionsBy Double
1 Bool
True

------------------------------------------------------------------------------
-- Keyed tumbling windows
------------------------------------------------------------------------------

-- Tumbling windows is a special case of sliding windows where the window slide
-- is the same as the window size. Or it can be a special case of session
-- windows where the reset flag is set to False.

-- XXX instead of using the early termination flag in the stream, we can use an
-- early terminating fold instead.

{-
-- | Split the stream into fixed size chunks of specified size. Within each
-- such chunk fold the elements in buckets identified by the keys. A particular
-- bucket fold can be terminated early if a closing flag is encountered in an
-- element for that key.
--
-- @since 0.7.0
{-# INLINABLE classifyChunksOf #-}
classifyChunksOf
    :: (IsStream t, MonadAsync m, Ord k)
    => Int              -- ^ window size
    -> Fold m a b       -- ^ Fold to be applied to window events
    -> t m (k, a, Bool) -- ^ window key, data, close event
    -> t m (k, b)
classifyChunksOf wsize = classifyChunksBy wsize False
-}

-- | Same as 'classifySessionsBy' with a timer tick of 1 second and keepalive
-- option set to 'False'.
--
-- @
-- classifySessionsOf = classifySessionsBy 1 False
-- @
--
-- /Pre-release/
--
{-# INLINE classifySessionsOf #-}
classifySessionsOf ::
       (IsStream t, MonadAsync m, Ord k)
    => (Int -> m Bool) -- ^ predicate to eject sessions on session count
    -> Double -- ^ time window size
    -> Fold m a b -- ^ Fold to be applied to session data
    -> t m (AbsTime, (k, a)) -- ^ timestamp, (session key, session data)
    -> t m (k, b)
classifySessionsOf :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
(IsStream t, MonadAsync m, Ord k) =>
(Int -> m Bool)
-> Double -> Fold m a b -> t m (AbsTime, (k, a)) -> t m (k, b)
classifySessionsOf = forall (t :: (* -> *) -> * -> *) (m :: * -> *) k a b.
(IsStream t, MonadAsync m, Ord k) =>
Double
-> Bool
-> (Int -> m Bool)
-> Double
-> Fold m a b
-> t m (AbsTime, (k, a))
-> t m (k, b)
classifySessionsBy Double
1 Bool
False

------------------------------------------------------------------------------
-- Nested Split
------------------------------------------------------------------------------

-- | @splitInnerBy splitter joiner stream@ splits the inner containers @f a@ of
-- an input stream @t m (f a)@ using the @splitter@ function. Container
-- elements @f a@ are collected until a split occurs, then all the elements
-- before the split are joined using the @joiner@ function.
--
-- For example, if we have a stream of @Array Word8@, we may want to split the
-- stream into arrays representing lines separated by '\n' byte such that the
-- resulting stream after a split would be one array for each line.
--
-- CAUTION! This is not a true streaming function as the container size after
-- the split and merge may not be bounded.
--
-- /Pre-release/
{-# INLINE splitInnerBy #-}
splitInnerBy
    :: (IsStream t, Monad m)
    => (f a -> m (f a, Maybe (f a)))  -- splitter
    -> (f a -> f a -> m (f a))        -- joiner
    -> t m (f a)
    -> t m (f a)
splitInnerBy :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) a.
(IsStream t, Monad m) =>
(f a -> m (f a, Maybe (f a)))
-> (f a -> f a -> m (f a)) -> t m (f a) -> t m (f a)
splitInnerBy f a -> m (f a, Maybe (f a))
splitter f a -> f a -> m (f a)
joiner t m (f a)
xs =
    forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) (f :: * -> *) a.
Monad m =>
(f a -> m (f a, Maybe (f a)))
-> (f a -> f a -> m (f a)) -> Stream m (f a) -> Stream m (f a)
D.splitInnerBy f a -> m (f a, Maybe (f a))
splitter f a -> f a -> m (f a)
joiner forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m (f a)
xs

-- | Like 'splitInnerBy' but splits assuming the separator joins the segment in
-- a suffix style.
--
-- /Pre-release/
{-# INLINE splitInnerBySuffix #-}
splitInnerBySuffix
    :: (IsStream t, Monad m, Eq (f a), Monoid (f a))
    => (f a -> m (f a, Maybe (f a)))  -- splitter
    -> (f a -> f a -> m (f a))        -- joiner
    -> t m (f a)
    -> t m (f a)
splitInnerBySuffix :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) a.
(IsStream t, Monad m, Eq (f a), Monoid (f a)) =>
(f a -> m (f a, Maybe (f a)))
-> (f a -> f a -> m (f a)) -> t m (f a) -> t m (f a)
splitInnerBySuffix f a -> m (f a, Maybe (f a))
splitter f a -> f a -> m (f a)
joiner t m (f a)
xs =
    forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
fromStreamD forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) (f :: * -> *) a.
(Monad m, Eq (f a), Monoid (f a)) =>
(f a -> m (f a, Maybe (f a)))
-> (f a -> f a -> m (f a)) -> Stream m (f a) -> Stream m (f a)
D.splitInnerBySuffix f a -> m (f a, Maybe (f a))
splitter f a -> f a -> m (f a)
joiner forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
toStreamD t m (f a)
xs