module Network.RPCA.Port ( Port , Callback , new , addService ) where import Control.Concurrent import Control.Concurrent.STM import Control.Exception import Control.Monad import Data.Binary.Strict.Get import Data.Maybe (isJust, isNothing, fromJust) import Data.Either (either) import Data.Int import Foreign.C.Types import qualified Data.ByteString as BS import qualified Data.Map as Map import qualified Data.Sequence as Seq import Network.Socket hiding (send, sendTo, recv, recvFrom) import Network.Socket.ByteString import Network.RPCA.Structs import Network.RPCA.Util import qualified Network.RPCA.Connection as C type Callback = Outboundrequest -> BS.ByteString -> (Inboundreply -> BS.ByteString -> STM ()) -> IO () data Port = Port { portsock :: Socket , portnextid :: TVar Int64 , portconnections :: TVar (Map.Map Int64 C.Connection) , portservices :: TVar (Map.Map String Callback) , portacceptorthread :: ThreadId } new :: Int -> IO Port new portno = do s <- socket AF_INET Stream 0 setSocketOption s ReuseAddr 1 let sockaddr = SockAddrInet (PortNum $ htons $ fromIntegral portno) 0 bindSocket s sockaddr listen s 1 emptyServices <- atomically $ newTVar Map.empty emptyConnections <- atomically $ newTVar Map.empty initId <- atomically $ newTVar 1 myid <- myThreadId let port = Port s initId emptyConnections emptyServices myid threadid <- forkIO $ accepting port return $ port { portacceptorthread = threadid } -- | Add a service to a port addService :: Port -- ^ the port to which the service is added -> String -- ^ the name of the service -> Callback -- ^ the handler callback for this service -> STM () addService port name cb = do updateTVar (Map.insert name cb) $ portservices port return () accepting :: Port -> IO () accepting port = do (newsock, addr) <- accept $ portsock port setSocketOption newsock NoDelay 1 id <- atomically $ updateTVar ((+) 1) (portnextid port) print $ "Connection from " ++ show addr conn <- atomically (do conn <- C.new newsock (closeAction port id) conns <- readTVar $ portconnections port writeTVar (portconnections port) $ Map.insert id conn conns return conn) C.forkThreads conn (readAction port newsock id) accepting port readAction :: Port -> Socket -> Int64 -> IO () readAction port sock id = do (abytes, payload) <- C.readPacket sock let a = either (const Nothing) Just $ outboundrequestDeserialiseBS abytes healthservice = a >>= outboundrequest_probe >>= return . healthprobe_service rpcservice = a >>= outboundrequest_rpc >>= return . rpcrequest_service servicename = healthservice `mplus` rpcservice when (isJust servicename) (do let Just serv = servicename mservice <- atomically $ readTVar (portservices port) >>= return . Map.lookup serv case mservice of Nothing -> do Just conn <- atomically $ readTVar (portconnections port) >>= return . Map.lookup id let replyCode = fromIntegral $ fromEnum ErrServiceUnknown nqReplyRPC conn (fromJust a) $ rpcreplyEmpty { rpcreply_reply_code = replyCode } Just callback -> callback (fromJust a) payload $ handleReply port id) readAction port sock id -- | Used to send payloadless replies to RPCs which are handled within the Port -- layer (probably because it's an error) nqReplyRPC :: C.Connection -- ^ the connection to send to -> Outboundrequest -- ^ the request to which we are replying -> Rpcreply -- ^ the header of the reply -> IO () nqReplyRPC conn obrq reply = do let ibreply = inboundreplyEmpty { inboundreply_id = outboundrequest_id obrq , inboundreply_rpc = Just reply } ibbytes = inboundreplySerialiseBS ibreply header = C.buildHeader (ibbytes, BS.empty) atomically (do updateTVar (\s -> ibbytes Seq.<| header Seq.<| s) $ C.connoutq conn) return () -- | A callback which is passed to the service. It takes the reply header -- and the payload and sends it to the connection, if it still exists handleReply :: Port -- ^ the port which the request came in on -> Int64 -- ^ the id of the connection -> Inboundreply -- ^ the reply header -> BS.ByteString --- ^ ... and payload -> STM () handleReply port id ibreply payload = do let ibbytes = inboundreplySerialiseBS ibreply header = C.buildHeader (ibbytes, payload) conns <- readTVar $ portconnections port let mconn = Map.lookup id conns case mconn of Nothing -> return () Just conn -> do q <- readTVar $ C.connoutq conn writeTVar (C.connoutq conn) $ payload Seq.<| ibbytes Seq.<| header Seq.<| q return () -- | This is called when a connection is closed closeAction :: Port -- ^ the port which owns the connection -> Int64 -- ^ the id of the connection -> IO () closeAction port id = atomically (do updateTVar (Map.delete id) $ portconnections port return ())