{-# LANGUAGE FlexibleContexts #-} module Control.Concurrent.NQE.PubSub ( Publisher , publisher , boundedPublisher , withPubSub , withBoundedPubSub ) where import Control.Applicative import Control.Concurrent.NQE.Process import Control.Monad.Reader import Data.List import UnliftIO data ControlMsg ch msg = Subscribe (ch msg) | Unsubscribe (ch msg) data Incoming ch msg = Control (ControlMsg ch msg) | Event msg type Publisher mbox ch msg = mbox (ControlMsg ch msg) withPubSub :: (MonadUnliftIO m, Mailbox mbox) => Publisher mbox TQueue msg -> (TQueue msg -> m a) -> m a withPubSub pub f = bracket subscribe unsubscribe action where subscribe = do mbox <- newTQueueIO Subscribe mbox `send` pub return mbox unsubscribe mbox = Unsubscribe mbox `send` pub action mbox = f mbox withBoundedPubSub :: (MonadUnliftIO m, Mailbox mbox) => Int -> Publisher mbox TBQueue msg -> (TBQueue msg -> m a) -> m a withBoundedPubSub bound pub f = bracket subscribe unsubscribe action where subscribe = do mbox <- newTBQueueIO bound Subscribe mbox `send` pub return mbox unsubscribe mbox = Unsubscribe mbox `send` pub action mbox = f mbox publisher :: ( MonadIO m , Mailbox mbox , Mailbox events , Mailbox ch , Eq (ch msg) ) => Publisher mbox ch msg -> events msg -> m () publisher pub events = do box <- newTVarIO [] runReaderT go box where go = forever $ do incoming <- atomically $ Control <$> receiveSTM pub <|> Event <$> receiveSTM events process incoming boundedPublisher :: (MonadIO m, Mailbox mbox, Mailbox events) => Publisher mbox TBQueue msg -> events msg -> m () boundedPublisher pub events = do box <- newTVarIO [] runReaderT go box where go = forever $ do incoming <- atomically $ Control <$> receiveSTM pub <|> Event <$> receiveSTM events processBound incoming processBound :: (MonadIO m, MonadReader (TVar [TBQueue msg]) m) => Incoming TBQueue msg -> m () processBound (Control (Subscribe mbox)) = do box <- ask atomically $ do subscribers <- readTVar box when (mbox `notElem` subscribers) $ writeTVar box (mbox : subscribers) processBound (Control (Unsubscribe mbox)) = do box <- ask atomically (modifyTVar box (delete mbox)) processBound (Event event) = ask >>= \box -> atomically $ readTVar box >>= \subs -> forM_ subs $ \sub -> isFullTBQueue sub >>= \full -> when (not full) (event `sendSTM` sub) process :: (Eq (ch msg), Mailbox ch, MonadIO m, MonadReader (TVar [ch msg]) m) => Incoming ch msg -> m () process (Control (Subscribe mbox)) = do box <- ask atomically $ do subscribers <- readTVar box when (mbox `notElem` subscribers) $ writeTVar box (mbox : subscribers) process (Control (Unsubscribe mbox)) = do box <- ask atomically (modifyTVar box (delete mbox)) process (Event event) = do box <- ask readTVarIO box >>= mapM_ (send event)