module Service ( Service, srvContext, srvName, srvId, stop, pause, resume, changeParam, changeOption, addDevice, remDevice, changeTimeout, withService, poll, xpoll, XPoll(..), periodic, periodicSend , Command(..), DevCmd(..) -- if test ) where import Factory import Types import qualified Data.ByteString.Char8 as B import Data.Time.Clock import Data.List (isPrefixOf) import Data.Map (Map) import qualified Data.Map as Map import Control.Concurrent import Control.Applicative ((<$>)) import Control.Monad import Prelude hiding (catch) import Control.Exception (bracket, finally) import qualified System.ZMQ as Z ------------------------------------------------------------------------ -- | Generic Service data type; -- 'Service' is passed to application-defined actions -- used with background services, namely -- withServer, withPeriodicPub, withSub, withPuller and withDevice. ------------------------------------------------------------------------ data Service = Service { srvCtx :: Z.Context, -- | Obtains the service name srvName :: String, srvCmd :: Z.Socket Z.Pub, srvId :: ThreadId } ------------------------------------------------------------------------ -- | Obtains the 'Z.Context' from 'Service' ------------------------------------------------------------------------ srvContext :: Service -> Z.Context srvContext = srvCtx ------------------------------------------------------------------------ -- Stops a service - used internally only ------------------------------------------------------------------------ stop :: Service -> IO () stop = sendCmd STOP ------------------------------------------------------------------------ -- | Pauses the 'Service' ------------------------------------------------------------------------ pause :: Service -> IO () pause = sendCmd PAUSE ------------------------------------------------------------------------ -- | Resumes the 'Service' ------------------------------------------------------------------------ resume :: Service -> IO () resume = sendCmd RESUME ------------------------------------------------------------------------ -- | Changes the 'Service' control parameter ------------------------------------------------------------------------ changeParam :: Service -> Parameter -> IO () changeParam s c = sendCmd (APP c) s ------------------------------------------------------------------------ -- | Changes SocketOption ------------------------------------------------------------------------ changeOption :: Service -> Z.SocketOption -> IO () changeOption s o = sendCmd (OPT o) s ------------------------------------------------------------------------ -- | Adds a 'PollEntry' to a device; -- the 'Service', of course, must be a device, -- the command is otherwise ignored. ------------------------------------------------------------------------ addDevice :: Service -> PollEntry -> IO () addDevice s p = sendDevCmd (ADD p) s ------------------------------------------------------------------------ -- | Removes a 'PollEntry' from a device; -- the 'Service', of course, must be a device, -- the command is otherwise ignored. ------------------------------------------------------------------------ remDevice :: Service -> Identifier -> IO () remDevice s i = sendDevCmd (REM i) s ------------------------------------------------------------------------ -- | Changes the timeout of a device; -- the 'Service', of course, must be a device, -- the command is otherwise ignored. ------------------------------------------------------------------------ changeTimeout :: Service -> Timeout -> IO () changeTimeout s t = sendDevCmd (TMO t) s ------------------------------------------------------------------------ -- Service commands ------------------------------------------------------------------------ data Command = STOP | PAUSE | RESUME | DEVICE DevCmd -- device specific commands | APP String -- change control parameter | OPT Z.SocketOption -- change socket option deriving (Eq, Show, Read) ------------------------------------------------------------------------ -- Device-specific commands ------------------------------------------------------------------------ data DevCmd = ADD PollEntry | REM Identifier | TMO Z.Timeout deriving (Eq, Show, Read) ------------------------------------------------------------------------ -- Send a command ------------------------------------------------------------------------ sendCmd :: Command -> Service -> IO () sendCmd c s = Z.send (srvCmd s) (B.pack $ show c) [] ------------------------------------------------------------------------ -- Send device-specific command ------------------------------------------------------------------------ sendDevCmd :: DevCmd -> Service -> IO () sendDevCmd d = sendCmd (DEVICE d) ------------------------------------------------------------------------ -- Parse a command string ------------------------------------------------------------------------ readCmd :: String -> Either String Command readCmd s = case s of "STOP" -> Right STOP "PAUSE" -> Right PAUSE "RESUME" -> Right RESUME x -> if "APP" `isPrefixOf` x || "OPT" `isPrefixOf` x || "DEVICE" `isPrefixOf` x then Right $ read x else Left $ "No Command: " ++ x ------------------------------------------------------------------------ -- The work horse behind "with*" services -- - starts the service in a separate thread and -- waits until it is ready -- - executes the control action -- - stops the service and waits for its termination ------------------------------------------------------------------------ withService :: Z.Context -> String -> String -> (Z.Context -> String -> String -> String -> IO () -> IO ()) -> (Service -> IO a) -> IO a withService ctx name param service action = do running <- newEmptyMVar ready <- newEmptyMVar Z.withSocket ctx Z.Pub $ \cmd -> do sn <- ("inproc://srv_" ++) <$> show <$> mkUniqueId Z.bind cmd sn bracket (start sn cmd ready running) (\s -> stop s >> takeMVar running) (doAction ready) where start sn cmd ready m = do let imReady = putMVar ready () tid <- forkIO $ finally (service ctx name sn param imReady) (putMVar m ()) return $ Service ctx name cmd tid doAction ready srv = takeMVar ready >>= \_ -> action srv ------------------------------------------------------------------------ -- Poll on a command socket and the service socket ------------------------------------------------------------------------ poll :: Bool -> [Z.Poll] -> (String -> IO ()) -> String -> IO () poll paused poller rcv param | paused = handleCmd paused poller rcv param | otherwise = do [c, s] <- Z.poll poller (-1) case c of Z.S _ Z.In -> handleCmd paused poller rcv param _ -> case s of Z.S _ Z.In -> rcv param >> poll paused poller rcv param _ -> poll paused poller rcv param ------------------------------------------------------------------------ -- Handle a message received on the command socket ------------------------------------------------------------------------ handleCmd :: Bool -> [Z.Poll] -> (String -> IO ()) -> String -> IO () handleCmd paused poller@[Z.S sock _, _] rcv param = do x <- Z.receive sock [] case readCmd $ B.unpack x of Left _ -> poll paused poller rcv param -- ignore Right cmd -> case cmd of STOP -> return () PAUSE -> poll True poller rcv param RESUME -> poll False poller rcv param APP p -> poll paused poller rcv p OPT o -> changeOpt poller o >> poll paused poller rcv param _ -> poll paused poller rcv param -- ignore handleCmd _ _ _ _ = ouch "invalid poller in 'handleCmd'!" ------------------------------------------------------------------------ -- Change a socket option ------------------------------------------------------------------------ changeOpt :: [Z.Poll] -> Z.SocketOption -> IO () changeOpt (_:Z.S s _:_) o = Z.setOption s o changeOpt _ _ = return () ------------------------------------------------------------------------ -- XPoll descriptor for device services ------------------------------------------------------------------------ data XPoll = XPoll { xpCtx :: Z.Context, xpTmo :: Z.Timeout, xpMap :: Map Identifier Z.Poll, xpIds :: [Identifier], xpPoll :: [Z.Poll] } ------------------------------------------------------------------------ -- Remove a poll entry ------------------------------------------------------------------------ xpDelete :: Identifier -> XPoll -> XPoll xpDelete i xp = let (p:pp) = xpPoll xp (is, ps) = go (xpIds xp) pp in xp {xpMap = Map.delete i $ xpMap xp, xpIds = is, xpPoll = p:ps} where go _ [] = ([], []) go [] _ = ([], []) go (d:ds) (p:ps) = if i == d then go ds ps else let ( ds', ps') = go ds ps in (d:ds', p:ps') ------------------------------------------------------------------------ -- Polling for device-based services: -- - a command socket -- - and a set of device sockets ------------------------------------------------------------------------ xpoll :: Bool -> MVar XPoll -> (String -> IO ()) -> (Identifier -> Z.Poll -> String -> IO ()) -> String -> IO () xpoll paused mxp ontmo rcv param | paused = handleCmdX paused mxp ontmo rcv param | otherwise = do xp <- readMVar mxp (c:ss) <- Z.poll (xpPoll xp) (xpTmo xp) case c of Z.S _ Z.In -> handleCmdX paused mxp ontmo rcv param _ -> go (xpIds xp) ss where go _ [] = ontmo param >> xpoll paused mxp ontmo rcv param go (i:is) (s:ss) = case s of Z.S _ Z.In -> rcv i s param >> xpoll paused mxp ontmo rcv param _ -> go is ss go _ _ = ouch "Invalid xpoll entries" ------------------------------------------------------------------------ -- Handle messages received through the command socket -- of a device service ------------------------------------------------------------------------ handleCmdX :: Bool -> MVar XPoll -> (String -> IO ()) -> (Identifier -> Z.Poll -> String -> IO ()) -> String -> IO () handleCmdX paused mxp ontmo rcv param = do xp <- readMVar mxp case xpPoll xp of (Z.S sock _ : _) -> do x <- Z.receive sock [] case readCmd $ B.unpack x of Left e -> do putStrLn $ e ++ ": " ++ B.unpack x xpoll paused mxp ontmo rcv param Right cmd -> case cmd of STOP -> return () PAUSE -> xpoll True mxp ontmo rcv param RESUME -> xpoll False mxp ontmo rcv param APP p -> xpoll paused mxp ontmo rcv p OPT _ -> xpoll paused mxp ontmo rcv param -- opt! DEVICE d -> do modifyMVar_ mxp (\_ -> handleDevCmd d xp) xpoll False mxp ontmo rcv param _ -> ouch "invalid poller in 'handleCmdX'!" ------------------------------------------------------------------------ -- Handle a device command ------------------------------------------------------------------------ handleDevCmd :: DevCmd -> XPoll -> IO XPoll handleDevCmd d xp = case d of TMO t -> return xp {xpTmo = t} REM i -> case Map.lookup i (xpMap xp) of Just (Z.S s _) -> safeClose s >> return (xpDelete i xp) _ -> return xp ADD p -> do s <- access (xpCtx xp) (pollType p) (pollLink p) (pollOs p) (pollAdd p) (pollSub p) case xpPoll xp of (c:ss) -> do let i = pollId p return xp {xpPoll = c:s:ss, xpIds = i:xpIds xp, xpMap = Map.insert i s $ xpMap xp} _ -> return xp ------------------------------------------------------------------------ -- Publish periodically ------------------------------------------------------------------------ periodicSend :: Bool -> Z.Timeout -> Z.Socket Z.Sub -> (String -> IO ()) -> String -> IO () periodicSend paused period cmd send param = do release <- getCurrentTime periodicSend_ paused period release cmd send param periodicSend_ :: Bool -> Z.Timeout -> UTCTime -> Z.Socket Z.Sub -> (String -> IO ()) -> String -> IO () periodicSend_ paused period release cmd send param | paused = handleCmdSnd True period release cmd send param | otherwise = send param >> handleCmdSnd paused period release cmd send param ------------------------------------------------------------------------ -- Poll on a publisher's command socket ------------------------------------------------------------------------ handleCmdSnd :: Bool -> Z.Timeout -> UTCTime -> Z.Socket Z.Sub -> (String -> IO ()) -> String -> IO () handleCmdSnd paused period release sock send param = do [Z.S _ evt] <- Z.poll [Z.S sock Z.In] 0 case evt of Z.In -> do x <- Z.receive sock [] release' <- waitNext period release case readCmd $ B.unpack x of Left _ -> periodicSend_ paused period release' sock send param Right cmd -> case cmd of STOP -> return () PAUSE -> periodicSend_ True period release' sock send param RESUME -> periodicSend_ False period release' sock send param APP p -> periodicSend_ paused period release' sock send p _ -> periodicSend_ paused period release' sock send param _ -> do release' <- waitNext period release periodicSend_ paused period release' sock send param ------------------------------------------------------------------------ -- Doing something periodically ------------------------------------------------------------------------ periodic :: Z.Timeout -> IO () -> IO () periodic period act = getCurrentTime >>= go where go release = act >> waitNext period release >>= go ------------------------------------------------------------------------ -- Wait for the next release point ------------------------------------------------------------------------ waitNext :: Z.Timeout -> UTCTime -> IO UTCTime waitNext period release = do now <- getCurrentTime let next = release `timeAdd` period if (now `timeAdd` 1) >= next then return now else do let sleepTime = next `diffUTCTime` now threadDelay (nominal2mu sleepTime) getCurrentTime ------------------------------------------------------------------------ -- Some time processing helpers ------------------------------------------------------------------------ timeAdd :: UTCTime -> Z.Timeout -> UTCTime timeAdd t p = mu2nominal p `addUTCTime` t mu2nominal :: Z.Timeout -> NominalDiffTime mu2nominal m = (fromIntegral m / 1000000)::NominalDiffTime nominal2mu :: NominalDiffTime -> Int nominal2mu n = ceiling (n * fromIntegral (1000000::Int))