module Data.SouSiT.STM ( -- * Sinks stmSink, stmSink', tchanSink, -- * Sources stmSource, stmSource', tchanSource ) where import Data.SouSiT.Source import Data.SouSiT.Sink import Control.Monad import Control.Concurrent.STM import Control.Monad.IO.Class -- | A sink that executes (atomically) a STM action for every input received. -- The sink continues as long as the action returns Nothing. When the action -- returns Just, then that value is the result of the sink. stmSink :: MonadIO m => (a -> STM (Maybe r)) -> Sink a m (Maybe r) stmSink f = maybeSink (liftIO . atomically . f) -- | A sink that executes (atomically) a STM action for every input received. -- The sink never terminates. stmSink' :: MonadIO m => (a -> STM ()) -> Sink a m () stmSink' f = actionSink (liftIO . atomically . f) -- | Sink that writes all items into a TChan. tchanSink :: MonadIO m => TChan a -> Sink a m () tchanSink chan = stmSink' (writeTChan chan) -- | Source that executes a STM action to get a new item. When the action returns 'Nothing' -- then the source is depleted. stmSource :: MonadIO m => STM (Maybe a) -> FeedSource m a stmSource f = actionSource (liftIO . atomically $ f) -- | Source that executes a STM action to get a new item. Does never run out of items. stmSource' :: MonadIO m => STM a -> FeedSource m a stmSource' f = actionSource (liftIO . atomically $ liftM Just f) -- | Source that reads from a TChan. Does never run out of items (just waits for new ones -- written to the TChan). tchanSource :: MonadIO m => TChan a -> FeedSource m a tchanSource c = stmSource' (readTChan c)