-- |
-- Module: Data.Conduit.FoldDebounce
-- Description: Regulate input traffic from conduit Source with Control.FoldDebounce
-- Maintainer: Toshio Ito <debug.ito@gmail.com>
--
-- Synopsis:
--
-- > module Main (main) where
-- >
-- > import Data.Conduit (ConduitT, yield, runConduit, (.|))
-- > import qualified Data.Conduit.List as CL
-- > import Data.Void (Void)
-- > import Control.Concurrent (threadDelay)
-- > import Control.Monad.IO.Class (liftIO)
-- > import Control.Monad.Trans.Resource (ResourceT, runResourceT)
-- >
-- > import qualified Data.Conduit.FoldDebounce as F
-- >
-- > fastSource :: Int -> ConduitT () Int (ResourceT IO) ()
-- > fastSource max_num = fastStream' 0 where
-- >   fastStream' count = do
-- >     yield count
-- >     if count >= max_num
-- >       then return ()
-- >       else do
-- >         liftIO $ threadDelay 100000
-- >         fastStream' (count + 1)
-- >
-- > printSink :: Show a => ConduitT a Void (ResourceT IO) ()
-- > printSink = CL.mapM_ (liftIO . putStrLn . show)
-- >
-- > main :: IO ()
-- > main = do
-- >   putStrLn "-- Before debounce"
-- >   runResourceT $ runConduit $ fastSource 10 .| printSink
-- >   let debouncer = F.debounce F.Args { F.cb = undefined, -- anything will do
-- >                                       F.fold = (\list num -> list ++ [num]),
-- >                                       F.init = [] }
-- >                              F.def { F.delay = 500000 }
-- >   putStrLn "-- After debounce"
-- >   runResourceT $ runConduit $ debouncer (fastSource 10) .| printSink
--
-- Result:
--
-- > -- Before debounce
-- > 0
-- > 1
-- > 2
-- > 3
-- > 4
-- > 5
-- > 6
-- > 7
-- > 8
-- > 9
-- > 10
-- > -- After debounce
-- > [0,1,2,3,4]
-- > [5,6,7,8,9]
-- > [10]
--
-- This module regulates (slows down) data stream from conduit source
-- using "Control.FoldDebounce".
--
-- The data from the original source (type @i@) are pulled and folded
-- together to create an output data (type @o@). The output data then
-- comes out of the debounced source in a predefined interval
-- (specified by 'delay' option).
--
-- See "Control.FoldDebounce" for detail.
module Data.Conduit.FoldDebounce
    ( debounce
      -- * Re-exports
    , Args (..)
    , Opts
    , def
      -- ** Accessors for 'Opts'
    , delay
    , alwaysResetTimer
      -- * Preset parameters
    , forStack
    , forMonoid
    , forVoid
    ) where

import           Control.Monad                (void)
import           Data.Monoid                  (Monoid)
import           Data.Void                    (Void)
import           Prelude                      hiding (init)

import           Control.Concurrent.STM       (TVar, atomically, newTChanIO, newTVarIO, readTChan,
                                               readTVar, writeTChan, writeTVar)
import           Control.FoldDebounce         (Args (Args, cb, fold, init), Opts, alwaysResetTimer,
                                               def, delay)
import qualified Control.FoldDebounce         as F
import           Control.Monad.IO.Class       (MonadIO, liftIO)
import           Control.Monad.Trans.Class    (lift)
import           Control.Monad.Trans.Resource (MonadResource, MonadUnliftIO, allocate, register,
                                               release, resourceForkIO, runResourceT)
import           Data.Conduit                 (ConduitT, await, bracketP, runConduit, yield, (.|))

-- | Debounce conduit source with "Control.FoldDebounce". The data
-- stream from the original source (type @i@) is debounced and folded
-- into the data stream of the type @o@.
--
-- Note that the original source is connected to a sink in another
-- thread. You may need some synchronization if the original source
-- has side-effects.
debounce :: (MonadResource m, MonadUnliftIO m)
            => Args i o -- ^ mandatory argument for FoldDebounce. 'cb'
                        -- field is ignored, so you can set anything
                        -- to that.
            -> Opts i o -- ^ optional argument for FoldDebounce
            -> ConduitT () i m () -- ^ original source
            -> ConduitT () o m () -- ^ debounced source
debounce :: forall (m :: * -> *) i o.
(MonadResource m, MonadUnliftIO m) =>
Args i o -> Opts i o -> ConduitT () i m () -> ConduitT () o m ()
debounce Args i o
args Opts i o
opts ConduitT () i m ()
src = forall (m :: * -> *) a i o r.
MonadResource m =>
IO a -> (a -> IO ()) -> (a -> ConduitT i o m r) -> ConduitT i o m r
bracketP IO (TVar Bool)
initOutTermed TVar Bool -> IO ()
finishOutTermed forall {i}. TVar Bool -> ConduitT i o m ()
debounceWith
  where
    initOutTermed :: IO (TVar Bool)
