{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE RankNTypes #-}
module Control.Concurrent.NQE.Process where
import Control.Concurrent.Unique
import Data.Function
import Data.Hashable
import Numeric.Natural
import UnliftIO
type Listen a = a -> STM ()
data Mailbox msg =
forall mbox. (OutChan mbox) =>
Mailbox !(mbox msg)
!Unique
data Inbox msg =
forall mbox. (OutChan mbox, InChan mbox) =>
Inbox !(mbox msg)
!Unique
instance Eq (Mailbox msg) where
(==) = (==) `on` f
where
f (Mailbox _ u) = u
instance Eq (Inbox msg) where
(==) = (==) `on` f
where
f (Inbox _ u) = u
data Process msg = Process
{ getProcessAsync :: Async ()
, getProcessMailbox :: Mailbox msg
} deriving Eq
class InChan mbox where
mailboxEmptySTM :: mbox msg -> STM Bool
receiveSTM :: mbox msg -> STM msg
requeueSTM :: msg -> mbox msg -> STM ()
class OutChan mbox where
mailboxFullSTM :: mbox msg -> STM Bool
sendSTM :: msg -> mbox msg -> STM ()
instance InChan TQueue where
mailboxEmptySTM = isEmptyTQueue
receiveSTM = readTQueue
requeueSTM msg = (`unGetTQueue` msg)
instance OutChan TQueue where
mailboxFullSTM _ = return False
sendSTM msg = (`writeTQueue` msg)
instance InChan TBQueue where
mailboxEmptySTM = isEmptyTBQueue
receiveSTM = readTBQueue
requeueSTM msg = (`unGetTBQueue` msg)
instance OutChan TBQueue where
mailboxFullSTM = isFullTBQueue
sendSTM msg = (`writeTBQueue` msg)
instance OutChan Mailbox where
mailboxFullSTM (Mailbox mbox _) = mailboxFullSTM mbox
sendSTM msg (Mailbox mbox _) = msg `sendSTM` mbox
instance InChan Inbox where
mailboxEmptySTM (Inbox mbox _) = mailboxEmptySTM mbox
receiveSTM (Inbox mbox _) = receiveSTM mbox
requeueSTM msg (Inbox mbox _) = msg `requeueSTM` mbox
instance OutChan Inbox where
mailboxFullSTM (Inbox mbox _) = mailboxFullSTM mbox
sendSTM msg (Inbox mbox _) = msg `sendSTM` mbox
instance OutChan Process where
mailboxFullSTM (Process _ mbox) = mailboxFullSTM mbox
sendSTM msg (Process _ mbox) = msg `sendSTM` mbox
instance Hashable (Process msg) where
hashWithSalt i (Process _ m) = hashWithSalt i m
hash (Process _ m) = hash m
instance Hashable (Mailbox msg) where
hashWithSalt i (Mailbox _ u) = hashWithSalt i u
hash (Mailbox _ u) = hash u
inboxToMailbox :: Inbox msg -> Mailbox msg
inboxToMailbox (Inbox m u) = Mailbox m u
wrapChannel ::
(MonadIO m, InChan mbox, OutChan mbox) => mbox msg -> m (Inbox msg)
wrapChannel mbox = Inbox mbox <$> liftIO newUnique
newInbox :: MonadIO m => m (Inbox msg)
newInbox = newTQueueIO >>= \c -> wrapChannel c
newBoundedInbox :: MonadIO m => Natural -> m (Inbox msg)
newBoundedInbox i = newTBQueueIO (fromIntegral i) >>= \c -> wrapChannel c
send :: (MonadIO m, OutChan mbox) => msg -> mbox msg -> m ()
send msg = atomically . sendSTM msg
receive :: (InChan mbox, MonadIO m) => mbox msg -> m msg
receive mbox = receiveMatch mbox Just
query ::
(MonadIO m, OutChan mbox)
=> (Listen response -> request)
-> mbox request
-> m response
query f m = do
r <- newEmptyTMVarIO
f (putTMVar r) `send` m
atomically $ takeTMVar r
queryU ::
(MonadUnliftIO m, OutChan mbox)
=> Int
-> (Listen response -> request)
-> mbox request
-> m (Maybe response)
queryU u f m = timeout u (query f m)
queryS ::
(MonadUnliftIO m, OutChan mbox)
=> Int
-> (Listen response -> request)
-> mbox request
-> m (Maybe response)
queryS s f m = timeout (s * 1000 * 1000) (query f m)
receiveMatch :: (MonadIO m, InChan mbox) => mbox msg -> (msg -> Maybe a) -> m a
receiveMatch mbox = atomically . receiveMatchSTM mbox
receiveMatchU ::
(MonadUnliftIO m, InChan mbox)
=> Int
-> mbox msg
-> (msg -> Maybe a)
-> m (Maybe a)
receiveMatchU u mbox f = timeout u $ receiveMatch mbox f
receiveMatchS ::
(MonadUnliftIO m, InChan mbox)
=> Int
-> mbox msg
-> (msg -> Maybe a)
-> m (Maybe a)
receiveMatchS s mbox f = timeout (s * 1000 * 1000) $ receiveMatch mbox f
receiveMatchSTM :: InChan mbox => mbox msg -> (msg -> Maybe a) -> STM a
receiveMatchSTM mbox f = go []
where
go acc =
receiveSTM mbox >>= \msg ->
case f msg of
Just x -> do
requeueListSTM acc mbox
return x
Nothing -> go (msg : acc)
mailboxEmpty :: (MonadIO m, InChan mbox) => mbox msg -> m Bool
mailboxEmpty = atomically . mailboxEmptySTM
requeueListSTM :: InChan mbox => [msg] -> mbox msg -> STM ()
requeueListSTM xs mbox = mapM_ (`requeueSTM` mbox) xs
withProcess ::
MonadUnliftIO m => (Inbox msg -> m ()) -> (Process msg -> m a) -> m a
withProcess p f = do
(i, m) <- newMailbox
withAsync (p i) (\a -> link a >> f (Process a m))
process :: MonadUnliftIO m => (Inbox msg -> m ()) -> m (Process msg)
process p = do
(i, m) <- newMailbox
a <- async $ p i
link a
return (Process a m)
newMailbox :: MonadUnliftIO m => m (Inbox msg, Mailbox msg)
newMailbox = do
i <- newInbox
let m = inboxToMailbox i
return (i, m)