module Network.Mom.Patterns.Broker.Broker (withBroker)
where
import Network.Mom.Patterns.Types
import Network.Mom.Patterns.Streams
import Network.Mom.Patterns.Broker.Common
import qualified Registry as R
import Heartbeat (hbPeriodReached)
import qualified Data.ByteString as B
import qualified Data.ByteString.Char8 as BC
import qualified Data.Conduit as C
import Data.Conduit ((=$), ($$))
import Data.Time.Clock
import Control.Applicative ((<$>))
import Control.Monad.Trans (liftIO)
import Control.Monad (when)
import Prelude hiding (catch)
import Control.Exception (throwIO)
import Control.Concurrent.MVar
withBroker :: Context -> Service -> Msec -> String -> String ->
OnError_ -> (Controller -> IO r) -> IO r
withBroker ctx srv tmo aClients aServers onerr ctrl | tmo <= 0 =
throwIO $ MDPExc "Heartbeat is mandatory"
| otherwise = do
r <- R.newReg tmo
t <- getCurrentTime
m <- newMVar t
withStreams ctx srv (1000 * fromIntegral tmo)
[Poll "servers" aServers RouterT Bind [] [],
Poll "clients" aClients RouterT Bind [] []]
(handleTmo r m tmo) onerr
(handleStream r m tmo) ctrl
handleTmo :: R.Registry -> MVar UTCTime -> Msec -> Streamer -> IO ()
handleTmo r m tmo s = hbPeriodReached m tmo >>= \x ->
when x $ R.checkWorker r >>= mapM_ sndHb
where hbM i = [i, B.empty, mdpW01, xHeartBeat]
sndHb i = C.runResourceT $ streamList (hbM i) $$
passAll s ["servers"]
handleStream :: R.Registry -> MVar UTCTime -> Msec -> StreamSink
handleStream r m x s =
let action | getSource s == "clients" = recvClient r s
| getSource s == "servers" = recvWorker r s
| otherwise = return ()
in action >> liftIO (handleTmo r m x s)
recvClient :: R.Registry -> StreamSink
recvClient r s = mdpCRcvReq >>= uncurry go
where go i sn | hdr sn == mmiHdr = handleMMI i sn
| otherwise = handleReq i sn
handleReq i sn = do
mbW <- liftIO $ R.getWorker r sn
case mbW of
Nothing -> noWorker sn
Just w -> sendRequest w [i] s
handleMMI i sn | srvc sn /= mmiSrv = C.yield mmiNimpl =$
sendReply sn [i] s
| otherwise = do
mbX <- C.await
case mbX of
Nothing -> liftIO (throwIO $ MMIExc
"No ServiceName in mmi.service request")
Just x -> do
m <- bool2MMI <$> liftIO (R.lookupService r x)
C.yield m =$ sendReply sn [i] s
bool2MMI True = mmiFound
bool2MMI False = mmiNotFound
hdr = B.take 4
srvc = B.drop 4
noWorker sn = liftIO (throwIO $ BrokerExc $
"No Worker for service " ++ BC.unpack sn)
recvWorker :: R.Registry -> StreamSink
recvWorker r s = do
f <- mdpWRcvRep
case f of
WBeat w -> liftIO (R.updWorkerHb r w)
WReady w sn -> liftIO $ R.insert r w sn
WReply w is -> handleReply r w is s
WDisc w -> liftIO $ R.remove r w
_ -> liftIO (throwIO $ Ouch "Unexpected Frame from Worker!")
handleReply :: R.Registry -> Identity -> [Identity] -> StreamSink
handleReply r w is s = do
mbS <- liftIO $ R.getServiceName r w
case mbS of
Nothing -> liftIO (throwIO $ ServerExc "Unknown Worker")
Just sn -> sendReply sn is s >> liftIO (R.freeWorker r w)
sendRequest :: Identity -> [Identity] -> StreamSink
sendRequest w is s = mdpWSndReq w is =$ passAll s ["servers"]
sendReply :: B.ByteString -> [Identity] -> StreamSink
sendReply sn is s = mdpCSndRep sn is =$ passAll s ["clients"]