initOutTermed = forall a. a -> IO (TVar a)
newTVarIO Bool
False
    finishOutTermed :: TVar Bool -> IO ()
finishOutTermed = forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b c. (a -> b -> c) -> b -> a -> c
flip forall a. TVar a -> a -> STM ()
writeTVar Bool
True
    debounceWith :: TVar Bool -> ConduitT i o m ()
debounceWith TVar Bool
out_termed = do
      TChan (OutData o)
out_chan <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IO (TChan a)
newTChanIO
      forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadUnliftIO m => ResourceT m a -> m a
runResourceT forall a b. (a -> b) -> a -> b
$ do
        forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *). MonadResource m => IO () -> m ReleaseKey
register forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TChan a -> a -> STM ()
writeTChan TChan (OutData o)
out_chan forall o. OutData o
OutFinished
        (ReleaseKey
_, Trigger i o
trig) <- forall (m :: * -> *) a.
MonadResource m =>
IO a -> (a -> IO ()) -> m (ReleaseKey, a)
allocate (forall i o. Args i o -> Opts i o -> IO (Trigger i o)
F.new Args i o
args { cb :: o -> IO ()
F.cb = forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. TChan a -> a -> STM ()
writeTChan TChan (OutData o)
out_chan forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall o. o -> OutData o
OutData }
                                     Opts i o
opts)
                              (forall i o. Trigger i o -> IO ()
F.close)
        forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *).
MonadUnliftIO m =>
ResourceT m () -> ResourceT m ThreadId
resourceForkIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit (ConduitT () i m ()
src forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| forall (m :: * -> *) i o.
MonadIO m =>
Trigger i o -> TVar Bool -> ConduitT i Void m ()
trigSink Trigger i o
trig TVar Bool
out_termed)
      forall {m :: * -> *} {o} {i}.
MonadIO m =>
TChan (OutData o) -> ConduitT i o m ()
keepYield TChan (OutData o)
out_chan
    keepYield :: TChan (OutData o) -> ConduitT i o m ()
keepYield TChan (OutData o)
out_chan = do
      OutData o
mgot <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TChan a -> STM a
readTChan TChan (OutData o)
out_chan
      case OutData o
mgot of
       OutData o
OutFinished -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
       OutData o
got -> forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield o
got forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> TChan (OutData o) -> ConduitT i o m ()
keepYield TChan (OutData o)
out_chan

-- | Internal data type for output channel.
data OutData o
  = OutData o
  | OutFinished

trigSink :: (MonadIO m) => F.Trigger i o -> TVar Bool -> ConduitT i Void m ()
trigSink :: forall (m :: * -> *) i o.
MonadIO m =>
Trigger i o -> TVar Bool -> ConduitT i Void m ()
trigSink Trigger i o
trig TVar Bool
out_termed = forall {o}. ConduitT i o m ()
trigSink' where
  trigSink' :: ConduitT i o m ()
trigSink' = do
    Maybe i
mgot <- forall (m :: * -> *) i o. Monad m => ConduitT i o m (Maybe i)
await
    Bool
termed <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> STM a
readTVar TVar Bool
out_termed
    case (Bool
termed, Maybe i
mgot) of
      (Bool
True, Maybe i
_) -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
      (Bool
False, Maybe i
Nothing) -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
      (Bool
False, Just i
got) -> do
        forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall i o. Trigger i o -> i -> IO ()
F.send Trigger i o
trig i
got
        ConduitT i o m ()
trigSink'


-- | 'Args' for stacks. Input events are accumulated in a stack, i.e.,
-- the last event is at the head of the list.
forStack :: Args i [i]
forStack :: forall i. Args i [i]
forStack = forall i. ([i] -> IO ()) -> Args i [i]
F.forStack forall a. HasCallStack => a
undefined

-- | 'Args' for monoids. Input events are appended to the tail.
forMonoid :: Monoid i => Args i i
forMonoid :: forall i. Monoid i => Args i i
forMonoid = forall i. Monoid i => (i -> IO ()) -> Args i i
F.forMonoid forall a. HasCallStack => a
undefined

-- | 'Args' that discards input events. The data stream from the
-- debounced source indicates the presence of data from the original
-- source.
forVoid :: Args i ()
forVoid :: forall i. Args i ()
forVoid = forall i. IO () -> Args i ()
F.forVoid forall a. HasCallStack => a
undefined