{-# LANGUAGE Strict #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE TypeFamilies #-}
{-# OPTIONS -Wno-unused-do-bind #-}
module Foreign.Erlang.Connection
( Connection(closeConnection)
, newConnection
, sendControlMessage
) where
import Control.Monad
import Control.Concurrent.STM
import qualified Control.Concurrent.Async as A
import Util.BufferedIOx
import Util.IOExtra
import Foreign.Erlang.NodeState
import Foreign.Erlang.Term
import Foreign.Erlang.ControlMessage
import Foreign.Erlang.Mailbox
data Connection = MkConnection { Connection -> TQueue ControlMessage
sendQueue :: TQueue ControlMessage
, Connection -> IO ()
closeConnection :: IO ()
}
newConnection :: (MonadLoggerIO m, MonadMask m, MonadBaseControl IO m, BufferedIOx s)
=> s
-> NodeState Pid Term Mailbox Connection
-> Term
-> m Connection
newConnection :: s -> NodeState Pid Term Mailbox Connection -> Term -> m Connection
newConnection s
sock NodeState Pid Term Mailbox Connection
nodeState Term
name = do
TQueue ControlMessage
sendQueue <- IO (TQueue ControlMessage) -> m (TQueue ControlMessage)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (TQueue ControlMessage)
forall a. IO (TQueue a)
newTQueueIO
TQueue ControlMessage -> m Connection
forkTransmitter TQueue ControlMessage
sendQueue
where
forkTransmitter :: TQueue ControlMessage -> m Connection
forkTransmitter TQueue ControlMessage
sendQueue = do
Async (StM m ())
s <- m () -> m (Async (StM m ()))
forall (m :: * -> *) a.
MonadBaseControl IO m =>
m a -> m (Async (StM m a))
async m ()
newSender
Async (StM m ())
r <- m () -> m (Async (StM m ()))
forall (m :: * -> *) a.
MonadBaseControl IO m =>
m a -> m (Async (StM m a))
async m ()
newReceiver
Async (StM m ()) -> Async (StM m ()) -> m Connection
registerConnection Async (StM m ())
s Async (StM m ())
r
where
newSender :: m ()
newSender = s -> TQueue ControlMessage -> m ()
forall (m :: * -> *) s.
(MonadCatch m, MonadLoggerIO m, BufferedIOx s) =>
s -> TQueue ControlMessage -> m ()
sendLoop s
sock TQueue ControlMessage
sendQueue
newReceiver :: m ()
newReceiver = s
-> TQueue ControlMessage
-> NodeState Pid Term Mailbox Connection
-> m ()
forall (m :: * -> *) s.
(MonadLoggerIO m, MonadCatch m, BufferedIOx s, MonadMask m) =>
s
-> TQueue ControlMessage
-> NodeState Pid Term Mailbox Connection
-> m ()
recvLoop s
sock TQueue ControlMessage
sendQueue NodeState Pid Term Mailbox Connection
nodeState
registerConnection :: Async (StM m ()) -> Async (StM m ()) -> m Connection
registerConnection Async (StM m ())
s Async (StM m ())
r = do
let connection :: Connection
connection = TQueue ControlMessage -> IO () -> Connection
MkConnection TQueue ControlMessage
sendQueue IO ()
stopTransmitter
String -> m ()
forall (m :: * -> *).
(HasCallStack, MonadLogger m) =>
String -> m ()
logInfoStr (String -> String -> String
forall r. PrintfType r => String -> r
printf String
"putConnectionForNode %s" (Term -> String
forall a. Show a => a -> String
show Term
name))
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (NodeState Pid Term Mailbox Connection
-> Term -> Connection -> IO ()
forall n p mb c. Ord n => NodeState p n mb c -> n -> c -> IO ()
putConnectionForNode NodeState Pid Term Mailbox Connection
nodeState Term
name Connection
connection)
m () -> m (Async (StM m ()))
forall (m :: * -> *) a.
MonadBaseControl IO m =>
m a -> m (Async (StM m a))
async m ()
awaitStopAndCleanup
Connection -> m Connection
forall (m :: * -> *) a. Monad m => a -> m a
return Connection
connection
where
stopTransmitter :: IO ()
stopTransmitter = IO ((), ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO () -> IO () -> IO ((), ())
forall a b. IO a -> IO b -> IO (a, b)
A.concurrently (Async (StM m ()) -> IO ()
forall a. Async a -> IO ()
A.cancel Async (StM m ())
s) (Async (StM m ()) -> IO ()
forall a. Async a -> IO ()
A.cancel Async (StM m ())
r))
awaitStopAndCleanup :: m ()
awaitStopAndCleanup = do
(Either SomeException ()
_ :: Either SomeException ()) <- Async (StM m ()) -> m (Either SomeException ())
forall (m :: * -> *) a.
MonadBaseControl IO m =>
Async (StM m a) -> m (Either SomeException a)
waitCatch Async (StM m ())
r
Async (StM m ()) -> m ()
forall (m :: * -> *) a. MonadBase IO m => Async a -> m ()
cancel Async (StM m ())
s
m () -> m (Maybe ())
forall a (m :: * -> *).
(HasCallStack, MonadCatch m, MonadLogger m) =>
m a -> m (Maybe a)
tryAndLogAll (IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (s -> IO ()
forall a (m :: * -> *). (BufferedIOx a, MonadIO m) => a -> m ()
closeBuffered s
sock))
String -> m ()
forall (m :: * -> *).
(HasCallStack, MonadLogger m) =>
String -> m ()
logInfoStr (String -> String -> String
forall r. PrintfType r => String -> r
printf String
"removeConnectionForNode %s" (Term -> String
forall a. Show a => a -> String
show Term
name))
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (NodeState Pid Term Mailbox Connection -> Term -> IO ()
forall n p mb c. Ord n => NodeState p n mb c -> n -> IO ()
removeConnectionForNode NodeState Pid Term Mailbox Connection
nodeState Term
name)
sendControlMessage :: MonadIO m => ControlMessage -> Connection -> m ()
sendControlMessage :: ControlMessage -> Connection -> m ()
sendControlMessage ControlMessage
controlMessage MkConnection{TQueue ControlMessage
sendQueue :: TQueue ControlMessage
sendQueue :: Connection -> TQueue ControlMessage
sendQueue} =
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TQueue ControlMessage -> ControlMessage -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue ControlMessage
sendQueue ControlMessage
controlMessage
sendLoop :: (MonadCatch m, MonadLoggerIO m, BufferedIOx s)
=> s
-> TQueue ControlMessage
-> m ()
sendLoop :: s -> TQueue ControlMessage -> m ()
sendLoop s
sock TQueue ControlMessage
sendQueue =
m (Maybe ()) -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m (Maybe ())
forall (m :: * -> *) a.
(HasCallStack, MonadCatch m, MonadLogger m) =>
m a -> m (Maybe a)
tryAndLogIO m ()
send)
where
send :: m ()
send = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
ControlMessage
controlMessage <- STM ControlMessage -> IO ControlMessage
forall a. STM a -> IO a
atomically (STM ControlMessage -> IO ControlMessage)
-> STM ControlMessage -> IO ControlMessage
forall a b. (a -> b) -> a -> b
$ TQueue ControlMessage -> STM ControlMessage
forall a. TQueue a -> STM a
readTQueue TQueue ControlMessage
sendQueue
s -> ControlMessage -> IO ()
forall (m :: * -> *) s a.
(MonadIO m, BufferedIOx s, Binary a) =>
s -> a -> m ()
runPutBuffered s
sock ControlMessage
controlMessage
recvLoop :: (MonadLoggerIO m, MonadCatch m, BufferedIOx s, MonadMask m)
=> s
-> TQueue ControlMessage
-> NodeState Pid Term Mailbox Connection
-> m ()
recvLoop :: s
-> TQueue ControlMessage
-> NodeState Pid Term Mailbox Connection
-> m ()
recvLoop s
sock TQueue ControlMessage
sendQueue NodeState Pid Term Mailbox Connection
nodeState =
m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m ()
recv m () -> (SomeException -> m ()) -> m ()
forall (m :: * -> *) a.
MonadCatch m =>
m a -> (SomeException -> m a) -> m a
`catchAll`
(\SomeException
x -> do
(SomeException, NodeState Pid Term Mailbox Connection) -> m ()
forall s (m :: * -> *).
(HasCallStack, Show s, MonadLogger m) =>
s -> m ()
logErrorShow (SomeException
x, NodeState Pid Term Mailbox Connection
nodeState)
SomeException -> m ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM SomeException
x))
where
recv :: m ()
recv = s -> m ControlMessage
forall (m :: * -> *) s a.
(MonadIO m, BufferedIOx s, Binary a, MonadMask m, MonadLogger m) =>
s -> m a
runGetBuffered s
sock m ControlMessage -> (ControlMessage -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ())
-> (ControlMessage -> IO ()) -> ControlMessage -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ControlMessage -> IO ()
deliver
deliver :: ControlMessage -> IO ()
deliver ControlMessage
controlMessage =
case ControlMessage
controlMessage of
ControlMessage
TICK -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TQueue ControlMessage -> ControlMessage -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue ControlMessage
sendQueue ControlMessage
TICK
LINK Pid
fromPid Pid
toPid -> do
Maybe Mailbox
mailbox <- NodeState Pid Term Mailbox Connection -> Pid -> IO (Maybe Mailbox)
forall p n mb c. Ord p => NodeState p n mb c -> p -> IO (Maybe mb)
getMailboxForPid NodeState Pid Term Mailbox Connection
nodeState Pid
toPid
(Mailbox -> IO ()) -> Maybe Mailbox -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (Mailbox -> Pid -> IO ()
`deliverLink` Pid
fromPid) Maybe Mailbox
mailbox
SEND Pid
toPid Term
message -> do
Maybe Mailbox
mailbox <- NodeState Pid Term Mailbox Connection -> Pid -> IO (Maybe Mailbox)
forall p n mb c. Ord p => NodeState p n mb c -> p -> IO (Maybe mb)
getMailboxForPid NodeState Pid Term Mailbox Connection
nodeState Pid
toPid
(Mailbox -> IO ()) -> Maybe Mailbox -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (Mailbox -> Term -> IO ()
`deliverSend` Term
message) Maybe Mailbox
mailbox
EXIT Pid
fromPid Pid
toPid Term
reason -> do
Maybe Mailbox
mailbox <- NodeState Pid Term Mailbox Connection -> Pid -> IO (Maybe Mailbox)
forall p n mb c. Ord p => NodeState p n mb c -> p -> IO (Maybe mb)
getMailboxForPid NodeState Pid Term Mailbox Connection
nodeState Pid
toPid
(Mailbox -> IO ()) -> Maybe Mailbox -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\Mailbox
mb -> Mailbox -> Pid -> Term -> IO ()
deliverExit Mailbox
mb Pid
fromPid Term
reason) Maybe Mailbox
mailbox
UNLINK Pid
fromPid Pid
toPid -> do
Maybe Mailbox
mailbox <- NodeState Pid Term Mailbox Connection -> Pid -> IO (Maybe Mailbox)
forall p n mb c. Ord p => NodeState p n mb c -> p -> IO (Maybe mb)
getMailboxForPid NodeState Pid Term Mailbox Connection
nodeState Pid
toPid
(Mailbox -> IO ()) -> Maybe Mailbox -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (Mailbox -> Pid -> IO ()
`deliverUnlink` Pid
fromPid) Maybe Mailbox
mailbox
ControlMessage
NODE_LINK ->
() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
REG_SEND Pid
fromPid Term
toName Term
message -> do
Maybe Mailbox
mailbox <- NodeState Pid Term Mailbox Connection -> Term -> IO (Maybe Mailbox)
forall n p mb c. Ord n => NodeState p n mb c -> n -> IO (Maybe mb)
getMailboxForName NodeState Pid Term Mailbox Connection
nodeState Term
toName
(Mailbox -> IO ()) -> Maybe Mailbox -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\Mailbox
mb -> Mailbox -> Pid -> Term -> IO ()
deliverRegSend Mailbox
mb Pid
fromPid Term
message) Maybe Mailbox
mailbox
GROUP_LEADER Pid
fromPid Pid
toPid -> do
Maybe Mailbox
mailbox <- NodeState Pid Term Mailbox Connection -> Pid -> IO (Maybe Mailbox)
forall p n mb c. Ord p => NodeState p n mb c -> p -> IO (Maybe mb)
getMailboxForPid NodeState Pid Term Mailbox Connection
nodeState Pid
toPid
(Mailbox -> IO ()) -> Maybe Mailbox -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (Mailbox -> Pid -> IO ()
`deliverGroupLeader` Pid
fromPid) Maybe Mailbox
mailbox
EXIT2 Pid
fromPid Pid
toPid Term
reason -> do
Maybe Mailbox
mailbox <- NodeState Pid Term Mailbox Connection -> Pid -> IO (Maybe Mailbox)
forall p n mb c. Ord p => NodeState p n mb c -> p -> IO (Maybe mb)
getMailboxForPid NodeState Pid Term Mailbox Connection
nodeState Pid
toPid
(Mailbox -> IO ()) -> Maybe Mailbox -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\Mailbox
mb -> Mailbox -> Pid -> Term -> IO ()
deliverExit2 Mailbox
mb Pid
fromPid Term
reason) Maybe Mailbox
mailbox