{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE FlexibleContexts          #-}
{-# LANGUAGE FlexibleInstances         #-}
{-# LANGUAGE MultiParamTypeClasses     #-}
{-# LANGUAGE RankNTypes                #-}
{-|
Module      : Control.Concurrent.NQE.Process
Copyright   : No rights reserved
License     : UNLICENSE
Maintainer  : xenog@protonmail.com
Stability   : experimental
Portability : POSIX

This is the core of the NQE library. It is composed of code to deal with
processes and mailboxes. Processes represent concurrent threads that receive
messages via a mailbox, also referred to as a channel. NQE is inspired by
Erlang/OTP and it stands for “Not Quite Erlang”. A process is analogous to an
actor in Scala, or an object in the original (Alan Kay) sense of the word. To
implement synchronous communication NQE makes use of 'STM' actions embedded in
asynchronous messages.
-}
module Control.Concurrent.NQE.Process where

import           Control.Concurrent.Unique
import           Data.Function
import           Data.Hashable
import           Numeric.Natural
import           UnliftIO

-- | 'STM' function that receives an event and does something with it.
type Listen a = a -> STM ()

-- | Channel that only allows messages to be sent to it.
data Mailbox msg =
    forall mbox. (OutChan mbox) =>
                 Mailbox !(mbox msg)
                         !Unique

-- | Channel that allows to send or receive messages.
data Inbox msg =
    forall mbox. (OutChan mbox, InChan mbox) =>
                 Inbox !(mbox msg)
                       !Unique

instance Eq (Mailbox msg) where
    == :: Mailbox msg -> Mailbox msg -> Bool
(==) = Unique -> Unique -> Bool
forall a. Eq a => a -> a -> Bool
(==) (Unique -> Unique -> Bool)
-> (Mailbox msg -> Unique) -> Mailbox msg -> Mailbox msg -> Bool
forall b c a. (b -> b -> c) -> (a -> b) -> a -> a -> c
`on` Mailbox msg -> Unique
forall msg. Mailbox msg -> Unique
f
      where
        f :: Mailbox msg -> Unique
f (Mailbox mbox msg
_ Unique
u) = Unique
u

instance Eq (Inbox msg) where
    == :: Inbox msg -> Inbox msg -> Bool
(==) = Unique -> Unique -> Bool
forall a. Eq a => a -> a -> Bool
(==) (Unique -> Unique -> Bool)
-> (Inbox msg -> Unique) -> Inbox msg -> Inbox msg -> Bool
forall b c a. (b -> b -> c) -> (a -> b) -> a -> a -> c
`on` Inbox msg -> Unique
forall msg. Inbox msg -> Unique
f
      where
        f :: Inbox msg -> Unique
f (Inbox mbox msg
_ Unique
u) = Unique
u

-- | 'Async' handle and 'Mailbox' for a process.
data Process msg = Process
    { Process msg -> Async ()
getProcessAsync   :: Async ()
    , Process msg -> Mailbox msg
getProcessMailbox :: Mailbox msg
    } deriving Process msg -> Process msg -> Bool
(Process msg -> Process msg -> Bool)
-> (Process msg -> Process msg -> Bool) -> Eq (Process msg)
forall msg. Process msg -> Process msg -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Process msg -> Process msg -> Bool
$c/= :: forall msg. Process msg -> Process msg -> Bool
== :: Process msg -> Process msg -> Bool
$c== :: forall msg. Process msg -> Process msg -> Bool
Eq

-- | Class for implementation of an 'Inbox'.
class InChan mbox where
    -- | Are there messages queued?
    mailboxEmptySTM :: mbox msg -> STM Bool
    -- | Receive a message.
    receiveSTM :: mbox msg -> STM msg
    -- | Put a message in the mailbox such that it is received next.
    requeueSTM :: msg -> mbox msg -> STM ()

-- | Class for implementation of a 'Mailbox'.
class OutChan mbox where
    -- | Is this bounded channel full? Always 'False' for unbounded channels.
    mailboxFullSTM :: mbox msg -> STM Bool
    -- | Send a message to this channel.
    sendSTM :: msg -> mbox msg -> STM ()

instance InChan TQueue where
    mailboxEmptySTM :: TQueue msg -> STM Bool
mailboxEmptySTM = TQueue msg -> STM Bool
forall msg. TQueue msg -> STM Bool
isEmptyTQueue
    receiveSTM :: TQueue msg -> STM msg
receiveSTM = TQueue msg -> STM msg
forall msg. TQueue msg -> STM msg
readTQueue
    requeueSTM :: msg -> TQueue msg -> STM ()
requeueSTM msg
msg = (TQueue msg -> msg -> STM ()
forall a. TQueue a -> a -> STM ()
`unGetTQueue` msg
msg)

instance OutChan TQueue where
    mailboxFullSTM :: TQueue msg -> STM Bool
mailboxFullSTM TQueue msg
_ = Bool -> STM Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
    sendSTM :: msg -> TQueue msg -> STM ()
sendSTM msg
msg = (TQueue msg -> msg -> STM ()
forall a. TQueue a -> a -> STM ()
`writeTQueue` msg
msg)

instance InChan TBQueue where
    mailboxEmptySTM :: TBQueue msg -> STM Bool
mailboxEmptySTM = TBQueue msg -> STM Bool
forall msg. TBQueue msg -> STM Bool
isEmptyTBQueue
    receiveSTM :: TBQueue msg -> STM msg
receiveSTM = TBQueue msg -> STM msg
forall msg. TBQueue msg -> STM msg
readTBQueue
    requeueSTM :: msg -> TBQueue msg -> STM ()
requeueSTM msg
msg = (TBQueue msg -> msg -> STM ()
forall a. TBQueue a -> a -> STM ()
`unGetTBQueue` msg
msg)

instance OutChan TBQueue where
    mailboxFullSTM :: TBQueue msg -> STM Bool
mailboxFullSTM = TBQueue msg -> STM Bool
forall msg. TBQueue msg -> STM Bool
isFullTBQueue
    sendSTM :: msg -> TBQueue msg -> STM ()
sendSTM msg
msg = (TBQueue msg -> msg -> STM ()
forall a. TBQueue a -> a -> STM ()
`writeTBQueue` msg
msg)

instance OutChan Mailbox where
    mailboxFullSTM :: Mailbox msg -> STM Bool
mailboxFullSTM (Mailbox mbox msg
mbox Unique
_) = mbox msg -> STM Bool
forall (mbox :: * -> *) msg. OutChan mbox => mbox msg -> STM Bool
mailboxFullSTM mbox msg
mbox
    sendSTM :: msg -> Mailbox msg -> STM ()
sendSTM msg
msg (Mailbox mbox msg
mbox Unique
_) = msg
msg msg -> mbox msg -> STM ()
forall (mbox :: * -> *) msg.
OutChan mbox =>
msg -> mbox msg -> STM ()
`sendSTM` mbox msg
mbox

instance InChan Inbox where
    mailboxEmptySTM :: Inbox msg -> STM Bool
mailboxEmptySTM (Inbox mbox msg
mbox Unique
_) = mbox msg -> STM Bool
forall (mbox :: * -> *) msg. InChan mbox => mbox msg -> STM Bool
mailboxEmptySTM mbox msg
mbox
    receiveSTM :: Inbox msg -> STM msg
receiveSTM (Inbox mbox msg
mbox Unique
_) = mbox msg -> STM msg
forall (mbox :: * -> *) msg. InChan mbox => mbox msg -> STM msg
receiveSTM mbox msg
mbox
    requeueSTM :: msg -> Inbox msg -> STM ()
requeueSTM msg
msg (Inbox mbox msg
mbox Unique
_) = msg
msg msg -> mbox msg -> STM ()
forall (mbox :: * -> *) msg.
InChan mbox =>
msg -> mbox msg -> STM ()
`requeueSTM` mbox msg
mbox

instance OutChan Inbox where
    mailboxFullSTM :: Inbox msg -> STM Bool
mailboxFullSTM (Inbox mbox msg
mbox Unique
_) = mbox msg -> STM Bool
forall (mbox :: * -> *) msg. OutChan mbox => mbox msg -> STM Bool
mailboxFullSTM mbox msg
mbox
    sendSTM :: msg -> Inbox msg -> STM ()
sendSTM msg
msg (Inbox mbox msg
mbox Unique
_) = msg
msg msg -> mbox msg -> STM ()
forall (mbox :: * -> *) msg.
OutChan mbox =>
msg -> mbox msg -> STM ()
`sendSTM` mbox msg
mbox

instance OutChan Process where
    mailboxFullSTM :: Process msg -> STM Bool
mailboxFullSTM (Process Async ()
_ Mailbox msg
mbox) = Mailbox msg -> STM Bool
forall (mbox :: * -> *) msg. OutChan mbox => mbox msg -> STM Bool
mailboxFullSTM Mailbox msg
mbox
    sendSTM :: msg -> Process msg -> STM ()
sendSTM msg
msg (Process Async ()
_ Mailbox msg
mbox) = msg
msg msg -> Mailbox msg -> STM ()
forall (mbox :: * -> *) msg.
OutChan mbox =>
msg -> mbox msg -> STM ()
`sendSTM` Mailbox msg
mbox

instance Hashable (Process msg) where
    hashWithSalt :: Int -> Process msg -> Int
hashWithSalt Int
i (Process Async ()
_ Mailbox msg
m) = Int -> Mailbox msg -> Int
forall a. Hashable a => Int -> a -> Int
hashWithSalt Int
i Mailbox msg
m
    hash :: Process msg -> Int
hash (Process Async ()
_ Mailbox msg
m) = Mailbox msg -> Int
forall a. Hashable a => a -> Int
hash Mailbox msg
m

instance Hashable (Mailbox msg) where
    hashWithSalt :: Int -> Mailbox msg -> Int
hashWithSalt Int
i (Mailbox mbox msg
_ Unique
u) = Int -> Unique -> Int
forall a. Hashable a => Int -> a -> Int
hashWithSalt Int
i Unique
u
    hash :: Mailbox msg -> Int
hash (Mailbox mbox msg
_ Unique
u) = Unique -> Int
forall a. Hashable a => a -> Int
hash Unique
u

-- | Get a send-only 'Mailbox' for an 'Inbox'.
inboxToMailbox :: Inbox msg -> Mailbox msg
inboxToMailbox :: Inbox msg -> Mailbox msg
inboxToMailbox (Inbox mbox msg
m Unique
u) = mbox msg -> Unique -> Mailbox msg
forall msg (mbox :: * -> *).
OutChan mbox =>
mbox msg -> Unique -> Mailbox msg
Mailbox mbox msg
m Unique
u

-- | Wrap a channel in an 'Inbox'
wrapChannel ::
       (MonadIO m, InChan mbox, OutChan mbox) => mbox msg -> m (Inbox msg)
wrapChannel :: mbox msg -> m (Inbox msg)
wrapChannel mbox msg
mbox = mbox msg -> Unique -> Inbox msg
forall msg (mbox :: * -> *).
(OutChan mbox, InChan mbox) =>
mbox msg -> Unique -> Inbox msg
Inbox mbox msg
mbox (Unique -> Inbox msg) -> m Unique -> m (Inbox msg)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO Unique -> m Unique
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO Unique
newUnique

-- | Create an unbounded 'Inbox'.
newInbox :: MonadIO m => m (Inbox msg)
newInbox :: m (Inbox msg)
newInbox = m (TQueue msg)
forall (m :: * -> *) a. MonadIO m => m (TQueue a)
newTQueueIO m (TQueue msg) -> (TQueue msg -> m (Inbox msg)) -> m (Inbox msg)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \TQueue msg
c -> TQueue msg -> m (Inbox msg)
forall (m :: * -> *) (mbox :: * -> *) msg.
(MonadIO m, InChan mbox, OutChan mbox) =>
mbox msg -> m (Inbox msg)
wrapChannel TQueue msg
c

-- | 'Inbox' with upper bound on number of allowed queued messages.
newBoundedInbox :: MonadIO m => Natural -> m (Inbox msg)
newBoundedInbox :: Natural -> m (Inbox msg)
newBoundedInbox Natural
i = Natural -> m (TBQueue msg)
forall (m :: * -> *) a. MonadIO m => Natural -> m (TBQueue a)
newTBQueueIO (Natural -> Natural
forall a b. (Integral a, Num b) => a -> b
fromIntegral Natural
i) m (TBQueue msg) -> (TBQueue msg -> m (Inbox msg)) -> m (Inbox msg)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \TBQueue msg
c -> TBQueue msg -> m (Inbox msg)
forall (m :: * -> *) (mbox :: * -> *) msg.
(MonadIO m, InChan mbox, OutChan mbox) =>
mbox msg -> m (Inbox msg)
wrapChannel TBQueue msg
c

-- | Send a message to a channel.
send :: (MonadIO m, OutChan mbox) => msg -> mbox msg -> m ()
send :: msg -> mbox msg -> m ()
send msg
msg = STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ()) -> (mbox msg -> STM ()) -> mbox msg -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. msg -> mbox msg -> STM ()
forall (mbox :: * -> *) msg.
OutChan mbox =>
msg -> mbox msg -> STM ()
sendSTM msg
msg

-- | Receive a message from a channel.
receive :: (InChan mbox, MonadIO m) => mbox msg -> m msg
receive :: mbox msg -> m msg
receive mbox msg
mbox = mbox msg -> (msg -> Maybe msg) -> m msg
forall (m :: * -> *) (mbox :: * -> *) msg a.
(MonadIO m, InChan mbox) =>
mbox msg -> (msg -> Maybe a) -> m a
receiveMatch mbox msg
mbox msg -> Maybe msg
forall a. a -> Maybe a
Just

-- | Send request to channel and wait for a response. The @request@ 'STM' action
-- will be created by this function.
query ::
       (MonadIO m, OutChan mbox)
    => (Listen response -> request)
    -> mbox request
    -> m response
query :: (Listen response -> request) -> mbox request -> m response
query Listen response -> request
f mbox request
m = do
    TMVar response
r <- m (TMVar response)
forall (m :: * -> *) a. MonadIO m => m (TMVar a)
newEmptyTMVarIO
    Listen response -> request
f (TMVar response -> Listen response
forall a. TMVar a -> a -> STM ()
putTMVar TMVar response
r) request -> mbox request -> m ()
forall (m :: * -> *) (mbox :: * -> *) msg.
(MonadIO m, OutChan mbox) =>
msg -> mbox msg -> m ()
`send` mbox request
m
    STM response -> m response
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM response -> m response) -> STM response -> m response
forall a b. (a -> b) -> a -> b
$ TMVar response -> STM response
forall a. TMVar a -> STM a
takeTMVar TMVar response
r

-- | Do a 'query' but timeout after @u@ microseconds. Return 'Nothing' if
-- timeout reached.
queryU ::
       (MonadUnliftIO m, OutChan mbox)
    => Int
    -> (Listen response -> request)
    -> mbox request
    -> m (Maybe response)
queryU :: Int
-> (Listen response -> request)
-> mbox request
-> m (Maybe response)
queryU Int
u Listen response -> request
f mbox request
m = Int -> m response -> m (Maybe response)
forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> m a -> m (Maybe a)
timeout Int
u ((Listen response -> request) -> mbox request -> m response
forall (m :: * -> *) (mbox :: * -> *) response request.
(MonadIO m, OutChan mbox) =>
(Listen response -> request) -> mbox request -> m response
query Listen response -> request
f mbox request
m)

-- | Do a 'query' but timeout after @s@ seconds. Return 'Nothing' if
-- timeout reached.
queryS ::
       (MonadUnliftIO m, OutChan mbox)
    => Int
    -> (Listen response -> request)
    -> mbox request
    -> m (Maybe response)
queryS :: Int
-> (Listen response -> request)
-> mbox request
-> m (Maybe response)
queryS Int
s Listen response -> request
f mbox request
m = Int -> m response -> m (Maybe response)
forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> m a -> m (Maybe a)
timeout (Int
s Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000) ((Listen response -> request) -> mbox request -> m response
forall (m :: * -> *) (mbox :: * -> *) response request.
(MonadIO m, OutChan mbox) =>
(Listen response -> request) -> mbox request -> m response
query Listen response -> request
f mbox request
m)

-- | Test all messages in a channel against the supplied function and return the
-- first matching message. Will block until a match is found. Messages that do
-- not match remain in the channel.
receiveMatch :: (MonadIO m, InChan mbox) => mbox msg -> (msg -> Maybe a) -> m a
receiveMatch :: mbox msg -> (msg -> Maybe a) -> m a
receiveMatch mbox msg
mbox = STM a -> m a
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM a -> m a)
-> ((msg -> Maybe a) -> STM a) -> (msg -> Maybe a) -> m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. mbox msg -> (msg -> Maybe a) -> STM a
forall (mbox :: * -> *) msg a.
InChan mbox =>
mbox msg -> (msg -> Maybe a) -> STM a
receiveMatchSTM mbox msg
mbox

-- | Like 'receiveMatch' but with a timeout set at @u@ microseconds. Returns
-- 'Nothing' if timeout is reached.
receiveMatchU ::
       (MonadUnliftIO m, InChan mbox)
    => Int
    -> mbox msg
    -> (msg -> Maybe a)
    -> m (Maybe a)
receiveMatchU :: Int -> mbox msg -> (msg -> Maybe a) -> m (Maybe a)
receiveMatchU Int
u mbox msg
mbox msg -> Maybe a
f = Int -> m a -> m (Maybe a)
forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> m a -> m (Maybe a)
timeout Int
u (m a -> m (Maybe a)) -> m a -> m (Maybe a)
forall a b. (a -> b) -> a -> b
$ mbox msg -> (msg -> Maybe a) -> m a
forall (m :: * -> *) (mbox :: * -> *) msg a.
(MonadIO m, InChan mbox) =>
mbox msg -> (msg -> Maybe a) -> m a
receiveMatch mbox msg
mbox msg -> Maybe a
f

-- | Like 'receiveMatch' but with a timeout set at @s@ seconds. Returns
-- 'Nothing' if timeout is reached.
receiveMatchS ::
       (MonadUnliftIO m, InChan mbox)
    => Int
    -> mbox msg
    -> (msg -> Maybe a)
    -> m (Maybe a)
receiveMatchS :: Int -> mbox msg -> (msg -> Maybe a) -> m (Maybe a)
receiveMatchS Int
s mbox msg
mbox msg -> Maybe a
f = Int -> m a -> m (Maybe a)
forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> m a -> m (Maybe a)
timeout (Int
s Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000) (m a -> m (Maybe a)) -> m a -> m (Maybe a)
forall a b. (a -> b) -> a -> b
$ mbox msg -> (msg -> Maybe a) -> m a
forall (m :: * -> *) (mbox :: * -> *) msg a.
(MonadIO m, InChan mbox) =>
mbox msg -> (msg -> Maybe a) -> m a
receiveMatch mbox msg
mbox msg -> Maybe a
f

-- | Match a message in the channel as an atomic 'STM' action.
receiveMatchSTM :: InChan mbox => mbox msg -> (msg -> Maybe a) -> STM a
receiveMatchSTM :: mbox msg -> (msg -> Maybe a) -> STM a
receiveMatchSTM mbox msg
mbox msg -> Maybe a
f = [msg] -> STM a
go []
  where
    go :: [msg] -> STM a
go [msg]
acc =
        mbox msg -> STM msg
forall (mbox :: * -> *) msg. InChan mbox => mbox msg -> STM msg
receiveSTM mbox msg
mbox STM msg -> (msg -> STM a) -> STM a
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \msg
msg ->
            case msg -> Maybe a
f msg
msg of
                Just a
x -> do
                    [msg] -> mbox msg -> STM ()
forall (mbox :: * -> *) msg.
InChan mbox =>
[msg] -> mbox msg -> STM ()
requeueListSTM [msg]
acc mbox msg
mbox
                    a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x
                Maybe a
Nothing -> [msg] -> STM a
go (msg
msg msg -> [msg] -> [msg]
forall a. a -> [a] -> [a]
: [msg]
acc)

-- | Check if the channel is empty.
mailboxEmpty :: (MonadIO m, InChan mbox) => mbox msg -> m Bool
mailboxEmpty :: mbox msg -> m Bool
mailboxEmpty = STM Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM Bool -> m Bool)
-> (mbox msg -> STM Bool) -> mbox msg -> m Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. mbox msg -> STM Bool
forall (mbox :: * -> *) msg. InChan mbox => mbox msg -> STM Bool
mailboxEmptySTM

