module Streamly.Internal.Data.Stream.Time
    (
    
    
    
      periodic 
    , ticks 
    , ticksRate
    , interject
    
    , takeInterval
    , takeLastInterval
    , dropInterval
    , dropLastInterval
    
    , intervalsOf
    , groupsOfTimeout
    
    , sampleIntervalEnd
    , sampleIntervalStart
    , sampleBurst
    , sampleBurstEnd
    , sampleBurstStart
    
    , classifySessionsByGeneric
    , classifySessionsBy
    , classifySessionsOf
    , classifyKeepAliveSessions
    
    
    
    
    
    , bufferLatest
    , bufferLatestN
    , bufferOldestN
    )
where
import Control.Concurrent (threadDelay)
import Control.Exception (assert)
import Control.Monad.IO.Class (MonadIO(..))
import Data.Heap (Entry(..))
import Data.Kind (Type)
import Data.Map (Map)
import Data.Maybe (isNothing)
import Data.Proxy (Proxy(..))
import Streamly.Data.Fold (Fold)
import Streamly.Internal.Data.Fold (Fold (..))
import Streamly.Internal.Data.IsMap (IsMap(..))
import Streamly.Internal.Data.Stream (Stream)
import Streamly.Internal.Data.Time.Units
    ( AbsTime
    , MilliSecond64(..)
    , addToAbsTime
    , toAbsTime
    , toRelTime
    )
import Streamly.Internal.Data.Time.Units (NanoSecond64(..), toRelTime64)
import qualified Data.Heap as H
import qualified Streamly.Data.Fold as Fold
import qualified Streamly.Data.Stream as Stream
import qualified Streamly.Data.Unfold as Unfold
import qualified Streamly.Internal.Data.Fold as Fold (Step(..))
import qualified Streamly.Internal.Data.IsMap as IsMap
import qualified Streamly.Internal.Data.Stream as Stream
    ( scanlMAfter'
    , timeIndexed
    , timestamped
    )
