{-# LANGUAGE FlexibleContexts           #-}
{-|
Module      : Control.Concurrent.NQE.Publisher
Copyright   : No rights reserved
License     : UNLICENSE
Maintainer  : xenog@protonmail.com
Stability   : experimental
Portability : POSIX

A publisher is a process that forwards messages to subscribers. NQE publishers
are simple, and do not implement filtering directly, although that can be done
on the 'STM' 'Listen' actions that forward messages to subscribers.

If a subscriber has been added to a publisher using the 'subscribe' function, it
needs to be removed later using 'unsubscribe' when it is no longer needed, or
the publisher will continue calling its 'Listen' action in the future, likely
causing memory leaks.
-}
module Control.Concurrent.NQE.Publisher
    ( Subscriber
    , PublisherMessage(..)
    , Publisher
    , withSubscription
    , subscribe
    , unsubscribe
    , withPublisher
    , publisher
    , publisherProcess
    , publish
    , publishSTM
    ) where

import           Control.Concurrent.NQE.Process
import           Control.Concurrent.Unique
import           Control.Monad.Reader
import           Data.Function
import           Data.Hashable
import           Data.List
import           UnliftIO

-- | Handle of a subscriber to a process. Should be kept in order to
-- unsubscribe.
data Subscriber msg = Subscriber (Listen msg) Unique

instance Eq (Subscriber msg) where
    == :: Subscriber msg -> Subscriber msg -> Bool
(==) = Unique -> Unique -> Bool
forall a. Eq a => a -> a -> Bool
(==) (Unique -> Unique -> Bool)
-> (Subscriber msg -> Unique)
-> Subscriber msg
-> Subscriber msg
-> Bool
forall b c a. (b -> b -> c) -> (a -> b) -> a -> a -> c
`on` Subscriber msg -> Unique
forall msg. Subscriber msg -> Unique
f
      where
        f :: Subscriber msg -> Unique
f (Subscriber Listen msg
_ Unique
u) = Unique
u

instance Hashable (Subscriber msg) where
    hashWithSalt :: Int -> Subscriber msg -> Int
hashWithSalt Int
i (Subscriber Listen msg
_ Unique
u) = Int -> Unique -> Int
forall a. Hashable a => Int -> a -> Int
hashWithSalt Int
i Unique
u

-- | Messages that a publisher will take.
data PublisherMessage msg
    = Subscribe !(Listen msg) !(Listen (Subscriber msg))
    | Unsubscribe !(Subscriber msg)
    | Event msg

-- | Alias for a publisher process.
type Publisher msg = Process (PublisherMessage msg)

publish :: MonadIO m => msg -> Publisher msg -> m ()
publish :: msg -> Publisher msg -> m ()
publish = PublisherMessage msg -> Publisher msg -> m ()
forall (m :: * -> *) (mbox :: * -> *) msg.
(MonadIO m, OutChan mbox) =>
msg -> mbox msg -> m ()
send (PublisherMessage msg -> Publisher msg -> m ())
-> (msg -> PublisherMessage msg) -> msg -> Publisher msg -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. msg -> PublisherMessage msg
forall msg. msg -> PublisherMessage msg
Event

publishSTM :: msg -> Publisher msg -> STM ()
publishSTM :: msg -> Publisher msg -> STM ()
publishSTM = PublisherMessage msg -> Publisher msg -> STM ()
forall (mbox :: * -> *) msg.
OutChan mbox =>
msg -> mbox msg -> STM ()
sendSTM (PublisherMessage msg -> Publisher msg -> STM ())
-> (msg -> PublisherMessage msg) -> msg -> Publisher msg -> STM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. msg -> PublisherMessage msg
forall msg. msg -> PublisherMessage msg
Event

-- | Create a mailbox, subscribe it to a publisher and pass it to the supplied
-- function . End subscription when function returns.
withSubscription ::
       MonadUnliftIO m => Publisher msg -> (Inbox msg -> m a) -> m a
withSubscription :: Publisher msg -> (Inbox msg -> m a) -> m a
withSubscription Publisher msg
pub Inbox msg -> m a
f = do
    Inbox msg
inbox <- m (Inbox msg)
forall (m :: * -> *) msg. MonadIO m => m (Inbox msg)
newInbox
    let sub :: m (Subscriber msg)
sub = Publisher msg -> Listen msg -> m (Subscriber msg)
forall (m :: * -> *) msg.
MonadIO m =>
Publisher msg -> Listen msg -> m (Subscriber msg)
subscribe Publisher msg
pub (msg -> Inbox msg -> STM ()
forall (mbox :: * -> *) msg.
OutChan mbox =>
msg -> mbox msg -> STM ()
`sendSTM` Inbox msg
inbox)
        unsub :: Subscriber msg -> m ()
unsub = Publisher msg -> Subscriber msg -> m ()
forall (m :: * -> *) msg.
MonadIO m =>
Publisher msg -> Subscriber msg -> m ()
unsubscribe Publisher msg
pub
    m (Subscriber msg)
-> (Subscriber msg -> m ()) -> (Subscriber msg -> m a) -> m a
forall (m :: * -> *) a b c.
MonadUnliftIO m =>
m a -> (a -> m b) -> (a -> m c) -> m c
bracket m (Subscriber msg)
sub Subscriber msg -> m ()
unsub ((Subscriber msg -> m a) -> m a) -> (Subscriber msg -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \Subscriber msg
_ -> Inbox msg -> m a
f Inbox msg
inbox

-- | 'Listen' to events from a publisher.
subscribe :: MonadIO m => Publisher msg -> Listen msg -> m (Subscriber msg)
subscribe :: Publisher msg -> Listen msg -> m (Subscriber msg)
subscribe Publisher msg
pub Listen msg
sub = Listen msg -> Listen (Subscriber msg) -> PublisherMessage msg
forall msg.
Listen msg -> Listen (Subscriber msg) -> PublisherMessage msg
Subscribe Listen msg
sub (Listen (Subscriber msg) -> PublisherMessage msg)
-> Publisher msg -> m (Subscriber msg)
forall (m :: * -> *) (mbox :: * -> *) response request.
(MonadIO m, OutChan mbox) =>
(Listen response -> request) -> mbox request -> m response
`query` Publisher msg
pub

-- | Stop listening to events from a publisher. Must provide 'Subscriber' that
-- was returned from corresponding 'subscribe' action.
unsubscribe :: MonadIO m => Publisher msg -> Subscriber msg -> m ()
unsubscribe :: Publisher msg -> Subscriber msg -> m ()
unsubscribe Publisher msg
pub Subscriber msg
sub = Subscriber msg -> PublisherMessage msg
forall msg. Subscriber msg -> PublisherMessage msg
Unsubscribe Subscriber msg
sub PublisherMessage msg -> Publisher msg -> m ()
forall (m :: * -> *) (mbox :: * -> *) msg.
(MonadIO m, OutChan mbox) =>
msg -> mbox msg -> m ()
`send` Publisher msg
pub

-- | Start a publisher in the background and pass it to a function. The
-- publisher will be stopped when the function function returns.
withPublisher :: MonadUnliftIO m => (Publisher msg -> m a) -> m a
withPublisher :: (Publisher msg -> m a) -> m a
withPublisher = (Inbox (PublisherMessage msg) -> m ())
-> (Publisher msg -> m a) -> m a
forall (m :: * -> *) msg a.
MonadUnliftIO m =>
(Inbox msg -> m ()) -> (Process msg -> m a) -> m a
withProcess Inbox (PublisherMessage msg) -> m ()
forall (m :: * -> *) msg.
MonadUnliftIO m =>
Inbox (PublisherMessage msg) -> m ()
publisherProcess

-- | Start a publisher in the background.
publisher :: MonadUnliftIO m => m (Publisher msg)
publisher :: m (Publisher msg)
publisher = (Inbox (PublisherMessage msg) -> m ()) -> m (Publisher msg)
forall (m :: * -> *) msg.
MonadUnliftIO m =>
(Inbox msg -> m ()) -> m (Process msg)
process Inbox (PublisherMessage msg) -> m ()
forall (m :: * -> *) msg.
MonadUnliftIO m =>
Inbox (PublisherMessage msg) -> m ()
publisherProcess

-- | Start a publisher in the current thread.
publisherProcess :: MonadUnliftIO m => Inbox (PublisherMessage msg) -> m ()
publisherProcess :: Inbox (PublisherMessage msg) -> m ()
publisherProcess Inbox (PublisherMessage msg)
inbox = [Subscriber msg] -> m (TVar [Subscriber msg])
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO [] m (TVar [Subscriber msg])
-> (TVar [Subscriber msg] -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ReaderT (TVar [Subscriber msg]) m ()
-> TVar [Subscriber msg] -> m ()
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT ReaderT (TVar [Subscriber msg]) m ()
forall b. ReaderT (TVar [Subscriber msg]) m b
go
  where
    go :: ReaderT (TVar [Subscriber msg]) m b
go = ReaderT (TVar [Subscriber msg]) m ()
-> ReaderT (TVar [Subscriber msg]) m b
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (ReaderT (TVar [Subscriber msg]) m ()
 -> ReaderT (TVar [Subscriber msg]) m b)
-> ReaderT (TVar [Subscriber msg]) m ()
-> ReaderT (TVar [Subscriber msg]) m b
forall a b. (a -> b) -> a -> b
$ Inbox (PublisherMessage msg)
-> ReaderT (TVar [Subscriber msg]) m (PublisherMessage msg)
forall (mbox :: * -> *) (m :: * -> *) msg.
(InChan mbox, MonadIO m) =>
mbox msg -> m msg
receive Inbox (PublisherMessage msg)
inbox ReaderT (TVar [Subscriber msg]) m (PublisherMessage msg)
-> (PublisherMessage msg -> ReaderT (TVar [Subscriber msg]) m ())
-> ReaderT (TVar [Subscriber msg]) m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= PublisherMessage msg -> ReaderT (TVar [Subscriber msg]) m ()
forall (m :: * -> *) msg.
(MonadIO m, MonadReader (TVar [Subscriber msg]) m) =>
PublisherMessage msg -> m ()
publisherMessage

-- | Internal function to dispatch a publisher message.
publisherMessage ::
       (MonadIO m, MonadReader (TVar [Subscriber msg]) m)
    => PublisherMessage msg
    -> m ()
publisherMessage :: PublisherMessage msg -> m ()
publisherMessage (Subscribe Listen msg
sub Listen (Subscriber msg)
r) =
    m (TVar [Subscriber msg])
forall r (m :: * -> *). MonadReader r m => m r
ask m (TVar [Subscriber msg])
-> (TVar [Subscriber msg] -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \TVar [Subscriber msg]
box -> do
        Unique
u <- IO Unique -> m Unique
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO Unique
newUnique
        let s :: Subscriber msg
s = Listen msg -> Unique -> Subscriber msg
forall msg. Listen msg -> Unique -> Subscriber msg
Subscriber Listen msg
sub Unique
u
        STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ()) -> STM () -> m ()
forall a b. (a -> b) -> a -> b
$ do
            TVar [Subscriber msg]
-> ([Subscriber msg] -> [Subscriber msg]) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar [Subscriber msg]
box ([Subscriber msg] -> [Subscriber msg] -> [Subscriber msg]
forall a. Eq a => [a] -> [a] -> [a]
`union` [Subscriber msg
s])
            Listen (Subscriber msg)
r Subscriber msg
s
publisherMessage (Unsubscribe Subscriber msg
sub) =
    m (TVar [Subscriber msg])
forall r (m :: * -> *). MonadReader r m => m r
ask m (TVar [Subscriber msg])
-> (TVar [Subscriber msg] -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \TVar [Subscriber msg]
box -> STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (TVar [Subscriber msg]
-> ([Subscriber msg] -> [Subscriber msg]) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar [Subscriber msg]
box (Subscriber msg -> [Subscriber msg] -> [Subscriber msg]
forall a. Eq a => a -> [a] -> [a]
delete Subscriber msg
sub))
publisherMessage (Event msg
event) =
    m (TVar [Subscriber msg])
forall r (m :: * -> *). MonadReader r m => m r
ask m (TVar [Subscriber msg])
-> (TVar [Subscriber msg] -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \TVar [Subscriber msg]
box ->
        STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ()) -> STM () -> m ()
forall a b. (a -> b) -> a -> b
$
        TVar [Subscriber msg] -> STM [Subscriber msg]
forall a. TVar a -> STM a
readTVar TVar [Subscriber msg]
box STM [Subscriber msg] -> ([Subscriber msg] -> STM ()) -> STM ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \[Subscriber msg]
subs ->
            [Subscriber msg] -> Listen (Subscriber msg) -> STM ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Subscriber msg]
subs (Listen (Subscriber msg) -> STM ())
-> Listen (Subscriber msg) -> STM ()
forall a b. (a -> b) -> a -> b
$ \(Subscriber Listen msg
sub Unique
_) -> Listen msg
sub msg
event