{-# LANGUAGE FlexibleContexts #-}
module Control.Concurrent.NQE.PubSub
( Publisher
, publisher
, subscribe
, unsubscribe
, withPubSub
) where
import Control.Applicative
import Control.Concurrent.NQE.Process
import Control.Monad.Reader
import Data.List
import UnliftIO
data ControlMsg event
= Subscribe (Inbox event)
| Unsubscribe (Inbox event)
type Publisher msg = Inbox (ControlMsg msg)
withPubSub ::
(MonadUnliftIO m, Mailbox mbox event)
=> Publisher event
-> m (mbox event)
-> (Inbox event -> m a)
-> m a
withPubSub pub sub = bracket acquire (unsubscribe pub)
where
acquire = do
i <- newInbox =<< sub
subscribe pub i
return i
subscribe :: MonadIO m => Publisher event -> Inbox event -> m ()
subscribe pub sub = Subscribe sub `send` pub
unsubscribe :: MonadIO m => Publisher event -> Inbox event -> m ()
unsubscribe pub sub = Unsubscribe sub `send` pub
publisher :: MonadIO m => Publisher event -> STM event -> m ()
publisher pub events = do
box <- newTVarIO []
runReaderT go box
where
go =
forever $ do
incoming <-
atomically $
Left <$> receiveSTM pub <|> Right <$> events
process incoming
process ::
(MonadIO m, MonadReader (TVar [Inbox event]) m)
=> Either (ControlMsg event) event
-> m ()
process (Left (Subscribe sub)) = do
box <- ask
atomically (modifyTVar box (`union` [sub]))
process (Left (Unsubscribe sub)) = do
box <- ask
atomically (modifyTVar box (delete sub))
process (Right event) = do
box <- ask
atomically $ do
subs <- readTVar box
forM_ subs $ \sub -> do
full <- mailboxFullSTM sub
unless full $ event `sendSTM` sub