import Streamly.Internal.Data.Stream.Concurrent
{-# INLINE periodic #-}
periodic :: MonadIO m => m a -> Double -> Stream m a
periodic :: forall (m :: * -> *) a. MonadIO m => m a -> Double -> Stream m a
periodic m a
action Double
n = m a -> Stream m a
forall (m :: * -> *) a. Monad m => m a -> Stream m a
Stream.repeatM m a
timed
    where
    timed :: m a
timed = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Int -> IO ()
threadDelay (Double -> Int
forall a b. (RealFrac a, Integral b) => a -> b
round (Double -> Int) -> Double -> Int
forall a b. (a -> b) -> a -> b
$ Double
n Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
1000000)) m () -> m a -> m a
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> m a
action
{-# INLINE ticks #-}
ticks :: MonadIO m => Double -> Stream m ()
ticks :: forall (m :: * -> *). MonadIO m => Double -> Stream m ()
ticks = m () -> Double -> Stream m ()
forall (m :: * -> *) a. MonadIO m => m a -> Double -> Stream m a
periodic (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
{-# INLINE ticksRate #-}
ticksRate :: MonadAsync m => Rate -> Stream m ()
ticksRate :: forall (m :: * -> *). MonadAsync m => Rate -> Stream m ()
ticksRate Rate
r = (Config -> Config) -> Stream m () -> Stream m ()
forall (m :: * -> *) a.
MonadAsync m =>
(Config -> Config) -> Stream m a -> Stream m a
parEval (Maybe Rate -> Config -> Config
rate (Rate -> Maybe Rate
forall a. a -> Maybe a
Just Rate
r)) (Stream m () -> Stream m ()) -> Stream m () -> Stream m ()
forall a b. (a -> b) -> a -> b
$ m () -> Stream m ()
forall (m :: * -> *) a. Monad m => m a -> Stream m a
Stream.repeatM (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
{-# INLINE interject #-}
interject :: MonadAsync m => m a -> Double -> Stream m a -> Stream m a
interject :: forall (m :: * -> *) a.
MonadAsync m =>
m a -> Double -> Stream m a -> Stream m a
interject m a
f Double
n Stream m a
xs = [Stream m a] -> Stream m a
forall (m :: * -> *) a. MonadAsync m => [Stream m a] -> Stream m a
parListEagerFst [Stream m a
xs, m a -> Double -> Stream m a
forall (m :: * -> *) a. MonadIO m => m a -> Double -> Stream m a
periodic m a
f Double
n]
{-# INLINE takeInterval #-}
takeInterval :: MonadAsync m => Double -> Stream m a -> Stream m a
takeInterval :: forall (m :: * -> *) a.
MonadAsync m =>
Double -> Stream m a -> Stream m a
takeInterval Double
d =
    Stream m (Maybe a) -> Stream m a
forall (m :: * -> *) a. Monad m => Stream m (Maybe a) -> Stream m a
Stream.catMaybes
        (Stream m (Maybe a) -> Stream m a)
-> (Stream m a -> Stream m (Maybe a)) -> Stream m a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Maybe a -> Bool) -> Stream m (Maybe a) -> Stream m (Maybe a)
forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m a
Stream.takeWhile Maybe a -> Bool
forall a. Maybe a -> Bool
isNothing
        (Stream m (Maybe a) -> Stream m (Maybe a))
-> (Stream m a -> Stream m (Maybe a))
-> Stream m a
-> Stream m (Maybe a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m (Maybe a) -> Double -> Stream m (Maybe a) -> Stream m (Maybe a)
forall (m :: * -> *) a.
MonadAsync m =>
m a -> Double -> Stream m a -> Stream m a
interject (Maybe a -> m (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing) Double
d (Stream m (Maybe a) -> Stream m (Maybe a))
-> (Stream m a -> Stream m (Maybe a))
-> Stream m a
-> Stream m (Maybe a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> Maybe a) -> Stream m a -> Stream m (Maybe a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> Maybe a
forall a. a -> Maybe a
Just
{-# INLINE takeLastInterval #-}
takeLastInterval :: 
    Double -> Stream m a -> Stream m a
takeLastInterval :: forall (m :: * -> *) a. Double -> Stream m a -> Stream m a
takeLastInterval = Double -> Stream m a -> Stream m a
forall a. HasCallStack => a
undefined
{-# INLINE dropInterval #-}
dropInterval :: MonadAsync m => Double -> Stream m a -> Stream m a
dropInterval :: forall (m :: * -> *) a.
MonadAsync m =>
Double -> Stream m a -> Stream m a
dropInterval Double
d =
    Stream m (Maybe a) -> Stream m a
forall (m :: * -> *) a. Monad m => Stream m (Maybe a) -> Stream m a
Stream.catMaybes
        (Stream m (Maybe a) -> Stream m a)
-> (Stream m a -> Stream m (Maybe a)) -> Stream m a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Maybe a -> Bool) -> Stream m (Maybe a) -> Stream m (Maybe a)
forall (m :: * -> *) a.
Monad m =>
(a -> Bool) -> Stream m a -> Stream m a
Stream.dropWhile Maybe a -> Bool
forall a. Maybe a -> Bool
isNothing
        (Stream m (Maybe a) -> Stream m (Maybe a))
-> (Stream m a -> Stream m (Maybe a))
-> Stream m a
-> Stream m (Maybe a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m (Maybe a) -> Double -> Stream m (Maybe a) -> Stream m (Maybe a)
forall (m :: * -> *) a.
MonadAsync m =>
m a -> Double -> Stream m a -> Stream m a
interject (Maybe a -> m (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing) Double
d (Stream m (Maybe a) -> Stream m (Maybe a))
-> (Stream m a -> Stream m (Maybe a))
-> Stream m a
-> Stream m (Maybe a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> Maybe a) -> Stream m a -> Stream m (Maybe a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> Maybe a
forall a. a -> Maybe a
Just
{-# INLINE dropLastInterval #-}
dropLastInterval :: 
    Int -> Stream m a -> Stream m a
dropLastInterval :: forall (m :: * -> *) a. Int -> Stream m a -> Stream m a
dropLastInterval = Int -> Stream m a -> Stream m a
forall a. HasCallStack => a
undefined
{-# INLINE intervalsOf #-}
intervalsOf :: MonadAsync m => Double -> Fold m a b -> Stream m a -> Stream m b
intervalsOf :: forall (m :: * -> *) a b.
MonadAsync m =>
Double -> Fold m a b -> Stream m a -> Stream m b
intervalsOf Double
n Fold m a b
f Stream m a
xs =
    Fold m (Maybe a) b -> Stream m (Maybe a) -> Stream m b
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> Stream m b
Stream.foldMany
        ((Maybe a -> Bool) -> Fold m (Maybe a) b -> Fold m (Maybe a) b
forall (m :: * -> *) a b.
Monad m =>
(a -> Bool) -> Fold m a b -> Fold m a b
Fold.takeEndBy Maybe a -> Bool
forall a. Maybe a -> Bool
isNothing (Fold m a b -> Fold m (Maybe a) b
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Fold m (Maybe a) b
Fold.catMaybes Fold m a b
f))
        (m (Maybe a) -> Double -> Stream m (Maybe a) -> Stream m (Maybe a)
forall (m :: * -> *) a.
MonadAsync m =>
m a -> Double -> Stream m a -> Stream m a
interject (Maybe a -> m (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing) Double
n ((a -> Maybe a) -> Stream m a -> Stream m (Maybe a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> Maybe a
forall a. a -> Maybe a
Just Stream m a
xs))
{-# INLINE groupsOfTimeout #-}
groupsOfTimeout :: MonadAsync m
    => Int -> Double -> Fold m a b -> Stream m a -> Stream m b
groupsOfTimeout :: forall (m :: * -> *) a b.
MonadAsync m =>
Int -> Double -> Fold m a b -> Stream m a -> Stream m b
groupsOfTimeout Int
n Double
timeout Fold m a b
f =
      (((), b) -> b) -> Stream m ((), b) -> Stream m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((), b) -> b
forall a b. (a, b) -> b
snd
    (Stream m ((), b) -> Stream m b)
-> (Stream m a -> Stream m ((), b)) -> Stream m a -> Stream m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Double
-> Bool
-> (Int -> m Bool)
-> Double
-> Fold m a b
-> Stream m (AbsTime, ((), a))
-> Stream m ((), b)
forall (m :: * -> *) k a b.
(MonadAsync m, Ord k) =>
Double
-> Bool
-> (Int -> m Bool)
-> Double
-> Fold m a b
-> Stream m (AbsTime, (k, a))
-> Stream m (k, b)
classifySessionsBy
        Double
0.1 Bool
False (m Bool -> Int -> m Bool
forall a b. a -> b -> a
const (Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False)) Double
timeout (Int -> Fold m a b -> Fold m a b
forall (m :: * -> *) a b.
Monad m =>
Int -> Fold m a b -> Fold m a b
Fold.take Int
n Fold m a b
f)
    (Stream m (AbsTime, ((), a)) -> Stream m ((), b))
-> (Stream m a -> Stream m (AbsTime, ((), a)))
-> Stream m a
-> Stream m ((), b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream m ((), a) -> Stream m (AbsTime, ((), a))
forall (m :: * -> *) a.
MonadIO m =>
Stream m a -> Stream m (AbsTime, a)
Stream.timestamped
    (Stream m ((), a) -> Stream m (AbsTime, ((), a)))
-> (Stream m a -> Stream m ((), a))
-> Stream m a
-> Stream m (AbsTime, ((), a))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> ((), a)) -> Stream m a -> Stream m ((), a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((),)
data SessionState t m f s b = SessionState
    { forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> AbsTime
sessionCurTime :: !AbsTime  
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> AbsTime
sessionEventTime :: !AbsTime 
    
    
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> Int
sessionCount :: !Int 
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> Heap (Entry AbsTime (Key f))
sessionTimerHeap :: H.Heap (H.Entry AbsTime (Key f)) 
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> f s
sessionKeyValueMap :: f s 
    , forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> t m (Key f, b)
sessionOutputStream :: t (m :: Type -> Type) (Key f, b) 
    }
data SessionEntry s = LiveSession !AbsTime !s | ZombieSession
ejectEntry :: (Monad m, IsMap f) =>
    (acc -> m b)
    -> heap
    -> f entry
    -> Stream m (Key f, b)
    -> Int
    -> acc
    -> Key f
    -> m (heap, f entry, Stream m (Key f, b), Int)
ejectEntry :: forall (m :: * -> *) (f :: * -> *) acc b heap entry.
(Monad m, IsMap f) =>
(acc -> m b)
-> heap
-> f entry
-> Stream m (Key f, b)
-> Int
-> acc
-> Key f
-> m (heap, f entry, Stream m (Key f, b), Int)
ejectEntry acc -> m b
extract heap
hp f entry
mp Stream m (Key f, b)
out Int
cnt acc
acc Key f
key = do
    b
sess <- acc -> m b
extract acc
acc
    let out1 :: Stream m (Key f, b)
out1 = (Key f, b) -> Stream m (Key f, b) -> Stream m (Key f, b)
forall (m :: * -> *) a.
Applicative m =>
a -> Stream m a -> Stream m a
Stream.cons (Key f
key, b
sess) Stream m (Key f, b)
out
    let mp1 :: f entry
mp1 = Key f -> f entry -> f entry
forall (f :: * -> *) a. IsMap f => Key f -> f a -> f a
IsMap.mapDelete Key f
key f entry
mp
    (heap, f entry, Stream m (Key f, b), Int)
-> m (heap, f entry, Stream m (Key f, b), Int)
forall (m :: * -> *) a. Monad m => a -> m a
return (heap
hp, f entry
mp1, Stream m (Key f, b)
out1, Int
cnt Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)
{-# NOINLINE flush #-}
flush :: (Monad m, IsMap f) =>
       (s -> m b)
    -> SessionState Stream m f (SessionEntry s) b
    -> m (SessionState Stream m f (SessionEntry s) b)
flush :: forall (m :: * -> *) (f :: * -> *) s b.
(Monad m, IsMap f) =>
(s -> m b)
-> SessionState Stream m f (SessionEntry s) b
-> m (SessionState Stream m f (SessionEntry s) b)
flush s -> m b
extract session :: SessionState Stream m f (SessionEntry s) b
session@SessionState{f (SessionEntry s)
Int
Heap (Entry AbsTime (Key f))
Stream m (Key f, b)
AbsTime
sessionOutputStream :: Stream m (Key f, b)
sessionKeyValueMap :: f (SessionEntry s)
sessionTimerHeap :: Heap (Entry AbsTime (Key f))
sessionCount :: Int
sessionEventTime :: AbsTime
sessionCurTime :: AbsTime
sessionOutputStream :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> t m (Key f, b)
sessionKeyValueMap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> f s
sessionTimerHeap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> Heap (Entry AbsTime (Key f))
sessionCount :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> Int
sessionEventTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> AbsTime
sessionCurTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> AbsTime
..} = do
    (Heap (Entry AbsTime (Key f))
hp', f (SessionEntry s)
mp', Stream m (Key f, b)
out, Int
count) <-
        (Heap (Entry AbsTime (Key f)), f (SessionEntry s),
 Stream m (Key f, b), Int)
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry s),
      Stream m (Key f, b), Int)
forall {f :: * -> *} {p}.
IsMap f =>
(Heap (Entry p (Key f)), f (SessionEntry s), Stream m (Key f, b),
 Int)
-> m (Heap (Entry p (Key f)), f (SessionEntry s),
      Stream m (Key f, b), Int)
ejectAll
            ( Heap (Entry AbsTime (Key f))
sessionTimerHeap
            , f (SessionEntry s)
sessionKeyValueMap
            , Stream m (Key f, b)
forall (m :: * -> *) a. Applicative m => Stream m a
Stream.nil
            , Int
sessionCount
            )
    SessionState Stream m f (SessionEntry s) b
-> m (SessionState Stream m f (SessionEntry s) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (SessionState Stream m f (SessionEntry s) b
 -> m (SessionState Stream m f (SessionEntry s) b))
-> SessionState Stream m f (SessionEntry s) b
-> m (SessionState Stream m f (SessionEntry s) b)
forall a b. (a -> b) -> a -> b
$ SessionState Stream m f (SessionEntry s) b
session
        { sessionCount :: Int
sessionCount = Int
count
        , sessionTimerHeap :: Heap (Entry AbsTime (Key f))
sessionTimerHeap = Heap (Entry AbsTime (Key f))
hp'
        , sessionKeyValueMap :: f (SessionEntry s)
sessionKeyValueMap = f (SessionEntry s)
mp'
        , sessionOutputStream :: Stream m (Key f, b)
sessionOutputStream = Stream m (Key f, b)
out
        }
    where
    ejectAll :: (Heap (Entry p (Key f)), f (SessionEntry s), Stream m (Key f, b),
 Int)
-> m (Heap (Entry p (Key f)), f (SessionEntry s),
      Stream m (Key f, b), Int)
ejectAll (Heap (Entry p (Key f))
hp, f (SessionEntry s)
mp, Stream m (Key f, b)
out, !Int
cnt) = do
        let hres :: Maybe (Entry p (Key f), Heap (Entry p (Key f)))
hres = Heap (Entry p (Key f))
-> Maybe (Entry p (Key f), Heap (Entry p (Key f)))
forall a. Heap a -> Maybe (a, Heap a)
H.uncons Heap (Entry p (Key f))
hp
        case Maybe (Entry p (Key f), Heap (Entry p (Key f)))
hres of
            Just (Entry p
_ Key f
key, Heap (Entry p (Key f))
hp1) -> do
                (Heap (Entry p (Key f)), f (SessionEntry s), Stream m (Key f, b),
 Int)
r <- case Key f -> f (SessionEntry s) -> Maybe (SessionEntry s)
forall (f :: * -> *) a. IsMap f => Key f -> f a -> Maybe a
IsMap.mapLookup Key f
key f (SessionEntry s)
mp of
                    Maybe (SessionEntry s)
Nothing -> (Heap (Entry p (Key f)), f (SessionEntry s), Stream m (Key f, b),
 Int)
-> m (Heap (Entry p (Key f)), f (SessionEntry s),
      Stream m (Key f, b), Int)
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry p (Key f))
hp1, f (SessionEntry s)
mp, Stream m (Key f, b)
out, Int
cnt)
                    Just SessionEntry s
ZombieSession ->
                        (Heap (Entry p (Key f)), f (SessionEntry s), Stream m (Key f, b),
 Int)
-> m (Heap (Entry p (Key f)), f (SessionEntry s),
      Stream m (Key f, b), Int)
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry p (Key f))
hp1, Key f -> f (SessionEntry s) -> f (SessionEntry s)
forall (f :: * -> *) a. IsMap f => Key f -> f a -> f a
IsMap.mapDelete Key f
key f (SessionEntry s)
mp, Stream m (Key f, b)
out, Int
cnt)
                    Just (LiveSession AbsTime
_ s
acc) ->
                        (s -> m b)
-> Heap (Entry p (Key f))
-> f (SessionEntry s)
-> Stream m (Key f, b)
-> Int
-> s
-> Key f
-> m (Heap (Entry p (Key f)), f (SessionEntry s),
      Stream m (Key f, b), Int)
forall (m :: * -> *) (f :: * -> *) acc b heap entry.
(Monad m, IsMap f) =>
(acc -> m b)
-> heap
-> f entry
-> Stream m (Key f, b)
-> Int
-> acc
-> Key f
-> m (heap, f entry, Stream m (Key f, b), Int)
ejectEntry s -> m b
extract Heap (Entry p (Key f))
hp1 f (SessionEntry s)
mp Stream m (Key f, b)
out Int
cnt s
acc Key f
key
                (Heap (Entry p (Key f)), f (SessionEntry s), Stream m (Key f, b),
 Int)
-> m (Heap (Entry p (Key f)), f (SessionEntry s),
      Stream m (Key f, b), Int)
ejectAll (Heap (Entry p (Key f)), f (SessionEntry s), Stream m (Key f, b),
 Int)
r
            Maybe (Entry p (Key f), Heap (Entry p (Key f)))
Nothing -> do
                Bool -> m () -> m ()
forall a. HasCallStack => Bool -> a -> a
assert (f (SessionEntry s) -> Bool
forall (f :: * -> *) a. IsMap f => f a -> Bool
IsMap.mapNull f (SessionEntry s)
mp) (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
                (Heap (Entry p (Key f)), f (SessionEntry s), Stream m (Key f, b),
 Int)
-> m (Heap (Entry p (Key f)), f (SessionEntry s),
      Stream m (Key f, b), Int)
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry p (Key f))
hp, f (SessionEntry s)
mp, Stream m (Key f, b)
out, Int
cnt)
{-# NOINLINE ejectOne #-}
ejectOne :: (IsMap f, Monad m) =>
       Bool
    -> (acc -> m b)
    -> ( H.Heap (Entry AbsTime (Key f))
       , f (SessionEntry acc)
       , Stream m (Key f, b)
       , Int
       )
    -> m ( H.Heap (Entry AbsTime (Key f))
         , f (SessionEntry acc)
         , Stream m (Key f, b), Int
         )
ejectOne :: forall (f :: * -> *) (m :: * -> *) acc b.
(IsMap f, Monad m) =>
Bool
-> (acc -> m b)
-> (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
    Stream m (Key f, b), Int)
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
      Stream m (Key f, b), Int)
ejectOne Bool
reset acc -> m b
extract = (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
 Stream m (Key f, b), Int)
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
      Stream m (Key f, b), Int)
forall {f :: * -> *}.
IsMap f =>
(Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
 Stream m (Key f, b), Int)
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
      Stream m (Key f, b), Int)
go
    where
    go :: (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
 Stream m (Key f, b), Int)
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
      Stream m (Key f, b), Int)
go (Heap (Entry AbsTime (Key f))
hp, f (SessionEntry acc)
mp, Stream m (Key f, b)
out, !Int
cnt) = do
        let hres :: Maybe (Entry AbsTime (Key f), Heap (Entry AbsTime (Key f)))
hres = Heap (Entry AbsTime (Key f))
-> Maybe (Entry AbsTime (Key f), Heap (Entry AbsTime (Key f)))
forall a. Heap a -> Maybe (a, Heap a)
H.uncons Heap (Entry AbsTime (Key f))
hp
        case Maybe (Entry AbsTime (Key f), Heap (Entry AbsTime (Key f)))
hres of
            Just (Entry AbsTime
expiry Key f
key, Heap (Entry AbsTime (Key f))
hp1) ->
                case Key f -> f (SessionEntry acc) -> Maybe (SessionEntry acc)
forall (f :: * -> *) a. IsMap f => Key f -> f a -> Maybe a
IsMap.mapLookup Key f
key f (SessionEntry acc)
mp of
                    Maybe (SessionEntry acc)
Nothing -> (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
 Stream m (Key f, b), Int)
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
      Stream m (Key f, b), Int)
go (Heap (Entry AbsTime (Key f))
hp1, f (SessionEntry acc)
mp, Stream m (Key f, b)
out, Int
cnt)
                    Just SessionEntry acc
ZombieSession ->
                        (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
 Stream m (Key f, b), Int)
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
      Stream m (Key f, b), Int)
go (Heap (Entry AbsTime (Key f))
hp1, Key f -> f (SessionEntry acc) -> f (SessionEntry acc)
forall (f :: * -> *) a. IsMap f => Key f -> f a -> f a
IsMap.mapDelete Key f
key f (SessionEntry acc)
mp, Stream m (Key f, b)
out, Int
cnt)
                    Just (LiveSession AbsTime
expiry1 acc
acc) -> do
                        if Bool -> Bool
not Bool
reset Bool -> Bool -> Bool
|| AbsTime
expiry1 AbsTime -> AbsTime -> Bool
forall a. Ord a => a -> a -> Bool
<= AbsTime
expiry
                        then (acc -> m b)
-> Heap (Entry AbsTime (Key f))
-> f (SessionEntry acc)
-> Stream m (Key f, b)
-> Int
-> acc
-> Key f
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
      Stream m (Key f, b), Int)
forall (m :: * -> *) (f :: * -> *) acc b heap entry.
(Monad m, IsMap f) =>
(acc -> m b)
-> heap
-> f entry
-> Stream m (Key f, b)
-> Int
-> acc
-> Key f
-> m (heap, f entry, Stream m (Key f, b), Int)
ejectEntry acc -> m b
extract Heap (Entry AbsTime (Key f))
hp1 f (SessionEntry acc)
mp Stream m (Key f, b)
out Int
cnt acc
acc Key f
key
                        else
                            
                            let hp2 :: Heap (Entry AbsTime (Key f))
hp2 = Entry AbsTime (Key f)
-> Heap (Entry AbsTime (Key f)) -> Heap (Entry AbsTime (Key f))
forall a. Ord a => a -> Heap a -> Heap a
H.insert (AbsTime -> Key f -> Entry AbsTime (Key f)
forall p a. p -> a -> Entry p a
Entry AbsTime
expiry1 Key f
key) Heap (Entry AbsTime (Key f))
hp1
                            in (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
 Stream m (Key f, b), Int)
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
      Stream m (Key f, b), Int)
go (Heap (Entry AbsTime (Key f))
hp2, f (SessionEntry acc)
mp, Stream m (Key f, b)
out, Int
cnt)
            Maybe (Entry AbsTime (Key f), Heap (Entry AbsTime (Key f)))
Nothing -> do
                Bool -> m () -> m ()
forall a. HasCallStack => Bool -> a -> a
assert (f (SessionEntry acc) -> Bool
forall (f :: * -> *) a. IsMap f => f a -> Bool
IsMap.mapNull f (SessionEntry acc)
mp) (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
                (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
 Stream m (Key f, b), Int)
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
      Stream m (Key f, b), Int)
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry AbsTime (Key f))
hp, f (SessionEntry acc)
mp, Stream m (Key f, b)
out, Int
cnt)
{-# NOINLINE ejectExpired #-}
ejectExpired :: (IsMap f, Monad m) =>
       Bool
    -> (Int -> m Bool)
    -> (acc -> m b)
    -> SessionState Stream m f (SessionEntry acc) b
    -> AbsTime
    -> m (SessionState Stream m f (SessionEntry acc) b)
ejectExpired :: forall (f :: * -> *) (m :: * -> *) acc b.
(IsMap f, Monad m) =>
Bool
-> (Int -> m Bool)
-> (acc -> m b)
-> SessionState Stream m f (SessionEntry acc) b
-> AbsTime
-> m (SessionState Stream m f (SessionEntry acc) b)
ejectExpired Bool
reset Int -> m Bool
ejectPred acc -> m b
extract session :: SessionState Stream m f (SessionEntry acc) b
session@SessionState{f (SessionEntry acc)
Int
Heap (Entry AbsTime (Key f))
Stream m (Key f, b)
AbsTime
sessionOutputStream :: Stream m (Key f, b)
sessionKeyValueMap :: f (SessionEntry acc)
sessionTimerHeap :: Heap (Entry AbsTime (Key f))
sessionCount :: Int
sessionEventTime :: AbsTime
sessionCurTime :: AbsTime
sessionOutputStream :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> t m (Key f, b)
sessionKeyValueMap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> f s
sessionTimerHeap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> Heap (Entry AbsTime (Key f))
sessionCount :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> Int
sessionEventTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> AbsTime
sessionCurTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> AbsTime
..} AbsTime
curTime = do
    (Heap (Entry AbsTime (Key f))
hp', f (SessionEntry acc)
mp', Stream m (Key f, b)
out, Int
count) <-
        Heap (Entry AbsTime (Key f))
-> f (SessionEntry acc)
-> Stream m (Key f, b)
-> Int
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
      Stream m (Key f, b), Int)
forall {f :: * -> *}.
IsMap f =>
Heap (Entry AbsTime (Key f))
-> f (SessionEntry acc)
-> Stream m (Key f, b)
-> Int
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
      Stream m (Key f, b), Int)
ejectLoop
            Heap (Entry AbsTime (Key f))
sessionTimerHeap f (SessionEntry acc)
sessionKeyValueMap Stream m (Key f, b)
forall (m :: * -> *) a. Applicative m => Stream m a
Stream.nil Int
sessionCount
    SessionState Stream m f (SessionEntry acc) b
-> m (SessionState Stream m f (SessionEntry acc) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (SessionState Stream m f (SessionEntry acc) b
 -> m (SessionState Stream m f (SessionEntry acc) b))
-> SessionState Stream m f (SessionEntry acc) b
-> m (SessionState Stream m f (SessionEntry acc) b)
forall a b. (a -> b) -> a -> b
$ SessionState Stream m f (SessionEntry acc) b
session
        { sessionCurTime :: AbsTime
sessionCurTime = AbsTime
curTime
        , sessionCount :: Int
sessionCount = Int
count
        , sessionTimerHeap :: Heap (Entry AbsTime (Key f))
sessionTimerHeap = Heap (Entry AbsTime (Key f))
hp'
        , sessionKeyValueMap :: f (SessionEntry acc)
sessionKeyValueMap = f (SessionEntry acc)
mp'
        , sessionOutputStream :: Stream m (Key f, b)
sessionOutputStream = Stream m (Key f, b)
out
        }
    where
    ejectLoop :: Heap (Entry AbsTime (Key f))
-> f (SessionEntry acc)
-> Stream m (Key f, b)
-> Int
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
      Stream m (Key f, b), Int)
ejectLoop Heap (Entry AbsTime (Key f))
hp f (SessionEntry acc)
mp Stream m (Key f, b)
out !Int
cnt = do
        let hres :: Maybe (Entry AbsTime (Key f), Heap (Entry AbsTime (Key f)))
hres = Heap (Entry AbsTime (Key f))
-> Maybe (Entry AbsTime (Key f), Heap (Entry AbsTime (Key f)))
forall a. Heap a -> Maybe (a, Heap a)
H.uncons Heap (Entry AbsTime (Key f))
hp
        case Maybe (Entry AbsTime (Key f), Heap (Entry AbsTime (Key f)))
hres of
            Just (Entry AbsTime
expiry Key f
key, Heap (Entry AbsTime (Key f))
hp1) -> do
                (Bool
eject, Bool
force) <-
                    if AbsTime
curTime AbsTime -> AbsTime -> Bool
forall a. Ord a => a -> a -> Bool
>= AbsTime
expiry
                    then (Bool, Bool) -> m (Bool, Bool)
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool
True, Bool
False)
                    else do
                        Bool
r <- Int -> m Bool
ejectPred Int
cnt
                        (Bool, Bool) -> m (Bool, Bool)
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool
r, Bool
r)
                if Bool
eject
                then
                    case Key f -> f (SessionEntry acc) -> Maybe (SessionEntry acc)
forall (f :: * -> *) a. IsMap f => Key f -> f a -> Maybe a
IsMap.mapLookup Key f
key f (SessionEntry acc)
mp of
                        Maybe (SessionEntry acc)
Nothing -> Heap (Entry AbsTime (Key f))
-> f (SessionEntry acc)
-> Stream m (Key f, b)
-> Int
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
      Stream m (Key f, b), Int)
