module Ros.Graph.Slave (RosSlave(..), runSlave, requestTopicClient,
cleanupNode) where
import Control.Applicative
import Control.Concurrent (killThread, forkIO, threadDelay, MVar, putMVar,
isEmptyMVar, readMVar, modifyMVar_)
import Control.Concurrent.SSem (SSem)
import qualified Control.Concurrent.SSem as Sem
import Control.Monad.IO.Class (liftIO)
import qualified Data.ByteString.UTF8 ()
import qualified Data.ByteString.Lazy.UTF8 as BLU
import Snap.Http.Server (simpleHttpServe)
import Snap.Http.Server.Config (defaultConfig, setPort, Config, ConfigLog(..),
setVerbose, setAccessLog, setErrorLog)
import Snap.Core (Snap, readRequestBody, writeLBS, getResponse, putResponse,
setContentLength)
import Network.Socket hiding (Stream)
import qualified Network.Socket as Net
import Network.XmlRpc.Internals (Value)
import Network.XmlRpc.Server (handleCall, methods, fun)
import Network.XmlRpc.Client (remote)
#ifndef mingw32_HOST_OS
import System.Posix.Process (getProcessID)
#endif
import System.Process (readProcess)
import Ros.Internal.RosTypes
import Ros.Topic.Stats (PubStats(PubStats), SubStats(SubStats))
import Ros.Graph.Master
class RosSlave a where
getMaster :: a -> URI
getNodeName :: a -> String
getNodeURI :: a -> MVar URI
getSubscriptions :: a -> IO [(TopicName, TopicType, [(URI, SubStats)])]
getPublications :: a -> IO [(TopicName, TopicType, [(URI, PubStats)])]
publisherUpdate :: a -> TopicName -> [URI] -> IO ()
getTopicPortTCP :: a -> TopicName -> Maybe Int
setShutdownAction :: a -> IO () -> IO ()
stopNode :: a -> IO ()
#ifdef mingw32_HOST_OS
getProcessID :: IO Int
getProcessID = return 42
#endif
cleanupNode :: RosSlave n => n -> IO ()
cleanupNode n = do pubs <- getPublications n
subs <- getSubscriptions n
nuri <- readMVar (getNodeURI n)
let nname = getNodeName n
master = getMaster n
stop f (tname,_,_) = f nname tname nuri
mapM_ (stop (unregisterPublisher master)) pubs
mapM_ (stop (unregisterSubscriber master)) subs
stopNode n
type RpcResult a = IO (Int, String, a)
mkPublishStats :: (TopicName, a, [(URI, PubStats)]) ->
(TopicName, Int, [(Int, Int, Int, Bool)])
mkPublishStats (n, _, pstats) = (n, 0, map formatStats pstats)
where formatStats (_, (PubStats bytesSent numSent conn)) =
(0, bytesSent, numSent, conn)
mkSubStats :: (TopicName, a, [(URI, SubStats)]) ->
(String, Int, [(Int, Int, Int, Bool)])
mkSubStats (n, _, sstats) = (n, 0, map formatStats sstats)
where formatStats (_, (SubStats bytesReceived conn)) =
(0, bytesReceived, 1, conn)
getBusStats :: (RosSlave a) => a -> CallerID ->
RpcResult ([(String,Int,[(Int,Int,Int,Bool)])],
[(String,Int,[(Int,Int,Int,Bool)])],
(Int,Int,Int))
getBusStats n _ = do
publishStats <- map (mkPublishStats) <$> getPublications n
subscribeStats <- map (mkSubStats) <$> getSubscriptions n
let serviceStats = (0,0,0)
return (1, "", (publishStats, subscribeStats, serviceStats))
getBusInfo :: (RosSlave a) => a -> CallerID ->
RpcResult [(Int,String,String,String,String)]
getBusInfo n _ = do
pubs <- concatMap formatPubs <$> getPublications n
subs <- concatMap formatSubs <$> getSubscriptions n
return (1, "", pubs ++ subs)
where formatPubs (tname, _, stats) =
map (\(uri,_) -> (0, uri, "o", "TCPROS", tname)) stats
formatSubs (tname, _, stats) =
map (\(uri,_) -> (0, uri, "i", "TCPROS", tname)) stats
getMaster' :: RosSlave a => a -> CallerID -> IO (Int, String, URI)
getMaster' n _ = return (1, "", getMaster n)
shutdown' :: RosSlave a => a -> SSem -> CallerID -> IO (Int, String, Bool)
shutdown' n q _ = stopNode n >> Sem.signal q >> return (1, "", True)
getPid' :: CallerID -> RpcResult Int
getPid' _ = do pid <- getProcessID
return (1, "", fromEnum pid)
getSubscriptions' :: RosSlave a => a -> CallerID -> RpcResult [(String, String)]
getSubscriptions' node _ = do
subs <- map (\(n,t,_) -> (n,t)) <$> getSubscriptions node
return (1, "", subs)
getPublications' :: RosSlave a => a -> CallerID -> RpcResult [(String, String)]
getPublications' node _ = do
pubs <- map (\(n,t,_) -> (n,t)) <$> getPublications node
return (1, "", pubs)
paramUpdate' :: RosSlave a => a -> CallerID -> String -> Value -> RpcResult Bool
paramUpdate' _n _ _paramKey _paramVal = do
putStrLn "paramUpdate not implemented!"
return (1, "", True)
pubUpdate :: RosSlave a => a -> CallerID -> TopicName -> [URI] -> RpcResult Int
pubUpdate n _ topic publishers = do publisherUpdate n topic publishers
return (1, "", 1)
myName :: RosSlave a => a -> IO String
myName n = extractName `fmap` readMVar (getNodeURI n)
where extractName uri = takeWhile (/=':') $ drop 7 uri
requestTopic :: RosSlave a => a -> CallerID -> TopicName -> [[Value]] ->
RpcResult (String,String,Int)
requestTopic n _ topic _protocols =
case getTopicPortTCP n topic of
Just p -> do
host <- myName n
return (1, "", ("TCPROS",host,p))
Nothing -> return (0, "Unknown topic", ("TCPROS", "", 0))
requestTopicClient :: URI -> CallerID -> TopicName -> [[String]] ->
RpcResult (String,String,Int)
requestTopicClient = flip remote "requestTopic"
slaveRPC :: (RosSlave a) => a -> SSem -> String -> IO BLU.ByteString
slaveRPC n =
handleCall . dispatch
where dispatch q = methods [ ("getBusStats", fun (getBusStats n))
, ("getBusInfo", fun (getBusInfo n))
, ("getMasterUri", fun (getMaster' n))
, ("shutdown", fun (shutdown' n q))
, ("getPid", fun getPid')
, ("getSubscriptions", fun (getSubscriptions' n))
, ("getPublications", fun (getPublications' n))
, ("paramUpdate", fun (paramUpdate' n))
, ("publisherUpdate", fun (pubUpdate n))
, ("requestTopic", fun (requestTopic n)) ]
simpleServe :: Int -> Snap () -> IO ()
simpleServe port handler = simpleHttpServe conf handler
where conf :: Config Snap ()
conf = setAccessLog ConfigNoLog .
setErrorLog ConfigNoLog .
setVerbose False .
setPort port $
defaultConfig
findFreePort :: IO Int
findFreePort = do s <- socket AF_INET Net.Stream defaultProtocol
bindSocket s (SockAddrInet aNY_PORT iNADDR_ANY)
port <- fromInteger . toInteger <$> socketPort s
sClose s
return port
runSlave :: RosSlave a => a -> IO (IO (), Int)
runSlave n = do quitNow <- Sem.new 0
port <- findFreePort
let myUri = getNodeURI n
myPort = ":" ++ show port
myURIEmpty <- isEmptyMVar myUri
if myURIEmpty
then do myIP <- init <$> readProcess "hostname" [] ""
putMVar myUri $! "http://"++myIP++myPort
else modifyMVar_ myUri ((return $!) . (++myPort))
t <- forkIO $ simpleServe port (rpc (slaveRPC n quitNow))
let wait = do Sem.wait quitNow
threadDelay 1000000
stopNode n
killThread t
return (wait, port)
where rpc f = do body <- BLU.toString <$> readRequestBody 4096
response <- liftIO $ f body
writeLBS response
let len = fromIntegral $ BLU.length response
putResponse . setContentLength len =<< getResponse