-- |
-- Module      : Streamly.Internal.Data.Fold.Window
-- Copyright   : (c) 2020 Composewell Technologies
-- License     : Apache-2.0
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
-- Simple incremental statistical measures over a stream of data. All
-- operations use numerically stable floating point arithmetic.
--
-- Measurements can be performed over the entire input stream or on a sliding
-- window of fixed or variable size.  Where possible, measures are computed
-- online without buffering the input stream.
--
-- Currently there is no overflow detection.
--
-- For more advanced statistical measures see the @streamly-statistics@
-- package.

-- XXX A window fold can be driven either using the Ring.slidingWindow
-- combinator or by zipping nthLast fold and last fold.

module Streamly.Internal.Data.Fold.Window
    (
    -- * Incremental Folds
    -- | Folds of type @Fold m (a, Maybe a) b@ are incremental sliding window
    -- folds. An input of type @(a, Nothing)@ indicates that the input element
    -- @a@ is being inserted in the window without ejecting an old value
    -- increasing the window size by 1. An input of type @(a, Just a)@
    -- indicates that the first element is being inserted in the window and the
    -- second element is being removed from the window, the window size remains
    -- the same. The window size can only increase and never decrease.
    --
    -- You can compute the statistics over the entire stream using sliding
    -- window folds by keeping the second element of the input tuple as
    -- @Nothing@.
    --
      windowLmap
    , cumulative

    , windowRollingMap
    , windowRollingMapM

    -- ** Sums
    , windowLength
    , windowSum
    , windowSumInt
    , windowPowerSum
    , windowPowerSumFrac

    -- ** Location
    , windowMinimum
    , windowMaximum
    , windowRange
    , windowMean
    )
where

import Control.Monad.IO.Class (MonadIO (liftIO))
import Data.Bifunctor(bimap)
import Foreign.Storable (Storable, peek)