ejectLoop Heap (Entry AbsTime (Key f))
hp1 f (SessionEntry acc)
mp Stream m (Key f, b)
out Int
cnt
                        Just SessionEntry acc
ZombieSession ->
                            Heap (Entry AbsTime (Key f))
-> f (SessionEntry acc)
-> Stream m (Key f, b)
-> Int
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
      Stream m (Key f, b), Int)
ejectLoop Heap (Entry AbsTime (Key f))
hp1 (Key f -> f (SessionEntry acc) -> f (SessionEntry acc)
forall (f :: * -> *) a. IsMap f => Key f -> f a -> f a
IsMap.mapDelete Key f
key f (SessionEntry acc)
mp) Stream m (Key f, b)
out Int
cnt
                        Just (LiveSession AbsTime
expiry1 acc
acc) -> do
                            if AbsTime
expiry1 AbsTime -> AbsTime -> Bool
forall a. Ord a => a -> a -> Bool
<= AbsTime
curTime Bool -> Bool -> Bool
|| Bool -> Bool
not Bool
reset Bool -> Bool -> Bool
|| Bool
force
                            then do
                                (Heap (Entry AbsTime (Key f))
hp2,f (SessionEntry acc)
mp1,Stream m (Key f, b)
out1,Int
cnt1) <-
                                    (acc -> m b)
