{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE FlexibleContexts          #-}
{-# LANGUAGE FlexibleInstances         #-}
{-# LANGUAGE MultiParamTypeClasses     #-}
{-# LANGUAGE RankNTypes                #-}
module Control.Concurrent.NQE.Process
    ( Mailbox(..)
    , Inbox
    , Reply
    , Listen
    , newInbox
    , send
    , receive
    , query
    , receiveMatch
    , receiveMatchSTM
    , mailboxEmpty
    ) where

import           Control.Concurrent.Unique
import           Data.Hashable
import           UnliftIO

-- | STM action to reply to a synchronous request.
type Reply a = a -> STM ()

-- | STM action for an event listener.
type Listen a = a -> STM ()

-- | Mailboxes are used to communicate with processes (actors). A process will
-- usually listen in a loop for events entering its mailbox. A process is its
-- mailbox, so it may be named as the process that it communicates with.
--
-- >>> :m + Control.Monad NQE UnliftIO
-- >>> registry <- newTQueueIO :: IO (TQueue String)
-- >>> let run = receive registry >>= putStrLn . ("Registered: " ++)
-- >>> withAsync run $ \a -> "Bruce Wayne" `send` registry >> wait a
-- Registered: Bruce Wayne
class Eq (mbox msg) => Mailbox mbox msg where
    -- | STM action that responds true if the mailbox is empty. Useful to avoid
    -- blocking on an empty mailbox.
    mailboxEmptySTM :: mbox msg -> STM Bool
    -- | STM action that responds true if the mailbox is full and would block if
    -- a new message is received.
    mailboxFullSTM :: mbox msg -> STM Bool
    -- | STM action to send a message to a mailbox. This is usually called from
    -- a process that wishes to communicate with actor that owns the mailbox.
    sendSTM :: msg -> mbox msg -> STM ()
    -- | STM action to receive a message from a mailbox. This should be called
    -- from the process that owns the mailbox.
    receiveSTM :: mbox msg -> STM msg
    -- | Put a message back in the mailbox so that it is the next one to be
    -- received. Used for pattern matching.
    requeueSTM :: msg -> mbox msg -> STM ()

instance Mailbox TQueue msg where
    mailboxEmptySTM = isEmptyTQueue
    mailboxFullSTM = const $ return False
    sendSTM msg = (`writeTQueue` msg)
    receiveSTM = readTQueue
    requeueSTM msg = (`unGetTQueue` msg)

instance Mailbox TBQueue msg where
    mailboxEmptySTM = isEmptyTBQueue
    mailboxFullSTM = isFullTBQueue
    sendSTM msg = (`writeTBQueue` msg)
    receiveSTM = readTBQueue
    requeueSTM msg = (`unGetTBQueue` msg)

-- | Wrapped 'Mailbox' hiding its implementation.
data Inbox msg = forall mbox. (Mailbox mbox msg) => Inbox !(mbox msg) !Unique

-- | Create a new 'Inbox' with a 'Unique' identifier inside. If you run this
-- function more than once with the same 'Mailbox', its results will be
-- different from the 'Eq' or 'Hashable' point of view.
newInbox :: (MonadIO m, Mailbox mbox msg) => mbox msg -> m (Inbox msg)
newInbox mbox = Inbox mbox <$> liftIO newUnique

instance Eq (Inbox msg) where
    Inbox _ u1 == Inbox _ u2 = u1 == u2

instance Hashable (Inbox msg) where
    hashWithSalt i (Inbox _ u) = hashWithSalt i u
    hash (Inbox _ u) = hash u

instance Mailbox Inbox msg where
    mailboxEmptySTM (Inbox mbox _) = mailboxEmptySTM mbox
    mailboxFullSTM (Inbox mbox _) = mailboxFullSTM mbox
    sendSTM msg (Inbox mbox _) = msg `sendSTM` mbox
    receiveSTM (Inbox mbox _) = receiveSTM mbox
    requeueSTM msg (Inbox mbox _) = msg `requeueSTM` mbox

-- | Send a message to a mailbox.
send :: (MonadIO m, Mailbox mbox msg) => msg -> mbox msg -> m ()
send msg = atomically . sendSTM msg

-- | Receive a message from the mailbox. This function should be called only by
-- the process that owns the malibox.
receive ::
       (MonadIO m, Mailbox mbox msg)
    => mbox msg
    -> m msg
receive mbox = receiveMatch mbox Just

-- | Use a partially-applied message type that takes a `Reply a` as its last
-- argument. This function will create the STM action for the response, send the
-- message to a process and await for the STM action to be fulfilled before
-- responding response. It implements synchronous communication with a process.
--
-- Example:
--
-- >>> :m + NQE UnliftIO
-- >>> data Message = Square Integer (Reply Integer)
-- >>> doubler <- newTQueueIO :: IO (TQueue Message)
-- >>> let proc = receive doubler >>= \(Square i r) -> atomically $ r (i * i)
-- >>> withAsync proc $ \_ -> Square 2 `query` doubler
-- 4
--
-- In this example the @Square@ constructor takes a 'Reply' action as its last
-- argument. It is passed partially-applied to @query@, which adds a new 'Reply'
-- action before sending it to the @doubler@ and then waiting for it. The
-- doubler process will run the @Reply@ action n STM with the reply as its
-- argument. In this case @i * i@.
query ::
       (MonadIO m, Mailbox mbox msg)
    => (Reply a -> msg)
    -> mbox msg
    -> m a
query f mbox = do
    box <- atomically newEmptyTMVar
    f (putTMVar box) `send` mbox
    atomically (takeTMVar box)

-- | Test all the messages in a mailbox against the supplied function and return
-- the output of the function only when it is 'Just'. Will block until a message
-- matches. All messages that did not match are left in the mailbox. Only call
-- from process that owns mailbox.
receiveMatch ::
       (MonadIO m, Mailbox mbox msg) => mbox msg -> (msg -> Maybe a) -> m a
receiveMatch mbox = atomically . receiveMatchSTM mbox

-- | Match a message in the mailbox as an atomic STM action.
receiveMatchSTM ::
       (Mailbox mbox msg)
    => mbox msg
    -> (msg -> Maybe a)
    -> STM a
receiveMatchSTM mbox f = go []
  where
    go acc =
        receiveSTM mbox >>= \msg ->
            case f msg of
                Just x -> do
                    requeueListSTM acc mbox
                    return x
                Nothing -> go (msg : acc)

-- | Check if the mailbox is empty.
mailboxEmpty :: (MonadIO m, Mailbox mbox msg) => mbox msg -> m Bool
mailboxEmpty = atomically . mailboxEmptySTM

-- | Put a message at the start of a mailbox, so that it is the next one read.
requeueListSTM :: (Mailbox mbox msg) => [msg] -> mbox msg -> STM ()
requeueListSTM xs mbox = mapM_ (`requeueSTM` mbox) xs