{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE RankNTypes #-}
module Control.Concurrent.NQE.Process where
import Control.Monad
import UnliftIO
type Reply a = a -> STM ()
type Listen a = a -> STM ()
class Mailbox mbox where
mailboxEmptySTM :: mbox msg -> STM Bool
sendSTM :: msg -> mbox msg -> STM ()
receiveSTM :: mbox msg -> STM msg
requeueMsg :: msg -> mbox msg -> STM ()
instance Mailbox TQueue where
mailboxEmptySTM = isEmptyTQueue
sendSTM msg = (`writeTQueue` msg)
receiveSTM = readTQueue
requeueMsg msg = (`unGetTQueue` msg)
instance Mailbox TBQueue where
mailboxEmptySTM = isEmptyTBQueue
sendSTM msg = (`writeTBQueue` msg)
receiveSTM = readTBQueue
requeueMsg msg = (`unGetTBQueue` msg)
data Inbox msg =
forall mbox. (Mailbox mbox) =>
Inbox (mbox msg)
instance Mailbox Inbox where
mailboxEmptySTM (Inbox mbox) = mailboxEmptySTM mbox
sendSTM msg (Inbox mbox) = msg `sendSTM` mbox
receiveSTM (Inbox mbox) = receiveSTM mbox
requeueMsg msg (Inbox mbox) = msg `requeueMsg` mbox
mailboxEmpty :: (MonadIO m, Mailbox mbox) => mbox msg -> m Bool
mailboxEmpty = atomically . mailboxEmptySTM
send :: (MonadIO m, Mailbox mbox) => msg -> mbox msg -> m ()
send msg = atomically . sendSTM msg
requeue :: (Mailbox mbox) => [msg] -> mbox msg -> STM ()
requeue xs mbox = mapM_ (`requeueMsg` mbox) xs
extractMsg ::
(Mailbox mbox)
=> [(msg -> Maybe a, a -> b)]
-> mbox msg
-> STM b
extractMsg hs mbox = do
msg <- receiveSTM mbox
go [] msg hs
where
go acc msg [] = do
msg' <- receiveSTM mbox
go (msg : acc) msg' hs
go acc msg ((f, action):fs) =
case f msg of
Just x -> do
requeue acc mbox
return $ action x
Nothing -> go acc msg fs
query ::
(MonadIO m, Mailbox mbox)
=> (Reply b -> msg)
-> mbox msg
-> m b
query f mbox = do
box <- atomically newEmptyTMVar
f (putTMVar box) `send` mbox
atomically (takeTMVar box)
dispatch ::
(MonadIO m, Mailbox mbox)
=> [(msg -> Maybe a, a -> m b)]
-> mbox msg
-> m b
dispatch hs = join . atomically . extractMsg hs
dispatchSTM :: (Mailbox mbox) => [msg -> Maybe a] -> mbox msg -> STM a
dispatchSTM = extractMsg . map (\x -> (x, id))
receive ::
(MonadIO m, Mailbox mbox)
=> mbox msg
-> m msg
receive = dispatch [(Just, return)]
receiveMatch :: (MonadIO m, Mailbox mbox) => mbox msg -> (msg -> Maybe a) -> m a
receiveMatch mbox f = dispatch [(f, return)] mbox
receiveMatchSTM :: (Mailbox mbox) => mbox msg -> (msg -> Maybe a) -> STM a
receiveMatchSTM mbox f = dispatchSTM [f] mbox