-> Heap (Entry AbsTime (Key f))
-> f (SessionEntry acc)
-> Stream m (Key f, b)
-> Int
-> acc
-> Key f
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
      Stream m (Key f, b), Int)
forall (m :: * -> *) (f :: * -> *) acc b heap entry.
(Monad m, IsMap f) =>
(acc -> m b)
-> heap
-> f entry
-> Stream m (Key f, b)
-> Int
-> acc
-> Key f
-> m (heap, f entry, Stream m (Key f, b), Int)
ejectEntry acc -> m b
extract Heap (Entry AbsTime (Key f))
hp1 f (SessionEntry acc)
mp Stream m (Key f, b)
out Int
cnt acc
acc Key f
key
                                Heap (Entry AbsTime (Key f))
-> f (SessionEntry acc)
-> Stream m (Key f, b)
-> Int
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
      Stream m (Key f, b), Int)
ejectLoop Heap (Entry AbsTime (Key f))
hp2 f (SessionEntry acc)
mp1 Stream m (Key f, b)
out1 Int
cnt1
                            else
                                
                                let hp2 :: Heap (Entry AbsTime (Key f))
hp2 = Entry AbsTime (Key f)
-> Heap (Entry AbsTime (Key f)) -> Heap (Entry AbsTime (Key f))
forall a. Ord a => a -> Heap a -> Heap a
H.insert (AbsTime -> Key f -> Entry AbsTime (Key f)
forall p a. p -> a -> Entry p a
Entry AbsTime
expiry1 Key f
key) Heap (Entry AbsTime (Key f))
hp1
                                in Heap (Entry AbsTime (Key f))
-> f (SessionEntry acc)
-> Stream m (Key f, b)
-> Int
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
      Stream m (Key f, b), Int)
ejectLoop Heap (Entry AbsTime (Key f))
hp2 f (SessionEntry acc)
mp Stream m (Key f, b)
out Int
cnt
                else (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
 Stream m (Key f, b), Int)
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
      Stream m (Key f, b), Int)
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry AbsTime (Key f))
hp, f (SessionEntry acc)
mp, Stream m (Key f, b)
out, Int
cnt)
            Maybe (Entry AbsTime (Key f), Heap (Entry AbsTime (Key f)))
Nothing -> do
                Bool -> m () -> m ()
