{-| A subscription based messaging system, with non-blocking bounded write. > fh <- atomically newFirehose > -- the following line will not block, even though nobody subscribed > atomically (mapM_ (writeEvent fh) [1..100]) > -- let's subscribe a single client > sub <- atomically (subscribe 10 fh) > forkIO (forever (atomically (readEvent sub) >>= print)) > atomically $ writeEvent fh 1 -} module Control.Concurrent.STM.Firehose (Firehose, Subscription, newFirehose, writeEvent, subscribe, unsubscribe, readEvent, getQueue) where import Control.Concurrent.STM import Control.Concurrent.STM.TBMQueue import Control.Monad (filterM) newtype Firehose a = Firehose (TVar [TBMQueue a]) data Subscription a = Subscription (TBMQueue a) (Firehose a) -- | Creates a new 'Firehose' item. newFirehose :: STM (Firehose a) newFirehose = Firehose `fmap` newTVar [] -- | Sends a piece of data in the fire hose. writeEvent :: Firehose a -> a -> STM () writeEvent (Firehose lst) element = readTVar lst >>= mapM_ (`tryWriteTBMQueue` element) -- | Get a subscription from the fire hose, that will be used to -- read events. subscribe :: Int -- ^ Number of elements buffered. If set too high, it will increase memory usage. If set too low, activity spikes will result in message loss. -> Firehose a -> STM (Subscription a) subscribe len f@(Firehose lst) = do nq <- newTBMQueue len modifyTVar' lst (nq:) return (Subscription nq f) -- | Unsubscribe from the fire hose. Subsequent calls to 'readEvent' will -- return 'Nothing'. This runs in O(n), where n is the current number of -- subscriptions. Please contact the maintainer if you need better -- performance. unsubscribe :: Subscription a -> STM () unsubscribe (Subscription q (Firehose lst)) = do closeTBMQueue q queues <- readTVar lst nqueues <- filterM (fmap not . isClosedTBMQueue) queues writeTVar lst nqueues -- | Read an event from a 'Subscription'. This will return 'Nothing' if the -- firehose is shut, or the subscription removed. readEvent :: Subscription a -> STM (Maybe a) readEvent (Subscription q _) = readTBMQueue q -- | Gets the underlying queue from a subscription. getQueue :: Subscription a -> TBMQueue a getQueue (Subscription q _) = q