module Control.Concurrent.STM.Firehose (Firehose, Subscription, newFirehose, writeEvent, subscribe, unsubscribe, readEvent, getQueue) where
import Control.Concurrent.STM
import Control.Concurrent.STM.TBMQueue
import Control.Monad (filterM)
newtype Firehose a = Firehose (TVar [TBMQueue a])
data Subscription a = Subscription (TBMQueue a) (Firehose a)
newFirehose :: STM (Firehose a)
newFirehose = Firehose `fmap` newTVar []
writeEvent :: Firehose a -> a -> STM ()
writeEvent (Firehose lst) element = readTVar lst >>= mapM_ (flip tryWriteTBMQueue element)
subscribe :: Int
-> Firehose a -> STM (Subscription a)
subscribe len f@(Firehose lst) = do
nq <- newTBMQueue len
modifyTVar' lst (nq:)
return (Subscription nq f)
unsubscribe :: Subscription a -> STM ()
unsubscribe (Subscription q (Firehose lst)) = do
closeTBMQueue q
queues <- readTVar lst
nqueues <- filterM (fmap not . isClosedTBMQueue) queues
writeTVar lst nqueues
readEvent :: Subscription a -> STM (Maybe a)
readEvent (Subscription q _) = readTBMQueue q
getQueue :: Subscription a -> TBMQueue a
getQueue (Subscription q _) = q