forall a. HasCallStack => Bool -> a -> a
assert (f (SessionEntry acc) -> Bool
forall (f :: * -> *) a. IsMap f => f a -> Bool
IsMap.mapNull f (SessionEntry acc)
mp) (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
                (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
 Stream m (Key f, b), Int)
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
      Stream m (Key f, b), Int)
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry AbsTime (Key f))
hp, f (SessionEntry acc)
mp, Stream m (Key f, b)
out, Int
cnt)
{-# INLINE classifySessionsByGeneric #-}
classifySessionsByGeneric
    :: forall m f a b. (MonadAsync m, IsMap f)
    => Proxy (f :: (Type -> Type))
    -> Double         
    -> Bool           
    -> (Int -> m Bool) 
    -> Double         
    -> Fold m a b  
    -> Stream m (AbsTime, (Key f, a)) 
                                      
    -> Stream m (Key f, b) 
classifySessionsByGeneric :: forall (m :: * -> *) (f :: * -> *) a b.
(MonadAsync m, IsMap f) =>
Proxy f
-> Double
-> Bool
-> (Int -> m Bool)
-> Double
-> Fold m a b
-> Stream m (AbsTime, (Key f, a))
-> Stream m (Key f, b)
classifySessionsByGeneric Proxy f
_ Double
tick Bool
reset Int -> m Bool
ejectPred Double
tmout
    (Fold s -> a -> m (Step s b)
step m (Step s b)
initial s -> m b
extract s -> m b
final) Stream m (AbsTime, (Key f, a))
input =
    Unfold m (SessionState Stream m f (SessionEntry s) b) (Key f, b)
-> Stream m (SessionState Stream m f (SessionEntry s) b)
-> Stream m (Key f, b)
forall (m :: * -> *) a b.
Monad m =>
Unfold m a b -> Stream m a -> Stream m b
Stream.unfoldMany ((SessionState Stream m f (SessionEntry s) b -> Stream m (Key f, b))
-> Unfold m (Stream m (Key f, b)) (Key f, b)
-> Unfold m (SessionState Stream m f (SessionEntry s) b) (Key f, b)
forall a c (m :: * -> *) b.
(a -> c) -> Unfold m c b -> Unfold m a b
Unfold.lmap SessionState Stream m f (SessionEntry s) b -> Stream m (Key f, b)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> t m (Key f, b)
sessionOutputStream Unfold m (Stream m (Key f, b)) (Key f, b)
forall (m :: * -> *) a. Applicative m => Unfold m (Stream m a) a
Unfold.fromStream)
        (Stream m (SessionState Stream m f (SessionEntry s) b)
 -> Stream m (Key f, b))
-> Stream m (SessionState Stream m f (SessionEntry s) b)
-> Stream m (Key f, b)
forall a b. (a -> b) -> a -> b
$ (SessionState Stream m f (SessionEntry s) b
 -> Maybe (AbsTime, (Key f, a))
 -> m (SessionState Stream m f (SessionEntry s) b))
-> m (SessionState Stream m f (SessionEntry s) b)
-> (SessionState Stream m f (SessionEntry s) b
    -> m (SessionState Stream m f (SessionEntry s) b))
-> Stream m (Maybe (AbsTime, (Key f, a)))
-> Stream m (SessionState Stream m f (SessionEntry s) b)
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> m b) -> m b -> (b -> m b) -> Stream m a -> Stream m b
Stream.scanlMAfter' SessionState Stream m f (SessionEntry s) b
-> Maybe (AbsTime, (Key f, a))
-> m (SessionState Stream m f (SessionEntry s) b)
forall {f :: * -> *}.
IsMap f =>
SessionState Stream m f (SessionEntry s) b
-> Maybe (AbsTime, (Key f, a))
-> m (SessionState Stream m f (SessionEntry s) b)
sstep (SessionState Stream m f (SessionEntry s) b
-> m (SessionState Stream m f (SessionEntry s) b)
forall (m :: * -> *) a. Monad m => a -> m a
return SessionState Stream m f (SessionEntry s) b
forall {s} {b}. SessionState Stream m f s b
szero) ((s -> m b)
-> SessionState Stream m f (SessionEntry s) b
-> m (SessionState Stream m f (SessionEntry s) b)
forall (m :: * -> *) (f :: * -> *) s b.
(Monad m, IsMap f) =>
(s -> m b)
-> SessionState Stream m f (SessionEntry s) b
-> m (SessionState Stream m f (SessionEntry s) b)
flush s -> m b
final)
        (Stream m (Maybe (AbsTime, (Key f, a)))
 -> Stream m (SessionState Stream m f (SessionEntry s) b))
-> Stream m (Maybe (AbsTime, (Key f, a)))
-> Stream m (SessionState Stream m f (SessionEntry s) b)
forall a b. (a -> b) -> a -> b
$ m (Maybe (AbsTime, (Key f, a)))
-> Double
-> Stream m (Maybe (AbsTime, (Key f, a)))
-> Stream m (Maybe (AbsTime, (Key f, a)))
forall (m :: * -> *) a.
MonadAsync m =>
m a -> Double -> Stream m a -> Stream m a
interject (Maybe (AbsTime, (Key f, a)) -> m (Maybe (AbsTime, (Key f, a)))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (AbsTime, (Key f, a))
forall a. Maybe a
Nothing) Double
tick
        (Stream m (Maybe (AbsTime, (Key f, a)))
 -> Stream m (Maybe (AbsTime, (Key f, a))))
-> Stream m (Maybe (AbsTime, (Key f, a)))
-> Stream m (Maybe (AbsTime, (Key f, a)))
forall a b. (a -> b) -> a -> b
$ ((AbsTime, (Key f, a)) -> Maybe (AbsTime, (Key f, a)))
-> Stream m (AbsTime, (Key f, a))
-> Stream m (Maybe (AbsTime, (Key f, a)))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (AbsTime, (Key f, a)) -> Maybe (AbsTime, (Key f, a))
forall a. a -> Maybe a
Just Stream m (AbsTime, (Key f, a))
input
    where
    timeoutMs :: RelTime
timeoutMs = MilliSecond64 -> RelTime
forall a. TimeUnit a => a -> RelTime
toRelTime (Double -> MilliSecond64
forall a b. (RealFrac a, Integral b) => a -> b
round (Double
tmout Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
1000) :: MilliSecond64)
    tickMs :: RelTime
tickMs = MilliSecond64 -> RelTime
forall a. TimeUnit a => a -> RelTime
toRelTime (Double -> MilliSecond64
forall a b. (RealFrac a, Integral b) => a -> b
round (Double
tick Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
1000) :: MilliSecond64)
    szero :: SessionState Stream m f s b
szero = SessionState :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
AbsTime
-> AbsTime
-> Int
-> Heap (Entry AbsTime (Key f))
-> f s
-> t m (Key f, b)
-> SessionState t m f s b
SessionState
        { sessionCurTime :: AbsTime
sessionCurTime = MilliSecond64 -> AbsTime
forall a. TimeUnit a => a -> AbsTime
toAbsTime (MilliSecond64
0 :: MilliSecond64)
        , sessionEventTime :: AbsTime
sessionEventTime = MilliSecond64 -> AbsTime
forall a. TimeUnit a => a -> AbsTime
toAbsTime (MilliSecond64
0 :: MilliSecond64)
        , sessionCount :: Int
sessionCount = Int
0
        , sessionTimerHeap :: Heap (Entry AbsTime (Key f))
sessionTimerHeap = Heap (Entry AbsTime (Key f))
forall a. Heap a
H.empty
        , sessionKeyValueMap :: f s
sessionKeyValueMap = forall {s}. f s
forall (f :: * -> *) a. IsMap f => f a
IsMap.mapEmpty :: f s
        , sessionOutputStream :: Stream m (Key f, b)
sessionOutputStream = Stream m (Key f, b)
forall (m :: * -> *) a. Applicative m => Stream m a
Stream.nil
        }
    
    
    
    
    
    
    
    
    
    
    sstep :: SessionState Stream m f (SessionEntry s) b
-> Maybe (AbsTime, (Key f, a))
-> m (SessionState Stream m f (SessionEntry s) b)
sstep session :: SessionState Stream m f (SessionEntry s) b
session@SessionState{f (SessionEntry s)
Int
Heap (Entry AbsTime (Key f))
Stream m (Key f, b)
AbsTime
sessionOutputStream :: Stream m (Key f, b)
sessionKeyValueMap :: f (SessionEntry s)
sessionTimerHeap :: Heap (Entry AbsTime (Key f))
sessionCount :: Int
sessionEventTime :: AbsTime
sessionCurTime :: AbsTime
sessionOutputStream :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> t m (Key f, b)
sessionKeyValueMap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> f s
sessionTimerHeap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> Heap (Entry AbsTime (Key f))
sessionCount :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> Int
sessionEventTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> AbsTime
sessionCurTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> AbsTime
..} (Just (AbsTime
timestamp, (Key f
key, a
value))) = do
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        
        let curTime :: AbsTime
