{-# LANGUAGE Safe #-} {-# LANGUAGE ScopedTypeVariables #-} module Signal.Subscriber ( Subscriber , subscriber , send , Event(..) ) where import Control.Concurrent import Control.Concurrent.STM import Control.Monad import Control.Monad.IO.Class import Disposable import Scheduler import Signal.Event import Signal.Subscriber.Internal -- | Constructs a subscriber. subscriber :: Scheduler s => (Event v -> SchedulerIO s ()) -- ^ An action to run when each event is received. -> IO (Subscriber s v) -- ^ The constructed subscriber. subscriber f = do b <- atomically $ newTVar False d <- newDisposable $ atomically $ writeTVar b True ds <- newDisposableSet addDisposable ds d tid <- myThreadId lt <- atomically $ newTVar tid tlc <- atomically $ newTVar 0 return Subscriber { onEvent = f, disposables = ds, lockedThread = lt, threadLockCounter = tlc, disposed = b } -- | Acquires a subscriber for the specified thread. -- -- This is used to ensure that a subscriber never receives multiple events concurrently. acquireSubscriber :: Subscriber s v -> ThreadId -> STM Bool acquireSubscriber sub tid = do d <- readTVar (disposed sub) if d then return False else do -- TODO: Skip all this synchronization for singleton scheduler types. tlc <- readTVar (threadLockCounter sub) lt <- readTVar (lockedThread sub) when (tlc > 0 && lt /= tid) retry writeTVar (lockedThread sub) tid writeTVar (threadLockCounter sub) $ tlc + 1 return True -- | Releases a subscriber from the specified thread's ownership. -- -- This must match a previous call to 'acquireSubscriber'. releaseSubscriber :: Subscriber s v -> ThreadId -> STM () releaseSubscriber sub tid = do -- TODO: Skip all this synchronization for singleton scheduler types. always $ fmap (== tid) $ readTVar (lockedThread sub) tlc <- readTVar (threadLockCounter sub) always $ return $ tlc > 0 writeTVar (threadLockCounter sub) $ tlc - 1 -- | Synchronously sends an event to a subscriber. send :: forall s v. Scheduler s => Subscriber s v -> Event v -> SchedulerIO s () send s ev = let sendAndDispose :: Event v -> SchedulerIO s () sendAndDispose ev = do liftIO $ disposeSubscriber s onEvent s ev send' :: Event v -> SchedulerIO s () send' ev@(NextEvent _) = onEvent s ev send' ev = do wasDisposed <- liftIO $ atomically $ swapTVar (disposed s) True unless wasDisposed $ sendAndDispose ev in do tid <- liftIO myThreadId b <- liftIO $ atomically $ acquireSubscriber s tid when b $ send' ev >> liftIO (atomically (releaseSubscriber s tid))