module Network.Mom.Patterns.Broker.Broker (withBroker)
where
import Network.Mom.Patterns.Types
import Network.Mom.Patterns.Streams
import Network.Mom.Patterns.Broker.Common
import qualified Registry as R
import Heartbeat (hbPeriodReached)
import qualified Data.ByteString as B
import qualified Data.ByteString.Char8 as BC
import qualified Data.Conduit as C
import Data.Conduit ((=$), ($$))
import Data.Time.Clock
import Control.Applicative ((<$>))
import Control.Monad.Trans (liftIO)
import Control.Monad (when)
import Prelude hiding (catch)
import Control.Exception (throwIO, finally)
import Control.Concurrent.MVar
import System.IO.Unsafe
withBroker :: Context -> Service -> Msec -> String -> String ->
OnError_ -> (Controller -> IO r) -> IO r
withBroker ctx srv tmo aClients aServers onerr ctrl | tmo <= 0 =
throwIO $ MDPExc "Heartbeat is mandatory"
| otherwise =
lockBroker $ \_ -> do
R.clean
R.setHbPeriod tmo
t <- getCurrentTime
m <- newMVar t
withStreams ctx srv (1000 * fromIntegral tmo)
[Poll "servers" aServers RouterT Bind [] [],
Poll "clients" aClients RouterT Bind [] []]
(handleTmo m tmo) onerr
(handleStream m tmo) ctrl
handleTmo :: MVar UTCTime -> Msec -> Streamer -> IO ()
handleTmo m tmo s = hbPeriodReached m tmo >>= \x ->
when x $ R.checkWorker >>= mapM_ sndHb
where hbM i = [i, B.empty, mdpW01, xHeartBeat]
sndHb i = C.runResourceT $ streamList (hbM i) $$
passAll s ["servers"]
handleStream :: MVar UTCTime -> Msec -> StreamSink
handleStream m x s =
let action | getSource s == "clients" = recvClient s
| getSource s == "servers" = recvWorker s
| otherwise = return ()
in action >> liftIO (handleTmo m x s)
recvClient :: StreamSink
recvClient s = mdpCRcvReq >>= uncurry go
where go i sn | hdr sn == mmiHdr = handleMMI i sn
| otherwise = handleReq i sn
handleReq i sn = do
mbW <- liftIO $ R.getWorker sn
case mbW of
Nothing -> noWorker sn
Just w -> sendRequest w [i] s
handleMMI i sn | srvc sn /= mmiSrv = C.yield mmiNimpl =$
sendReply sn [i] s
| otherwise = do
mbX <- C.await
case mbX of
Nothing -> liftIO (throwIO $ MMIExc
"No ServiceName in mmi.service request")
Just x -> do
m <- bool2MMI <$> liftIO (R.lookupService x)
C.yield m =$ sendReply sn [i] s
bool2MMI True = mmiFound
bool2MMI False = mmiNotFound
hdr = B.take 4
srvc = B.drop 4
noWorker sn = liftIO (throwIO $ BrokerExc $
"No Worker for service " ++ BC.unpack sn)
recvWorker :: StreamSink
recvWorker s = do
f <- mdpWRcvRep
case f of
WBeat w -> liftIO (R.updWorkerHb w)
WReady w sn -> liftIO $ R.insert w sn
WReply w is -> handleReply w is s
WDisc w -> liftIO $ R.remove w
_ -> liftIO (throwIO $ Ouch "Unexpected Frame from Worker!")
handleReply :: Identity -> [Identity] -> StreamSink
handleReply w is s = do
mbS <- liftIO $ R.getServiceName w
case mbS of
Nothing -> liftIO (throwIO $ ServerExc "Unknown Worker")
Just sn -> sendReply sn is s >> liftIO (R.freeWorker w)
sendRequest :: Identity -> [Identity] -> StreamSink
sendRequest w is s = mdpWSndReq w is =$ passAll s ["servers"]
sendReply :: B.ByteString -> [Identity] -> StreamSink
sendReply sn is s = mdpCSndRep sn is =$ passAll s ["clients"]
_brk :: MVar ()
_brk = unsafePerformIO $ newMVar ()
lockBroker :: (() -> IO b) -> IO b
lockBroker act = do
mb_ <- tryTakeMVar _brk
case mb_ of
Nothing -> throwIO $ SingleBrokerExc "Another broker is running!"
Just _ -> finally (act ()) (putMVar _brk ())