curTime = AbsTime -> AbsTime -> AbsTime
forall a. Ord a => a -> a -> a
max AbsTime
sessionEventTime AbsTime
timestamp
            mOld :: Maybe (SessionEntry s)
mOld = Key f -> f (SessionEntry s) -> Maybe (SessionEntry s)
forall (f :: * -> *) a. IsMap f => Key f -> f a -> Maybe a
IsMap.mapLookup Key f
key f (SessionEntry s)
sessionKeyValueMap
        let done :: b -> m (SessionState Stream m f (SessionEntry s) b)
done b
fb = do
                
                
                
                
                
                
                let (f (SessionEntry s)
mp, Int
cnt) = case Maybe (SessionEntry s)
mOld of
                        Just (LiveSession AbsTime
_ s
_) ->
                            ( Key f -> SessionEntry s -> f (SessionEntry s) -> f (SessionEntry s)
forall (f :: * -> *) a. IsMap f => Key f -> a -> f a -> f a
IsMap.mapInsert
                                Key f
key SessionEntry s
forall s. SessionEntry s
ZombieSession f (SessionEntry s)
sessionKeyValueMap
                            , Int
sessionCount Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1
                            )
                        Maybe (SessionEntry s)
_ -> (f (SessionEntry s)
sessionKeyValueMap, Int
sessionCount)
                SessionState Stream m f (SessionEntry s) b
-> m (SessionState Stream m f (SessionEntry s) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (SessionState Stream m f (SessionEntry s) b
 -> m (SessionState Stream m f (SessionEntry s) b))
-> SessionState Stream m f (SessionEntry s) b
-> m (SessionState Stream m f (SessionEntry s) b)
forall a b. (a -> b) -> a -> b
$ SessionState Stream m f (SessionEntry s) b
session
                    { sessionCurTime :: AbsTime
sessionCurTime = AbsTime
curTime
                    , sessionEventTime :: AbsTime
sessionEventTime = AbsTime
curTime
                    , sessionCount :: Int
sessionCount = Int
cnt
                    , sessionKeyValueMap :: f (SessionEntry s)
sessionKeyValueMap = f (SessionEntry s)
mp
                    , sessionOutputStream :: Stream m (Key f, b)
sessionOutputStream = (Key f, b) -> Stream m (Key f, b)
forall (m :: * -> *) a. Applicative m => a -> Stream m a
Stream.fromPure (Key f
key, b
fb)
                    }
            partial :: s -> m (SessionState Stream m f (SessionEntry s) b)
partial s
fs1 = do
                let expiry :: AbsTime
expiry = AbsTime -> RelTime -> AbsTime
addToAbsTime AbsTime
timestamp RelTime
timeoutMs
                (Heap (Entry AbsTime (Key f))
hp1, f (SessionEntry s)
mp1, Stream m (Key f, b)
out1, Int
cnt1) <- do
                        let vars :: (Heap (Entry AbsTime (Key f)), f (SessionEntry s), Stream m a, Int)
vars = (Heap (Entry AbsTime (Key f))
sessionTimerHeap, f (SessionEntry s)
sessionKeyValueMap,
                                           Stream m a
forall (m :: * -> *) a. Applicative m => Stream m a
Stream.nil, Int
sessionCount)
                        case Maybe (SessionEntry s)
mOld of
                            
                            Maybe (SessionEntry s)
Nothing -> do
                                
                                Bool
eject <- Int -> m Bool
ejectPred Int
sessionCount
                                (Heap (Entry AbsTime (Key f))
hp, f (SessionEntry s)
mp, Stream m (Key f, b)
out, Int
cnt) <-
                                    if Bool
eject
                                    then Bool
-> (s -> m b)
-> (Heap (Entry AbsTime (Key f)), f (SessionEntry s),
    Stream m (Key f, b), Int)
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry s),
      Stream m (Key f, b), Int)
forall (f :: * -> *) (m :: * -> *) acc b.
(IsMap f, Monad m) =>
Bool
-> (acc -> m b)
-> (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
    Stream m (Key f, b), Int)
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry acc),
      Stream m (Key f, b), Int)
ejectOne Bool
reset s -> m b
extract (Heap (Entry AbsTime (Key f)), f (SessionEntry s),
 Stream m (Key f, b), Int)
forall {a}.
(Heap (Entry AbsTime (Key f)), f (SessionEntry s), Stream m a, Int)
vars
                                    else (Heap (Entry AbsTime (Key f)), f (SessionEntry s),
 Stream m (Key f, b), Int)
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry s),
      Stream m (Key f, b), Int)
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry AbsTime (Key f)), f (SessionEntry s),
 Stream m (Key f, b), Int)
forall {a}.
(Heap (Entry AbsTime (Key f)), f (SessionEntry s), Stream m a, Int)
vars
                                
                                let hp' :: Heap (Entry AbsTime (Key f))
hp' = Entry AbsTime (Key f)
-> Heap (Entry AbsTime (Key f)) -> Heap (Entry AbsTime (Key f))
forall a. Ord a => a -> Heap a -> Heap a
H.insert (AbsTime -> Key f -> Entry AbsTime (Key f)
forall p a. p -> a -> Entry p a
Entry AbsTime
expiry Key f
key) Heap (Entry AbsTime (Key f))
hp
                                 in (Heap (Entry AbsTime (Key f)), f (SessionEntry s),
 Stream m (Key f, b), Int)
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry s),
      Stream m (Key f, b), Int)
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry AbsTime (Key f))
hp', f (SessionEntry s)
mp, Stream m (Key f, b)
out, Int
cnt Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
                            
                            Just SessionEntry s
_ -> (Heap (Entry AbsTime (Key f)), f (SessionEntry s),
 Stream m (Key f, b), Int)
-> m (Heap (Entry AbsTime (Key f)), f (SessionEntry s),
      Stream m (Key f, b), Int)
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap (Entry AbsTime (Key f)), f (SessionEntry s),
 Stream m (Key f, b), Int)
forall {a}.
(Heap (Entry AbsTime (Key f)), f (SessionEntry s), Stream m a, Int)
vars
                let acc :: SessionEntry s
acc = AbsTime -> s -> SessionEntry s
forall s. AbsTime -> s -> SessionEntry s
LiveSession AbsTime
expiry s
fs1
                    mp2 :: f (SessionEntry s)
mp2 = Key f -> SessionEntry s -> f (SessionEntry s) -> f (SessionEntry s)
forall (f :: * -> *) a. IsMap f => Key f -> a -> f a -> f a
IsMap.mapInsert Key f
key SessionEntry s
acc f (SessionEntry s)
mp1
                SessionState Stream m f (SessionEntry s) b
