-- |
-- Module      : Streamly.Internal.Data.Fold.Async
-- Copyright   : (c) 2019 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
module Streamly.Internal.Data.Fold.Async
    (
    -- * Trimming
      takeInterval

    -- * Splitting
    , intervalsOf
    )
where

import Control.Concurrent (threadDelay, forkIO, killThread)
import Control.Concurrent.MVar (MVar, newMVar, swapMVar, readMVar)
import Control.Exception (SomeException(..), catch, mask)
import Control.Monad (void)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Trans.Control (control)
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.Tuple.Strict (Tuple3'(..))

import Streamly.Internal.Data.Fold.Type

-- $setup
-- >>> :m
-- >>> import qualified Streamly.Prelude as Stream
-- >>> import qualified Streamly.Data.Fold as Fold
-- >>> import qualified Streamly.Internal.Data.Fold.Async as Fold

-- XXX We can use asyncClock here. A parser can be used to return an input that
-- arrives after the timeout.
-- XXX If n is 0 return immediately in initial.
-- XXX we should probably discard the input received after the timeout like
-- takeEndBy_.
--
-- | @takeInterval n fold@ uses @fold@ to fold the input items arriving within
-- a window of first @n@ seconds.
--
-- >>> input = Stream.delay 0.1 $ Stream.fromList [1..]
-- >>> Stream.fold (Fold.takeInterval 1.0 Fold.toList) input
-- [1,2,3,4,5,6,7,8,9,10,11]
--
-- Stops when @fold@ stops or when the timeout occurs. Note that the fold needs
-- an input after the timeout to stop. For example, if no input is pushed to
-- the fold until one hour after the timeout had occurred, then the fold will
-- be done only after consuming that input.
--
-- /Pre-release/
--
{-# INLINE takeInterval #-}
takeInterval :: MonadAsync m => Double -> Fold m a b -> Fold m a b
takeInterval :: Double -> Fold m a b -> Fold m a b
takeInterval Double
n (Fold s -> a -> m (Step s b)
step m (Step s b)
initial s -> m b
done) = (Tuple3' s (MVar Bool) ThreadId
 -> a -> m (Step (Tuple3' s (MVar Bool) ThreadId) b))
-> m (Step (Tuple3' s (MVar Bool) ThreadId) b)
-> (Tuple3' s (MVar Bool) ThreadId -> m b)
-> Fold m a b
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
Fold Tuple3' s (MVar Bool) ThreadId
-> a -> m (Step (Tuple3' s (MVar Bool) ThreadId) b)
step' m (Step (Tuple3' s (MVar Bool) ThreadId) b)
initial' Tuple3' s (MVar Bool) ThreadId -> m b
forall b c. Tuple3' s b c -> m b
done'

    where

    initial' :: m (Step (Tuple3' s (MVar Bool) ThreadId) b)
initial' = do
        Step s b
res <- m (Step s b)
initial
        case Step s b
res of
            Partial s
s -> do
                MVar Bool
mv <- IO (MVar Bool) -> m (MVar Bool)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (MVar Bool) -> m (MVar Bool))
-> IO (MVar Bool) -> m (MVar Bool)
forall a b. (a -> b) -> a -> b
$ Bool -> IO (MVar Bool)
forall a. a -> IO (MVar a)
newMVar Bool
False
                ThreadId
t <-
                    (RunInBase m IO -> IO (StM m ThreadId)) -> m ThreadId
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
(RunInBase m b -> b (StM m a)) -> m a
control ((RunInBase m IO -> IO (StM m ThreadId)) -> m ThreadId)
-> (RunInBase m IO -> IO (StM m ThreadId)) -> m ThreadId
forall a b. (a -> b) -> a -> b
$ \RunInBase m IO
run ->
                        ((forall a. IO a -> IO a) -> IO (StM m ThreadId))
-> IO (StM m ThreadId)
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask (((forall a. IO a -> IO a) -> IO (StM m ThreadId))
 -> IO (StM m ThreadId))
-> ((forall a. IO a -> IO a) -> IO (StM m ThreadId))
-> IO (StM m ThreadId)
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore -> do
                            ThreadId
tid <-
                                IO () -> IO ThreadId
forkIO
                                  (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
catch
                                        (IO () -> IO ()
forall a. IO a -> IO a
restore (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IO (StM m ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (StM m ()) -> IO ()) -> IO (StM m ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ m () -> IO (StM m ())
RunInBase m IO
run (MVar Bool -> m ()
forall (m :: * -> *). MonadIO m => MVar Bool -> m ()
timerThread MVar Bool
mv))
                                        (MVar Bool -> SomeException -> IO ()
handleChildException MVar Bool
mv)
                            m ThreadId -> IO (StM m ThreadId)
RunInBase m IO
run (ThreadId -> m ThreadId
forall (m :: * -> *) a. Monad m => a -> m a
return ThreadId
tid)
                Step (Tuple3' s (MVar Bool) ThreadId) b
-> m (Step (Tuple3' s (MVar Bool) ThreadId) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Tuple3' s (MVar Bool) ThreadId) b
 -> m (Step (Tuple3' s (MVar Bool) ThreadId) b))
-> Step (Tuple3' s (MVar Bool) ThreadId) b
-> m (Step (Tuple3' s (MVar Bool) ThreadId) b)
forall a b. (a -> b) -> a -> b
$ Tuple3' s (MVar Bool) ThreadId
-> Step (Tuple3' s (MVar Bool) ThreadId) b
forall s b. s -> Step s b
Partial (Tuple3' s (MVar Bool) ThreadId
 -> Step (Tuple3' s (MVar Bool) ThreadId) b)
-> Tuple3' s (MVar Bool) ThreadId
-> Step (Tuple3' s (MVar Bool) ThreadId) b
forall a b. (a -> b) -> a -> b
$ s -> MVar Bool -> ThreadId -> Tuple3' s (MVar Bool) ThreadId
forall a b c. a -> b -> c -> Tuple3' a b c
Tuple3' s
s MVar Bool
mv ThreadId
t
            Done b
b -> Step (Tuple3' s (MVar Bool) ThreadId) b
-> m (Step (Tuple3' s (MVar Bool) ThreadId) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Tuple3' s (MVar Bool) ThreadId) b
 -> m (Step (Tuple3' s (MVar Bool) ThreadId) b))
-> Step (Tuple3' s (MVar Bool) ThreadId) b
-> m (Step (Tuple3' s (MVar Bool) ThreadId) b)
forall a b. (a -> b) -> a -> b
$ b -> Step (Tuple3' s (MVar Bool) ThreadId) b
forall s b. b -> Step s b
Done b
b

    step' :: Tuple3' s (MVar Bool) ThreadId
-> a -> m (Step (Tuple3' s (MVar Bool) ThreadId) b)
step' (Tuple3' s
s MVar Bool
mv ThreadId
t) a
a = do
        Bool
val <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ MVar Bool -> IO Bool
forall a. MVar a -> IO a
readMVar MVar Bool
mv
        if Bool
val
        then do
            Step s b
res <- s -> a -> m (Step s b)
step s
s a
a
            case Step s b
res of
                Partial s
sres -> b -> Step (Tuple3' s (MVar Bool) ThreadId) b
forall s b. b -> Step s b
Done (b -> Step (Tuple3' s (MVar Bool) ThreadId) b)
-> m b -> m (Step (Tuple3' s (MVar Bool) ThreadId) b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> s -> m b
done s
sres
                Done b
bres -> Step (Tuple3' s (MVar Bool) ThreadId) b
-> m (Step (Tuple3' s (MVar Bool) ThreadId) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Tuple3' s (MVar Bool) ThreadId) b
 -> m (Step (Tuple3' s (MVar Bool) ThreadId) b))
-> Step (Tuple3' s (MVar Bool) ThreadId) b
-> m (Step (Tuple3' s (MVar Bool) ThreadId) b)
forall a b. (a -> b) -> a -> b
$ b -> Step (Tuple3' s (MVar Bool) ThreadId) b
forall s b. b -> Step s b
Done b
bres
        else do
            Step s b
res <- s -> a -> m (Step s b)
step s
s a
a
            case Step s b
res of
                Partial s
fs -> Step (Tuple3' s (MVar Bool) ThreadId) b
-> m (Step (Tuple3' s (MVar Bool) ThreadId) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Tuple3' s (MVar Bool) ThreadId) b
 -> m (Step (Tuple3' s (MVar Bool) ThreadId) b))
-> Step (Tuple3' s (MVar Bool) ThreadId) b
-> m (Step (Tuple3' s (MVar Bool) ThreadId) b)
forall a b. (a -> b) -> a -> b
$ Tuple3' s (MVar Bool) ThreadId
-> Step (Tuple3' s (MVar Bool) ThreadId) b
forall s b. s -> Step s b
Partial (Tuple3' s (MVar Bool) ThreadId
 -> Step (Tuple3' s (MVar Bool) ThreadId) b)
-> Tuple3' s (MVar Bool) ThreadId
-> Step (Tuple3' s (MVar Bool) ThreadId) b
forall a b. (a -> b) -> a -> b
$ s -> MVar Bool -> ThreadId -> Tuple3' s (MVar Bool) ThreadId
forall a b c. a -> b -> c -> Tuple3' a b c
Tuple3' s
fs MVar Bool
mv ThreadId
t
                Done b
b -> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (ThreadId -> IO ()
killThread ThreadId
t) m ()
-> m (Step (Tuple3' s (MVar Bool) ThreadId) b)
-> m (Step (Tuple3' s (MVar Bool) ThreadId) b)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Step (Tuple3' s (MVar Bool) ThreadId) b
-> m (Step (Tuple3' s (MVar Bool) ThreadId) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> Step (Tuple3' s (MVar Bool) ThreadId) b
forall s b. b -> Step s b
Done b
b)

    done' :: Tuple3' s b c -> m b
done' (Tuple3' s
s b
_ c
_) = s -> m b
done s
s

    timerThread :: MVar Bool -> m ()
timerThread MVar Bool
mv = do
        IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay (Double -> Int
forall a b. (RealFrac a, Integral b) => a -> b
round (Double -> Int) -> Double -> Int
forall a b. (a -> b) -> a -> b
$ Double
n Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
1000000)
        -- Use IORef + CAS? instead of MVar since its a Bool?
        IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar Bool -> Bool -> IO Bool
forall a. MVar a -> a -> IO a
swapMVar MVar Bool
mv Bool
True

    handleChildException :: MVar Bool -> SomeException -> IO ()
    handleChildException :: MVar Bool -> SomeException -> IO ()
handleChildException MVar Bool
mv SomeException
_ = IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar Bool -> Bool -> IO Bool
forall a. MVar a -> a -> IO a
swapMVar MVar Bool
mv Bool
True

-- For example, we can copy and distribute a stream to multiple folds where
-- each fold can group the input differently e.g. by one second, one minute and
-- one hour windows respectively and fold each resulting stream of folds.

-- | Group the input stream into windows of n second each using the first fold
-- and then fold the resulting groups using the second fold.
--
-- >>> intervals = Fold.intervalsOf 0.5 Fold.toList Fold.toList
-- >>> Stream.fold intervals $ Stream.delay 0.2 $ Stream.fromList [1..10]
-- [[1,2,3,4],[5,6,7],[8,9,10]]
--
-- > intervalsOf n split = many (takeInterval n split)
--
-- /Pre-release/
--
{-# INLINE intervalsOf #-}
intervalsOf :: MonadAsync m => Double -> Fold m a b -> Fold m b c -> Fold m a c
intervalsOf :: Double -> Fold m a b -> Fold m b c -> Fold m a c
intervalsOf Double
n Fold m a b
split = Fold m a b -> Fold m b c -> Fold m a c
forall (m :: * -> *) a b c.
Monad m =>
Fold m a b -> Fold m b c -> Fold m a c
many (Double -> Fold m a b -> Fold m a b
forall (m :: * -> *) a b.
MonadAsync m =>
Double -> Fold m a b -> Fold m a b
takeInterval Double
n Fold m a b
split)