{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE RankNTypes #-}
module Control.Concurrent.NQE.Process where
import Control.Concurrent.Unique
import Data.Function
import Data.Hashable
import Numeric.Natural
import UnliftIO
type Listen a = a -> STM ()
data Mailbox msg =
forall mbox. (OutChan mbox) =>
Mailbox !(mbox msg)
!Unique
data Inbox msg =
forall mbox. (OutChan mbox, InChan mbox) =>
Inbox !(mbox msg)
!Unique
instance Eq (Mailbox msg) where
== :: Mailbox msg -> Mailbox msg -> Bool
(==) = Unique -> Unique -> Bool
forall a. Eq a => a -> a -> Bool
(==) (Unique -> Unique -> Bool)
-> (Mailbox msg -> Unique) -> Mailbox msg -> Mailbox msg -> Bool
forall b c a. (b -> b -> c) -> (a -> b) -> a -> a -> c
`on` Mailbox msg -> Unique
forall msg. Mailbox msg -> Unique
f
where
f :: Mailbox msg -> Unique
f (Mailbox mbox msg
_ Unique
u) = Unique
u
instance Eq (Inbox msg) where
== :: Inbox msg -> Inbox msg -> Bool
(==) = Unique -> Unique -> Bool
forall a. Eq a => a -> a -> Bool
(==) (Unique -> Unique -> Bool)
-> (Inbox msg -> Unique) -> Inbox msg -> Inbox msg -> Bool
forall b c a. (b -> b -> c) -> (a -> b) -> a -> a -> c
`on` Inbox msg -> Unique
forall msg. Inbox msg -> Unique
f
where
f :: Inbox msg -> Unique
f (Inbox mbox msg
_ Unique
u) = Unique
u
data Process msg = Process
{ Process msg -> Async ()
getProcessAsync :: Async ()
, Process msg -> Mailbox msg
getProcessMailbox :: Mailbox msg
} deriving Process msg -> Process msg -> Bool
(Process msg -> Process msg -> Bool)
-> (Process msg -> Process msg -> Bool) -> Eq (Process msg)
forall msg. Process msg -> Process msg -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Process msg -> Process msg -> Bool
$c/= :: forall msg. Process msg -> Process msg -> Bool
== :: Process msg -> Process msg -> Bool
$c== :: forall msg. Process msg -> Process msg -> Bool
Eq
class InChan mbox where
mailboxEmptySTM :: mbox msg -> STM Bool
receiveSTM :: mbox msg -> STM msg
requeueSTM :: msg -> mbox msg -> STM ()
class OutChan mbox where
mailboxFullSTM :: mbox msg -> STM Bool
sendSTM :: msg -> mbox msg -> STM ()
instance InChan TQueue where
mailboxEmptySTM :: TQueue msg -> STM Bool
mailboxEmptySTM = TQueue msg -> STM Bool
forall msg. TQueue msg -> STM Bool
isEmptyTQueue
receiveSTM :: TQueue msg -> STM msg
receiveSTM = TQueue msg -> STM msg
forall msg. TQueue msg -> STM msg
readTQueue
requeueSTM :: msg -> TQueue msg -> STM ()
requeueSTM msg
msg = (TQueue msg -> msg -> STM ()
forall a. TQueue a -> a -> STM ()
`unGetTQueue` msg
msg)
instance OutChan TQueue where
mailboxFullSTM :: TQueue msg -> STM Bool
mailboxFullSTM TQueue msg
_ = Bool -> STM Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
sendSTM :: msg -> TQueue msg -> STM ()
sendSTM msg
msg = (TQueue msg -> msg -> STM ()
forall a. TQueue a -> a -> STM ()
`writeTQueue` msg
msg)
instance InChan TBQueue where
mailboxEmptySTM :: TBQueue msg -> STM Bool
mailboxEmptySTM = TBQueue msg -> STM Bool
forall msg. TBQueue msg -> STM Bool
isEmptyTBQueue
receiveSTM :: TBQueue msg -> STM msg
receiveSTM = TBQueue msg -> STM msg
forall msg. TBQueue msg -> STM msg
readTBQueue
requeueSTM :: msg -> TBQueue msg -> STM ()
requeueSTM msg
msg = (TBQueue msg -> msg -> STM ()
forall a. TBQueue a -> a -> STM ()
`unGetTBQueue` msg
msg)
instance OutChan TBQueue where
mailboxFullSTM :: TBQueue msg -> STM Bool
mailboxFullSTM = TBQueue msg -> STM Bool
forall msg. TBQueue msg -> STM Bool
isFullTBQueue
sendSTM :: msg -> TBQueue msg -> STM ()
sendSTM msg
msg = (TBQueue msg -> msg -> STM ()
forall a. TBQueue a -> a -> STM ()
`writeTBQueue` msg
msg)
instance OutChan Mailbox where
mailboxFullSTM :: Mailbox msg -> STM Bool
mailboxFullSTM (Mailbox mbox msg
mbox Unique
_) = mbox msg -> STM Bool
forall (mbox :: * -> *) msg. OutChan mbox => mbox msg -> STM Bool
mailboxFullSTM mbox msg
mbox
sendSTM :: msg -> Mailbox msg -> STM ()
sendSTM msg
msg (Mailbox mbox msg
mbox Unique
_) = msg
msg msg -> mbox msg -> STM ()
forall (mbox :: * -> *) msg.
OutChan mbox =>
msg -> mbox msg -> STM ()
`sendSTM` mbox msg
mbox
instance InChan Inbox where
mailboxEmptySTM :: Inbox msg -> STM Bool
mailboxEmptySTM (Inbox mbox msg
mbox Unique
_) = mbox msg -> STM Bool
forall (mbox :: * -> *) msg. InChan mbox => mbox msg -> STM Bool
mailboxEmptySTM mbox msg
mbox
receiveSTM :: Inbox msg -> STM msg
receiveSTM (Inbox mbox msg
mbox Unique
_) = mbox msg -> STM msg
forall (mbox :: * -> *) msg. InChan mbox => mbox msg -> STM msg
receiveSTM mbox msg
mbox
requeueSTM :: msg -> Inbox msg -> STM ()
requeueSTM msg
msg (Inbox mbox msg
mbox Unique
_) = msg
msg msg -> mbox msg -> STM ()
forall (mbox :: * -> *) msg.
InChan mbox =>
msg -> mbox msg -> STM ()
`requeueSTM` mbox msg
mbox
instance OutChan Inbox where
mailboxFullSTM :: Inbox msg -> STM Bool
mailboxFullSTM (Inbox mbox msg
mbox Unique
_) = mbox msg -> STM Bool
forall (mbox :: * -> *) msg. OutChan mbox => mbox msg -> STM Bool
mailboxFullSTM mbox msg
mbox
sendSTM :: msg -> Inbox msg -> STM ()
sendSTM msg
msg (Inbox mbox msg
mbox Unique
_) = msg
msg msg -> mbox msg -> STM ()
forall (mbox :: * -> *) msg.
OutChan mbox =>
msg -> mbox msg -> STM ()
`sendSTM` mbox msg
mbox
instance OutChan Process where
mailboxFullSTM :: Process msg -> STM Bool
mailboxFullSTM (Process Async ()
_ Mailbox msg
mbox) = Mailbox msg -> STM Bool
forall (mbox :: * -> *) msg. OutChan mbox => mbox msg -> STM Bool
mailboxFullSTM Mailbox msg
mbox
sendSTM :: msg -> Process msg -> STM ()
sendSTM msg
msg (Process Async ()
_ Mailbox msg
mbox) = msg
msg msg -> Mailbox msg -> STM ()
forall (mbox :: * -> *) msg.
OutChan mbox =>
msg -> mbox msg -> STM ()
`sendSTM` Mailbox msg
mbox
instance Hashable (Process msg) where
hashWithSalt :: Int -> Process msg -> Int
hashWithSalt Int
i (Process Async ()
_ Mailbox msg
m) = Int -> Mailbox msg -> Int
forall a. Hashable a => Int -> a -> Int
hashWithSalt Int
i Mailbox msg
m
hash :: Process msg -> Int
hash (Process Async ()
_ Mailbox msg
m) = Mailbox msg -> Int
forall a. Hashable a => a -> Int
hash Mailbox msg
m
instance Hashable (Mailbox msg) where
hashWithSalt :: Int -> Mailbox msg -> Int
hashWithSalt Int
i (Mailbox mbox msg
_ Unique
u) = Int -> Unique -> Int
forall a. Hashable a => Int -> a -> Int
hashWithSalt Int
i Unique
u
hash :: Mailbox msg -> Int
hash (Mailbox mbox msg
_ Unique
u) = Unique -> Int
forall a. Hashable a => a -> Int
hash Unique
u
inboxToMailbox :: Inbox msg -> Mailbox msg
inboxToMailbox :: Inbox msg -> Mailbox msg
inboxToMailbox (Inbox mbox msg
m Unique
u) = mbox msg -> Unique -> Mailbox msg
forall msg (mbox :: * -> *).
OutChan mbox =>
mbox msg -> Unique -> Mailbox msg
Mailbox mbox msg
m Unique
u
wrapChannel ::
(MonadIO m, InChan mbox, OutChan mbox) => mbox msg -> m (Inbox msg)
wrapChannel :: mbox msg -> m (Inbox msg)
wrapChannel mbox msg
mbox = mbox msg -> Unique -> Inbox msg
forall msg (mbox :: * -> *).
(OutChan mbox, InChan mbox) =>
mbox msg -> Unique -> Inbox msg
Inbox mbox msg
mbox (Unique -> Inbox msg) -> m Unique -> m (Inbox msg)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO Unique -> m Unique
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO Unique
newUnique
newInbox :: MonadIO m => m (Inbox msg)
newInbox :: m (Inbox msg)
newInbox = m (TQueue msg)
forall (m :: * -> *) a. MonadIO m => m (TQueue a)
newTQueueIO m (TQueue msg) -> (TQueue msg -> m (Inbox msg)) -> m (Inbox msg)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \TQueue msg
c -> TQueue msg -> m (Inbox msg)
forall (m :: * -> *) (mbox :: * -> *) msg.
(MonadIO m, InChan mbox, OutChan mbox) =>
mbox msg -> m (Inbox msg)
wrapChannel TQueue msg
c
newBoundedInbox :: MonadIO m => Natural -> m (Inbox msg)
newBoundedInbox :: Natural -> m (Inbox msg)
newBoundedInbox Natural
i = Natural -> m (TBQueue msg)
forall (m :: * -> *) a. MonadIO m => Natural -> m (TBQueue a)
newTBQueueIO (Natural -> Natural
forall a b. (Integral a, Num b) => a -> b
fromIntegral Natural
i) m (TBQueue msg) -> (TBQueue msg -> m (Inbox msg)) -> m (Inbox msg)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \TBQueue msg
c -> TBQueue msg -> m (Inbox msg)
forall (m :: * -> *) (mbox :: * -> *) msg.
(MonadIO m, InChan mbox, OutChan mbox) =>
mbox msg -> m (Inbox msg)
wrapChannel TBQueue msg
c
send :: (MonadIO m, OutChan mbox) => msg -> mbox msg -> m ()
send :: msg -> mbox msg -> m ()
send msg
msg = STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ()) -> (mbox msg -> STM ()) -> mbox msg -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. msg -> mbox msg -> STM ()
forall (mbox :: * -> *) msg.
OutChan mbox =>
msg -> mbox msg -> STM ()
sendSTM msg
msg
receive :: (InChan mbox, MonadIO m) => mbox msg -> m msg
receive :: mbox msg -> m msg
receive mbox msg
mbox = mbox msg -> (msg -> Maybe msg) -> m msg
forall (m :: * -> *) (mbox :: * -> *) msg a.
(MonadIO m, InChan mbox) =>
mbox msg -> (msg -> Maybe a) -> m a
receiveMatch mbox msg
mbox msg -> Maybe msg
forall a. a -> Maybe a
Just
query ::
(MonadIO m, OutChan mbox)
=> (Listen response -> request)
-> mbox request
-> m response
query :: (Listen response -> request) -> mbox request -> m response
query Listen response -> request
f mbox request
m = do
TMVar response
r <- m (TMVar response)
forall (m :: * -> *) a. MonadIO m => m (TMVar a)
newEmptyTMVarIO
Listen response -> request
f (TMVar response -> Listen response
forall a. TMVar a -> a -> STM ()
putTMVar TMVar response
r) request -> mbox request -> m ()
forall (m :: * -> *) (mbox :: * -> *) msg.
(MonadIO m, OutChan mbox) =>
msg -> mbox msg -> m ()
`send` mbox request
m
STM response -> m response
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM response -> m response) -> STM response -> m response
forall a b. (a -> b) -> a -> b
$ TMVar response -> STM response
forall a. TMVar a -> STM a
takeTMVar TMVar response
r
queryU ::
(MonadUnliftIO m, OutChan mbox)
=> Int
-> (Listen response -> request)
-> mbox request
-> m (Maybe response)
queryU :: Int
-> (Listen response -> request)
-> mbox request
-> m (Maybe response)
queryU Int
u Listen response -> request
f mbox request
m = Int -> m response -> m (Maybe response)
forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> m a -> m (Maybe a)
timeout Int
u ((Listen response -> request) -> mbox request -> m response
forall (m :: * -> *) (mbox :: * -> *) response request.
(MonadIO m, OutChan mbox) =>
(Listen response -> request) -> mbox request -> m response
query Listen response -> request
f mbox request
m)
queryS ::
(MonadUnliftIO m, OutChan mbox)
=> Int
-> (Listen response -> request)
-> mbox request
-> m (Maybe response)
queryS :: Int
-> (Listen response -> request)
-> mbox request
-> m (Maybe response)
queryS Int
s Listen response -> request
f mbox request
m = Int -> m response -> m (Maybe response)
forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> m a -> m (Maybe a)
timeout (Int
s Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000) ((Listen response -> request) -> mbox request -> m response
forall (m :: * -> *) (mbox :: * -> *) response request.
(MonadIO m, OutChan mbox) =>
(Listen response -> request) -> mbox request -> m response
query Listen response -> request
f mbox request
m)
receiveMatch :: (MonadIO m, InChan mbox) => mbox msg -> (msg -> Maybe a) -> m a
receiveMatch :: mbox msg -> (msg -> Maybe a) -> m a
receiveMatch mbox msg
mbox = STM a -> m a
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM a -> m a)
-> ((msg -> Maybe a) -> STM a) -> (msg -> Maybe a) -> m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. mbox msg -> (msg -> Maybe a) -> STM a
forall (mbox :: * -> *) msg a.
InChan mbox =>
mbox msg -> (msg -> Maybe a) -> STM a
receiveMatchSTM mbox msg
mbox
receiveMatchU ::
(MonadUnliftIO m, InChan mbox)
=> Int
-> mbox msg
-> (msg -> Maybe a)
-> m (Maybe a)
receiveMatchU :: Int -> mbox msg -> (msg -> Maybe a) -> m (Maybe a)
receiveMatchU Int
u mbox msg
mbox msg -> Maybe a
f = Int -> m a -> m (Maybe a)
forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> m a -> m (Maybe a)
timeout Int
u (m a -> m (Maybe a)) -> m a -> m (Maybe a)
forall a b. (a -> b) -> a -> b
$ mbox msg -> (msg -> Maybe a) -> m a
forall (m :: * -> *) (mbox :: * -> *) msg a.
(MonadIO m, InChan mbox) =>
mbox msg -> (msg -> Maybe a) -> m a
receiveMatch mbox msg
mbox msg -> Maybe a
f
receiveMatchS ::
(MonadUnliftIO m, InChan mbox)
=> Int
-> mbox msg
-> (msg -> Maybe a)
-> m (Maybe a)
receiveMatchS :: Int -> mbox msg -> (msg -> Maybe a) -> m (Maybe a)
receiveMatchS Int
s mbox msg
mbox msg -> Maybe a
f = Int -> m a -> m (Maybe a)
forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> m a -> m (Maybe a)
timeout (Int
s Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000) (m a -> m (Maybe a)) -> m a -> m (Maybe a)
forall a b. (a -> b) -> a -> b
$ mbox msg -> (msg -> Maybe a) -> m a
forall (m :: * -> *) (mbox :: * -> *) msg a.
(MonadIO m, InChan mbox) =>
mbox msg -> (msg -> Maybe a) -> m a
receiveMatch mbox msg
mbox msg -> Maybe a
f
receiveMatchSTM :: InChan mbox => mbox msg -> (msg -> Maybe a) -> STM a
receiveMatchSTM :: mbox msg -> (msg -> Maybe a) -> STM a
receiveMatchSTM mbox msg
mbox msg -> Maybe a
f = [msg] -> STM a
go []
where
go :: [msg] -> STM a
go [msg]
acc =
mbox msg -> STM msg
forall (mbox :: * -> *) msg. InChan mbox => mbox msg -> STM msg
receiveSTM mbox msg
mbox STM msg -> (msg -> STM a) -> STM a
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \msg
msg ->
case msg -> Maybe a
f msg
msg of
Just a
x -> do
[msg] -> mbox msg -> STM ()
forall (mbox :: * -> *) msg.
InChan mbox =>
[msg] -> mbox msg -> STM ()
requeueListSTM [msg]
acc mbox msg
mbox
a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x
Maybe a
Nothing -> [msg] -> STM a
go (msg
msg msg -> [msg] -> [msg]
forall a. a -> [a] -> [a]
: [msg]
acc)
mailboxEmpty :: (MonadIO m, InChan mbox) => mbox msg -> m Bool
mailboxEmpty :: mbox msg -> m Bool
mailboxEmpty = STM Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM Bool -> m Bool)
-> (mbox msg -> STM Bool) -> mbox msg -> m Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. mbox msg -> STM Bool
forall (mbox :: * -> *) msg. InChan mbox => mbox msg -> STM Bool
mailboxEmptySTM
requeueListSTM :: InChan mbox => [msg] -> mbox msg -> STM ()
requeueListSTM :: [msg] -> mbox msg -> STM ()
requeueListSTM [msg]
xs mbox msg
mbox = (msg -> STM ()) -> [msg] -> STM ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (msg -> mbox msg -> STM ()
forall (mbox :: * -> *) msg.
InChan mbox =>
msg -> mbox msg -> STM ()
`requeueSTM` mbox msg
mbox) [msg]
xs
withProcess ::
MonadUnliftIO m => (Inbox msg -> m ()) -> (Process msg -> m a) -> m a
withProcess :: (Inbox msg -> m ()) -> (Process msg -> m a) -> m a
withProcess Inbox msg -> m ()
p Process msg -> m a
f = do
(Inbox msg
i, Mailbox msg
m) <- m (Inbox msg, Mailbox msg)
forall (m :: * -> *) msg.
MonadUnliftIO m =>
m (Inbox msg, Mailbox msg)
newMailbox
m () -> (Async () -> m a) -> m a
forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> (Async a -> m b) -> m b
withAsync (Inbox msg -> m ()
p Inbox msg
i) (\Async ()
a -> Async () -> m ()
forall (m :: * -> *) a. MonadIO m => Async a -> m ()
link Async ()
a m () -> m a -> m a
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Process msg -> m a
f (Async () -> Mailbox msg -> Process msg
forall msg. Async () -> Mailbox msg -> Process msg
Process Async ()
a Mailbox msg
m))
process :: MonadUnliftIO m => (Inbox msg -> m ()) -> m (Process msg)
process :: (Inbox msg -> m ()) -> m (Process msg)
process Inbox msg -> m ()
p = do
(Inbox msg
i, Mailbox msg
m) <- m (Inbox msg, Mailbox msg)
forall (m :: * -> *) msg.
MonadUnliftIO m =>
m (Inbox msg, Mailbox msg)
newMailbox
Async ()
a <- m () -> m (Async ())
forall (m :: * -> *) a. MonadUnliftIO m => m a -> m (Async a)
async (m () -> m (Async ())) -> m () -> m (Async ())
forall a b. (a -> b) -> a -> b
$ Inbox msg -> m ()
p Inbox msg
i
Async () -> m ()
forall (m :: * -> *) a. MonadIO m => Async a -> m ()
link Async ()
a
Process msg -> m (Process msg)
forall (m :: * -> *) a. Monad m => a -> m a
return (Async () -> Mailbox msg -> Process msg
forall msg. Async () -> Mailbox msg -> Process msg
Process Async ()
a Mailbox msg
m)
newMailbox :: MonadUnliftIO m => m (Inbox msg, Mailbox msg)
newMailbox :: m (Inbox msg, Mailbox msg)
newMailbox = do
Inbox msg
i <- m (Inbox msg)
forall (m :: * -> *) msg. MonadIO m => m (Inbox msg)
newInbox
let m :: Mailbox msg
m = Inbox msg -> Mailbox msg
forall msg. Inbox msg -> Mailbox msg
inboxToMailbox Inbox msg
i
(Inbox msg, Mailbox msg) -> m (Inbox msg, Mailbox msg)
forall (m :: * -> *) a. Monad m => a -> m a
return (Inbox msg
i, Mailbox msg
m)