-> m (SessionState Stream m f (SessionEntry s) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (SessionState Stream m f (SessionEntry s) b
 -> m (SessionState Stream m f (SessionEntry s) b))
-> SessionState Stream m f (SessionEntry s) b
-> m (SessionState Stream m f (SessionEntry s) b)
forall a b. (a -> b) -> a -> b
$ SessionState :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
AbsTime
-> AbsTime
-> Int
-> Heap (Entry AbsTime (Key f))
-> f s
-> t m (Key f, b)
-> SessionState t m f s b
SessionState
                    { sessionCurTime :: AbsTime
sessionCurTime = AbsTime
curTime
                    , sessionEventTime :: AbsTime
sessionEventTime = AbsTime
curTime
                    , sessionCount :: Int
sessionCount = Int
cnt1
                    , sessionTimerHeap :: Heap (Entry AbsTime (Key f))
sessionTimerHeap = Heap (Entry AbsTime (Key f))
hp1
                    , sessionKeyValueMap :: f (SessionEntry s)
sessionKeyValueMap = f (SessionEntry s)
mp2
                    , sessionOutputStream :: Stream m (Key f, b)
sessionOutputStream = Stream m (Key f, b)
out1
                    }
        Step s b
res0 <- do
            case Maybe (SessionEntry s)
mOld of
                Just (LiveSession AbsTime
_ s
acc) -> Step s b -> m (Step s b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step s b -> m (Step s b)) -> Step s b -> m (Step s b)
forall a b. (a -> b) -> a -> b
$ s -> Step s b
forall s b. s -> Step s b
Fold.Partial s
acc
                Maybe (SessionEntry s)
_ -> m (Step s b)
initial
        case Step s b
res0 of
            Fold.Done b
_ ->
                [Char] -> m (SessionState Stream m f (SessionEntry s) b)
forall a. HasCallStack => [Char] -> a
error ([Char] -> m (SessionState Stream m f (SessionEntry s) b))
-> [Char] -> m (SessionState Stream m f (SessionEntry s) b)
forall a b. (a -> b) -> a -> b
$ [Char]
"classifySessionsBy: "
                    [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [Char]
"The supplied fold must consume at least one input"
            Fold.Partial s
fs -> do
                Step s b
res <- s -> a -> m (Step s b)
step s
fs a
value
                case Step s b
res of
                    Fold.Done b
fb -> b -> m (SessionState Stream m f (SessionEntry s) b)
forall {m :: * -> *} {m :: * -> *} {b}.
(Monad m, Applicative m) =>
b -> m (SessionState Stream m f (SessionEntry s) b)
done b
fb
                    Fold.Partial s
fs1 -> s -> m (SessionState Stream m f (SessionEntry s) b)
partial s
fs1
    
    sstep sessionState :: SessionState Stream m f (SessionEntry s) b
sessionState@SessionState{f (SessionEntry s)
Int
Heap (Entry AbsTime (Key f))
Stream m (Key f, b)
AbsTime
sessionOutputStream :: Stream m (Key f, b)
sessionKeyValueMap :: f (SessionEntry s)
sessionTimerHeap :: Heap (Entry AbsTime (Key f))
sessionCount :: Int
sessionEventTime :: AbsTime
sessionCurTime :: AbsTime
sessionOutputStream :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> t m (Key f, b)
sessionKeyValueMap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> f s
sessionTimerHeap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> Heap (Entry AbsTime (Key f))
sessionCount :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> Int
sessionEventTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> AbsTime
sessionCurTime :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) (f :: * -> *) s b.
SessionState t m f s b -> AbsTime
..} Maybe (AbsTime, (Key f, a))
Nothing =
        let curTime :: AbsTime
curTime = AbsTime -> RelTime -> AbsTime
addToAbsTime AbsTime
sessionCurTime RelTime
tickMs
        in Bool
-> (Int -> m Bool)
-> (s -> m b)
-> SessionState Stream m f (SessionEntry s) b
-> AbsTime
-> m (SessionState Stream m f (SessionEntry s) b)
forall (f :: * -> *) (m :: * -> *) acc b.
(IsMap f, Monad m) =>
Bool
-> (Int -> m Bool)
-> (acc -> m b)
-> SessionState Stream m f (SessionEntry acc) b
-> AbsTime
-> m (SessionState Stream m f (SessionEntry acc) b)
ejectExpired Bool
reset Int -> m Bool
ejectPred s -> m b
extract SessionState Stream m f (SessionEntry s) b
sessionState AbsTime
curTime
{-# INLINE classifySessionsBy #-}
classifySessionsBy
    :: (MonadAsync m, Ord k)
    => Double         
    -> Bool           
    -> (Int -> m Bool) 
    -> Double         
    -> Fold m a b  
    -> Stream m (AbsTime, (k, a)) 
    -> Stream m (k, b) 
classifySessionsBy :: forall (m :: * -> *) k a b.
(MonadAsync m, Ord k) =>
Double
-> Bool
-> (Int -> m Bool)
-> Double
-> Fold m a b
-> Stream m (AbsTime, (k, a))
-> Stream m (k, b)
classifySessionsBy = Proxy (Map k)
-> Double
-> Bool
-> (Int -> m Bool)
-> Double
-> Fold m a b
-> Stream m (AbsTime, (Key (Map k), a))
-> Stream m (Key (Map k), b)
forall (m :: * -> *) (f :: * -> *) a b.
(MonadAsync m, IsMap f) =>
Proxy f
-> Double
-> Bool
-> (Int -> m Bool)
-> Double
-> Fold m a b
-> Stream m (AbsTime, (Key f, a))
-> Stream m (Key f, b)
classifySessionsByGeneric (forall {k}. Proxy (Map k)
forall {k} (t :: k). Proxy t
Proxy :: Proxy (Map k))
{-# INLINE classifyKeepAliveSessions #-}
classifyKeepAliveSessions ::
       (MonadAsync m, Ord k)
    => (Int -> m Bool) 
    -> Double 
    -> Fold m a b 
    -> Stream m (AbsTime, (k, a)) 
    -> Stream m (k, b)
classifyKeepAliveSessions :: forall (m :: * -> *) k a b.
(MonadAsync m, Ord k) =>
(Int -> m Bool)
-> Double
-> Fold m a b
-> Stream m (AbsTime, (k, a))
-> Stream m (k, b)
classifyKeepAliveSessions = Double
-> Bool
-> (Int -> m Bool)
-> Double
-> Fold m a b
-> Stream m (AbsTime, (k, a))
-> Stream m (k, b)
forall (m :: * -> *) k a b.
(MonadAsync m, Ord k) =>
Double
-> Bool
-> (Int -> m Bool)
-> Double
-> Fold m a b
-> Stream m (AbsTime, (k, a))
-> Stream m (k, b)
classifySessionsBy Double
1 Bool
True
{-# INLINE classifySessionsOf #-}
classifySessionsOf ::
       (MonadAsync m, Ord k)
    => (Int -> m Bool) 
    -> Double 
    -> Fold m a b 
    -> Stream m (AbsTime, (k, a)) 
    -> Stream m (k, b)
classifySessionsOf :: forall (m :: * -> *) k a b.
(MonadAsync m, Ord k) =>
(Int -> m Bool)
-> Double
-> Fold m a b
-> Stream m (AbsTime, (k, a))
-> Stream m (k, b)
classifySessionsOf = Double
-> Bool
-> (Int -> m Bool)
-> Double
-> Fold m a b
-> Stream m (AbsTime, (k, a))
-> Stream m (k, b)
forall (m :: * -> *) k a b.
(MonadAsync m, Ord k) =>
Double
-> Bool
-> (Int -> m Bool)
-> Double
-> Fold m a b
-> Stream m (AbsTime, (k, a))
-> Stream m (k, b)
classifySessionsBy Double
1 Bool
False
{-# INLINE sampleIntervalEnd #-}
sampleIntervalEnd :: MonadAsync m => Double -> Stream m a -> Stream m a
sampleIntervalEnd :: forall (m :: * -> *) a.
MonadAsync m =>
Double -> Stream m a -> Stream m a
sampleIntervalEnd Double
n = Stream m (Maybe a) -> Stream m a
forall (m :: * -> *) a. Monad m => Stream m (Maybe a) -> Stream m a
Stream.catMaybes (Stream m (Maybe a) -> Stream m a)
-> (Stream m a -> Stream m (Maybe a)) -> Stream m a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Double -> Fold m a (Maybe a) -> Stream m a -> Stream m (Maybe a)
forall (m :: * -> *) a b.
MonadAsync m =>
Double -> Fold m a b -> Stream m a -> Stream m b
intervalsOf Double
n Fold m a (Maybe a)
forall (m :: * -> *) a. Monad m => Fold m a (Maybe a)
Fold.latest
{-# INLINE sampleIntervalStart #-}
sampleIntervalStart :: MonadAsync m => Double -> Stream m a -> Stream m a
sampleIntervalStart :: forall (m :: * -> *) a.
MonadAsync m =>
Double -> Stream m a -> Stream m a
sampleIntervalStart Double
n = Stream m (Maybe a) -> Stream m a
forall (m :: * -> *) a. Monad m => Stream m (Maybe a) -> Stream m a
Stream.catMaybes (Stream m (Maybe a) -> Stream m a)
-> (Stream m a -> Stream m (Maybe a)) -> Stream m a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Double -> Fold m a (Maybe a) -> Stream m a -> Stream m (Maybe a)
forall (m :: * -> *) a b.
MonadAsync m =>
Double -> Fold m a b -> Stream m a -> Stream m b
intervalsOf Double
n Fold m a (Maybe a)
forall (m :: * -> *) a. Monad m => Fold m a (Maybe a)
Fold.one
data BurstState t x =
      BurstNone
    | BurstWait !t !x
    | BurstDone !x
    | BurstDoneNext !x !t !x
{-# INLINE sampleBurst #-}
sampleBurst :: MonadAsync m => Bool -> Double -> Stream m a -> Stream m a
sampleBurst :: forall (m :: * -> *) a.
MonadAsync m =>
Bool -> Double -> Stream m a -> Stream m a
sampleBurst Bool
sampleAtEnd Double
gap Stream m a
xs =
    
    
    
    (BurstState RelTime64 a -> Maybe a)
-> Stream m (BurstState RelTime64 a) -> Stream m a
forall (m :: * -> *) a b.
Monad m =>
(a -> Maybe b) -> Stream m a -> Stream m b
Stream.mapMaybe BurstState RelTime64 a -> Maybe a
forall {t} {a}. BurstState t a -> Maybe a
extract
        (Stream m (BurstState RelTime64 a) -> Stream m a)
-> Stream m (BurstState RelTime64 a) -> Stream m a
forall a b. (a -> b) -> a -> b
$ Fold m (RelTime64, Maybe a) (BurstState RelTime64 a)
-> Stream m (RelTime64, Maybe a)
-> Stream m (BurstState RelTime64 a)
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> Stream m b
Stream.scan ((BurstState RelTime64 a
 -> (RelTime64, Maybe a) -> BurstState RelTime64 a)
-> BurstState RelTime64 a
-> Fold m (RelTime64, Maybe a) (BurstState RelTime64 a)
forall (m :: * -> *) b a.
Monad m =>
(b -> a -> b) -> b -> Fold m a b
Fold.foldl' BurstState RelTime64 a
-> (RelTime64, Maybe a) -> BurstState RelTime64 a
forall {x}.
BurstState RelTime64 x
-> (RelTime64, Maybe x) -> BurstState RelTime64 x
step BurstState RelTime64 a
forall t x. BurstState t x
BurstNone)
        (Stream m (RelTime64, Maybe a)
 -> Stream m (BurstState RelTime64 a))
-> Stream m (RelTime64, Maybe a)
-> Stream m (BurstState RelTime64 a)
forall a b. (a -> b) -> a -> b
$ Stream m (Maybe a) -> Stream m (RelTime64, Maybe a)
forall (m :: * -> *) a.
MonadIO m =>
Stream m a -> Stream m (RelTime64, a)
Stream.timeIndexed
        (Stream m (Maybe a) -> Stream m (RelTime64, Maybe a))
-> Stream m (Maybe a) -> Stream m (RelTime64, Maybe a)
forall a b. (a -> b) -> a -> b
$ m (Maybe a) -> Double -> Stream m (Maybe a) -> Stream m (Maybe a)
forall (m :: * -> *) a.
MonadAsync m =>
m a -> Double -> Stream m a -> Stream m a
interject (Maybe a -> m (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing) Double
0.01 ((a -> Maybe a) -> Stream m a -> Stream m (Maybe a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> Maybe a
forall a. a -> Maybe a
Just Stream m a
xs)
    where
    gap1 :: RelTime64
gap1 = NanoSecond64 -> RelTime64
forall a. TimeUnit64 a => a -> RelTime64
toRelTime64 (Int64 -> NanoSecond64
NanoSecond64 (Double -> Int64
forall a b. (RealFrac a, Integral b) => a -> b
round (Double
gap Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
10Double -> Int -> Double
forall a b. (Num a, Integral b) => a -> b -> a
^(Int
9::Int))))
    {-# INLINE step #-}
    step :: BurstState RelTime64 x
-> (RelTime64, Maybe x) -> BurstState RelTime64 x
step BurstState RelTime64 x
BurstNone (RelTime64
t1, Just x
x1) = RelTime64 -> x -> BurstState RelTime64 x
forall t x. t -> x -> BurstState t x
BurstWait RelTime64
t1 x
x1
    step BurstState RelTime64 x
BurstNone (RelTime64, Maybe x)
_ = BurstState RelTime64 x
forall t x. BurstState t x
BurstNone
    step (BurstDone x
_) (RelTime64
t1, Just x
x1) = RelTime64 -> x -> BurstState RelTime64 x
forall t x. t -> x -> BurstState t x
BurstWait RelTime64
t1 x
x1
    step (BurstDone x
_) (RelTime64, Maybe x)
_ = BurstState RelTime64 x
forall t x. BurstState t x
BurstNone
    step old :: BurstState RelTime64 x
old@(BurstWait RelTime64
t0 x
x0) (RelTime64
t1, Maybe x
Nothing)
        | RelTime64
t1 RelTime64 -> RelTime64 -> RelTime64
forall a. Num a => a -> a -> a
- RelTime64
t0 RelTime64 -> RelTime64 -> Bool
forall a. Ord a => a -> a -> Bool
>= RelTime64
gap1 = x -> BurstState RelTime64 x
forall t x. x -> BurstState t x
BurstDone x
x0
        | Bool
otherwise = BurstState RelTime64 x
old
    
    
    
    
    step (BurstWait RelTime64
t0 x
x0) (RelTime64
t1, Just x
x1)
        | RelTime64
t1 RelTime64 -> RelTime64 -> RelTime64
forall a. Num a => a -> a -> a
- RelTime64
t0 RelTime64 -> RelTime64 -> Bool
forall a. Ord a => a -> a -> Bool
>= RelTime64
gap1 = x -> RelTime64 -> x -> BurstState RelTime64 x
forall t x. x -> t -> x -> BurstState t x
BurstDoneNext x
x0 RelTime64
t1 x
x1
        | Bool
sampleAtEnd = RelTime64 -> x -> BurstState RelTime64 x
forall t x. t -> x -> BurstState t x
BurstWait RelTime64
t1 x
x1
        | Bool
otherwise = RelTime64 -> x -> BurstState RelTime64 x
forall t x. t -> x -> BurstState t x
BurstWait RelTime64
t1 x
x0
    step (BurstDoneNext x
_ RelTime64
t0 x
x0) (RelTime64
t1, Maybe x
Nothing)
        | RelTime64
t1 RelTime64 -> RelTime64 -> RelTime64
forall a. Num a => a -> a -> a
- RelTime64
t0 RelTime64 -> RelTime64 -> Bool
forall a. Ord a => a -> a -> Bool
>= RelTime64
gap1 = x -> BurstState RelTime64 x
forall t x. x -> BurstState t x
BurstDone x
x0
        | Bool
otherwise =  RelTime64 -> x -> BurstState RelTime64 x
forall t x. t -> x -> BurstState t x
BurstWait RelTime64
t0 x
x0
    step (BurstDoneNext x
_ RelTime64
t0 x
x0) (RelTime64
t1, Just x
x1)
        | RelTime64
t1 RelTime64 -> RelTime64 -> RelTime64
forall a. Num a => a -> a -> a
- RelTime64
t0 RelTime64 -> RelTime64 -> Bool
forall a. Ord a => a -> a -> Bool
>= RelTime64
gap1 = x -> RelTime64 -> x -> BurstState RelTime64 x
forall t x. x -> t -> x -> BurstState t x
BurstDoneNext x
x0 RelTime64
t1 x
x1
        | Bool
sampleAtEnd = RelTime64 -> x -> BurstState RelTime64 x
forall t x. t -> x -> BurstState t x
BurstWait RelTime64
t1 x
x1
        | Bool
otherwise = RelTime64 -> x -> BurstState RelTime64 x
forall t x. t -> x -> BurstState t x
BurstWait RelTime64
t1 x
x0
    {-# INLINE extract #-}
    extract :: BurstState t a -> Maybe a
extract (BurstDoneNext a
x t
_ a
_) = a -> Maybe a
forall a. a -> Maybe a
Just a
x
    extract (BurstDone a
x) = a -> Maybe a
forall a. a -> Maybe a
Just a
x
    extract BurstState t a
_ = Maybe a
forall a. Maybe a
Nothing
{-# INLINE sampleBurstEnd #-}
sampleBurstEnd :: MonadAsync m => Double -> Stream m a -> Stream m a
sampleBurstEnd :: forall (m :: * -> *) a.
MonadAsync m =>
Double -> Stream m a -> Stream m a
sampleBurstEnd = Bool -> Double -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
Bool -> Double -> Stream m a -> Stream m a
sampleBurst Bool
True
{-# INLINE sampleBurstStart #-}
sampleBurstStart :: MonadAsync m => Double -> Stream m a -> Stream m a
sampleBurstStart :: forall (m :: * -> *) a.
MonadAsync m =>
Double -> Stream m a -> Stream m a
sampleBurstStart = Bool -> Double -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
Bool -> Double -> Stream m a -> Stream m a
sampleBurst Bool
False
{-# INLINE bufferOldestN #-}
bufferOldestN :: 
    Int -> Stream m a -> Stream m a
bufferOldestN :: forall (m :: * -> *) a. Int -> Stream m a -> Stream m a
bufferOldestN = Int -> Stream m a -> Stream m a
forall a. HasCallStack => a
undefined
{-# INLINE bufferLatestN #-}
bufferLatestN :: 
    Int -> Stream m a -> Stream m a
bufferLatestN :: forall (m :: * -> *) a. Int -> Stream m a -> Stream m a
bufferLatestN = Int -> Stream m a -> Stream m a
forall a. HasCallStack => a
undefined
{-# INLINE bufferLatest #-}
bufferLatest :: 
    Stream m a -> Stream m (Maybe a)
bufferLatest :: forall (m :: * -> *) a. Stream m a -> Stream m (Maybe a)
bufferLatest = Stream m a -> Stream m (Maybe a)
forall a. HasCallStack => a
undefined