module Signal.Subscriber ( Subscriber
, subscriber
, send
, Event(..)
) where
import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad
import Control.Monad.IO.Class
import Disposable
import Scheduler
import Signal.Event
import Signal.Subscriber.Internal
subscriber
:: Scheduler s
=> (Event v -> SchedulerIO s ())
-> IO (Subscriber s v)
subscriber f = do
b <- atomically $ newTVar False
d <- newDisposable $ atomically $ writeTVar b True
ds <- newDisposableSet
addDisposable ds d
tid <- myThreadId
lt <- atomically $ newTVar tid
tlc <- atomically $ newTVar 0
return Subscriber {
onEvent = f,
disposables = ds,
lockedThread = lt,
threadLockCounter = tlc,
disposed = b
}
acquireSubscriber :: Subscriber s v -> ThreadId -> STM Bool
acquireSubscriber sub tid = do
d <- readTVar (disposed sub)
if d
then return False
else do
tlc <- readTVar (threadLockCounter sub)
lt <- readTVar (lockedThread sub)
when (tlc > 0 && lt /= tid) retry
writeTVar (lockedThread sub) tid
writeTVar (threadLockCounter sub) $ tlc + 1
return True
releaseSubscriber :: Subscriber s v -> ThreadId -> STM ()
releaseSubscriber sub tid = do
ltid <- readTVar $ lockedThread sub
unless (ltid == tid) $ throwSTM $ userError $ "Locked thread " ++ show ltid ++ " does not match current thread " ++ show tid
tlc <- readTVar (threadLockCounter sub)
unless (tlc > 0) $ throwSTM $ userError "Thread lock count is not greater than zero"
writeTVar (threadLockCounter sub) $ tlc 1
send :: forall s v. Scheduler s => Subscriber s v -> Event v -> SchedulerIO s ()
send s ev =
let sendAndDispose :: Event v -> SchedulerIO s ()
sendAndDispose ev = do
liftIO $ disposeSubscriber s
onEvent s ev
send' :: Event v -> SchedulerIO s ()
send' ev@(NextEvent _) = onEvent s ev
send' ev = do
wasDisposed <- liftIO $ atomically $ swapTVar (disposed s) True
unless wasDisposed $ sendAndDispose ev
in do
tid <- liftIO myThreadId
b <- liftIO $ atomically $ acquireSubscriber s tid
when b $ send' ev >> liftIO (atomically (releaseSubscriber s tid))