import Streamly.Internal.Data.Fold.Type (Fold(..), Step(..))
import Streamly.Internal.Data.Tuple.Strict
    (Tuple'(..), Tuple3Fused' (Tuple3Fused'))

import Foreign.ForeignPtr.Unsafe (unsafeForeignPtrToPtr)

import qualified Streamly.Internal.Data.Fold.Type as Fold
import qualified Streamly.Internal.Data.Ring as Ring

import Prelude hiding (length, sum, minimum, maximum)

-- $setup
-- >>> import Data.Bifunctor(bimap)
-- >>> import qualified Streamly.Data.Fold as Fold
-- >>> import qualified Streamly.Internal.Data.Fold.Window as Fold
-- >>> import qualified Streamly.Internal.Data.Ring as Ring
-- >>> import qualified Streamly.Data.Stream as Stream
-- >>> import Prelude hiding (length, sum, minimum, maximum)

-------------------------------------------------------------------------------
-- Utilities
-------------------------------------------------------------------------------

-- | Map a function on the incoming as well as outgoing element of a rolling
-- window fold.
--
-- >>> lmap f = Fold.lmap (bimap f (f <$>))
--
{-# INLINE windowLmap #-}
windowLmap :: (c -> a) -> Fold m (a, Maybe a) b -> Fold m (c, Maybe c) b
windowLmap :: forall c a (m :: * -> *) b.
(c -> a) -> Fold m (a, Maybe a) b -> Fold m (c, Maybe c) b
windowLmap c -> a
f = ((c, Maybe c) -> (a, Maybe a))
-> Fold m (a, Maybe a) b -> Fold m (c, Maybe c) b
forall a b (m :: * -> *) r. (a -> b) -> Fold m b r -> Fold m a r
Fold.lmap ((c -> a) -> (Maybe c -> Maybe a) -> (c, Maybe c) -> (a, Maybe a)
forall a b c d. (a -> b) -> (c -> d) -> (a, c) -> (b, d)
forall (p :: * -> * -> *) a b c d.
Bifunctor p =>
(a -> b) -> (c -> d) -> p a c -> p b d
bimap c -> a
f (c -> a
f (c -> a) -> Maybe c -> Maybe a
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$>))

-- | Convert an incremental fold to a cumulative fold using the entire input
-- stream as a single window.
--
-- >>> cumulative f = Fold.lmap (\x -> (x, Nothing)) f
--
{-# INLINE cumulative #-}
cumulative :: Fold m (a, Maybe a) b -> Fold m a b
cumulative :: forall (m :: * -> *) a b. Fold m (a, Maybe a) b -> Fold m a b
cumulative = (a -> (a, Maybe a)) -> Fold m (a, Maybe a) b -> Fold m a b
forall a b (m :: * -> *) r. (a -> b) -> Fold m b r -> Fold m a r
Fold.lmap (, Maybe a
forall a. Maybe a
Nothing)

-- XXX Exchange the first two arguments of rollingMap or exchange the order in
-- the fold input tuple.

-- | Apply an effectful function on the latest and the oldest element of the
-- window.
{-# INLINE windowRollingMapM #-}
windowRollingMapM :: Monad m =>
    (Maybe a -> a -> m (Maybe b)) -> Fold m (a, Maybe a) (Maybe b)
windowRollingMapM :: forall (m :: * -> *) a b.
Monad m =>
(Maybe a -> a -> m (Maybe b)) -> Fold m (a, Maybe a) (Maybe b)
windowRollingMapM Maybe a -> a -> m (Maybe b)
f = (Maybe b -> (a, Maybe a) -> m (Maybe b))
-> m (Maybe b) -> Fold m (a, Maybe a) (Maybe b)
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> Fold m a b
Fold.foldlM' Maybe b -> (a, Maybe a) -> m (Maybe b)
forall {p}. p -> (a, Maybe a) -> m (Maybe b)
f1 m (Maybe b)
forall {a}. m (Maybe a)
initial

    where

    initial :: m (Maybe a)
initial = Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing

    f1 :: p -> (a, Maybe a) -> m (Maybe b)
f1 p
_ (a
a, Maybe a
ma) = Maybe a -> a -> m (Maybe b)
f Maybe a
ma a
a

-- | Apply a pure function on the latest and the oldest element of the window.
--
-- >>> windowRollingMap f = Fold.windowRollingMapM (\x y -> return $ f x y)
--
{-# INLINE windowRollingMap #-}
windowRollingMap :: Monad m =>
    (Maybe a -> a -> Maybe b) -> Fold m (a, Maybe a) (Maybe b)
windowRollingMap :: forall (m :: * -> *) a b.
Monad m =>
(Maybe a -> a -> Maybe b) -> Fold m (a, Maybe a) (Maybe b)
windowRollingMap Maybe a -> a -> Maybe b
f = (Maybe b -> (a, Maybe a) -> Maybe b)
-> Maybe b -> Fold m (a, Maybe a) (Maybe b)
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> Fold m a b
Fold.foldl' Maybe b -> (a, Maybe a) -> Maybe b
forall {p}. p -> (a, Maybe a) -> Maybe b
f1 Maybe b
forall a. Maybe a
initial

    where

    initial :: Maybe a
initial = Maybe a
forall a. Maybe a
Nothing

    f1 :: p -> (a, Maybe a) -> Maybe b
f1 p
_ (a
a, Maybe a
ma) = Maybe a -> a -> Maybe b
f Maybe a
ma a
a

-------------------------------------------------------------------------------
-- Sum
-------------------------------------------------------------------------------

-- XXX Overflow.
--
-- | The sum of all the elements in a rolling window. The input elements are
-- required to be intergal numbers.
--
-- This was written in the hope that it would be a tiny bit faster than 'sum'
-- for 'Integral' values. But turns out that 'sum' is 2% faster than this even
-- for intergal values!
--
-- /Internal/
--
{-# INLINE windowSumInt #-}
windowSumInt :: forall m a. (Monad m, Integral a) => Fold m (a, Maybe a) a
windowSumInt :: forall (m :: * -> *) a.
(Monad m, Integral a) =>
Fold m (a, Maybe a) a
windowSumInt = (a -> (a, Maybe a) -> m (Step a a))
-> m (Step a a)
-> (a -> m a)
-> (a -> m a)
-> Fold m (a, Maybe a) a
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> (s -> m b) -> Fold m a b
Fold a -> (a, Maybe a) -> m (Step a a)
forall {m :: * -> *} {s} {b}.
(Monad m, Num s) =>
s -> (s, Maybe s) -> m (Step s b)
step m (Step a a)
forall {b}. m (Step a b)
initial a -> m a
forall {a}. a -> m a
extract a -> m a
forall {a}. a -> m a
extract

    where

    initial :: m (Step a b)
initial = Step a b -> m (Step a b)
forall {a}. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step a b -> m (Step a b)) -> Step a b -> m (Step a b)
forall a b. (a -> b) -> a -> b
$ a -> Step a b
forall s b. s -> Step s b
Partial (a
0 :: a)

    step :: s -> (s, Maybe s) -> m (Step s b)
step s
s (s
a, Maybe s
ma) =
        Step s b -> m (Step s b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return
            (Step s b -> m (Step s b)) -> Step s b -> m (Step s b)
forall a b. (a -> b) -> a -> b
$ s -> Step s b
forall s b. s -> Step s b
Partial
                (s -> Step s b) -> s -> Step s b
forall a b. (a -> b) -> a -> b
$ case Maybe s
ma of
                    Maybe s
Nothing -> s
s s -> s -> s
forall a. Num a => a -> a -> a
+ s
a
                    Just s
old -> s
s s -> s -> s
forall a. Num a => a -> a -> a
+ s
a s -> s -> s
forall a. Num a => a -> a -> a
- s
old

    extract :: a -> m a
extract = a -> m a
forall {a}. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return

-- XXX Overflow.
--
-- | Sum of all the elements in a rolling window:
--
-- \(S = \sum_{i=1}^n x_{i}\)
--
-- This is the first power sum.
--
-- >>> sum = powerSum 1
--
-- Uses Kahan-Babuska-Neumaier style summation for numerical stability of
-- floating precision arithmetic.
--
-- /Space/: \(\mathcal{O}(1)\)
--
-- /Time/: \(\mathcal{O}(n)\)
--
{-# INLINE windowSum #-}
windowSum :: forall m a. (Monad m, Num a) => Fold m (a, Maybe a) a
windowSum :: forall (m :: * -> *) a. (Monad m, Num a) => Fold m (a, Maybe a) a
windowSum = (Tuple' a a -> (a, Maybe a) -> m (Step (Tuple' a a) a))
-> m (Step (Tuple' a a) a)
-> (Tuple' a a -> m a)
-> (Tuple' a a -> m a)
-> Fold m (a, Maybe a) a
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> (s -> m b) -> Fold m a b
Fold Tuple' a a -> (a, Maybe a) -> m (Step (Tuple' a a) a)
forall {m :: * -> *} {b} {b}.
(Monad m, Num b) =>
Tuple' b b -> (b, Maybe b) -> m (Step (Tuple' b b) b)
step m (Step (Tuple' a a) a)
forall {b}. m (Step (Tuple' a a) b)
initial Tuple' a a -> m a
forall {m :: * -> *} {a} {b}. Monad m => Tuple' a b -> m a
extract Tuple' a a -> m a
forall {m :: * -> *} {a} {b}. Monad m => Tuple' a b -> m a
extract

    where

    initial :: m (Step (Tuple' a a) b)
initial =
        Step (Tuple' a a) b -> m (Step (Tuple' a a) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return
            (Step (Tuple' a a) b -> m (Step (Tuple' a a) b))
-> Step (Tuple' a a) b -> m (Step (Tuple' a a) b)
forall a b. (a -> b) -> a -> b
$ Tuple' a a -> Step (Tuple' a a) b
forall s b. s -> Step s b
Partial
            (Tuple' a a -> Step (Tuple' a a) b)
-> Tuple' a a -> Step (Tuple' a a) b
forall a b. (a -> b) -> a -> b
$ a -> a -> Tuple' a a
forall a b. a -> b -> Tuple' a b
Tuple'
                (a
0 :: a) -- running sum
                (a
0 :: a) -- accumulated rounding error

    step :: Tuple' b b -> (b, Maybe b) -> m (Step (Tuple' b b) b)
step (Tuple' b
total b
err) (b
new, Maybe b
mOld) =
        let incr :: b
incr =
                case Maybe b
mOld of
                    -- XXX new may be large and err may be small we may lose it
                    Maybe b
Nothing -> b
new b -> b -> b
forall a. Num a => a -> a -> a
- b
err
                    -- XXX if (new - old) is large we may lose err
                    Just b
old -> (b
new b -> b -> b
forall a. Num a => a -> a -> a
- b
old) b -> b -> b
forall a. Num a => a -> a -> a
- b
err
            -- total is large and incr may be small, we may round incr here but
            -- we will accumulate the rounding error in err1 in the next step.
            total1 :: b
total1 = b
total b -> b -> b
forall a. Num a => a -> a -> a
+ b
incr
            -- Accumulate any rounding error in err1
            -- XXX In the Nothing case above we may lose err, therefore we
            -- should use ((total1 - total) - new) + err here.
            -- Or even in the just case if (new - old) is large we may lose
            -- err, so we should use ((total1 - total) + (old - new)) + err.
            err1 :: b
err1 = (b
total1 b -> b -> b
forall a. Num a => a -> a -> a
- b
total) b -> b -> b
forall a. Num a => a -> a -> a
- b
incr
        in Step (Tuple' b b) b -> m (Step (Tuple' b b) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Tuple' b b) b -> m (Step (Tuple' b b) b))
-> Step (Tuple' b b) b -> m (Step (Tuple' b b) b)
forall a b. (a -> b) -> a -> b
$ Tuple' b b -> Step (Tuple' b b) b
forall s b. s -> Step s b
Partial (Tuple' b b -> Step (Tuple' b b) b)
-> Tuple' b b -> Step (Tuple' b b) b
forall a b. (a -> b) -> a -> b
$ b -> b -> Tuple' b b
forall a b. a -> b -> Tuple' a b
Tuple' b
total1 b
err1

    extract :: Tuple' a b -> m a
