{-# LANGUAGE ExistentialQuantification #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE RankNTypes #-} module Control.Concurrent.NQE.Process where import Control.Monad import UnliftIO type Reply a = a -> STM () type Listen a = a -> STM () class Mailbox mbox where mailboxEmptySTM :: mbox msg -> STM Bool sendSTM :: msg -> mbox msg -> STM () receiveSTM :: mbox msg -> STM msg requeueMsg :: msg -> mbox msg -> STM () instance Mailbox TQueue where mailboxEmptySTM = isEmptyTQueue sendSTM msg = (`writeTQueue` msg) receiveSTM = readTQueue requeueMsg msg = (`unGetTQueue` msg) instance Mailbox TBQueue where mailboxEmptySTM = isEmptyTBQueue sendSTM msg = (`writeTBQueue` msg) receiveSTM = readTBQueue requeueMsg msg = (`unGetTBQueue` msg) data Inbox msg = forall mbox. (Mailbox mbox) => Inbox (mbox msg) instance Mailbox Inbox where mailboxEmptySTM (Inbox mbox) = mailboxEmptySTM mbox sendSTM msg (Inbox mbox) = msg `sendSTM` mbox receiveSTM (Inbox mbox) = receiveSTM mbox requeueMsg msg (Inbox mbox) = msg `requeueMsg` mbox mailboxEmpty :: (MonadIO m, Mailbox mbox) => mbox msg -> m Bool mailboxEmpty = atomically . mailboxEmptySTM send :: (MonadIO m, Mailbox mbox) => msg -> mbox msg -> m () send msg = atomically . sendSTM msg requeue :: (Mailbox mbox) => [msg] -> mbox msg -> STM () requeue xs mbox = mapM_ (`requeueMsg` mbox) xs extractMsg :: (Mailbox mbox) => [(msg -> Maybe a, a -> b)] -> mbox msg -> STM b extractMsg hs mbox = do msg <- receiveSTM mbox go [] msg hs where go acc msg [] = do msg' <- receiveSTM mbox go (msg : acc) msg' hs go acc msg ((f, action):fs) = case f msg of Just x -> do requeue acc mbox return $ action x Nothing -> go acc msg fs query :: (MonadIO m, Mailbox mbox) => (Reply b -> msg) -> mbox msg -> m b query f mbox = do box <- atomically newEmptyTMVar f (putTMVar box) `send` mbox atomically (takeTMVar box) dispatch :: (MonadIO m, Mailbox mbox) => [(msg -> Maybe a, a -> m b)] -- ^ action to dispatch -> mbox msg -- ^ mailbox to read from -> m b dispatch hs = join . atomically . extractMsg hs dispatchSTM :: (Mailbox mbox) => [msg -> Maybe a] -> mbox msg -> STM a dispatchSTM = extractMsg . map (\x -> (x, id)) receive :: (MonadIO m, Mailbox mbox) => mbox msg -> m msg receive = dispatch [(Just, return)] receiveMatch :: (MonadIO m, Mailbox mbox) => mbox msg -> (msg -> Maybe a) -> m a receiveMatch mbox f = dispatch [(f, return)] mbox receiveMatchSTM :: (Mailbox mbox) => mbox msg -> (msg -> Maybe a) -> STM a receiveMatchSTM mbox f = dispatchSTM [f] mbox