{-# LANGUAGE FlexibleContexts #-}
module Control.Concurrent.NQE.PubSub
    ( Publisher
    , publisher
    , subscribe
    , unsubscribe
    , withPubSub
    ) where

import           Control.Applicative
import           Control.Concurrent.NQE.Process
import           Control.Monad.Reader
import           Data.List
import           UnliftIO

-- | Subscribe or unsubscribe from an event publisher.
data ControlMsg event
    = Subscribe (Inbox event)
    | Unsubscribe (Inbox event)

-- | Wrapper for a 'Mailbox' that can receive 'ControlMsg'.
type Publisher msg = Inbox (ControlMsg msg)

-- | Subscribe an 'Inbox' to events from a 'Publisher'. Unsubscribe when action
-- finishes. Pass a mailbox constructor.
withPubSub ::
       (MonadUnliftIO m, Mailbox mbox event)
    => Publisher event
    -> m (mbox event)         -- ^ mailbox creator for this subscription
    -> (Inbox event -> m a)   -- ^ unsubscribe when this action ends
    -> m a
withPubSub pub sub = bracket acquire (unsubscribe pub)
  where
    acquire = do
        i <- newInbox =<< sub
        subscribe pub i
        return i

-- | Subscribe an 'Inbox' to a 'Publisher' generating events.
subscribe :: MonadIO m => Publisher event -> Inbox event -> m ()
subscribe pub sub = Subscribe sub `send` pub

-- | Unsubscribe an 'Inbox' from a 'Publisher' events.
unsubscribe :: MonadIO m => Publisher event -> Inbox event -> m ()
unsubscribe pub sub = Unsubscribe sub `send` pub

-- | Start a publisher that will receive events from an 'STM' action, and
-- subscription requests from a 'Publisher' mailbox. Events will be forwarded
-- atomically to all subscribers. Full mailboxes will not receive events.
publisher :: MonadIO m => Publisher event -> STM event -> m ()
publisher pub events = do
    box <- newTVarIO []
    runReaderT go box
  where
    go =
        forever $ do
            incoming <-
                atomically $
                Left <$> receiveSTM pub <|> Right <$> events
            process incoming

process ::
       (MonadIO m, MonadReader (TVar [Inbox event]) m)
    => Either (ControlMsg event) event
    -> m ()
process (Left (Subscribe sub)) = do
    box <- ask
    atomically (modifyTVar box (`union` [sub]))

process (Left (Unsubscribe sub)) = do
    box <- ask
    atomically (modifyTVar box (delete sub))

process (Right event) = do
    box <- ask
    atomically $ do
        subs <- readTVar box
        forM_ subs $ \sub -> do
            full <- mailboxFullSTM sub
            unless full $ event `sendSTM` sub