{-# OPTIONS_GHC -XScopedTypeVariables -XDeriveDataTypeable #-}
{-# OPTIONS_HADDOCK hide #-}

module Control.Distributed.STM.EnvAddr (ID, STMID, VarID, VersionID, uniqueId,
                EnvAddr, gMyEnv, gMySocket, VarLink (VarLink),
                connectToRetryEnv, connectToEnvHold, connectAtomicToEnv,
                SomeDistTVarException (PropagateDistTVarFail, CommunicationFail,
                NodeConnectionFail), propagateEx, distTVarExEnv,
                sendTCP, recvTCP, aStat, printTCPStat,
                RemMsgIdx (ATM,INV,STT,CTT,ENC,RES,AEA,DEA,LFC,DRP,RDT),
                gNameServerPort, connectToNameServer) where

import Control.Concurrent
import Control.Exception
import Data.Dynamic
import Control.Distributed.STM.DebugBase
import Control.Distributed.STM.Utils
import Network
import Network.BSD
import Network.Socket
import Maybe
import Prelude as P hiding (catch, putStr, putStrLn)
import System.IO
import System.IO.Unsafe

-----------------
-- Identifiers --
-----------------

type ID         = Integer
type STMID      = ID
type VarID      = ID
type VersionID  = ID

---------------------------------------
-- Unique IDs from common pool for   --
-- STM, TVar, TVar-Version, RetryVar --
---------------------------------------

gInitID :: ID
gInitID = 1

gFreshIds :: MVar [ID]
{-# NOINLINE gFreshIds #-}
gFreshIds  = unsafePerformIO (newMVar [gInitID..])

uniqueId :: IO ID
uniqueId = modifyMVar gFreshIds (\(h:t) -> return (t,h))

freeGlobalId :: ID -> IO ()
freeGlobalId n = modifyMVar_ gFreshIds (return . (n:))

-------------------------
-- Network Environment --
-------------------------

gNameServerPort :: PortNumber
gNameServerPort = 60000

gServerPort :: PortNumber
gServerPort = 60001

freshPort :: PortNumber -> IO (PortID, Socket)
freshPort p = catch (do
                     s <- listenOn (PortNumber p)
                     return (PortNumber p,s)
                     )(\(_::SomeException) -> freshPort (p+1))

type IPAddr  = String

data EnvAddr = EnvAddr PortID IPAddr
  deriving (Show, Read, Eq, Ord)

data VarLink = VarLink EnvAddr -- host TVar node
                       VarID   -- TVar Id on host TVar node
  deriving (Eq, Show, Read)

instance Show PortID where
 show (PortNumber p) = show p
 show _      = "Show PortID: otherPort" -- internal error

instance Read PortID where
 readsPrec _ str0 =
     case reads str0 of
       ((p,str2):_) -> [(PortNumber (fromInteger p),str2)]
       o -> error ("error readsPrec PortID" ++ show o)

instance Eq PortID where
 (PortNumber p1)==(PortNumber p2) = p1==p2
 _ == _ = False -- error

instance Ord PortID where
 (PortNumber p1)<=(PortNumber p2) = p1<=p2
 _ <= _ = False -- error

gMyIpAddr :: IPAddr
{-# NOINLINE gMyIpAddr #-}
gMyIpAddr = unsafePerformIO $ do
  hName <- getHostName
  hEntry <- getHostByName hName
  inet_ntoa (hostAddress hEntry)

gMyPort :: PortID
{-# NOINLINE gMyPort #-}
gMySocket :: Socket
{-# NOINLINE gMySocket #-}
(gMyPort,gMySocket) = unsafePerformIO $ do
  (port,sock) <- freshPort gServerPort
  return (port,sock)

gMyEnv :: EnvAddr -- initialize environment
gMyEnv = (EnvAddr $! gMyPort) $! gMyIpAddr

nameServerEnv :: String -> EnvAddr
nameServerEnv = EnvAddr (PortNumber gNameServerPort)

-----------------
-- Post Office --
-----------------

type TCPSndConn = (EnvAddr, Handle)

gTCPSndConn :: MVar [TCPSndConn]
{-# NOINLINE gTCPSndConn #-}
gTCPSndConn = unsafePerformIO (newMVar [])

lookupTCPSndEnv :: Handle -> IO EnvAddr
lookupTCPSndEnv h = do
  conns <- readMVar gTCPSndConn
  debugStrLn5 ("lookupTCPSndEnv: "++show conns++"\n"++show h++"\n"++show (lookupSndEnv h conns))
  return (fromJust (lookupSndEnv h conns))

lookupSndEnv :: Handle -> [TCPSndConn] -> Maybe EnvAddr
lookupSndEnv _ [] = Nothing
lookupSndEnv key ((env, h):conns) | key == h = Just env
                                  | otherwise = lookupSndEnv key conns

connectToEnvHold' :: EnvAddr -> IO Handle
connectToEnvHold' env = catch (do
  sndConns <- readMVar gTCPSndConn
  case P.lookup env sndConns of
    Just h  -> return h
    Nothing -> do -- no tcp yet, establish (one and only) unidir. conn.
      h <- connectToEnv' env
      modifyMVar_ gTCPSndConn (return . ((env, h):))
      return h
  )(propagateEx "connectToEnvHold'")

type TCPRecConn = (EnvAddr, [Handle])

gTCPRecConn :: MVar [TCPRecConn]
{-# NOINLINE gTCPRecConn #-}
gTCPRecConn = unsafePerformIO (newMVar [])

gTCPTotalRecConn :: MVar [TCPRecConn]
{-# NOINLINE gTCPTotalRecConn #-}
gTCPTotalRecConn = unsafePerformIO (newMVar [])

-- all open connections to determine env in case of broken connection
lookupTCPRecEnv :: Handle -> IO EnvAddr
lookupTCPRecEnv h = do
  conns <- readMVar gTCPTotalRecConn
  debugStrLn5 ("lookupTCPRecEnv: "++show conns++"\n"++show h++"\n"++show (lookupRecEnv h conns))
  case lookupRecEnv h conns of
    Nothing  -> return gMyEnv -- internal error
    Just env -> return env

lookupRecEnv :: Handle -> [TCPRecConn] -> Maybe EnvAddr
lookupRecEnv _ []                = Nothing
lookupRecEnv key ((_, []):conns) = lookupRecEnv key conns
lookupRecEnv key ((env, (h:hs)):conns) 
                      | key == h  = Just env
                      | otherwise = lookupRecEnv key ((env, hs):conns)

connectAtomicToEnv' :: EnvAddr -> (Handle -> IO String) -> IO String
connectAtomicToEnv' env tcp = do
  debugStrLn3 ("connectAtomicToEnv ->: " ++ show env)
  h <- allocTcpHandle env
  debugStrLn3 ("connectAtomicToEnv -: " ++ show h)
  answer <- tcp h -- execute 2-way tcp comm. atomically w/rspct to handle
  debugStrLn3 ("connectAtomicToEnv --: " ++ show answer)
  freeTcpHandle h env
  debugStrLn3 ("connectAtomicToEnv <-: ")
  return answer

freeTcpHandle :: Handle -> EnvAddr -> IO ()
freeTcpHandle h env = do
  tcpConns <- takeMVar gTCPRecConn
  case P.lookup env tcpConns of
    Just hs -> putMVar gTCPRecConn (replaceListElems (h:hs) env tcpConns)
    Nothing -> putMVar gTCPRecConn ((env,[h]):tcpConns)


allocTcpHandle :: EnvAddr -> IO Handle
allocTcpHandle env = catch (do
  conns <- takeMVar gTCPRecConn
  case P.lookup env conns of
    Just []     -> do -- no tcp available -> get new tcp
                   putMVar gTCPRecConn conns
                   h <- connectToEnv' env
                   modifyMVar_ gTCPTotalRecConn (return . (updateListElems (h:) [h] env))
                   return h
    Just (h:hs) -> do -- get head of available tcp
                   putMVar gTCPRecConn (replaceListElems hs env conns)
                   return h
    Nothing     -> do -- no tcp yet -> get first new tcp
                   putMVar gTCPRecConn conns
                   h <- connectToEnv' env
                   modifyMVar_ gTCPTotalRecConn (return . ((env,[h]):))
                   return h
  )(propagateEx "allocTcpHandle")

connectToEnv' :: EnvAddr -> IO Handle
connectToEnv' env@(EnvAddr pid ip) = catch (do 
  h <- connectTo ip pid
  hSetBuffering h LineBuffering
  return h
  )(\e -> debugStrLn1 ("connectToEnv' " ++ show e) >> throw (NodeConnectionFail "connectToEnv' " env e))

connectToNameServer' :: String -> IO Handle
connectToNameServer' = connectToEnv'.nameServerEnv

--------------------
-- TCP Statistics --
--------------------

data RemMsgIdx = ATM|INV|STT|CTT|ENC|RES|AEA|DEA|LFC|DRP|RDT
  deriving (Eq, Ord, Show)

type STMMsgStat = (RemMsgIdx, Int)

gSTMMsgStat :: MVar [STMMsgStat]
{-# NOINLINE gSTMMsgStat #-}
gSTMMsgStat = unsafePerformIO (newMVar ((ATM, 0):(INV,0):[]))

aStat :: RemMsgIdx -> IO ()
aStat msgIdx = modifyMVar_ gSTMMsgStat (return . (insertWith (\_ x ->x+1)) msgIdx 1)

data TCPStat = TCPStat {
                conEnv      :: Int,
                conExclEnv  :: Int,
                conRetryVar :: Int,
                conNS       :: Int
                }

gTCPStat :: MVar TCPStat
{-# NOINLINE gTCPStat #-}
gTCPStat = unsafePerformIO (newMVar (TCPStat{conEnv      = 0, 
                                             conExclEnv  = 0,
                                             conRetryVar = 0,
                                             conNS       = 0}))

connectToEnvHold :: EnvAddr -> IO Handle
connectToEnvHold env = do
  modifyMVar_ gTCPStat (\ips -> return ips{conEnv=(conEnv ips)+1})
  connectToEnvHold' env

connectAtomicToEnv :: EnvAddr -> (Handle -> IO String) -> IO String
connectAtomicToEnv env tcp = do
  modifyMVar_ gTCPStat (\ips -> return ips{conExclEnv=(conExclEnv ips)+1})
  connectAtomicToEnv' env tcp

connectToRetryEnv :: EnvAddr -> IO Handle
connectToRetryEnv env = do
  modifyMVar_ gTCPStat (\ips -> return ips{conRetryVar=(conRetryVar ips)+1})
  connectToEnvHold' env

connectToNameServer :: String -> IO Handle
connectToNameServer ns = do
  modifyMVar_ gTCPStat (\ips -> return ips{conNS=(conNS ips)+1})
  connectToNameServer' ns

printTCPStat :: String -> IO ()
printTCPStat s = do
  ips <- readMVar gTCPStat
  sndConns <- readMVar gTCPSndConn
  recConns <- readMVar gTCPRecConn
  msgs <- readMVar gSTMMsgStat
  printStat s ips sndConns recConns msgs

printStat :: String -> TCPStat -> [TCPSndConn] -> [TCPRecConn] -> [STMMsgStat] 
             -> IO ()
printStat s ips sndConns recConns msgs = do
  let l = foldr (+) 1 (map (length.snd) recConns)
  takeMVar gDebugLock
  putStrLn s
  putStrLn ("--> TCP Verbindungen: " ++ showJustify (length sndConns) 5)
  putStrLn ("<-> TCP Verbindungen: " ++ showJustify l 5)
  putStrLn ("--> TCP Nachrichten:  " ++ showJustify (conEnv ips) 5)
  putStrLn ("<-> TCP Nachrichten:  " ++ showJustify (conExclEnv ips) 5)
  putStrLn ("|-> TCP Nachrichten:  " ++ showJustify (conRetryVar ips) 5)
  printMsgs msgs
  putMVar gDebugLock ()

printMsgs :: [STMMsgStat] -> IO ()
printMsgs [] = return ()
printMsgs msgs = do
  putStrLn (">-> Transaktionen:    " ++ (showJustify.snd.head) msgs 5)
  putStrLn ("-X- Transaktionen:    " ++ (showJustify.snd.head.tail) msgs 5)
  putStrLn ("<-- TCP Nachrichten:  " ++ showJustify (foldr (+) 0 (map snd ((tail.tail) msgs))) 5 ++ "\n")
  mapM_ printMsg ((tail.tail) msgs)

printMsg :: (RemMsgIdx, Int) -> IO ()
printMsg (msg, count) = putStrLn (show msg ++ ": " ++ showJustify count 5)

----------------
-- Robustness --
----------------

-- |'SomeDistTVarException' is the abstract exception type which is thrown
-- by the DSTM library when either 'readTVar' or 'writeTVar' is called on an
-- unreachable TVar.
-- A TVar becomes unreachable when the process hosting the TVar becomes
-- unreachable. 
-- An atomic transaction using a TVar which becomes unreachable during the 
-- execution of 'atomic' may either execute completely (without the unreachable
-- TVar(s)) or execute not at all depending on transaction states. In either
-- case an exception of type 'SomeDistTVarException' is raised.
data SomeDistTVarException = PropagateDistTVarFail String SomeDistTVarException
                           | CommunicationFail String EnvAddr SomeException
                           | NodeConnectionFail String EnvAddr SomeException
  deriving Typeable

instance Exception SomeDistTVarException

instance Show SomeDistTVarException where
  show (PropagateDistTVarFail loc ex) = 
    "PropagateDistTVarFail " ++ loc ++ "\n" ++ show ex
  show (CommunicationFail loc env ex)  = formEx "CommunicationFail" loc env ex
  show (NodeConnectionFail loc env ex) = formEx "NodeConnectionFail" loc env ex

formEx :: String -> String -> EnvAddr -> SomeException -> String
formEx err loc env cause = "Message : " ++ err
                        ++ "\nLocation: " ++ loc
                        ++ "\nProcess : " ++ show env
                        ++ "\nSysError: " ++ show cause ++ "\n"

propagateEx :: String -> SomeDistTVarException -> IO a
propagateEx loc e = throw (PropagateDistTVarFail (loc ++ " -> ") e)

distTVarExEnv :: SomeDistTVarException -> EnvAddr
distTVarExEnv eDist = case eDist of
  (PropagateDistTVarFail _ e)  -> distTVarExEnv e
  (CommunicationFail _ env _)  -> env
  (NodeConnectionFail _ env _) -> env

--------------------------
-- Robust Remote Access --
--------------------------

gSendLock :: MVar ()
{-# NOINLINE gSendLock #-}
gSendLock  = unsafePerformIO (newMVar ())

sendTCP :: Show a => a -> Handle -> IO ()
sendTCP msg h = catch (do
  takeMVar gSendLock
  hPutStrLn h (show msg) 
  hFlush h
  putMVar gSendLock ()
  )(\e -> do
    debugStrLn1 $ "sendTCP " ++ show e
    putMVar gSendLock ()
    env <- lookupTCPSndEnv h
    throw (CommunicationFail ("sendTCP: " ++ show msg ++ show h) env e))

recvTCP :: Show a => a -> Handle -> IO String
recvTCP msg h = catch (do 
  hPutStrLn h (show msg) 
  hFlush h 
  hGetLine h
  )(\e -> do
    debugStrLn1 $ "recvTCP " ++ show e
    env <- lookupTCPRecEnv h
    throw (CommunicationFail ("recvTCP: " ++ show msg ++ show h) env e))