module Network.Mom.Patterns.Broker.Server (withServer)
where
import Prelude hiding (catch)
import Control.Exception (throwIO, finally)
import Control.Monad.Trans (liftIO)
import Control.Monad (when)
import Control.Concurrent.MVar
import Data.Conduit ((=$), (=$=), ($$))
import qualified Data.Conduit as C
import qualified Data.ByteString as B
import Network.Mom.Patterns.Types
import Network.Mom.Patterns.Streams
import Network.Mom.Patterns.Broker.Common
import Heartbeat
import Data.Time.Clock
withServer :: Context ->
Service ->
Msec ->
String ->
OnError_ ->
Conduit_ ->
(Control a) -> IO a
withServer ctx srv tmo add onErr serve act | tmo <= 0 =
throwIO $ MDPExc "Heartbeat is mandatory"
| otherwise = do
t <- getCurrentTime
m <- newMVar t
h <- newMVar (timeAdd t (tolerance * tmo))
withStreams ctx srv (1000 * fromIntegral tmo)
[Poll "client" add DealerT Connect [] []]
(handleTmo m h tmo) onErr
(job m h) $ \c ->
finally (send c ["client"] (mdpWConnect srv) >> act c)
(send c ["client"] mdpWDisconnect)
where job m h s
| getSource s == "client" = mdpServe m h s
| otherwise = return ()
mdpServe m h s = do
f <- mdpWRcvReq
case f of
WRequest is -> liftIO (updBeat h) >>
serve =$= mdpWSndRep is =$ passAll s ["client"]
WBeat _ -> liftIO (updBeat h)
WDisc _ -> liftIO (throwIO $ BrokerExc
"Broker disconnects")
_ -> liftIO (throwIO $ Ouch
"Unknown frame from Broker!")
liftIO (handleTmo m h tmo s)
updBeat h = modifyMVar_ h $ \_ -> do
now <- getCurrentTime
return (now `timeAdd` (tolerance * tmo))
handleTmo :: MVar UTCTime -> MVar UTCTime -> Msec -> Streamer -> IO ()
handleTmo m h tmo s = do hbPeriodReached m tmo >>= \x -> when x sndHb
hbDelay >>= \x -> when x $ throwIO $
BrokerExc $ "Missing heartbeat"
where sndHb = C.runResourceT $
streamList [B.empty, mdpW01, xHeartBeat] $$
passAll s ["client"]
hbDelay = getCurrentTime >>= \now ->
readMVar h >>= \t -> return (now > t)