module Control.Concurrent.NQE.Process where
import Control.Concurrent
import Control.Concurrent.Async.Lifted.Safe
import Control.Concurrent.STM
import Control.Monad
import Control.Monad.IO.Class
import Control.Monad.Trans.Control
type Reply a = a -> STM ()
type Listen a = a -> STM ()
class Mailbox mbox where
mailboxEmptySTM :: mbox msg -> STM Bool
sendSTM :: msg -> mbox msg -> STM ()
receiveSTM :: mbox msg -> STM msg
requeueMsg :: msg -> mbox msg -> STM ()
instance Mailbox TQueue where
mailboxEmptySTM = isEmptyTQueue
sendSTM msg = (`writeTQueue` msg)
receiveSTM = readTQueue
requeueMsg msg = (`unGetTQueue` msg)
instance Mailbox TBQueue where
mailboxEmptySTM = isEmptyTBQueue
sendSTM msg = (`writeTBQueue` msg)
receiveSTM = readTBQueue
requeueMsg msg = (`unGetTBQueue` msg)
data Inbox msg =
forall mbox. (Mailbox mbox) =>
Inbox (mbox msg)
instance Mailbox Inbox where
mailboxEmptySTM (Inbox mbox) = mailboxEmptySTM mbox
sendSTM msg (Inbox mbox) = msg `sendSTM` mbox
receiveSTM (Inbox mbox) = receiveSTM mbox
requeueMsg msg (Inbox mbox) = msg `requeueMsg` mbox
mailboxEmpty :: (MonadIO m, Mailbox mbox) => mbox msg -> m Bool
mailboxEmpty = liftIO . atomically . mailboxEmptySTM
send :: (MonadIO m, Mailbox mbox) => msg -> mbox msg -> m ()
send msg = liftIO . atomically . sendSTM msg
requeue :: (Mailbox mbox) => [msg] -> mbox msg -> STM ()
requeue xs mbox = mapM_ (`requeueMsg` mbox) xs
extractMsg ::
(Mailbox mbox)
=> [(msg -> Maybe a, a -> b)]
-> mbox msg
-> STM b
extractMsg hs mbox = do
msg <- receiveSTM mbox
go [] msg hs
where
go acc msg [] = do
msg' <- receiveSTM mbox
go (msg : acc) msg' hs
go acc msg ((f, action):fs) =
case f msg of
Just x -> do
requeue acc mbox
return $ action x
Nothing -> go acc msg fs
query ::
(MonadIO m, Mailbox mbox)
=> (Reply b -> msg)
-> mbox msg
-> m b
query f mbox = do
box <- liftIO $ atomically newEmptyTMVar
f (putTMVar box) `send` mbox
liftIO . atomically $ takeTMVar box
dispatch ::
(MonadIO m, Mailbox mbox)
=> [(msg -> Maybe a, a -> IO b)]
-> mbox msg
-> m b
dispatch hs = liftIO . join . atomically . extractMsg hs
dispatchSTM :: (Mailbox mbox) => [msg -> Maybe a] -> mbox msg -> STM a
dispatchSTM = extractMsg . map (\x -> (x, id))
receive ::
(MonadIO m, Mailbox mbox)
=> mbox msg
-> m msg
receive = dispatch [(Just, return)]
receiveMatch :: (MonadIO m, Mailbox mbox) => mbox msg -> (msg -> Maybe a) -> m a
receiveMatch mbox f = dispatch [(f, return)] mbox
receiveMatchSTM :: (Mailbox mbox) => mbox msg -> (msg -> Maybe a) -> STM a
receiveMatchSTM mbox f = dispatchSTM [f] mbox
timeout ::
forall m b. (MonadIO m, MonadBaseControl IO m, Forall (Pure m))
=> Int
-> m b
-> m (Maybe b)
timeout n action =
race (liftIO $ threadDelay n) action >>= \case
Left () -> return Nothing
Right r -> return $ Just r