{-# LANGUAGE Safe #-} {-# LANGUAGE ScopedTypeVariables #-} module Signal.Channel ( Channel , newChannel , ChannelCapacity(..) , newReplayChannel , Signal , Subscriber , Scheduler ) where import Control.Concurrent.STM import Control.Monad hiding (mapM_) import Control.Monad.IO.Class import Data.Foldable import Data.Sequence as Seq import Disposable import Prelude hiding (mapM_, length, drop) import Scheduler import Signal import Signal.Subscriber import Signal.Subscriber.Internal -- | A controllable signal, represented by a 'Subscriber' and 'Signal' pair. -- -- Values sent to the subscriber will automatically be broadcast to all of the signal's subscribers. -- In effect, the subscriber is the write end, while the signal is the read end. type Channel s v = (Subscriber s v, Signal s v) -- | Determines how many events a replay channel will save. data ChannelCapacity = LimitedCapacity Int -- ^ The channel will only save the specified number of events. | UnlimitedCapacity -- ^ The channel will save an unlimited number of events. deriving (Eq, Show) -- | Creates a simple channel which broadcasts all values sent to it. -- -- Sending an 'ErrorEvent' or 'CompletedEvent' will terminate the channel. newChannel :: Scheduler s => IO (Channel s v) newChannel = newReplayChannel $ LimitedCapacity 0 -- | Like 'newChannel', but new subscriptions to the returned signal will receive all values -- (up to the specified capacity) which have been sent thus far. -- -- Sending an 'ErrorEvent' or 'CompletedEvent' will terminate the channel. Any terminating event -- will be replayed to future subscribers, assuming sufficient capacity. newReplayChannel :: forall s v. Scheduler s => ChannelCapacity -> IO (Channel s v) newReplayChannel cap = do subs <- atomically $ newTVar Seq.empty disposed <- atomically $ newTVar False events <- atomically $ newTVar Seq.empty let addSubscriber :: Subscriber s v -> STM (Seq (Event v), Bool) addSubscriber sub = do d <- readTVar disposed unless d $ modifyTVar' subs (|> sub) seq <- readTVar events return (seq, d) s :: Signal s v s = signal $ \sub -> do (events, d) <- liftIO $ atomically $ addSubscriber sub -- TODO: Allow these sends to be interrupted through disposal. mapM_ (send sub) events when d $ liftIO $ disposeSubscriber sub return EmptyDisposable limit :: Int -> Seq (Event v) -> Seq (Event v) limit n seq | n <= 0 = Seq.empty | length seq > n = drop (length seq - n) seq | otherwise = seq addEvent' :: ChannelCapacity -> Event v -> Seq (Event v) -> Seq (Event v) addEvent' (LimitedCapacity c) ev seq = limit c $ seq |> ev addEvent' UnlimitedCapacity ev seq = seq |> ev addEvent :: Event v -> STM (Seq (Subscriber s v)) addEvent ev@(NextEvent _) = do d <- readTVar disposed unless d $ modifyTVar' events $ addEvent' cap ev readTVar subs addEvent ev = do d <- swapTVar disposed True unless d $ modifyTVar' events $ addEvent' cap ev swapTVar subs Seq.empty onEvent :: Event v -> SchedulerIO s () onEvent ev = do subs <- liftIO $ atomically $ addEvent ev mapM_ (`send` ev) subs sub <- subscriber onEvent return (sub, s)