-- | -- Module: Data.Conduit.FoldDebounce -- Description: Regulate input traffic from conduit Source with Control.FoldDebounce -- Maintainer: Toshio Ito -- -- Synopsis: -- -- > module Main (main) where -- > -- > import Data.Conduit (ConduitT, yield, runConduit, (.|)) -- > import qualified Data.Conduit.List as CL -- > import Data.Void (Void) -- > import Control.Concurrent (threadDelay) -- > import Control.Monad.IO.Class (liftIO) -- > import Control.Monad.Trans.Resource (ResourceT, runResourceT) -- > -- > import qualified Data.Conduit.FoldDebounce as F -- > -- > fastSource :: Int -> ConduitT () Int (ResourceT IO) () -- > fastSource max_num = fastStream' 0 where -- > fastStream' count = do -- > yield count -- > if count >= max_num -- > then return () -- > else do -- > liftIO $ threadDelay 100000 -- > fastStream' (count + 1) -- > -- > printSink :: Show a => ConduitT a Void (ResourceT IO) () -- > printSink = CL.mapM_ (liftIO . putStrLn . show) -- > -- > main :: IO () -- > main = do -- > putStrLn "-- Before debounce" -- > runResourceT $ runConduit $ fastSource 10 .| printSink -- > let debouncer = F.debounce F.Args { F.cb = undefined, -- anything will do -- > F.fold = (\list num -> list ++ [num]), -- > F.init = [] } -- > F.def { F.delay = 500000 } -- > putStrLn "-- After debounce" -- > runResourceT $ runConduit $ debouncer (fastSource 10) .| printSink -- -- Result: -- -- > -- Before debounce -- > 0 -- > 1 -- > 2 -- > 3 -- > 4 -- > 5 -- > 6 -- > 7 -- > 8 -- > 9 -- > 10 -- > -- After debounce -- > [0,1,2,3,4] -- > [5,6,7,8,9] -- > [10] -- -- This module regulates (slows down) data stream from conduit source -- using "Control.FoldDebounce". -- -- The data from the original source (type @i@) are pulled and folded -- together to create an output data (type @o@). The output data then -- comes out of the debounced source in a predefined interval -- (specified by 'delay' option). -- -- See "Control.FoldDebounce" for detail. module Data.Conduit.FoldDebounce ( debounce, -- * Re-exports Args(..), Opts, def, -- ** Accessors for 'Opts' delay, alwaysResetTimer, -- * Preset parameters forStack, forMonoid, forVoid ) where import Prelude hiding (init) import Control.Monad (void) import Data.Monoid (Monoid) import Data.Void (Void) import Control.FoldDebounce (Args(Args,cb,fold,init), Opts, delay, alwaysResetTimer, def) import qualified Control.FoldDebounce as F import Data.Conduit (ConduitT, await, (.|), bracketP, yield, runConduit) import Control.Monad.Trans.Resource (MonadResource, MonadUnliftIO, allocate, register, release, resourceForkIO, runResourceT) import Control.Monad.Trans.Class (lift) import Control.Monad.IO.Class (MonadIO, liftIO) import Control.Concurrent.STM (newTChanIO, writeTChan, readTChan, atomically, TVar, readTVar, newTVarIO, writeTVar) -- | Debounce conduit source with "Control.FoldDebounce". The data -- stream from the original source (type @i@) is debounced and folded -- into the data stream of the type @o@. -- -- Note that the original source is connected to a sink in another -- thread. You may need some synchronization if the original source -- has side-effects. debounce :: (MonadResource m, MonadUnliftIO m) => Args i o -- ^ mandatory argument for FoldDebounce. 'cb' -- field is ignored, so you can set anything -- to that. -> Opts i o -- ^ optional argument for FoldDebounce -> ConduitT () i m () -- ^ original source -> ConduitT () o m () -- ^ debounced source debounce args opts src = bracketP initOutTermed finishOutTermed debounceWith where initOutTermed = newTVarIO False finishOutTermed = atomically . flip writeTVar True debounceWith out_termed = do out_chan <- liftIO $ newTChanIO lift $ runResourceT $ do void $ register $ atomically $ writeTChan out_chan OutFinished (_, trig) <- allocate (F.new args { F.cb = atomically . writeTChan out_chan . OutData } opts) (F.close) void $ resourceForkIO $ lift $ runConduit (src .| trigSink trig out_termed) keepYield out_chan keepYield out_chan = do mgot <- liftIO $ atomically $ readTChan out_chan case mgot of OutFinished -> return () OutData got -> yield got >> keepYield out_chan -- | Internal data type for output channel. data OutData o = OutData o | OutFinished trigSink :: (MonadIO m) => F.Trigger i o -> TVar Bool -> ConduitT i Void m () trigSink trig out_termed = trigSink' where trigSink' = do mgot <- await termed <- liftIO $ atomically $ readTVar out_termed case (termed, mgot) of (True, _) -> return () (False, Nothing) -> return () (False, Just got) -> do liftIO $ F.send trig got trigSink' -- | 'Args' for stacks. Input events are accumulated in a stack, i.e., -- the last event is at the head of the list. forStack :: Args i [i] forStack = F.forStack undefined -- | 'Args' for monoids. Input events are appended to the tail. forMonoid :: Monoid i => Args i i forMonoid = F.forMonoid undefined -- | 'Args' that discards input events. The data stream from the -- debounced source indicates the presence of data from the original -- source. forVoid :: Args i () forVoid = F.forVoid undefined