{-# OPTIONS_GHC -fno-warn-missing-fields #-} ----------------------------------------------------------------------------- -- -- Module : Control.Concurrent.Network.Master -- Copyright : (C) 2010, Paul Sonkoly -- License : BSD style -- -- Maintainer : Paul Sonkoly -- Stability : provisional -- Portability : -- -- | The master process doesn't do any work except -- managing the slaves. The master process is handled entirely by the library -- in 'initMaster'. -- ----------------------------------------------------------------------------- module Control.Concurrent.Network.Master ( -- * Functions initMaster ) where import Control.Concurrent.Network.Protocol import Network import System.IO import Data.Binary import Data.ByteString.Lazy as DBL import Data.Map import Control.Monad import Control.Monad.Loops import Control.Concurrent import System.Log.Logger data MMVar = MMV { dat :: ByteString , empt :: Bool } data Master = M { registry :: MVar (Map String MMVar) , slaveid :: Int , numslaves :: Int } -- | The master process. Only returns when all slaves have -- closed connection. initMaster :: Int -> IO () initMaster n = do debugM rootLoggerName "Initialise master" sck <- listenOn $ PortNumber 9999 slcnt <- newMVar 0 slid <- newMVar 0 reg <- newMVar $ fromList [] mcntx <- return $ M { registry = reg } -- create n threads replicateM_ n $ do tid <- forkIO $ do slaveid' <- takeMVar slid putMVar slid $ slaveid' + 1 mymcntx <- return $ mcntx { slaveid = slaveid', numslaves = n } (hdl, hname, pnum) <- accept sck infoM rootLoggerName $ hname ++ " connected from port " ++ show pnum hSetBuffering hdl NoBuffering -- handling client requests untilM_ (do p <- readProtoId hdl debugM rootLoggerName $ (show slaveid') ++ "==> " ++ show p handlePacket mymcntx hdl p debugM rootLoggerName $ (show slaveid') ++ "<== " ++ show p ) $ hIsEOF hdl -- last thing is incrementing the slave counter modifyMVar_ slcnt (\c -> return $ c + 1) -- need the thread id from above infoM rootLoggerName $ "thread " ++ show tid ++ " is waiting for connections.." -- main thread waits for the slave counter to become 'n' debugM rootLoggerName "Waiting for internal threads.." untilM_ yield $ do cnt <- readMVar $ slcnt return $ cnt == n handlePacket :: Master -> Handle -> ProtoId -> IO () -- new 'NVar' handlePacket mcntx hdl NNV = do name <- readBinary hdl -- we don't check whether it'd been already created. modifyMVar_ (registry mcntx) $ return . (insert name MMV { empt = True }) -- put 'NVar' handlePacket mcntx hdl PNV = do name <- readBinary hdl val <- readByteString hdl untilM_ yield $ do reg <- takeMVar (registry mcntx) if not $ member name reg then putMVar (registry mcntx) reg >> return False else if not $ empt $ reg ! name then putMVar (registry mcntx) reg >> return False else do putMVar (registry mcntx) $ insert name MMV { dat = val, empt = False } reg return True writeProtoId hdl PNV -- take 'NVar' handlePacket mcntx hdl TNV = do name <- readBinary hdl untilM_ yield $ do reg <- takeMVar (registry mcntx) if not $ member name reg then putMVar (registry mcntx) reg >> return False else if empt $ reg ! name then putMVar (registry mcntx) reg >> return False else do val <- return $ reg ! name writeByteString hdl $ dat val putMVar (registry mcntx) $ insert name val { empt = True } reg return True -- poll with op handlePacket mcntx hdl PWO = do name <- readBinary hdl op <- readBinary hdl val <- readByteString hdl untilM_ yield $ do reg <- takeMVar (registry mcntx) if not $ member name reg then putMVar (registry mcntx) reg >> return False else if empt $ reg ! name then putMVar (registry mcntx) reg >> return False else if (dat (reg ! name) /= val && op == EQOP) || (dat (reg ! name) == val && op == NEQOP) then putMVar (registry mcntx) reg >> return True else putMVar (registry mcntx) reg >> return False writeProtoId hdl PWO -- slave id handlePacket mcntx hdl SID = writeBinary hdl $ slaveid mcntx -- number of slaves handlePacket mcntx hdl NSL = writeBinary hdl $ numslaves mcntx -- print message handlePacket _ hdl PMS = readBinary hdl >>= System.IO.putStrLn