{-# LANGUAGE CPP #-}
-- |
-- Module      : Streamly.Data.Stream
-- Copyright   : (c) 2017 Composewell Technologies
--
-- License     : BSD3
-- Maintainer  : streamly@composewell.com
-- Stability   : released
-- Portability : GHC
--
-- Streams represented as state machines, that fuse together when composed
-- statically, eliminating function calls or intermediate constructor
-- allocations - generating tight, efficient loops. Suitable for high
-- performance looping operations.
--
-- If you need to call these operations recursively in a loop (i.e. composed
-- dynamically) then it is recommended to use the continuation passing style
-- (CPS) stream operations from the "Streamly.Data.StreamK" module. 'Stream'
-- and 'StreamK' types are interconvertible.  See more details in the
-- documentation below regarding 'Stream' vs 'StreamK'.
--
-- Please refer to "Streamly.Internal.Data.Stream" for more functions that have
-- not yet been released.
--
-- Checkout the <https://github.com/composewell/streamly-examples>
-- repository for many more real world examples of stream programming.

module Streamly.Data.Stream
    (
    -- * Setup
    -- | To execute the code examples provided in this module in ghci, please
    -- run the following commands first.
    --
    -- $setup

    -- * Overview
    -- $overview

    -- * The Stream Type
      Stream

    -- * Construction
    -- | Functions ending in the general shape @b -> Stream m a@.
    --
    -- Useful Idioms:
    --
    -- >>> fromIndices f = fmap f $ Stream.enumerateFrom 0
    -- >>> fromIndicesM f = Stream.mapM f $ Stream.enumerateFrom 0
    -- >>> fromListM = Stream.sequence . Stream.fromList
    -- >>> fromFoldable = StreamK.toStream . StreamK.fromFoldable
    -- >>> fromFoldableM = Stream.sequence . fromFoldable

    -- ** Primitives
    -- | A fused 'Stream' is never constructed using these primitives, they are
    -- typically generated by converting containers like list into streams, or
    -- generated using custom functions provided in this module. The 'cons'
    -- primitive in this module has a rare use in fusing a small number of
    -- elements. On the other hand, it is common to construct 'StreamK' stream
    -- using the StreamK.'StreamK.cons' primitive.
    , nil
    , nilM
    , cons
    , consM

    -- ** Unfolding
    -- | 'unfoldrM' is the most general way of generating a stream efficiently.
    -- All other generation operations can be expressed using it.
    , unfoldr
    , unfoldrM

    -- ** From Values
    -- | Generate a monadic stream from a seed value or values.
    , fromPure
    , fromEffect
    , repeat
    , repeatM
    , replicate
    , replicateM

    -- Note: Using enumeration functions e.g. 'Prelude.enumFromThen' turns out
    -- to be slightly faster than the idioms like @[from, then..]@.
    --
    -- ** Enumeration
    -- | We can use the 'Enum' type class to enumerate a type producing a list
    -- and then convert it to a stream:
    --
    -- @
    -- 'fromList' $ 'Prelude.enumFromThen' from then
    -- @
    --
    -- However, this is not particularly efficient.
    -- The 'Enumerable' type class provides corresponding functions that
    -- generate a stream instead of a list, efficiently.

    , Enumerable (..)
    , enumerate
    , enumerateTo

    -- ** Iteration
    , iterate
    , iterateM

    -- ** From Containers
    -- | Convert an input structure, container or source into a stream. All of
    -- these can be expressed in terms of primitives.
    , fromList

    -- ** From Unfolds
    -- | Most of the above stream generation operations can also be expressed
    -- using the corresponding unfolds in the "Streamly.Data.Unfold" module.
    , unfold -- XXX rename to fromUnfold?

    -- * Elimination
    -- | Functions ending in the general shape @Stream m a -> m b@ or @Stream m
    -- a -> m (b, Stream m a)@
    --

-- EXPLANATION: In imperative terms a fold can be considered as a loop over the stream
-- that reduces the stream to a single value.
-- Left and right folds both use a fold function @f@ and an identity element
-- @z@ (@zero@) to deconstruct a recursive data structure and reconstruct a
-- new data structure. The new structure may be a recursive construction (a
-- container) or a non-recursive single value reduction of the original
-- structure.
--
-- Both right and left folds are mathematical duals of each other, they are
-- functionally equivalent.  Operationally, a left fold on a left associated
-- structure behaves exactly in the same way as a right fold on a right
-- associated structure. Similarly, a left fold on a right associated structure
-- behaves in the same way as a right fold on a left associated structure.
-- However, the behavior of a right fold on a right associated structure is
-- operationally different (even though functionally equivalent) than a left
-- fold on the same structure.
--
-- On right associated structures like Haskell @cons@ lists or Streamly
-- streams, a lazy right fold is naturally suitable for lazy recursive
-- reconstruction of a new structure, while a strict left fold is naturally
-- suitable for efficient reduction. In right folds control is in the hand of
-- the @puller@ whereas in left folds the control is in the hand of the
-- @pusher@.
--
-- The behavior of right and left folds are described in detail in the
-- individual fold's documentation.  To illustrate the two folds for right
-- associated @cons@ lists:
--
-- > foldr :: (a -> b -> b) -> b -> [a] -> b
-- > foldr f z [] = z
-- > foldr f z (x:xs) = x `f` foldr f z xs
-- >
-- > foldl :: (b -> a -> b) -> b -> [a] -> b
-- > foldl f z [] = z
-- > foldl f z (x:xs) = foldl f (z `f` x) xs
--
-- @foldr@ is conceptually equivalent to:
--
-- > foldr f z [] = z
-- > foldr f z [x] = f x z
-- > foldr f z xs = foldr f (foldr f z (tail xs)) [head xs]
--
-- @foldl@ is conceptually equivalent to:
--
-- > foldl f z [] = z
-- > foldl f z [x] = f z x
-- > foldl f z xs = foldl f (foldl f z (init xs)) [last xs]
--
-- Left and right folds are duals of each other.
--
-- @
-- foldr f z xs = foldl (flip f) z (reverse xs)
-- foldl f z xs = foldr (flip f) z (reverse xs)
-- @
--
-- More generally:
--
-- @
-- foldr f z xs = foldl g id xs z where g k x = k . f x
-- foldl f z xs = foldr g id xs z where g x k = k . flip f x
-- @
--

-- NOTE: Folds are inherently serial as each step needs to use the result of
-- the previous step. However, it is possible to fold parts of the stream in
-- parallel and then combine the results using a monoid.

    -- ** Primitives
    -- Consuming a part of the stream and returning the rest. Functions
    -- ending in the general shape @Stream m a -> m (b, Stream m a)@
    , uncons

    -- ** Strict Left Folds
    -- XXX Need to have a general parse operation here which can be used to
    -- express all others.
    , fold -- XXX rename to run? We can have a Stream.run and Fold.run.
    -- XXX fold1 can be achieved using Monoids or Refolds.
    -- XXX We can call this just "break" and parseBreak as "munch"
    , foldBreak

    -- XXX should we have a Fold returning function in stream module?
    -- , foldAdd
    -- , buildl

    -- ** Parsing
    , parse
    -- , parseBreak

    -- ** Lazy Right Folds
    -- | Consuming a stream to build a right associated expression, suitable
    -- for lazy evaluation. Evaluation of the input happens when the output of
    -- the fold is evaluated, the fold output is a lazy thunk.
    --
    -- This is suitable for stream transformation operations, for example,
    -- operations like mapping a function over the stream.
    , foldrM
    , foldr

    -- ** Specific Folds
    -- | Usually you can use the folds in "Streamly.Data.Fold". However, some
    -- folds that may be commonly used or may have an edge in performance in
    -- some cases are provided here.
    --
    -- Useful idioms:
    --
    -- >>> foldlM' f a = Stream.fold (Fold.foldlM' f a)
    -- >>> foldl1 f = Stream.fold (Fold.foldl1' f)
    -- >>> foldl' f a = Stream.fold (Fold.foldl' f a)
    -- >>> drain = Stream.fold Fold.drain
    -- >>> mapM_ f = Stream.fold (Fold.drainMapM f)
    -- >>> length = Stream.fold Fold.length
    -- >>> head = Stream.fold Fold.one
    , toList

    -- * Mapping
    -- | Stateless one-to-one transformations. Use 'fmap' for mapping a pure
    -- function on a stream.

    -- EXPLANATION:
    -- In imperative terms a map operation can be considered as a loop over
    -- the stream that transforms the stream into another stream by performing
    -- an operation on each element of the stream.
    --
    -- 'map' is the least powerful transformation operation with strictest
    -- guarantees.  A map, (1) is a stateless loop which means that no state is
    -- allowed to be carried from one iteration to another, therefore,
    -- operations on different elements are guaranteed to not affect each
    -- other, (2) is a strictly one-to-one transformation of stream elements
    -- which means it guarantees that no elements can be added or removed from
    -- the stream, it can merely transform them.
    , sequence
    , mapM
    , trace
    , tap
    , delay

    -- * Scanning
    -- | Stateful one-to-one transformations.
    --

    {-
    -- ** Left scans
    -- | We can perform scans using folds with the 'scan' combinator in the
    -- next section. However, the combinators supplied in this section are
    -- better amenable to stream fusion when combined with other operations.
    -- Note that 'postscan' using folds fuses well and does not require custom
    -- combinators like these.
    , scanl'
    , scanlM'
    , scanl1'
    , scanl1M'
    -}

    -- ** Scanning By 'Fold'
    -- | Useful idioms:
    --
    -- >>> scanl' f z = Stream.scan (Fold.foldl' f z)
    -- >>> scanlM' f z = Stream.scan (Fold.foldlM' f z)
    -- >>> postscanl' f z = Stream.postscan (Fold.foldl' f z)
    -- >>> postscanlM' f z = Stream.postscan (Fold.foldlM' f z)
    -- >>> scanl1' f = Stream.catMaybes . Stream.scan (Fold.foldl1' f)
    -- >>> scanl1M' f = Stream.catMaybes . Stream.scan (Fold.foldlM1' f)
    , scan
    , postscan
    -- XXX postscan1 can be implemented using Monoids or Refolds.

    -- ** Specific scans
    -- Indexing can be considered as a special type of zipping where we zip a
    -- stream with an index stream.
    , indexed

    -- * Insertion
    -- | Add elements to the stream.

    -- Inserting elements is a special case of interleaving/merging streams.
    , insertBy
    , intersperseM
    , intersperseM_
    , intersperse

    -- * Filtering
    -- | Remove elements from the stream.

    -- ** Stateless Filters
    -- | 'mapMaybeM' is the most general stateless filtering operation. All
    -- other filtering operations can be expressed using it.

    -- EXPLANATION:
    -- In imperative terms a filter over a stream corresponds to a loop with a
    -- @continue@ clause for the cases when the predicate fails.

    , mapMaybe
    , mapMaybeM
    , filter
    , filterM

    -- Filter and concat
    , catMaybes
    , catLefts
    , catRights
    , catEithers

    -- ** Stateful Filters
    -- | 'scanMaybe' is the most general stateful filtering operation. The
    -- filtering folds (folds returning a 'Maybe' type) in
    -- "Streamly.Internal.Data.Fold" can be used along with 'scanMaybe' to
    -- perform stateful filtering operations in general.
    --
    -- Useful idioms:
    --
    -- >>> deleteBy cmp x = Stream.scanMaybe (Fold.deleteBy cmp x)
    -- >>> findIndices p = Stream.scanMaybe (Fold.findIndices p)
    -- >>> elemIndices a = findIndices (== a)
    -- >>> uniq = Stream.scanMaybe (Fold.uniqBy (==))
    , scanMaybe
    , take
    , takeWhile
    , takeWhileM
    , drop
    , dropWhile
    , dropWhileM

    -- XXX These are available as scans in folds. We need to check the
    -- performance though. If these are common and we need convenient stream
    -- ops then we can expose these.

    -- , deleteBy
    -- , uniq
    -- , uniqBy

    -- -- ** Sampling
    -- , strideFromThen

    -- -- ** Searching
    -- Finding the presence or location of an element, a sequence of elements
    -- or another stream within a stream.

    -- -- ** Searching Elements
    -- , findIndices
    -- , elemIndices

    -- * Combining Two Streams
    -- | Note that these operations are suitable for statically fusing a few
    -- streams, they have a quadratic O(n^2) time complexity wrt to the number
    -- of streams. If you want to compose many streams dynamically using binary
    -- combining operations see the corresponding operations in
    -- "Streamly.Data.StreamK".
    --
    -- When fusing more than two streams it is more efficient if the binary
    -- operations are composed as a balanced tree rather than a right
    -- associative or left associative one e.g.:
    --
    -- >>> s1 = Stream.fromList [1,2] `Stream.append` Stream.fromList [3,4]
    -- >>> s2 = Stream.fromList [4,5] `Stream.append` Stream.fromList [6,7]
    -- >>> s = s1 `Stream.append` s2

    -- ** Appending
    , append

    -- ** Interleaving
    , interleave

    -- ** Merging
    , mergeBy
    , mergeByM

    -- ** Zipping
    , zipWith
    , zipWithM
    -- , ZipStream (..)

    -- ** Cross Product
    -- XXX The argument order in this operation is such that it seems we are
    -- transforming the first stream using the second stream because the second
    -- stream is evaluated many times or buffered and better be finite, first
    -- stream could potentially be infinite. In the tradition of using the
    -- transformed stream at the end we can have a flipped version called
    -- "crossMap" or "nestWith".
    , crossWith
    -- , cross
    -- , joinInner
    -- , CrossStream (..)

    -- * Unfold Each
    , unfoldMany
    , intercalate
    , intercalateSuffix

    -- * Stream of streams
    -- | Stream operations like map and filter represent loop processing in
    -- imperative programming terms. Similarly, the imperative concept of
    -- nested loops are represented by streams of streams. The 'concatMap'
    -- operation represents nested looping.
    -- A 'concatMap' operation loops over the input stream and then for each
    -- element of the input stream generates another stream and then loops over
    -- that inner stream as well producing effects and generating a single
    -- output stream.
    --
    -- One dimension loops are just a special case of nested loops.  For
    -- example, 'concatMap' can degenerate to a simple map operation:
    --
    -- > map f m = S.concatMap (\x -> S.fromPure (f x)) m
    --
    -- Similarly, 'concatMap' can perform filtering by mapping an element to a
    -- 'nil' stream:
    --
    -- > filter p m = S.concatMap (\x -> if p x then S.fromPure x else S.nil) m
    --

    , concatEffect
    , concatMap
    , concatMapM

    -- * Repeated Fold
    -- | Useful idioms:
    --
    -- >>> splitWithSuffix p f = Stream.foldMany (Fold.takeEndBy p f)
    -- >>> splitOnSuffix p f = Stream.foldMany (Fold.takeEndBy_ p f)
    -- >>> groupsBy eq f = Stream.parseMany (Parser.groupBy eq f)
    -- >>> groupsByRolling eq f = Stream.parseMany (Parser.groupByRolling eq f)
    -- >>> groupsOf n f = Stream.foldMany (Fold.take n f)
    , foldMany -- XXX Rename to foldRepeat
    , groupsOf
    , parseMany

    -- * Splitting
    , splitOn
    , wordsBy

    -- * Buffered Operations
    -- | Operations that require buffering of the stream.
    -- Reverse is essentially a left fold followed by an unfold.
    , reverse

    -- * Multi-Stream folds
    -- | Operations that consume multiple streams at the same time.
    , eqBy
    , cmpBy
    , isPrefixOf
    , isSubsequenceOf

    -- trimming sequences
    , stripPrefix

    -- Exceptions and resource management depend on the "exceptions" package
    -- XXX We can have IO Stream operations not depending on "exceptions"
    -- in Exception.Base

    -- * Exceptions
    -- | Note that the stream exception handling routines catch and handle
    -- exceptions only in the stream generation steps and not in the consumer
    -- of the stream. For example, if we are folding or parsing a stream - any
    -- exceptions in the fold or parse steps won't be observed by the stream
    -- exception handlers. Exceptions in the fold or parse steps can be handled
    -- using the fold or parse exception handling routines. You can wrap the
    -- stream elimination function in the monad exception handler to observe
    -- exceptions in the stream as well as the consumer.
    --
    -- Most of these combinators inhibit stream fusion, therefore, when
    -- possible, they should be called in an outer loop to mitigate the cost.
    -- For example, instead of calling them on a stream of chars call them on a
    -- stream of arrays before flattening it to a stream of chars.
    --
    , onException
    , handle

    -- * Resource Management
    -- | 'bracket' is the most general resource management operation, all other
    -- operations can be expressed using it. These functions have IO suffix
    -- because the allocation and cleanup functions are IO actions. For
    -- generalized allocation and cleanup functions, see the functions without
    -- the IO suffix in the "streamly" package.
    --
    -- Note that these operations bracket the stream generation only, they do
    -- not cover the stream consumer. This means if an exception occurs in
    -- the consumer of the stream (e.g. in a fold or parse step) then the
    -- exception won't be observed by the stream resource handlers, in that
    -- case the resource cleanup handler runs when the stream is garbage
    -- collected.
    --
    -- Monad level resource management can always be used around the stream
    -- elimination functions, such a function can observe exceptions in both
    -- the stream and its consumer.
    , before
    , afterIO
    , finallyIO
    , bracketIO
    , bracketIO3

    -- * Transforming Inner Monad

    , morphInner
    , liftInner
    , runReaderT
    , runStateT

    -- XXX Arrays could be different types, therefore, this should be in
    -- specific array module. Or maybe we should abstract over array types.
    -- * Stream of Arrays
    , Array.chunksOf
    )
