{-# LANGUAGE Strict              #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE RankNTypes          #-}
{-# LANGUAGE TypeFamilies        #-}
{-# OPTIONS -Wno-unused-do-bind #-}

module Foreign.Erlang.Connection
    ( Connection(closeConnection)
    , newConnection
    , sendControlMessage
    ) where

import           Control.Monad
import           Control.Concurrent.STM
import qualified Control.Concurrent.Async      as A
import           Util.BufferedIOx
import           Util.IOExtra
import           Foreign.Erlang.NodeState
import           Foreign.Erlang.Term
import           Foreign.Erlang.ControlMessage
import           Foreign.Erlang.Mailbox

--------------------------------------------------------------------------------
data Connection = MkConnection { Connection -> TQueue ControlMessage
sendQueue       :: TQueue ControlMessage
                               , Connection -> IO ()
closeConnection :: IO ()
                               }

--------------------------------------------------------------------------------
newConnection :: (MonadLoggerIO m, MonadMask m, MonadBaseControl IO m, BufferedIOx s)
              => s
              -> NodeState Pid Term Mailbox Connection
              -> Term
              -> m Connection
newConnection :: s -> NodeState Pid Term Mailbox Connection -> Term -> m Connection
newConnection s
sock NodeState Pid Term Mailbox Connection
nodeState Term
name = do
    TQueue ControlMessage
sendQueue <- IO (TQueue ControlMessage) -> m (TQueue ControlMessage)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (TQueue ControlMessage)
forall a. IO (TQueue a)
newTQueueIO
    TQueue ControlMessage -> m Connection
forkTransmitter TQueue ControlMessage
sendQueue
  where
    forkTransmitter :: TQueue ControlMessage -> m Connection
forkTransmitter TQueue ControlMessage
sendQueue = do
        Async (StM m ())
s <- m () -> m (Async (StM m ()))
forall (m :: * -> *) a.
MonadBaseControl IO m =>
m a -> m (Async (StM m a))
async m ()
newSender
        Async (StM m ())
r <- m () -> m (Async (StM m ()))
forall (m :: * -> *) a.
MonadBaseControl IO m =>
m a -> m (Async (StM m a))
async m ()
newReceiver
        Async (StM m ()) -> Async (StM m ()) -> m Connection
registerConnection Async (StM m ())
s Async (StM m ())
r
      where
        newSender :: m ()
newSender = s -> TQueue ControlMessage -> m ()
forall (m :: * -> *) s.
(MonadCatch m, MonadLoggerIO m, BufferedIOx s) =>
s -> TQueue ControlMessage -> m ()
sendLoop s
sock TQueue ControlMessage
sendQueue
        newReceiver :: m ()
newReceiver = s
-> TQueue ControlMessage
-> NodeState Pid Term Mailbox Connection
-> m ()
forall (m :: * -> *) s.
(MonadLoggerIO m, MonadCatch m, BufferedIOx s, MonadMask m) =>
s
-> TQueue ControlMessage
-> NodeState Pid Term Mailbox Connection
-> m ()
recvLoop s
sock TQueue ControlMessage
sendQueue NodeState Pid Term Mailbox Connection
nodeState
        registerConnection :: Async (StM m ()) -> Async (StM m ()) -> m Connection
registerConnection Async (StM m ())
s Async (StM m ())
r = do
            let connection :: Connection
connection = TQueue ControlMessage -> IO () -> Connection
MkConnection TQueue ControlMessage
sendQueue IO ()
stopTransmitter
            String -> m ()
forall (m :: * -> *).
(HasCallStack, MonadLogger m) =>
String -> m ()
logInfoStr (String -> String -> String
forall r. PrintfType r => String -> r
printf String
"putConnectionForNode %s" (Term -> String
forall a. Show a => a -> String
show Term
name))
            IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (NodeState Pid Term Mailbox Connection
-> Term -> Connection -> IO ()
forall n p mb c. Ord n => NodeState p n mb c -> n -> c -> IO ()
putConnectionForNode NodeState Pid Term Mailbox Connection
nodeState Term
name Connection
connection)
            m () -> m (Async (StM m ()))
forall (m :: * -> *) a.
MonadBaseControl IO m =>
m a -> m (Async (StM m a))
async m ()
awaitStopAndCleanup
            Connection -> m Connection
forall (m :: * -> *) a. Monad m => a -> m a
return Connection
connection
          where
            stopTransmitter :: IO ()
stopTransmitter = IO ((), ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO () -> IO () -> IO ((), ())
forall a b. IO a -> IO b -> IO (a, b)
A.concurrently (Async (StM m ()) -> IO ()
forall a. Async a -> IO ()
A.cancel Async (StM m ())
s) (Async (StM m ()) -> IO ()
forall a. Async a -> IO ()
A.cancel Async (StM m ())
r))
            awaitStopAndCleanup :: m ()
awaitStopAndCleanup = do
                (Either SomeException ()
_ :: Either SomeException ()) <- Async (StM m ()) -> m (Either SomeException ())
forall (m :: * -> *) a.
MonadBaseControl IO m =>
Async (StM m a) -> m (Either SomeException a)
waitCatch Async (StM m ())
r
                Async (StM m ()) -> m ()
forall (m :: * -> *) a. MonadBase IO m => Async a -> m ()
cancel Async (StM m ())
s
                m () -> m (Maybe ())
forall a (m :: * -> *).
(HasCallStack, MonadCatch m, MonadLogger m) =>
m a -> m (Maybe a)
tryAndLogAll (IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (s -> IO ()
forall a (m :: * -> *). (BufferedIOx a, MonadIO m) => a -> m ()
closeBuffered s
sock))
                String -> m ()
forall (m :: * -> *).
(HasCallStack, MonadLogger m) =>
String -> m ()
logInfoStr (String -> String -> String
forall r. PrintfType r => String -> r
printf String
"removeConnectionForNode %s" (Term -> String
forall a. Show a => a -> String
show Term
name))
                IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (NodeState Pid Term Mailbox Connection -> Term -> IO ()
forall n p mb c. Ord n => NodeState p n mb c -> n -> IO ()
removeConnectionForNode NodeState Pid Term Mailbox Connection
nodeState Term
name)

-- liftIO $
-- getConnectionForNode nodeState name >>=
--     mapM_ closeConnection
sendControlMessage :: MonadIO m => ControlMessage -> Connection -> m ()
sendControlMessage :: ControlMessage -> Connection -> m ()
sendControlMessage ControlMessage
controlMessage MkConnection{TQueue ControlMessage
sendQueue :: TQueue ControlMessage
sendQueue :: Connection -> TQueue ControlMessage
sendQueue} =
    IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$
        STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TQueue ControlMessage -> ControlMessage -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue ControlMessage
sendQueue ControlMessage
controlMessage

sendLoop :: (MonadCatch m, MonadLoggerIO m, BufferedIOx s)
         => s
         -> TQueue ControlMessage
         -> m ()
sendLoop :: s -> TQueue ControlMessage -> m ()
sendLoop s
sock TQueue ControlMessage
sendQueue =
    m (Maybe ()) -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m (Maybe ())
forall (m :: * -> *) a.
(HasCallStack, MonadCatch m, MonadLogger m) =>
m a -> m (Maybe a)
tryAndLogIO m ()
send)
  where
    send :: m ()
send = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
        ControlMessage
controlMessage <- STM ControlMessage -> IO ControlMessage
forall a. STM a -> IO a
atomically (STM ControlMessage -> IO ControlMessage)
-> STM ControlMessage -> IO ControlMessage
forall a b. (a -> b) -> a -> b
$ TQueue ControlMessage -> STM ControlMessage
forall a. TQueue a -> STM a
readTQueue TQueue ControlMessage
sendQueue
        s -> ControlMessage -> IO ()
forall (m :: * -> *) s a.
(MonadIO m, BufferedIOx s, Binary a) =>
s -> a -> m ()
runPutBuffered s
sock ControlMessage
controlMessage

recvLoop :: (MonadLoggerIO m, MonadCatch m, BufferedIOx s, MonadMask m)
         => s
         -> TQueue ControlMessage
         -> NodeState Pid Term Mailbox Connection
         -> m ()
recvLoop :: s
-> TQueue ControlMessage
-> NodeState Pid Term Mailbox Connection
-> m ()
recvLoop s
sock TQueue ControlMessage
sendQueue NodeState Pid Term Mailbox Connection
nodeState =
    m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m ()
recv m () -> (SomeException -> m ()) -> m ()
forall (m :: * -> *) a.
MonadCatch m =>
m a -> (SomeException -> m a) -> m a
`catchAll`
                 (\SomeException
x -> do
                      (SomeException, NodeState Pid Term Mailbox Connection) -> m ()
forall s (m :: * -> *).
(HasCallStack, Show s, MonadLogger m) =>
s -> m ()
logErrorShow (SomeException
x, NodeState Pid Term Mailbox Connection
nodeState)
                      SomeException -> m ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM SomeException
x))
  where
    recv :: m ()
recv = s -> m ControlMessage
forall (m :: * -> *) s a.
(MonadIO m, BufferedIOx s, Binary a, MonadMask m, MonadLogger m) =>
s -> m a
runGetBuffered s
sock m ControlMessage -> (ControlMessage -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ())
-> (ControlMessage -> IO ()) -> ControlMessage -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ControlMessage -> IO ()
deliver
    deliver :: ControlMessage -> IO ()
deliver ControlMessage
controlMessage =
        case ControlMessage
controlMessage of
            ControlMessage
TICK -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TQueue ControlMessage -> ControlMessage -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue ControlMessage
sendQueue ControlMessage
TICK
            LINK Pid
fromPid Pid
toPid -> do
                Maybe Mailbox
mailbox <- NodeState Pid Term Mailbox Connection -> Pid -> IO (Maybe Mailbox)
forall p n mb c. Ord p => NodeState p n mb c -> p -> IO (Maybe mb)
getMailboxForPid NodeState Pid Term Mailbox Connection
nodeState Pid
toPid
                (Mailbox -> IO ()) -> Maybe Mailbox -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (Mailbox -> Pid -> IO ()
`deliverLink` Pid
fromPid) Maybe Mailbox
mailbox
            SEND Pid
toPid Term
message -> do
                Maybe Mailbox
mailbox <- NodeState Pid Term Mailbox Connection -> Pid -> IO (Maybe Mailbox)
forall p n mb c. Ord p => NodeState p n mb c -> p -> IO (Maybe mb)
getMailboxForPid NodeState Pid Term Mailbox Connection
nodeState Pid
toPid
                (Mailbox -> IO ()) -> Maybe Mailbox -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (Mailbox -> Term -> IO ()
`deliverSend` Term
message) Maybe Mailbox
mailbox
            EXIT Pid
fromPid Pid
toPid Term
reason -> do
                Maybe Mailbox
mailbox <- NodeState Pid Term Mailbox Connection -> Pid -> IO (Maybe Mailbox)
forall p n mb c. Ord p => NodeState p n mb c -> p -> IO (Maybe mb)
getMailboxForPid NodeState Pid Term Mailbox Connection
nodeState Pid
toPid
                (Mailbox -> IO ()) -> Maybe Mailbox -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\Mailbox
mb -> Mailbox -> Pid -> Term -> IO ()
deliverExit Mailbox
mb Pid
fromPid Term
reason) Maybe Mailbox
mailbox
            UNLINK Pid
