module Control.Distributed.STM.EnvAddr (ID, STMID, VarID, VersionID, uniqueId,
EnvAddr, gMyEnv, gMySocket, VarLink (VarLink),
connectToRetryEnv, connectToEnvHold, connectAtomicToEnv,
SomeDistTVarException (PropagateDistTVarFail, CommunicationFail,
NodeConnectionFail), propagateEx, distTVarExEnv,
sendTCP, recvTCP, aStat, printTCPStat,
RemMsgIdx (ATM,INV,STT,CTT,ENC,RES,AEA,DEA,LFC,DRP,RDT),
gNameServerPort, connectToNameServer) where
import Control.Concurrent
import Control.Exception
import Data.Dynamic
import Control.Distributed.STM.DebugBase
import Control.Distributed.STM.Utils
import Network
import Network.BSD
import Network.Socket
import Maybe
import Prelude as P hiding (catch, putStr, putStrLn)
import System.IO
import System.IO.Unsafe
type ID = Integer
type STMID = ID
type VarID = ID
type VersionID = ID
gInitID :: ID
gInitID = 1
gFreshIds :: MVar [ID]
gFreshIds = unsafePerformIO (newMVar [gInitID..])
uniqueId :: IO ID
uniqueId = modifyMVar gFreshIds (\(h:t) -> return (t,h))
freeGlobalId :: ID -> IO ()
freeGlobalId n = modifyMVar_ gFreshIds (return . (n:))
gNameServerPort :: PortNumber
gNameServerPort = 60000
gServerPort :: PortNumber
gServerPort = 60001
freshPort :: PortNumber -> IO (PortID, Socket)
freshPort p = catch (do
s <- listenOn (PortNumber p)
return (PortNumber p,s)
)(\(_::SomeException) -> freshPort (p+1))
type IPAddr = String
data EnvAddr = EnvAddr PortID IPAddr
deriving (Show, Read, Eq, Ord)
data VarLink = VarLink EnvAddr
VarID
deriving (Eq, Show, Read)
instance Show PortID where
show (PortNumber p) = show p
show _ = "Show PortID: otherPort"
instance Read PortID where
readsPrec _ str0 =
case reads str0 of
((p,str2):_) -> [(PortNumber (fromInteger p),str2)]
o -> error ("error readsPrec PortID" ++ show o)
instance Eq PortID where
(PortNumber p1)==(PortNumber p2) = p1==p2
_ == _ = False
instance Ord PortID where
(PortNumber p1)<=(PortNumber p2) = p1<=p2
_ <= _ = False
gMyIpAddr :: IPAddr
gMyIpAddr = unsafePerformIO $ do
hName <- getHostName
hEntry <- getHostByName hName
inet_ntoa (hostAddress hEntry)
gMyPort :: PortID
gMySocket :: Socket
(gMyPort,gMySocket) = unsafePerformIO $ do
(port,sock) <- freshPort gServerPort
return (port,sock)
gMyEnv :: EnvAddr
gMyEnv = (EnvAddr $! gMyPort) $! gMyIpAddr
nameServerEnv :: String -> EnvAddr
nameServerEnv = EnvAddr (PortNumber gNameServerPort)
type TCPSndConn = (EnvAddr, Handle)
gTCPSndConn :: MVar [TCPSndConn]
gTCPSndConn = unsafePerformIO (newMVar [])
lookupTCPSndEnv :: Handle -> IO EnvAddr
lookupTCPSndEnv h = do
conns <- readMVar gTCPSndConn
debugStrLn5 ("lookupTCPSndEnv: "++show conns++"\n"++show h++"\n"++show (lookupSndEnv h conns))
return (fromJust (lookupSndEnv h conns))
lookupSndEnv :: Handle -> [TCPSndConn] -> Maybe EnvAddr
lookupSndEnv _ [] = Nothing
lookupSndEnv key ((env, h):conns) | key == h = Just env
| otherwise = lookupSndEnv key conns
connectToEnvHold' :: EnvAddr -> IO Handle
connectToEnvHold' env = catch (do
sndConns <- readMVar gTCPSndConn
case P.lookup env sndConns of
Just h -> return h
Nothing -> do
h <- connectToEnv' env
modifyMVar_ gTCPSndConn (return . ((env, h):))
return h
)(propagateEx "connectToEnvHold'")
type TCPRecConn = (EnvAddr, [Handle])
gTCPRecConn :: MVar [TCPRecConn]
gTCPRecConn = unsafePerformIO (newMVar [])
gTCPTotalRecConn :: MVar [TCPRecConn]
gTCPTotalRecConn = unsafePerformIO (newMVar [])
lookupTCPRecEnv :: Handle -> IO EnvAddr
lookupTCPRecEnv h = do
conns <- readMVar gTCPTotalRecConn
debugStrLn5 ("lookupTCPRecEnv: "++show conns++"\n"++show h++"\n"++show (lookupRecEnv h conns))
case lookupRecEnv h conns of
Nothing -> return gMyEnv
Just env -> return env
lookupRecEnv :: Handle -> [TCPRecConn] -> Maybe EnvAddr
lookupRecEnv _ [] = Nothing
lookupRecEnv key ((_, []):conns) = lookupRecEnv key conns
lookupRecEnv key ((env, (h:hs)):conns)
| key == h = Just env
| otherwise = lookupRecEnv key ((env, hs):conns)
connectAtomicToEnv' :: EnvAddr -> (Handle -> IO String) -> IO String
connectAtomicToEnv' env tcp = do
debugStrLn3 ("connectAtomicToEnv ->: " ++ show env)
h <- allocTcpHandle env
debugStrLn3 ("connectAtomicToEnv -: " ++ show h)
answer <- tcp h
debugStrLn3 ("connectAtomicToEnv --: " ++ show answer)
freeTcpHandle h env
debugStrLn3 ("connectAtomicToEnv <-: ")
return answer
freeTcpHandle :: Handle -> EnvAddr -> IO ()
freeTcpHandle h env = do
tcpConns <- takeMVar gTCPRecConn
case P.lookup env tcpConns of
Just hs -> putMVar gTCPRecConn (replaceListElems (h:hs) env tcpConns)
Nothing -> putMVar gTCPRecConn ((env,[h]):tcpConns)
allocTcpHandle :: EnvAddr -> IO Handle
allocTcpHandle env = catch (do
conns <- takeMVar gTCPRecConn
case P.lookup env conns of
Just [] -> do
putMVar gTCPRecConn conns
h <- connectToEnv' env
modifyMVar_ gTCPTotalRecConn (return . (updateListElems (h:) [h] env))
return h
Just (h:hs) -> do
putMVar gTCPRecConn (replaceListElems hs env conns)
return h
Nothing -> do
putMVar gTCPRecConn conns
h <- connectToEnv' env
modifyMVar_ gTCPTotalRecConn (return . ((env,[h]):))
return h
)(propagateEx "allocTcpHandle")
connectToEnv' :: EnvAddr -> IO Handle
connectToEnv' env@(EnvAddr pid ip) = catch (do
h <- connectTo ip pid
hSetBuffering h LineBuffering
return h
)(\e -> debugStrLn1 ("connectToEnv' " ++ show e) >> throw (NodeConnectionFail "connectToEnv' " env e))
connectToNameServer' :: String -> IO Handle
connectToNameServer' = connectToEnv'.nameServerEnv
data RemMsgIdx = ATM|INV|STT|CTT|ENC|RES|AEA|DEA|LFC|DRP|RDT
deriving (Eq, Ord, Show)
type STMMsgStat = (RemMsgIdx, Int)
gSTMMsgStat :: MVar [STMMsgStat]
gSTMMsgStat = unsafePerformIO (newMVar ((ATM, 0):(INV,0):[]))
aStat :: RemMsgIdx -> IO ()
aStat msgIdx = modifyMVar_ gSTMMsgStat (return . (insertWith (\_ x ->x+1)) msgIdx 1)
data TCPStat = TCPStat {
conEnv :: Int,
conExclEnv :: Int,
conRetryVar :: Int,
conNS :: Int
}
gTCPStat :: MVar TCPStat
gTCPStat = unsafePerformIO (newMVar (TCPStat{conEnv = 0,
conExclEnv = 0,
conRetryVar = 0,
conNS = 0}))
connectToEnvHold :: EnvAddr -> IO Handle
connectToEnvHold env = do
modifyMVar_ gTCPStat (\ips -> return ips{conEnv=(conEnv ips)+1})
connectToEnvHold' env
connectAtomicToEnv :: EnvAddr -> (Handle -> IO String) -> IO String
connectAtomicToEnv env tcp = do
modifyMVar_ gTCPStat (\ips -> return ips{conExclEnv=(conExclEnv ips)+1})
connectAtomicToEnv' env tcp
connectToRetryEnv :: EnvAddr -> IO Handle
connectToRetryEnv env = do
modifyMVar_ gTCPStat (\ips -> return ips{conRetryVar=(conRetryVar ips)+1})
connectToEnvHold' env
connectToNameServer :: String -> IO Handle
connectToNameServer ns = do
modifyMVar_ gTCPStat (\ips -> return ips{conNS=(conNS ips)+1})
connectToNameServer' ns
printTCPStat :: String -> IO ()
printTCPStat s = do
ips <- readMVar gTCPStat
sndConns <- readMVar gTCPSndConn
recConns <- readMVar gTCPRecConn
msgs <- readMVar gSTMMsgStat
printStat s ips sndConns recConns msgs
printStat :: String -> TCPStat -> [TCPSndConn] -> [TCPRecConn] -> [STMMsgStat]
-> IO ()
printStat s ips sndConns recConns msgs = do
let l = foldr (+) 1 (map (length.snd) recConns)
takeMVar gDebugLock
putStrLn s
putStrLn ("--> TCP Verbindungen: " ++ showJustify (length sndConns) 5)
putStrLn ("<-> TCP Verbindungen: " ++ showJustify l 5)
putStrLn ("--> TCP Nachrichten: " ++ showJustify (conEnv ips) 5)
putStrLn ("<-> TCP Nachrichten: " ++ showJustify (conExclEnv ips) 5)
putStrLn ("|-> TCP Nachrichten: " ++ showJustify (conRetryVar ips) 5)
printMsgs msgs
putMVar gDebugLock ()
printMsgs :: [STMMsgStat] -> IO ()
printMsgs [] = return ()
printMsgs msgs = do
putStrLn (">-> Transaktionen: " ++ (showJustify.snd.head) msgs 5)
putStrLn ("-X- Transaktionen: " ++ (showJustify.snd.head.tail) msgs 5)
putStrLn ("<-- TCP Nachrichten: " ++ showJustify (foldr (+) 0 (map snd ((tail.tail) msgs))) 5 ++ "\n")
mapM_ printMsg ((tail.tail) msgs)
printMsg :: (RemMsgIdx, Int) -> IO ()
printMsg (msg, count) = putStrLn (show msg ++ ": " ++ showJustify count 5)
data SomeDistTVarException = PropagateDistTVarFail String SomeDistTVarException
| CommunicationFail String EnvAddr SomeException
| NodeConnectionFail String EnvAddr SomeException
deriving Typeable
instance Exception SomeDistTVarException
instance Show SomeDistTVarException where
show (PropagateDistTVarFail loc ex) =
"PropagateDistTVarFail " ++ loc ++ "\n" ++ show ex
show (CommunicationFail loc env ex) = formEx "CommunicationFail" loc env ex
show (NodeConnectionFail loc env ex) = formEx "NodeConnectionFail" loc env ex
formEx :: String -> String -> EnvAddr -> SomeException -> String
formEx err loc env cause = "Message : " ++ err
++ "\nLocation: " ++ loc
++ "\nProcess : " ++ show env
++ "\nSysError: " ++ show cause ++ "\n"
propagateEx :: String -> SomeDistTVarException -> IO a
propagateEx loc e = throw (PropagateDistTVarFail (loc ++ " -> ") e)
distTVarExEnv :: SomeDistTVarException -> EnvAddr
distTVarExEnv eDist = case eDist of
(PropagateDistTVarFail _ e) -> distTVarExEnv e
(CommunicationFail _ env _) -> env
(NodeConnectionFail _ env _) -> env
gSendLock :: MVar ()
gSendLock = unsafePerformIO (newMVar ())
sendTCP :: Show a => a -> Handle -> IO ()
sendTCP msg h = catch (do
takeMVar gSendLock
hPutStrLn h (show msg)
hFlush h
putMVar gSendLock ()
)(\e -> do
debugStrLn1 $ "sendTCP " ++ show e
putMVar gSendLock ()
env <- lookupTCPSndEnv h
throw (CommunicationFail ("sendTCP: " ++ show msg ++ show h) env e))
recvTCP :: Show a => a -> Handle -> IO String
recvTCP msg h = catch (do
hPutStrLn h (show msg)
hFlush h
hGetLine h
)(\e -> do
debugStrLn1 $ "recvTCP " ++ show e
env <- lookupTCPRecEnv h
throw (CommunicationFail ("recvTCP: " ++ show msg ++ show h) env e))