{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE FlexibleContexts          #-}
{-# LANGUAGE FlexibleInstances         #-}
{-# LANGUAGE MultiParamTypeClasses     #-}
{-# LANGUAGE RankNTypes                #-}
{-|
Module      : Control.Concurrent.NQE.Process
Copyright   : No rights reserved
License     : UNLICENSE
Maintainer  : xenog@protonmail.com
Stability   : experimental
Portability : POSIX

This is the core of the NQE library. It is composed of code to deal with
processes and mailboxes. Processes represent concurrent threads that receive
messages via a mailbox, also referred to as a channel. NQE is inspired by
Erlang/OTP and it stands for “Not Quite Erlang”. A process is analogous to an
actor in Scala, or an object in the original (Alan Kay) sense of the word. To
implement synchronous communication NQE makes use of 'STM' actions embedded in
asynchronous messages.
-}
module Control.Concurrent.NQE.Process where

import           Control.Concurrent.Unique
import           Data.Function
import           Data.Hashable
import           Numeric.Natural
import           UnliftIO

-- | 'STM' function that receives an event and does something with it.
type Listen a = a -> STM ()

-- | Channel that only allows messages to be sent to it.
data Mailbox msg =
    forall mbox. (OutChan mbox) =>
                 Mailbox !(mbox msg)
                         !Unique

-- | Channel that allows to send or receive messages.
data Inbox msg =
    forall mbox. (OutChan mbox, InChan mbox) =>
                 Inbox !(mbox msg)
                       !Unique

instance Eq (Mailbox msg) where
    (==) = (==) `on` f
      where
        f (Mailbox _ u) = u

instance Eq (Inbox msg) where
    (==) = (==) `on` f
      where
        f (Inbox _ u) = u

-- | 'Async' handle and 'Mailbox' for a process.
data Process msg = Process
    { getProcessAsync   :: Async ()
    , getProcessMailbox :: Mailbox msg
    } deriving Eq

-- | Class for implementation of an 'Inbox'.
class InChan mbox where
    -- | Are there messages queued?
    mailboxEmptySTM :: mbox msg -> STM Bool
    -- | Receive a message.
    receiveSTM :: mbox msg -> STM msg
    -- | Put a message in the mailbox such that it is received next.
    requeueSTM :: msg -> mbox msg -> STM ()

-- | Class for implementation of a 'Mailbox'.
class OutChan mbox where
    -- | Is this bounded channel full? Always 'False' for unbounded channels.
    mailboxFullSTM :: mbox msg -> STM Bool
    -- | Send a message to this channel.
    sendSTM :: msg -> mbox msg -> STM ()

instance InChan TQueue where
    mailboxEmptySTM = isEmptyTQueue
    receiveSTM = readTQueue
    requeueSTM msg = (`unGetTQueue` msg)

instance OutChan TQueue where
    mailboxFullSTM _ = return False
    sendSTM msg = (`writeTQueue` msg)

instance InChan TBQueue where
    mailboxEmptySTM = isEmptyTBQueue
    receiveSTM = readTBQueue
    requeueSTM msg = (`unGetTBQueue` msg)

instance OutChan TBQueue where
    mailboxFullSTM = isFullTBQueue
    sendSTM msg = (`writeTBQueue` msg)

instance OutChan Mailbox where
    mailboxFullSTM (Mailbox mbox _) = mailboxFullSTM mbox
    sendSTM msg (Mailbox mbox _) = msg `sendSTM` mbox

instance InChan Inbox where
    mailboxEmptySTM (Inbox mbox _) = mailboxEmptySTM mbox
    receiveSTM (Inbox mbox _) = receiveSTM mbox
    requeueSTM msg (Inbox mbox _) = msg `requeueSTM` mbox

instance OutChan Inbox where
    mailboxFullSTM (Inbox mbox _) = mailboxFullSTM mbox
    sendSTM msg (Inbox mbox _) = msg `sendSTM` mbox

instance OutChan Process where
    mailboxFullSTM (Process _ mbox) = mailboxFullSTM mbox
    sendSTM msg (Process _ mbox) = msg `sendSTM` mbox

instance Hashable (Process msg) where
    hashWithSalt i (Process _ m) = hashWithSalt i m
    hash (Process _ m) = hash m

instance Hashable (Mailbox msg) where
    hashWithSalt i (Mailbox _ u) = hashWithSalt i u
    hash (Mailbox _ u) = hash u

-- | Get a send-only 'Mailbox' for an 'Inbox'.
inboxToMailbox :: Inbox msg -> Mailbox msg
inboxToMailbox (Inbox m u) = Mailbox m u

-- | Wrap a channel in an 'Inbox'
wrapChannel ::
       (MonadIO m, InChan mbox, OutChan mbox) => mbox msg -> m (Inbox msg)
wrapChannel mbox = Inbox mbox <$> liftIO newUnique

-- | Create an unbounded 'Inbox'.
newInbox :: MonadIO m => m (Inbox msg)
newInbox = newTQueueIO >>= \c -> wrapChannel c

-- | 'Inbox' with upper bound on number of allowed queued messages.
newBoundedInbox :: MonadIO m => Natural -> m (Inbox msg)
newBoundedInbox i = newTBQueueIO (fromIntegral i) >>= \c -> wrapChannel c

-- | Send a message to a channel.
send :: (MonadIO m, OutChan mbox) => msg -> mbox msg -> m ()
send msg = atomically . sendSTM msg

-- | Receive a message from a channel.
receive :: (InChan mbox, MonadIO m) => mbox msg -> m msg
receive mbox = receiveMatch mbox Just

-- | Send request to channel and wait for a response. The @request@ 'STM' action
-- will be created by this function.
query ::
       (MonadIO m, OutChan mbox)
    => (Listen response -> request)
    -> mbox request
    -> m response
query f m = do
    r <- newEmptyTMVarIO
    f (putTMVar r) `send` m
    atomically $ takeTMVar r

-- | Do a 'query' but timeout after @u@ microseconds. Return 'Nothing' if
-- timeout reached.
queryU ::
       (MonadUnliftIO m, OutChan mbox)
    => Int
    -> (Listen response -> request)
    -> mbox request
    -> m (Maybe response)
queryU u f m = timeout u (query f m)

-- | Do a 'query' but timeout after @s@ seconds. Return 'Nothing' if
-- timeout reached.
queryS ::
       (MonadUnliftIO m, OutChan mbox)
    => Int
    -> (Listen response -> request)
    -> mbox request
    -> m (Maybe response)
queryS s f m = timeout (s * 1000 * 1000) (query f m)

-- | Test all messages in a channel against the supplied function and return the
-- first matching message. Will block until a match is found. Messages that do
-- not match remain in the channel.
receiveMatch :: (MonadIO m, InChan mbox) => mbox msg -> (msg -> Maybe a) -> m a
receiveMatch mbox = atomically . receiveMatchSTM mbox

-- | Like 'receiveMatch' but with a timeout set at @u@ microseconds. Returns
-- 'Nothing' if timeout is reached.
receiveMatchU ::
       (MonadUnliftIO m, InChan mbox)
    => Int
    -> mbox msg
    -> (msg -> Maybe a)
    -> m (Maybe a)
receiveMatchU u mbox f = timeout u $ receiveMatch mbox f

-- | Like 'receiveMatch' but with a timeout set at @s@ seconds. Returns
-- 'Nothing' if timeout is reached.
receiveMatchS ::
       (MonadUnliftIO m, InChan mbox)
    => Int
    -> mbox msg
    -> (msg -> Maybe a)
    -> m (Maybe a)
receiveMatchS s mbox f = timeout (s * 1000 * 1000) $ receiveMatch mbox f

-- | Match a message in the channel as an atomic 'STM' action.
receiveMatchSTM :: InChan mbox => mbox msg -> (msg -> Maybe a) -> STM a
receiveMatchSTM mbox f = go []
  where
    go acc =
        receiveSTM mbox >>= \msg ->
            case f msg of
                Just x -> do
                    requeueListSTM acc mbox
                    return x
                Nothing -> go (msg : acc)

-- | Check if the channel is empty.
mailboxEmpty :: (MonadIO m, InChan mbox) => mbox msg -> m Bool
mailboxEmpty = atomically . mailboxEmptySTM

-- | Put a list of messages at the start of a channel, so that the last element
-- of the list is the next message to be received.
requeueListSTM :: InChan mbox => [msg] -> mbox msg -> STM ()
requeueListSTM xs mbox = mapM_ (`requeueSTM` mbox) xs

-- | Run a process in the background and pass it to a function. Stop the
-- background process once the function returns. Background process exceptions
-- are re-thrown in the current thread.
withProcess ::
       MonadUnliftIO m => (Inbox msg -> m ()) -> (Process msg -> m a) -> m a
withProcess p f = do
    (i, m) <- newMailbox
    withAsync (p i) (\a -> link a >> f (Process a m))

-- | Run a process in the background and return the 'Process' handle. Background
-- process exceptions are re-thrown in the current thread.
process :: MonadUnliftIO m => (Inbox msg -> m ()) -> m (Process msg)
process p = do
    (i, m) <- newMailbox
    a <- async $ p i
    link a
    return (Process a m)

-- | Create an unbounded inbox and corresponding mailbox.
newMailbox :: MonadUnliftIO m => m (Inbox msg, Mailbox msg)
newMailbox = do
    i <- newInbox
    let m = inboxToMailbox i
    return (i, m)