module Ros.Node.RosTcp (subStream, runServer, runServers, callServiceWithMaster) where
import Control.Applicative ((<$>))
import Control.Arrow (first)
import Control.Concurrent (forkIO, killThread, newEmptyMVar, takeMVar, putMVar)
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TVar
import qualified Control.Exception as E
import Control.Monad.Reader
import Data.Binary.Put (runPut, putWord32le)
import Data.Binary.Get (runGet, getWord32le)
import Data.ByteString.Lazy (ByteString)
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as BL
import Network.BSD (getHostByName, hostAddress)
import Network.Socket hiding (send, sendTo, recv, recvFrom, Stream, ServiceName)
import qualified Network.Socket as Sock
import Network.Socket.ByteString
import Prelude hiding (getContents)
import System.IO (IOMode(ReadMode), hClose)
import Text.URI (parseURI, uriRegName, uriPort)
import Ros.Node.BinaryIter (streamIn, getServiceResult)
import Ros.Internal.Msg.MsgInfo
import Ros.Internal.Msg.SrvInfo
import Ros.Internal.RosBinary
import Ros.Internal.RosTypes
import Ros.Internal.Util.RingChan
import Ros.Internal.Util.AppConfig (Config, debug, forkConfig)
import Ros.Topic (Topic(..))
import Ros.Node.ConnectionHeader
import Ros.Graph.Slave (requestTopicClient)
import Ros.Graph.Master (lookupService)
import Data.Maybe (fromMaybe)
import Ros.Service.ServiceTypes
import Control.Monad.Except
import System.IO.Error (tryIOError)
serviceClient :: RingChan ByteString -> Socket -> IO ()
serviceClient c s = forever $ do bs <- readChan c
sendBS s bs
sendBS :: Socket -> ByteString -> IO ()
sendBS sock bs =
let len = runPut $
putWord32le . fromIntegral $
BL.length bs
in
sendAll sock (BL.toStrict $ BL.append len bs)
recvAll :: Socket -> Int -> IO B.ByteString
recvAll s = flip go []
where go len acc = do bs <- recv s len
if B.length bs < len
then go (len B.length bs) (bs:acc)
else return $ B.concat (reverse (bs:acc))
negotiatePub :: String -> String -> Socket -> IO ()
negotiatePub ttype md5 sock =
do headerLength <- runGet (fromIntegral <$> getWord32le) <$>
BL.fromChunks . (:[]) <$> recvAll sock 4
headerBytes <- recvAll sock headerLength
let connHeader = parseHeader headerBytes
wildCard = case lookup "type" connHeader of
Just t | t == "*" -> True
| t == ttype -> False
| otherwise -> error $
"Disagreeing Topic types: " ++
"publisher expected "++ttype++
", but client asked for "++t
Nothing -> error $ "Client did not include the "++
"topic type in its "++
"connection request."
when (not wildCard)
(case lookup "md5sum" connHeader of
Just s | s == md5 -> return ()
| otherwise -> error "Disagreement on Topic type MD5"
Nothing -> error $ "Client did not include MD5 sum "++
"in its request.")
case lookup "tcp_nodelay" connHeader of
Just "1" -> setSocketOption sock NoDelay 0
_ -> return ()
sendAll sock . genHeader $
[("md5sum",md5), ("type",ttype), ("callerid","roshask")]
acceptClients :: Socket -> TVar [(Config (), RingChan ByteString)] ->
(Socket -> IO ()) -> IO (RingChan ByteString) -> Config ()
acceptClients sock clients negotiate mkBuffer = forever acceptClient
where acceptClient = do (client,_) <- liftIO $ accept sock
debug "Accepted client socket"
liftIO $ negotiate client
chan <- liftIO mkBuffer
let cleanup1 =
do debug "Closing client socket"
liftIO $
shutdown client ShutdownBoth `E.catch`
\(_::E.SomeException) -> return ()
r <- ask
t <- liftIO . forkIO $
serviceClient chan client `E.catch`
\(_::E.SomeException) -> runReaderT cleanup1 r
let cleanup2 = cleanup1 >>
(liftIO $ killThread t)
liftIO . atomically $
readTVar clients >>=
writeTVar clients . ((cleanup2,chan) :)
pubStream :: RosBinary a
=> Topic IO a -> TVar [(b, RingChan ByteString)] -> Config ()
pubStream t0 clients = liftIO $ go 0 t0
where go !n t = do (x, t') <- runTopic t
let bytes = runPut $ putMsg n x
cs <- readTVarIO clients
mapM_ (flip writeChan bytes . snd) cs
go (n+1) t'
pubStreamIO :: RosBinary a => IO (TVar [(b, RingChan ByteString)] -> Config (),
a -> IO ())
pubStreamIO = do m <- newEmptyMVar
let feed clients =
let go !n = do x <- takeMVar m
let bytes = runPut $ putMsg n x
cs <- readTVarIO clients
mapM_ (flip writeChan bytes . snd) cs
go (n+1)
in liftIO $ go 0
return (feed, putMVar m)
negotiateSub :: Socket -> String -> String -> String -> IO ()
negotiateSub sock tname ttype md5 =
do sendAll sock $ genHeader [ ("callerid", "roshask"), ("topic", tname)
, ("md5sum", md5), ("type", ttype)
, ("tcp_nodelay", "1") ]
responseLength <- runGet (fromIntegral <$> getWord32le) <$>
BL.fromChunks . (:[]) <$> recvAll sock 4
headerBytes <- recvAll sock responseLength
let connHeader = parseHeader headerBytes
case lookup "type" connHeader of
Just t | t == ttype -> return ()
| otherwise -> error $ "Disagreeing Topic types: " ++
"subscriber expected "++ttype++
", but server replied with "++t
Nothing -> error $ "Server did not include the topic type "++
"in its response."
case lookup "md5sum" connHeader of
Just s | s == md5 -> return ()
| otherwise -> error "Disagreement on Topic type MD5"
Nothing -> error "Server did not include MD5 sum in its response."
setSocketOption sock KeepAlive 1
subStream :: forall a. (RosBinary a, MsgInfo a) =>
URI -> String -> (Int -> IO ()) -> Config (Topic IO a)
subStream target tname _updateStats =
do debug $ "Opening stream to " ++target++" for "++tname
h <- liftIO $
do response <- requestTopicClient target "/roshask" tname
[["TCPROS"]]
let port = case response of
(1,_,("TCPROS",_,port')) -> fromIntegral port'
_ -> error $ "Couldn't get publisher's port for "++
tname++" from node "++target
sock <- socket AF_INET Sock.Stream defaultProtocol
ip <- hostAddress <$> getHostByName host
connect sock $ SockAddrInet port ip
let md5 = sourceMD5 (undefined::a)
ttype = msgTypeName (undefined::a)
negotiateSub sock tname ttype md5
socketToHandle sock ReadMode
debug $ "Streaming "++tname++" from "++target
return $ streamIn h
where host = parseHost target
parseHost :: URI -> String
parseHost target = case parseURI target of
Just u -> fromMaybe
(error $ "Couldn't parse hostname "++ "from "++target)
(uriRegName u)
Nothing -> error $ "Couldn't parse URI "++target
parseHostAndPort :: URI -> Either ServiceResponseExcept (String, PortNumber)
parseHostAndPort target = do
uri <- maybeToEither (ConnectExcept $ "Could not parse URI "++target) $ parseURI target
host <- maybeToEither (ConnectExcept $ "Could not parse hostname from "++target) $ uriRegName uri
port <- maybeToEither (ConnectExcept $ "Could not parse port from "++target) $ uriPort uri
return (host, fromIntegral port)
maybeToEither :: a -> Maybe b -> Either a b
maybeToEither left m = case m of
Just x -> Right x
Nothing -> Left left
callServiceWithMaster :: forall a b. (RosBinary a, SrvInfo a, RosBinary b, SrvInfo b) =>
URI -> ServiceName -> a -> IO (Either ServiceResponseExcept b)
callServiceWithMaster rosMaster serviceName message = runExceptT $ do
checkServicesMatch message (undefined::b)
(code, statusMessage, serviceUrl) <- lookupService rosMaster callerID serviceName
checkLookupServiceCode code statusMessage
(host, port) <- ExceptT . return $ parseHostAndPort serviceUrl
let makeSocket = socket AF_INET Sock.Stream defaultProtocol
closeSocket sock = liftIO $ sClose sock
withSocket sock = do
ioErrorToExceptT ConnectExcept "Problem connecting to server. Got exception : " $ do
ip <- hostAddress <$> getHostByName host
connect sock $ SockAddrInet port ip
let reqMd5 = srvMD5 message
reqServiceType = srvTypeName message
negotiateService sock serviceName reqServiceType reqMd5
let bytes = runPut $ putMsg 0 message
ioErrorToExceptT SendRequestExcept "Problem sending request. Got exception: " $
sendBS sock bytes
liftIO $ socketToHandle sock ReadMode
handle <- bracketOnErrorME (liftIO makeSocket) closeSocket withSocket
result <- getServiceResult handle
liftIO $ hClose handle
return result
where
callerID = "roshask"
checkLookupServiceCode 1 _ = return ()
checkLookupServiceCode code statusMessage =
throwError $ MasterExcept
("lookupService failed, code: " ++ show code ++ ", statusMessage: " ++ statusMessage)
checkServicesMatch x y =
unless match $
error "Request and response type do not match"
where
match = srvMD5 x == srvMD5 y && srvTypeName x == srvTypeName y
bracketOnErrorME :: MonadError e m => m a -> (a -> m b) -> (a -> m c) -> m c
bracketOnErrorME before after thing = do
a <- before
let handler e = after a >> throwError e
catchError (thing a) handler
catchConvertIO :: (String -> a) -> IO b -> IO (Either a b)
catchConvertIO excep action = do
err <- tryIOError action
return $ case err of
Left e -> Left . excep $ show e
Right r -> Right r
ioErrorToExceptT :: (String -> e) -> String -> IO a -> ExceptT e IO a
ioErrorToExceptT except msg acc =
ExceptT . catchConvertIO (\m -> except $ msg ++ m) $ acc
negotiateService :: Socket -> String -> String -> String -> ExceptT ServiceResponseExcept IO ()
negotiateService sock serviceName serviceType md5 = do
headerBytes <- liftIO $
do sendAll sock $ genHeader [ ("callerid", "roshask"), ("service", serviceName)
, ("md5sum", md5), ("type", serviceType) ]
responseLength <- runGet (fromIntegral <$> getWord32le) <$>
BL.fromChunks . (:[]) <$> recvAll sock 4
recvAll sock responseLength
let connHeader = parseHeader headerBytes
case lookup "error" connHeader of
Nothing -> return ()
Just _ -> throwError . ConHeadExcept $
"Connection header from server has error, connection header is: " ++ show connHeader
mkPubNegotiator :: MsgInfo a => a -> Socket -> IO ()
mkPubNegotiator x = negotiatePub (msgTypeName x) (sourceMD5 x)
runServerAux :: (Socket -> IO ()) ->
(TVar [(Config (), RingChan ByteString)] -> Config ()) ->
(URI -> Int -> IO ()) -> Int -> Config (Config (), Int)
runServerAux negotiate pubAction _updateStats bufferSize =
do r <- ask
liftIO . withSocketsDo $ runReaderT go r
where go = do sock <- liftIO $ socket AF_INET Sock.Stream defaultProtocol
liftIO $ bindSocket sock (SockAddrInet aNY_PORT iNADDR_ANY)
port <- liftIO (fromInteger . toInteger <$> socketPort sock)
liftIO $ listen sock 5
clients <- liftIO $ newTVarIO []
let mkBuffer = newRingChan bufferSize
acceptThread <- forkConfig $
acceptClients sock clients negotiate mkBuffer
pubThread <- forkConfig $ pubAction clients
let cleanup = liftIO (atomically (readTVar clients)) >>=
sequence_ . map fst >>
liftIO (shutdown sock ShutdownBoth >>
killThread acceptThread >>
killThread pubThread)
return (cleanup, port)
runServer :: forall a. (RosBinary a, MsgInfo a) =>
Topic IO a -> (URI -> Int -> IO ()) -> Int ->
Config (Config (), Int)
runServer stream = runServerAux (mkPubNegotiator (undefined::a))
(pubStream stream)
data MsgInfoRcd = MsgInfoRcd { _md5, _typeName :: String }
data Feeder = Feeder MsgInfoRcd
Int
(URI -> Int -> IO ())
(TVar [(Config (), RingChan ByteString)] -> Config ())
feedTopic :: forall a. (MsgInfo a, RosBinary a) =>
(URI -> Int -> IO ()) -> Int -> IO (Feeder, a -> IO ())
feedTopic updateStats bufSize =
do (feed,pub) <- pubStreamIO
let f = Feeder info bufSize updateStats feed
return (f, pub)
where info = mkInfo (undefined::a)
mkInfo x = MsgInfoRcd (msgTypeName x) (sourceMD5 x)
runServers :: [Feeder] -> Config (Config (), [Int])
runServers = return . first sequence_ . unzip <=< mapM feed
where feed (Feeder (MsgInfoRcd md5 typeName) bufSize stats push) =
let pub = negotiatePub typeName md5
in runServerAux pub push stats bufSize