fromPid Pid
toPid -> do
                Maybe Mailbox
mailbox <- NodeState Pid Term Mailbox Connection -> Pid -> IO (Maybe Mailbox)
forall p n mb c. Ord p => NodeState p n mb c -> p -> IO (Maybe mb)
getMailboxForPid NodeState Pid Term Mailbox Connection
nodeState Pid
toPid
                (Mailbox -> IO ()) -> Maybe Mailbox -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (Mailbox -> Pid -> IO ()
`deliverUnlink` Pid
fromPid) Maybe Mailbox
mailbox
            ControlMessage
NODE_LINK ->
                -- FIXME
                () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
            REG_SEND Pid
fromPid Term
toName Term
message -> do
                Maybe Mailbox
mailbox <- NodeState Pid Term Mailbox Connection -> Term -> IO (Maybe Mailbox)
forall n p mb c. Ord n => NodeState p n mb c -> n -> IO (Maybe mb)
getMailboxForName NodeState Pid Term Mailbox Connection
nodeState Term
toName
                (Mailbox -> IO ()) -> Maybe Mailbox -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\Mailbox
mb -> Mailbox -> Pid -> Term -> IO ()
deliverRegSend Mailbox
mb Pid
fromPid Term
message) Maybe Mailbox
mailbox
            GROUP_LEADER Pid
fromPid Pid
toPid -> do
                Maybe Mailbox
mailbox <- NodeState Pid Term Mailbox Connection -> Pid -> IO (Maybe Mailbox)
forall p n mb c. Ord p => NodeState p n mb c -> p -> IO (Maybe mb)
getMailboxForPid NodeState Pid Term Mailbox Connection
nodeState Pid
toPid
                (Mailbox -> IO ()) -> Maybe Mailbox -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (Mailbox -> Pid -> IO ()
`deliverGroupLeader` Pid
fromPid) Maybe Mailbox
mailbox
            EXIT2 Pid
fromPid Pid
toPid Term
reason -> do
                Maybe Mailbox
mailbox <- NodeState Pid Term Mailbox Connection -> Pid -> IO (Maybe Mailbox)
forall p n mb c. Ord p => NodeState p n mb c -> p -> IO (Maybe mb)
getMailboxForPid NodeState Pid Term Mailbox Connection
nodeState Pid
toPid
                (Mailbox -> IO ()) -> Maybe Mailbox -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\Mailbox
mb -> Mailbox -> Pid -> Term -> IO ()
deliverExit2 Mailbox
mb Pid
fromPid Term
reason) Maybe Mailbox
mailbox