-- | Put a list of messages at the start of a channel, so that the last element
-- of the list is the next message to be received.
requeueListSTM :: InChan mbox => [msg] -> mbox msg -> STM ()
requeueListSTM :: [msg] -> mbox msg -> STM ()
requeueListSTM [msg]
xs mbox msg
mbox = (msg -> STM ()) -> [msg] -> STM ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (msg -> mbox msg -> STM ()
forall (mbox :: * -> *) msg.
InChan mbox =>
msg -> mbox msg -> STM ()
`requeueSTM` mbox msg
mbox) [msg]
xs

-- | Run a process in the background and pass it to a function. Stop the
-- background process once the function returns. Background process exceptions
-- are re-thrown in the current thread.
withProcess ::
       MonadUnliftIO m => (Inbox msg -> m ()) -> (Process msg -> m a) -> m a
withProcess :: (Inbox msg -> m ()) -> (Process msg -> m a) -> m a
withProcess Inbox msg -> m ()
p Process msg -> m a
f = do
    (Inbox msg
i, Mailbox msg
m) <- m (Inbox msg, Mailbox msg)
forall (m :: * -> *) msg.
MonadUnliftIO m =>
m (Inbox msg, Mailbox msg)
newMailbox
    m () -> (Async () -> m a) -> m a
forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> (Async a -> m b) -> m b
withAsync (Inbox msg -> m ()
p Inbox msg
i) (\Async ()
a -> Async () -> m ()
forall (m :: * -> *) a. MonadIO m => Async a -> m ()
link Async ()
a m () -> m a -> m a
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Process msg -> m a
f (Async () -> Mailbox msg -> Process msg
forall msg. Async () -> Mailbox msg -> Process msg
Process Async ()
a Mailbox msg
m))

-- | Run a process in the background and return the 'Process' handle. Background
-- process exceptions are re-thrown in the current thread.
process :: MonadUnliftIO m => (Inbox msg -> m ()) -> m (Process msg)
process :: (Inbox msg -> m ()) -> m (Process msg)
process Inbox msg -> m ()
p = do
    (Inbox msg
i, Mailbox msg
m) <- m (Inbox msg, Mailbox msg)
forall (m :: * -> *) msg.
MonadUnliftIO m =>
m (Inbox msg, Mailbox msg)
newMailbox
    Async ()
a <- m () -> m (Async ())
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m (Async a)
async (m () -> m (Async ())) -> m () -> m (Async ())
forall a b. (a -> b) -> a -> b
$ Inbox msg -> m ()
p Inbox msg
i
    Async () -> m ()
forall (m :: * -> *) a. MonadIO m => Async a -> m ()
link Async ()
a
    Process msg -> m (Process msg)
forall (m :: * -> *) a. Monad m => a -> m a
return (Async () -> Mailbox msg -> Process msg
forall msg. Async () -> Mailbox msg -> Process msg
Process Async ()
a Mailbox msg
m)

-- | Create an unbounded inbox and corresponding mailbox.
newMailbox :: MonadUnliftIO m => m (Inbox msg, Mailbox msg)
newMailbox :: m (Inbox msg, Mailbox msg)
newMailbox = do
    Inbox msg
i <- m (Inbox msg)
forall (m :: * -> *) msg. MonadIO m => m (Inbox msg)
newInbox
    let m :: Mailbox msg
m = Inbox msg -> Mailbox msg
forall msg. Inbox msg -> Mailbox msg
inboxToMailbox Inbox msg
i
    (Inbox msg, Mailbox msg) -> m (Inbox msg, Mailbox msg)
forall (m :: * -> *) a. Monad m => a -> m a
return (Inbox msg
i, Mailbox msg
m)