extract (Tuple' a
total b
_) = a -> m a
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
total

-- | The number of elements in the rolling window.
--
-- This is the \(0\)th power sum.
--
-- >>> length = powerSum 0
--
{-# INLINE windowLength #-}
windowLength :: (Monad m, Num b) => Fold m (a, Maybe a) b
windowLength :: forall (m :: * -> *) b a. (Monad m, Num b) => Fold m (a, Maybe a) b
windowLength = (b -> (a, Maybe a) -> b) -> b -> Fold m (a, Maybe a) b
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> Fold m a b
Fold.foldl' b -> (a, Maybe a) -> b
forall {a} {a} {a}. Num a => a -> (a, Maybe a) -> a
step b
0

    where

    step :: a -> (a, Maybe a) -> a
step a
w (a
_, Maybe a
Nothing) = a
w a -> a -> a
forall a. Num a => a -> a -> a
+ a
1
    step a
w (a, Maybe a)
_ = a
w

-- | Sum of the \(k\)th power of all the elements in a rolling window:
--
-- \(S_k = \sum_{i=1}^n x_{i}^k\)
--
-- >>> powerSum k = lmap (^ k) sum
--
-- /Space/: \(\mathcal{O}(1)\)
--
-- /Time/: \(\mathcal{O}(n)\)
{-# INLINE windowPowerSum #-}
windowPowerSum :: (Monad m, Num a) => Int -> Fold m (a, Maybe a) a
windowPowerSum :: forall (m :: * -> *) a.
(Monad m, Num a) =>
Int -> Fold m (a, Maybe a) a
windowPowerSum Int
k = (a -> a) -> Fold m (a, Maybe a) a -> Fold m (a, Maybe a) a
forall c a (m :: * -> *) b.
(c -> a) -> Fold m (a, Maybe a) b -> Fold m (c, Maybe c) b
windowLmap (a -> Int -> a
forall a b. (Num a, Integral b) => a -> b -> a
^ Int
k) Fold m (a, Maybe a) a
forall (m :: * -> *) a. (Monad m, Num a) => Fold m (a, Maybe a) a
windowSum

-- | Like 'powerSum' but powers can be negative or fractional. This is slower
-- than 'powerSum' for positive intergal powers.
--
-- >>> powerSumFrac p = lmap (** p) sum
--
{-# INLINE windowPowerSumFrac #-}
windowPowerSumFrac :: (Monad m, Floating a) => a -> Fold m (a, Maybe a) a
windowPowerSumFrac :: forall (m :: * -> *) a.
(Monad m, Floating a) =>
a -> Fold m (a, Maybe a) a
windowPowerSumFrac a
p = (a -> a) -> Fold m (a, Maybe a) a -> Fold m (a, Maybe a) a
forall c a (m :: * -> *) b.
(c -> a) -> Fold m (a, Maybe a) b -> Fold m (c, Maybe c) b
windowLmap (a -> a -> a
forall a. Floating a => a -> a -> a
** a
p) Fold m (a, Maybe a) a
forall (m :: * -> *) a. (Monad m, Num a) => Fold m (a, Maybe a) a
windowSum

-------------------------------------------------------------------------------
-- Location
-------------------------------------------------------------------------------

-- XXX Remove MonadIO constraint

-- | Determine the maximum and minimum in a rolling window.
--
-- If you want to compute the range of the entire stream @Fold.teeWith (,)
-- Fold.maximum Fold.minimum@ would be much faster.
--
-- /Space/: \(\mathcal{O}(n)\) where @n@ is the window size.
--
-- /Time/: \(\mathcal{O}(n*w)\) where \(w\) is the window size.
--
{-# INLINE windowRange #-}
windowRange :: (MonadIO m, Storable a, Ord a) => Int -> Fold m a (Maybe (a, a))
windowRange :: forall (m :: * -> *) a.
(MonadIO m, Storable a, Ord a) =>
Int -> Fold m a (Maybe (a, a))
windowRange Int
n = (Tuple3Fused' (Ring a) (Ptr a) Int
 -> a
 -> m (Step (Tuple3Fused' (Ring a) (Ptr a) Int) (Maybe (a, a))))
-> m (Step (Tuple3Fused' (Ring a) (Ptr a) Int) (Maybe (a, a)))
-> (Tuple3Fused' (Ring a) (Ptr a) Int -> m (Maybe (a, a)))
-> (Tuple3Fused' (Ring a) (Ptr a) Int -> m (Maybe (a, a)))
-> Fold m a (Maybe (a, a))
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> (s -> m b) -> Fold m a b
Fold Tuple3Fused' (Ring a) (Ptr a) Int
-> a -> m (Step (Tuple3Fused' (Ring a) (Ptr a) Int) (Maybe (a, a)))
forall {m :: * -> *} {a} {c} {b}.
(MonadIO m, Storable a, Num c) =>
Tuple3Fused' (Ring a) (Ptr a) c
-> a -> m (Step (Tuple3Fused' (Ring a) (Ptr a) c) b)
step m (Step (Tuple3Fused' (Ring a) (Ptr a) Int) (Maybe (a, a)))
forall {b}. m (Step (Tuple3Fused' (Ring a) (Ptr a) Int) b)
initial Tuple3Fused' (Ring a) (Ptr a) Int -> m (Maybe (a, a))
forall {m :: * -> *} {a}.
(MonadIO m, Storable a, Ord a) =>
Tuple3Fused' (Ring a) (Ptr a) Int -> m (Maybe (a, a))
extract Tuple3Fused' (Ring a) (Ptr a) Int -> m (Maybe (a, a))
forall {m :: * -> *} {a}.
(MonadIO m, Storable a, Ord a) =>
Tuple3Fused' (Ring a) (Ptr a) Int -> m (Maybe (a, a))
extract

    where

    -- XXX Use Ring unfold and then fold for composing maximum and minimum to
    -- get the range.

    initial :: m (Step (Tuple3Fused' (Ring a) (Ptr a) Int) b)
initial =
        if Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0
        then [Char] -> m (Step (Tuple3Fused' (Ring a) (Ptr a) Int) b)
forall a. HasCallStack => [Char] -> a
error [Char]
"range: window size must be > 0"
        else
            let f :: (a, b) -> Step (Tuple3Fused' a b Int) b
f (a
a, b
b) = Tuple3Fused' a b Int -> Step (Tuple3Fused' a b Int) b
forall s b. s -> Step s b
Partial (Tuple3Fused' a b Int -> Step (Tuple3Fused' a b Int) b)
-> Tuple3Fused' a b Int -> Step (Tuple3Fused' a b Int) b
forall a b. (a -> b) -> a -> b
$ a -> b -> Int -> Tuple3Fused' a b Int
forall a b c. a -> b -> c -> Tuple3Fused' a b c
Tuple3Fused' a
a b
b (Int
0 :: Int)
             in ((Ring a, Ptr a) -> Step (Tuple3Fused' (Ring a) (Ptr a) Int) b)
-> m (Ring a, Ptr a)
-> m (Step (Tuple3Fused' (Ring a) (Ptr a) Int) b)
forall a b. (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Ring a, Ptr a) -> Step (Tuple3Fused' (Ring a) (Ptr a) Int) b
forall {a} {b} {b}. (a, b) -> Step (Tuple3Fused' a b Int) b
f (m (Ring a, Ptr a)
 -> m (Step (Tuple3Fused' (Ring a) (Ptr a) Int) b))
-> m (Ring a, Ptr a)
-> m (Step (Tuple3Fused' (Ring a) (Ptr a) Int) b)
forall a b. (a -> b) -> a -> b
$ IO (Ring a, Ptr a) -> m (Ring a, Ptr a)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Ring a, Ptr a) -> m (Ring a, Ptr a))
-> IO (Ring a, Ptr a) -> m (Ring a, Ptr a)
forall a b. (a -> b) -> a -> b
$ Int -> IO (Ring a, Ptr a)
forall a. Storable a => Int -> IO (Ring a, Ptr a)
Ring.new Int
n

    step :: Tuple3Fused' (Ring a) (Ptr a) c
-> a -> m (Step (Tuple3Fused' (Ring a) (Ptr a) c) b)
step (Tuple3Fused' Ring a
rb Ptr a
rh c
i) a
a = do
        Ptr a
rh1 <- IO (Ptr a) -> m (Ptr a)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Ptr a) -> m (Ptr a)) -> IO (Ptr a) -> m (Ptr a)
forall a b. (a -> b) -> a -> b
$ Ring a -> Ptr a -> a -> IO (Ptr a)
forall a. Storable a => Ring a -> Ptr a -> a -> IO (Ptr a)
Ring.unsafeInsert Ring a
rb Ptr a
rh a
a
        Step (Tuple3Fused' (Ring a) (Ptr a) c) b
-> m (Step (Tuple3Fused' (Ring a) (Ptr a) c) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Tuple3Fused' (Ring a) (Ptr a) c) b
 -> m (Step (Tuple3Fused' (Ring a) (Ptr a) c) b))
-> Step (Tuple3Fused' (Ring a) (Ptr a) c) b
-> m (Step (Tuple3Fused' (Ring a) (Ptr a) c) b)
forall a b. (a -> b) -> a -> b
$ Tuple3Fused' (Ring a) (Ptr a) c
-> Step (Tuple3Fused' (Ring a) (Ptr a) c) b
forall s b. s -> Step s b
Partial (Tuple3Fused' (Ring a) (Ptr a) c
 -> Step (Tuple3Fused' (Ring a) (Ptr a) c) b)
-> Tuple3Fused' (Ring a) (Ptr a) c
-> Step (Tuple3Fused' (Ring a) (Ptr a) c) b
forall a b. (a -> b) -> a -> b
$ Ring a -> Ptr a -> c -> Tuple3Fused' (Ring a) (Ptr a) c
forall a b c. a -> b -> c -> Tuple3Fused' a b c
Tuple3Fused' Ring a
rb Ptr a
rh1 (c
i c -> c -> c
forall a. Num a => a -> a -> a
+ c
1)

    -- XXX We need better Ring array APIs so that we can unfold the ring to a
    -- stream and fold the stream using a fold of our choice.
    --
    -- We could just scan the stream to get a stream of ring buffers and then
    -- map required folds over those, but we need to be careful that all those
    -- rings refer to the same mutable ring, therefore, downstream needs to
    -- process those strictly before it can change.
    foldFunc :: Int -> Ptr a -> (b -> a -> m b) -> b -> Ring a -> m b
foldFunc Int
i
        | Int
i Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
n = Ptr a -> (b -> a -> m b) -> b -> Ring a -> m b
forall (m :: * -> *) a b.
(MonadIO m, Storable a) =>
Ptr a -> (b -> a -> m b) -> b -> Ring a -> m b
Ring.unsafeFoldRingM
        | Bool
otherwise = Ptr a -> (b -> a -> m b) -> b -> Ring a -> m b
forall (m :: * -> *) a b.
(MonadIO m, Storable a) =>
Ptr a -> (b -> a -> m b) -> b -> Ring a -> m b
Ring.unsafeFoldRingFullM

    extract :: Tuple3Fused' (Ring a) (Ptr a) Int -> m (Maybe (a, a))
extract (Tuple3Fused' Ring a
rb Ptr a
rh Int
i) =
        if Int
i Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
        then Maybe (a, a) -> m (Maybe (a, a))
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (a, a)
forall a. Maybe a
Nothing
        else do
            -- Here we use "ringStart" over "ringHead" as "ringHead" will be
            -- uninitialized if the ring is not full.
            -- Using "unsafeForeignPtrToPtr" here is safe as we touch the ring
            -- again in "foldFunc".
            a
x <- IO a -> m a
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO a -> m a) -> IO a -> m a
forall a b. (a -> b) -> a -> b
$ Ptr a -> IO a
forall a. Storable a => Ptr a -> IO a
peek (ForeignPtr a -> Ptr a
forall a. ForeignPtr a -> Ptr a
unsafeForeignPtrToPtr (Ring a -> ForeignPtr a
forall a. Ring a -> ForeignPtr a
Ring.ringStart Ring a
rb))
            let accum :: (b, b) -> b -> m (b, b)
accum (b
mn, b
mx) b
a = (b, b) -> m (b, b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> b -> b
forall a. Ord a => a -> a -> a
min b
mn b
a, b -> b -> b
forall a. Ord a => a -> a -> a
max b
mx b
a)
            ((a, a) -> Maybe (a, a)) -> m (a, a) -> m (Maybe (a, a))
forall a b. (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (a, a) -> Maybe (a, a)
forall a. a -> Maybe a
Just (m (a, a) -> m (Maybe (a, a))) -> m (a, a) -> m (Maybe (a, a))
forall a b. (a -> b) -> a -> b
$ Int
-> Ptr a
-> ((a, a) -> a -> m (a, a))
-> (a, a)
-> Ring a
-> m (a, a)
forall {m :: * -> *} {a} {b}.
(MonadIO m, Storable a) =>
Int -> Ptr a -> (b -> a -> m b) -> b -> Ring a -> m b
foldFunc Int
i Ptr a
rh (a, a) -> a -> m (a, a)
forall {m :: * -> *} {b}.
(Monad m, Ord b) =>
(b, b) -> b -> m (b, b)
accum (a
x, a
x) Ring a
rb

-- | Find the minimum element in a rolling window.
--
-- This implementation traverses the entire window buffer to compute the
-- minimum whenever we demand it.  It performs better than the dequeue based
-- implementation in @streamly-statistics@ package when the window size is
-- small (< 30).
--
-- If you want to compute the minimum of the entire stream
-- 'Streamly.Data.Fold.minimum' is much faster.
--
-- /Time/: \(\mathcal{O}(n*w)\) where \(w\) is the window size.
--
{-# INLINE windowMinimum #-}
windowMinimum :: (MonadIO m, Storable a, Ord a) => Int -> Fold m a (Maybe a)
windowMinimum :: forall (m :: * -> *) a.
(MonadIO m, Storable a, Ord a) =>
Int -> Fold m a (Maybe a)
windowMinimum Int
n = (Maybe (a, a) -> Maybe a)
-> Fold m a (Maybe (a, a)) -> Fold m a (Maybe a)
forall a b. (a -> b) -> Fold m a a -> Fold m a b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (((a, a) -> a) -> Maybe (a, a) -> Maybe a
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (a, a) -> a
forall a b. (a, b) -> a
fst) (Fold m a (Maybe (a, a)) -> Fold m a (Maybe a))
-> Fold m a (Maybe (a, a)) -> Fold m a (Maybe a)
forall a b. (a -> b) -> a -> b
$ Int -> Fold m a (Maybe (a, a))
forall (m :: * -> *) a.
(MonadIO m, Storable a, Ord a) =>
Int -> Fold m a (Maybe (a, a))
windowRange Int
n

-- | The maximum element in a rolling window.
--
-- See the performance related comments in 'minimum'.
--
-- If you want to compute the maximum of the entire stream 'Fold.maximum' would
-- be much faster.
--
-- /Time/: \(\mathcal{O}(n*w)\) where \(w\) is the window size.
--
{-# INLINE windowMaximum #-}
windowMaximum :: (MonadIO m, Storable a, Ord a) => Int -> Fold m a (Maybe a)
windowMaximum :: forall (m :: * -> *) a.
(MonadIO m, Storable a, Ord a) =>
Int -> Fold m a (Maybe a)
windowMaximum Int
n = (Maybe (a, a) -> Maybe a)
-> Fold m a (Maybe (a, a)) -> Fold m a (Maybe a)
forall a b. (a -> b) -> Fold m a a -> Fold m a b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (((a, a) -> a) -> Maybe (a, a) -> Maybe a
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (a, a) -> a
forall a b. (a, b) -> b
snd) (Fold m a (Maybe (a, a)) -> Fold m a (Maybe a))
-> Fold m a (Maybe (a, a)) -> Fold m a (Maybe a)
forall a b. (a -> b) -> a -> b
$ Int -> Fold m a (Maybe (a, a))
forall (m :: * -> *) a.
(MonadIO m, Storable a, Ord a) =>
Int -> Fold m a (Maybe (a, a))
windowRange Int
n

-- | Arithmetic mean of elements in a sliding window:
--
-- \(\mu = \frac{\sum_{i=1}^n x_{i}}{n}\)
--
-- This is also known as the Simple Moving Average (SMA) when used in the
-- sliding window and Cumulative Moving Avergae (CMA) when used on the entire
-- stream.
--
-- >>> mean = Fold.teeWith (/) sum length
--
-- /Space/: \(\mathcal{O}(1)\)
--
-- /Time/: \(\mathcal{O}(n)\)
{-# INLINE windowMean #-}
windowMean :: forall m a. (Monad m, Fractional a) => Fold m (a, Maybe a) a
windowMean :: forall (m :: * -> *) a.
(Monad m, Fractional a) =>
Fold m (a, Maybe a) a
windowMean = (a -> a -> a)
-> Fold m (a, Maybe a) a
-> Fold m (a, Maybe a) a
-> Fold m (a, Maybe a) a
forall (m :: * -> *) a b c x.
Monad m =>
(a -> b -> c) -> Fold m x a -> Fold m x b -> Fold m x c
Fold.teeWith a -> a -> a
forall a. Fractional a => a -> a -> a
(/) Fold m (a, Maybe a) a
forall (m :: * -> *) a. (Monad m, Num a) => Fold m (a, Maybe a) a
windowSum Fold m (a, Maybe a) a
forall (m :: * -> *) b a. (Monad m, Num b) => Fold m (a, Maybe a) b
windowLength