where

import Streamly.Internal.Data.Stream
import Prelude
       hiding (filter, drop, dropWhile, take, takeWhile, zipWith, foldr,
               foldl, map, mapM, mapM_, sequence, all, any, sum, product, elem,
               notElem, maximum, minimum, head, last, tail, length, null,
               reverse, iterate, init, and, or, lookup, foldr1, (!!),
               scanl, scanl1, repeat, replicate, concatMap, span)

import qualified Streamly.Internal.Data.Array.Type as Array

#include "DocTestDataStream.hs"

-- $overview
--
-- Streamly is a framework for modular data flow based programming and
-- declarative concurrency.  Powerful stream fusion framework in streamly
-- allows high performance combinatorial programming even when using byte level
-- streams.  Streamly API is similar to Haskell lists.
--
-- == Console Echo Example
--
-- In the following example, 'repeatM' generates an infinite stream of 'String'
-- by repeatedly performing the 'getLine' IO action. 'mapM' then applies
-- 'putStrLn' on each element in the stream converting it to stream of '()'.
-- Finally, 'drain' folds the stream to IO discarding the () values, thus
-- producing only effects.
--
-- >>> import Data.Function ((&))
--
-- >>> :{
-- echo =
--  Stream.repeatM getLine       -- Stream IO String
--      & Stream.mapM putStrLn   -- Stream IO ()
--      & Stream.fold Fold.drain -- IO ()
-- :}
--
-- This is a console echo program. It is an example of a declarative loop
-- written using streaming combinators.  Compare it with an imperative @while@
-- loop.
--
-- Hopefully, this gives you an idea how we can program declaratively by
-- representing loops using streams. In this module, you can find all
-- "Data.List" like functions and many more powerful combinators to perform
-- common programming tasks.
--
-- == Stream Fusion
--
-- The fused 'Stream' type in this module employs stream fusion for C-like
-- performance when looping over data. It represents the stream as a state
-- machine using an explicit state, and a step function working on the state. A
-- typical stream operation consumes elements from the previous state machine
-- in a stream pipeline, transforms the elements and yields new values for the
-- next stage to consume. The stream operations are modular and represent a
-- single task, they have no knowledge of previous or next operation on the
-- elements.
--
-- A typical stream pipeline consists of a stream producer, several stream
-- transformation operations and a stream consumer. All these operations taken
-- together form a closed loop processing the stream elements. Elements are
-- transferred between stages using a boxed data constructor. However, all the
-- stages of the pipeline are fused together by GHC, eliminating the boxing of
-- intermediate constructors, and thus forming a tight C like loop without any
-- boxed data being used in the loop.
--
-- Stream fusion works effectively when:
--
-- * the stream pipeline is composed statically (known at compile time)
-- * all the operations forming the loop are inlined
-- * the loop is not recursively defined, recursion breaks inlining
--
-- If these conditions cannot be met, the CPS style stream type 'StreamK' may
-- turn out to be a better choice than the fused stream type 'Stream'.
--
-- == Stream vs StreamK
--
-- The fused stream model avoids constructor allocations and function call
-- overheads. However, the stream is represented as a state machine, and to
-- generate stream elements it has to navigate the decision tree of the state
-- machine. Moreover, the state machine is cranked for each element in the
-- stream. This performs extremely well when the number of states are limited.
-- The state machine starts getting expensive as the number of states increase.
-- For example, generating a stream from a list requires a single state and is
-- very efficient, even if it has millions of elements. However, using 'cons'
-- to construct a million element stream would be a disaster.
--
-- A typical worst case scenario for fused stream model is a large number of
-- `cons` or `append` operations. A few static `cons` or `append` operations
-- are very fast and much faster than a CPS style stream because CPS involves a
-- function call for each element whereas fused stream involves a few
-- conditional branches in the state machine. However, constructing a large
-- stream using `cons` introduces as many states in the state machine as the
-- number of elements. If we compose `cons` as a balanced binary tree it will
-- take @n * log n@ time to navigate the tree, and @n * n@ if it is a right
-- associative composition.
--
-- Operations like 'cons' or 'append'; are typically recursively called to
-- construct a lazy infinite stream. For such use cases the CPS style 'StreamK'
-- should be used. CPS streams do not have a state machine that needs to be
-- cranked for each element, past state has no effect on the future element
-- processing. However, CPS incurs a function call overhead for each element
-- processed, the overhead could be large compared to a fused state machine
-- even if it has many states. However, because of its linear performance
-- characterstics, after a certain threshold of stream compositions the CPS
-- stream would perform much better than the quadratic fused stream operations.
--
-- As a general guideline, you need to use 'StreamK' when you have to use
-- 'cons', 'append' or other operations having quadratic complexity at a large
-- scale. Typically, in such cases you need to compose the stream recursively,
-- by calling an operation in a loop. The decision to compose the stream is
-- taken at run time rather than statically at compile time.
--
-- Typically you would compose a 'StreamK' of chunks of data so that the
-- StreamK overhead is not high, and then process  the chunks using 'Stream' by
-- using statically fused stream pipeline operations on the chunks.
--
-- 'Stream' and 'StreamK' types can be interconverted. See
-- "Streamly.Data.StreamK" module for conversion operations.