{-# OPTIONS_GHC -XScopedTypeVariables #-}
{-# OPTIONS_GHC -fno-warn-unused-do-bind #-}

module Control.Distributed.STM.DSTM (TVar, STM, newTVar, readTVar, writeTVar,
             atomic, retry, orElse, throw, catch, 
             module Control.Distributed.STM.Dist,
             module Control.Distributed.STM.NameService,
             startDist, SomeDistTVarException, isDistErrTVar) where

import Control.Concurrent
import Control.Distributed.STM.DebugDSTM
import Control.Distributed.STM.Dist
import Control.Distributed.STM.EnvAddr
import Control.Distributed.STM.NameService -- just for export
import Control.Distributed.STM.RetryVar
import Control.Distributed.STM.STM
import Control.Distributed.STM.TVar
import Control.Distributed.STM.Utils
import Control.Exception hiding (catch, throw)
import qualified Control.Exception as CE (catch, throw)
import Control.Monad
import qualified Data.List
import Network
import Network.Socket hiding (accept)
import Prelude as P hiding (catch)
import System.IO
import System.IO.Unsafe
import System.Mem
import System.Process (ProcessHandle, runCommand, terminateProcess)
import System.Posix

infixl 2 `orElse`

------------------
-- TVar actions --
------------------

gInitVersionID :: ID
gInitVersionID = 1
 
-- | Create a new TVar holding a value supplied
newTVar   :: Dist a => a -> STM (TVar a)
newTVar v = STM (\stmState -> do
  newID   <- uniqueId
  newRef  <- newMVar (v, gInitVersionID)
  newLock <- newMVar ()
  waitQ   <- newMVar []
-- newDebugMVar enables later MVar inspection
--  newRef  <- newDebugMVar ("Ref:   TVar "++show newID) (v, gInitVersionID)
--  newLock <- newDebugMVar ("Lock:  TVar "++show newID) ()
--  waitQ   <- newDebugMVar ("WaitQ: TVar "++show newID) []
  let tVar = (TVar newRef newID newLock waitQ)
  return (Success stmState tVar))
 
-- |Return the current value stored in a TVar
readTVar  :: Dist a => TVar a -> STM a
readTVar tVar = STM (\stmState -> do
  (val, vId) <- readIntraTransTVar tVar (stmCommit stmState)
  let newState = stmState
        {stmValid = bundledValidLogs tVar
                                    (Just (vId, (stmRetryVar stmState)))
                                    (stmValid stmState)
        }
  return (Success newState val))

-- check if uncommitted transaction-local writes precede this read
readIntraTransTVar :: Dist a => TVar a -> [CommitLog] -> IO (a, VersionID)
readIntraTransTVar  tVar commitLog =
  case P.lookup env commitLog of
    Just (Left commitActs) ->
      case P.lookup tId commitActs of
        Just acts -> readHost tVar $ commitAct acts
        Nothing   -> coreReadTVar tVar
    Just (Right (strVals, _)) ->
      case P.lookup tId strVals of
        Just v  -> coreReadTVar tVar >>= return . ((,) (read v)) . snd
        Nothing -> coreReadTVar tVar
    Nothing -> coreReadTVar tVar
  where VarLink env tId = tVarToLink tVar

coreReadTVar  :: Dist a => TVar a -> IO (a,VersionID)
coreReadTVar (TVar tVarRef _ lock _) = do
  takeMVar lock -- read lock to prevent read before finishing remote commit
  v <- readMVar tVarRef 
  putMVar lock ()
  debugStrLn8 ("coreReadTVar Loc "++show v)
  return v
coreReadTVar (LinkTVar (VarLink tEnv tVarId)) = CE.catch (do   
  answer <- remGetMsg tEnv (RemReadTVar tVarId gMyEnv)
  let vv@(v,_) = read answer
  finTVars v
  debugStrLn8 ("coreReadTVar Rem "++show vv)
  return vv
  )(propagateEx "coreReadTVar")

-- read uncommitted transaction-local writes by keeping original version id
readHost :: forall a . Read a => TVar a -> IO () -> IO (a, VersionID)
            -- explicit forall a extends scope of a into function body
readHost (TVar tVarRef _ lock _) commit = do
  takeMVar lock
  orig@(_, origVersion) <- readMVar tVarRef
  commit
  (modV, _) <- swapMVar tVarRef orig
  putMVar lock ()
  return (modV, origVersion)
readHost (LinkTVar (VarLink _ tId)) commit = do
  lockTVarFromId tId
  origStr <- readTVarValFromId tId
  let (_::a, origVersion) = read origStr
  commit
  (modV, _::VersionID) <- liftM read (swapTVarValFromId tId origStr)
  unLockTVarFromId tId
  return (modV, origVersion)

-- |Write the supplied value into a TVar
writeTVar :: Dist a => TVar a -> a -> STM ()
writeTVar tVar v = STM (\stmState -> 
  let newState = stmState{
    stmValid  = bundledValidLogs tVar Nothing (stmValid stmState),
    stmCommit = bundledCommitLogs tVar v (stmCommit stmState)
    }
  in debugStrLn8 ("writeTVar "++show (stmCommit newState))>>return (Success newState ()))

----------------------
-- STM computations --
----------------------

-- |Perform a series of STM actions atomically
atomic :: Show a => STM a -> IO a
atomic stmAction = do
  aStat ATM
  debugStrLn7 ("A")
  iState <- initialState
  debugStrLn7 ("I")
  stmResult <- runSTM stmAction iState
  debugStrLn7 ("M")
  case stmResult of
    Exception newState e -> do 
      debugStrLn7 ("E")
      let gState = gatherStmState newState
      valid <- startTrans gState
      if valid
        then do
             endTrans gState
             CE.throw e -- propagate Exception into IO world
        else do
             endTrans gState
             atomic stmAction
    Retry newState -> do
      debugStrLn7 ("R")
      let gState = gatherStmState newState
      valid <- startTrans gState
      if valid
        then do
             debugStrLn7 ("v")
             retryTrans gState  -- including set auto-link
             endTrans gState
             let retryVar = stmRetryVar gState
             insertRetryVarAction retryVar
             debugStrLn7 ("SUSPEND "++show (stmId gState))
             suspend retryVar
             debugStrLn7 ("RESUME "++show (stmId gState))
             deleteRetryVarAction retryVar
	     unRetryTrans gState  -- reset auto-link
        else do
    	     debugStrLn7 ("i")
             endTrans gState
             aStat INV
      atomic stmAction
    Success newState res -> do
      debugStrLn7 ("S")
      let gState = gatherStmState newState
      valid <- startTrans gState
      if valid
        then do
             debugStrLn7 ("v")
             commitTrans gState
             endTrans gState
             return res
        else do
   	     debugStrLn7 ("i")
             endTrans gState
             aStat INV
             atomic stmAction

initialState :: IO STMState
initialState = do
  atomicId <- uniqueId
  retryVar <- newRetryVar
  return (STMState {stmId       = (gMyEnv, atomicId),
                    stmRetryVar = retryVar,
	            stmValid    = [],
                    stmCommit   = []})

-- |Retry execution of the current memory transaction because it has seen 
-- values in TVars which mean that it should not continue (e.g. the TVars
-- represent a shared buffer that is now empty). The implementation may block
-- the thread until one of the TVars that it has read from has been udpated.
retry  :: STM a
retry = STM (\state -> return (Retry state))

-- |Compose two alternative STM actions. If the first action completes without
-- retrying then it forms the result of the orElse. Otherwise, if the first
-- action retries, then the second action is tried in its place. If both
-- actions retry then the orElse as a whole retries
orElse :: STM a -> STM a -> STM a
orElse (STM stm1) (STM stm2) =
  STM (\(stmState@STMState{stmCommit = savedCont}) -> do
         stm1Res <- stm1 stmState
         case stm1Res of
           Retry newState -> stm2 newState{stmCommit = savedCont}
	   _              -> return stm1Res)

----------------
-- Exceptions --
----------------

-- |Throw an exception within an STM action
throw :: SomeException -> STM a
throw e = STM (\state -> return (Exception state e))

-- |Exception handling within STM actions
catch :: STM a -> (SomeException -> STM a) -> STM a
catch (STM stm) eHandler = STM (\stmState -> do
  res <- stm stmState
  case res of
    Exception _ e -> do
                     let (STM stmEx) = eHandler e
                     stmEx stmState
    _             -> return res)


-------------------------
-- Perform STM Actions --
-------------------------

-- copy WriteVals from CommitVals to ValidVals at end of transaction because of
-- possible nested orElse-scopes where locks and validations are preserved
-- while commits are reset using simple push/pop in orElse stack 
-- finally all info is needed in ValidVals to be transmitted to all trans
-- to ensure proper recovery in 3-phase model

gatherStmState :: STMState -> STMState
gatherStmState state = state {stmValid =
  map (gatherValidRemVal (stmCommit state)) (stmValid state)}

gatherValidRemVal :: [CommitLog] -> ValidLog -> ValidLog
gatherValidRemVal _ (env, Left stActs) = (env, Left stActs)
gatherValidRemVal comLogs vLog@(env, Right validRemVals) =
  case P.lookup env comLogs of
    Nothing             -> vLog
    Just (Right (v, _)) -> (env, Right(map (addCommitRemVals v) validRemVals))
    _                   -> vLog -- internal error

addCommitRemVals :: [CommitRemVal] -> ValidRemVal -> ValidRemVal
addCommitRemVals comVals (tId,(rVal, _)) = (tId, (rVal, P.lookup tId comVals))

-------------------
startTrans :: STMState -> IO Bool
startTrans state = do
  robustFoldValidAct (stmId state) (fst.unzip $ valLogs) True valLogs
  where valLogs = stmValid state

robustFoldValidAct :: TransID -> [EnvAddr] -> Bool -> [ValidLog] -> IO Bool
robustFoldValidAct _ _ isValid [] = return isValid
robustFoldValidAct transId envs isValid (vLog:vLogs) = CE.catch (do
  debugStrLn5 ("robustFoldValidAct: x= "++show vLog++"xs= "++show vLogs)
  b <- doValidAction isValid vLog transId envs
  robustFoldValidAct transId envs b vLogs
  )(\(e::SomeDistTVarException) -> do
    debugStrLn1 (">>> robustFoldValidAct -> error: " ++ show e)
    debugStrLn1 (">>> robustFoldValidAct dyn: "++ show (distTVarExEnv e))
    CE.catch (doEndAction transId vLog) 
             (\(_::SomeDistTVarException) -> return ())
    propagateEx "robustFoldValidAct _" e
   )

doValidAction :: Bool -> ValidLog -> TransID -> [EnvAddr] -> IO Bool
doValidAction isValid (_, Left (lockActs, readAct)) _ _ = do
  mapM_ (lockAct.snd) lockActs
  isValid +>> validAct readAct
doValidAction isValid (env, (Right idVRs)) transId transEnvs =
  remGetMsg env (RemStartTrans transId idVRs transEnvs)
  >>= return . (isValid &&) . read
  -- possible opt: lazy val. msg -> requires skipping recovery on lazy val.

commitTrans :: STMState -> IO ()
commitTrans state = robustMapM_ (doCommitAction (stmId state)) (stmCommit state)

doCommitAction :: TransID -> CommitLog -> IO ()
doCommitAction _ (env, Left idCommits) = do
  debugStrLn5 ("doCommitAction: env= "++show env++"ids= "++show idCommits)
  mapM_ (commitAct.snd) idCommits
  mapM_ (notifyAct.snd) idCommits
doCommitAction transId (env, Right (_, regAct)) = do
  regAct -- register dest env on TVars within values
  remPutMsg env (RemContTrans transId Com)

retryTrans :: STMState -> IO ()
retryTrans state = robustMapM_ (doRetryAction (stmId state)) (stmValid state)

doRetryAction :: TransID -> ValidLog -> IO ()
doRetryAction _ (env, Left (_, valLog)) = do
  debugStrLn5 ("doRetryAction: env= "++show env)
  extWaitQAct valLog
doRetryAction transId (env, Right idrs) = do
  remPutMsg env (RemContTrans transId Ret)
  -- insert auto link or update existing (retry or auto) link with retryVar
  insertRetryLinks env idrs

unRetryTrans :: STMState -> IO ()
unRetryTrans state = mapM_ doUnRetryAction (stmValid state)

doUnRetryAction :: ValidLog -> IO ()
doUnRetryAction (_, Left _)       = return ()
doUnRetryAction (env, Right idrs) = do
  deleteRetryLinks env idrs
  debugStrLn5 ("makeUnWaitQAction: env= "++show env++"idrs= "++show idrs)

endTrans :: STMState -> IO ()
endTrans state = robustMapM_ (doEndAction (stmId state)) (stmValid state)

doEndAction :: TransID -> ValidLog -> IO ()
doEndAction _ (_, Left stLog) = do
  mapM_ (unLockAct.snd) (fst stLog)
doEndAction transId (env, Right _) = do
  remPutMsg env (RemContTrans transId End)

robustMapM_ :: Show a => (a -> IO ()) -> [a] -> IO ()
robustMapM_ _ []          = return ()
robustMapM_ io (x:xs) = CE.catch (do
  debugStrLn5 ("robustMapM_: x= "++show x++"xs= "++show xs)
  io x
  robustMapM_ io xs
  )(\(ex::SomeDistTVarException) -> do
    debugStrLn1 (">>> robustMapM_ -> error: " ++ show ex)
    debugStrLn1 (">>> robustMapM_ dyn: "++ show (distTVarExEnv ex))
    robustMapM_ io xs -- ignore action x and continue w/actions xs, w/o prop.!
   )

---------------------------------
-- Remote Transactional Status --
---------------------------------

type DistTransCont = (TransID, Chan RemCont)

-- state of received remote transactions needed for possible recovery
gDistTransCont :: MVar [DistTransCont]
{-# NOINLINE gDistTransCont #-}
gDistTransCont = unsafePerformIO (newMVar [])

startRemTrans :: TransID -> [ValidRemVal] -> 
                 [EnvAddr] -> (Bool -> IO ()) -> IO ()
startRemTrans transId idVRVars transEnvs notifyCaller = do
  updateAutoTrans (+1) transId
  msgChan <- newChan
  modifyMVar_ gDistTransCont (return . ((transId, msgChan):))
  forkIO (CE.catch
           (ctrlTrans msgChan transId idVRVars transEnvs notifyCaller)
           (\(e::SomeException) -> debugStrLn1 ("startRemTrans " ++ show e)))
  return ()

contRemTrans :: TransID -> RemCont -> IO ()
contRemTrans transId msg = do
  debugStrLn8 ("<<<   contRemTrans transId "++show transId)
  conts <- readMVar gDistTransCont
  case P.lookup transId conts of
    Just msgChan -> do
      debugStrLn8 ("<<<   contRemTrans : transId =  "++show transId)
      writeChan msgChan msg
      debugStrLn8 ("<<<   contRemTrans : writeChan msg =  "++show msg)
    Nothing  -> -- return () -- no error, possible duplicate recovery msgs
      debugStrLn8 ("<<<   contRemTrans N RemCont=   "++show msg)

ctrlTrans :: Chan RemCont -> TransID -> [ValidRemVal] -> 
             [EnvAddr] -> (Bool -> IO ()) -> IO ()
ctrlTrans msgChan transId idVRVars transEnvs notifyCaller = do
  -- start remote transaction (phase 1)
  printValidFromId "ctrlTrans" transId idVRVars
  debugStrLn8 ("<<<   ctrlTrans lock transId "++show transId ++ " idVRVars "++ show idVRVars)
  mapM_ (lockTVarFromId.fst) idVRVars
  debugStrLn8 ("###   ctrlTrans locked transId=   "++show transId)
  isValid <- foldr validateValidIds (return True) idVRVars
  CE.catch (notifyCaller isValid) 
           (\(e::SomeException) -> do
             debugStrLn1 $ "<<<   ctrlTransFromIds !!! catch=   " ++ show e
             writeChan msgChan Err
           )
  ctrlContTrans msgChan transId idVRVars transEnvs

ctrlContTrans :: Chan RemCont -> TransID -> [ValidRemVal] -> 
                 [EnvAddr] -> IO ()
ctrlContTrans msgChan transId idVRVars transEnvs = do
  -- continue remote transaction (phase 2)
  debugStrLn8 (">   ctrlContTrans readChan ")
  msg <- readChan msgChan
  debugStrLn8 ("<   ctrlContTrans readChan ")
  case msg of
    Com -> do -- decision for Com
      debugStrLn8 ("<<<   ctrlContTrans: Com transId =   "++show transId)
      mapM_ contWriteTVar idVRVars
      mapM_ (notifyFromId.fst) idVRVars
      ctrlEndTrans msgChan transId Com idVRVars transEnvs 
    Ret -> do -- decision for Ret
      debugStrLn8 ("<<<   ctrlContTrans: Ret transId =   "++show transId)
      mapM_ contExtWaitQ idVRVars
      ctrlEndTrans msgChan transId Ret idVRVars transEnvs
    End -> do -- decision for rerun (invalid) trans
      debugStrLn8 ("<<<   ctrlContTrans: End/Invalid transId =   "++show transId)
      finishTrans transId idVRVars
    Err -> do -- error before decision
      debugStrLn8 ("<<<   ctrlContTrans: Err transId =   "++show transId)
      electNewTransCoordinator transId transEnvs End
      ctrlContTrans msgChan transId idVRVars transEnvs

ctrlEndTrans :: Chan RemCont -> TransID -> RemCont -> 
                [ValidRemVal] -> [EnvAddr] -> IO ()
ctrlEndTrans msgChan transId remCont idVRVars transEnvs = do
  -- finish remote transaction (phase 3)
  debugStrLn8 (">   ctrlEndTrans readChan ")
  msg <- readChan msgChan
  debugStrLn8 ("<   ctrlEndTrans readChan ")
  case msg of
    End -> do -- finish trans ok
      debugStrLn8 ("<<<   ctrlEndTrans: End transId =   "++show transId ++ " idVRVars "++ show idVRVars)
      finishTrans transId idVRVars
      debugStrLn8 ("<<<   ctrlEndTrans: End unlocked transId =  "++show transId)
    Err -> do -- error after decision
      debugStrLn8 ("<<<   ctrlEndTrans: Err transId =   "++show transId)
      debugStrLn8 ("### ctrlEndTrans phase III cont trans w/new initiator")
      electNewTransCoordinator transId transEnvs remCont
      ctrlEndTrans msgChan transId remCont idVRVars transEnvs
      debugStrLn8 ("###   ctrlEndTrans  ok")
    m -> if m == remCont -- no error, possible duplicate recovery msgs
           then ctrlEndTrans msgChan transId m idVRVars transEnvs 
           else error "error ctrlEndTrans"

finishTrans :: TransID -> [ValidRemVal] -> IO ()
finishTrans trId idVRs = do
  mapM_ (unLockTVarFromId.fst) idVRs
  updateAutoTrans (+ (-1)) trId
  modifyMVar_ gDistTransCont (return . P.filter ((/= trId).fst))

electNewTransCoordinator :: TransID -> [EnvAddr] -> RemCont -> IO ()
electNewTransCoordinator transId@(env, _) transEnvs remCont = do
  let upEnvs = P.filter (/= env) transEnvs
  case minimum upEnvs of
    newC | newC == gMyEnv -> contTransWNewCoord transId remCont upEnvs
         | otherwise -> remPutMsg newC 
                                  (RemElectedNewCoord transId remCont upEnvs)
  `CE.catch` -- newCoordinator unavailable itself (SomeDistTVarException)
  \ex -> let eEnv = (/= distTVarExEnv ex)
             upEnvs = P.filter eEnv . P.filter (/= env) $ transEnvs
  in electNewTransCoordinator transId upEnvs remCont
  `CE.catch` -- other exception
  \(e1::SomeException) -> debugStrLn1 ("electNewTransCoordinator other ex"++show e1)

contTransWNewCoord :: TransID -> RemCont -> [EnvAddr] -> IO ()
contTransWNewCoord transId remCont upEnvs = do -- recovery
  debugStrLn1 (">>><<< contTransWNewCoord 1 : "++show remCont)
  let remEnvs = P.filter (/= gMyEnv) upEnvs
  case remCont of
    End -> return ()
    _   -> do -- continue with trans coordination
         debugStrLn1 (">>><<< contTransWNewCoord 2 : "++show remCont)
         robustMapM_ (flip remPutMsg  (RemContTrans transId remCont)) remEnvs
  robustMapM_ (flip remPutMsg (RemContTrans transId End)) remEnvs
  contRemTrans transId End -- end local

validateValidIds :: ValidRemVal -> IO Bool -> IO Bool
validateValidIds (_, (Nothing, _)) isValid               = isValid
validateValidIds (tId, (Just (versionId, _), _)) isValid =
  validateTVarFromId (tId, versionId) >>+ isValid

contWriteTVar :: ValidRemVal -> IO ()
contWriteTVar (_, (_, Nothing)) = return ()
contWriteTVar (tId, (_, Just str)) = writeTVarFromId (tId, str)

contExtWaitQ :: ValidRemVal -> IO ()
contExtWaitQ (_, (Nothing, _)) = return ()
contExtWaitQ (tId, (Just (_, rVar), _)) = extWaitQFromId (tId, rVar)

-----------------------
-- Distributed Stuff --
-----------------------

nodeReceiver :: Socket -> IO () -- new socket for new sender process
nodeReceiver sock = CE.catch (do
  setSocketOption sock NoDelay 1
  time <- getProcessTimes
  debugStrLn1 ("accept: "++show (elapsedTime time))
  (h, hn, p) <- accept sock -- wait for new process msg handle on socket
  debugStrLn4 ("### new connection: " ++ show h ++ " -> name: " ++ show hn ++ " -> port: " ++ show p)
  hSetBuffering h LineBuffering
  forkIO (readMsg h)
  nodeReceiver sock
  )(\(e::SomeException) -> debugStrLn1 ("nodeReceiver "++show e))

readMsg :: Handle -> IO ()
readMsg h = CE.catch (do
  str <- hGetLine h -- wait for new message on handle
  handleMsg h (read str)
  readMsg h
  )(\(e::SomeException) -> debugStrLn1 ("readMsg dropLostSocket "++show e))

handleMsg :: Handle -> STMMessage -> IO ()
handleMsg h msg =
  case msg of
    RemResume retryVarIds ->
      mapM_ resumeFromId retryVarIds >> aStat RES
    RemAddEnvToAction tVarId env ->
      addEnvToTVarActions tVarId env >> aStat AEA
    RemDelEnvFromAction tVarId env -> 
      delEnvFromTVarActions tVarId env >> aStat DEA
    RemLifeCheck -> 
      debugStrLn2 ("RemLifeCheck: I'm alive") >> aStat LFC
    RemReadTVar tId destEnv -> -- async
      readTVarFromId tId destEnv (hPutStrLn h) >> aStat RDT
    RemStartTrans transId idVRVars transEnvs -> -- async
      startRemTrans transId idVRVars transEnvs (hPutStrLn h . show)
      >> aStat STT
    RemContTrans transId remCont -> 
      contRemTrans transId remCont >> aStat CTT
    RemElectedNewCoord transId remCont oprtlEnvs -> 
      contTransWNewCoord transId remCont oprtlEnvs >> aStat ENC

--------------------
-- TVar Lifecheck --
--------------------

data AutoLink = AutoLink {autoTrans :: Int,  -- stack of open remote trans
                          autoRetry :: [RetryVar]} -- active retry vars
  deriving Show

gDefaultLink :: AutoLink
gDefaultLink =  AutoLink {autoTrans = 0, autoRetry = []}

type Link  = (EnvAddr, AutoLink)

gLinks :: MVar [Link]
{-# NOINLINE gLinks #-}
gLinks = unsafePerformIO (newMVar [])

gTransChkIntv, gRetryChkIntv :: Int
gTransChkIntv    = 1
gRetryChkIntv    = 3

gMegaMuSec :: Int
gMegaMuSec = 1000000


updateAutoTrans ::  (Int -> Int) -> TransID -> IO ()
updateAutoTrans f (env, _) = do 
  links <- takeMVar gLinks
  case Data.List.partition ((== env).fst) links of
    ([], otherLinks) -> do -- no link to env yet
      putMVar gLinks ((env, gDefaultLink{autoTrans = f (autoTrans gDefaultLink)}):otherLinks)
      forkIO (lifeCheck env) -- start new life check loop
      return ()
    ([(e, link)], otherLinks) -> do -- existing link, modify link only
      putMVar gLinks ((e, link{autoTrans = f (autoTrans link)}):otherLinks)
    _ -> putMVar gLinks links -- internal error

insertRetryLinks ::  EnvAddr -> [ValidRemVal] -> IO ()
insertRetryLinks env idVRs = do -- env mapped over other processes in trans
  --printGLinkMap "insertRetryLinks"
  links <- takeMVar gLinks
  case Data.List.partition ((== env).fst) links of
    ([], otherLinks) -> do -- no link to env yet
      putMVar gLinks ((env,retrylink gDefaultLink):otherLinks)
      forkIO (lifeCheck env) -- start new life check loop
      return ()
    ([(e, link)], otherLinks) -> do -- existing link, modify link only
      putMVar gLinks ((e, retrylink link):otherLinks)
    _ -> putMVar gLinks links -- internal error
  where retrylink l = foldr insertRetryLink l idVRs

insertRetryLink :: ValidRemVal -> AutoLink -> AutoLink
insertRetryLink (_, (Just (_, rVar), _)) link@AutoLink{autoRetry=rVars} = 
                -- duplicate retryVars in different TVars in same env possible
                         link{autoRetry=rVar:rVars}
insertRetryLink _ link = link

deleteRetryLinks ::  EnvAddr -> [ValidRemVal] -> IO ()
deleteRetryLinks env idVRs = do -- env mapped over other processes in trans
  links <- takeMVar gLinks
  case Data.List.partition ((== env).fst) links of
    ([], otherLinks) -> do -- no link to env
      putMVar gLinks otherLinks -- internal error
    ([(e, link)], otherLinks) -> -- existing link, modify link only
      putMVar gLinks ((e, foldr deleteRetryLink link idVRs):otherLinks)
    _ -> putMVar gLinks links -- internal error

deleteRetryLink :: ValidRemVal -> AutoLink -> AutoLink
deleteRetryLink (_, (Just (_, rVar), _)) link@AutoLink{autoRetry=rVars} = 
  -- filter also possible duplicate retryVars in different TVars in same env
                         link{autoRetry = P.filter (/= rVar) rVars}
deleteRetryLink _ link = link

lifeCheck :: EnvAddr -> IO ()
lifeCheck env = CE.catch (do
  links <- readMVar gLinks
  case P.lookup env links of
    Nothing -> do
               debugStrLn9 ("lifeCheck Nothing: "++show env)
               return () -- no ping for env
    Just link | autoTrans link > 0 -> do
                                      debugStrLn9 ("lifeCheck trans "++show env)
                                      ping
                                      threadDelay (gMegaMuSec * gTransChkIntv) 
                                      lifeCheck env
              | autoRetry link /= [] -> do
                                      debugStrLn9 ("lifeCheck retry "++show env)
                                      ping
                                      threadDelay (gMegaMuSec * gRetryChkIntv) 
                                      lifeCheck env
              | otherwise -> do -- last ping
                             ping
                             debugStrLn9 ("lifeCheck -------: "++show env)
                             deleteLink
  )(\(e::SomeException) -> do
          debugStrLn9 ("!!! lifeCheck recovery" ++ show e)
          aStat DRP
          recoverBrokenReactiveTrans env
          recoverBrokenInactiveTrans env
          deleteLink
   )
  where 
    ping = remPutMsg env RemLifeCheck
    deleteLink = modifyMVar_ gLinks (return.P.filter ((/= env).fst))

recoverBrokenReactiveTrans :: EnvAddr -> IO ()
recoverBrokenReactiveTrans env = do
  debugStrLn1 ("<<< !!!   recoverBrokenReactiveTrans: "++show env)
  conts <- readMVar gDistTransCont
  let brokenRemTrans = P.filter ((== env) . fst . fst) conts
  mapM_ ((flip writeChan Err).snd) brokenRemTrans
  debugStrLn1 ("<<<   recoverBrokenReactiveTrans "++show brokenRemTrans)

recoverBrokenInactiveTrans :: EnvAddr -> IO ()
recoverBrokenInactiveTrans env = do
  debugStrLn1 ("<<< !!! recoverBrokenInactiveTrans env: "++show env)
  links <- readMVar gLinks
  case P.lookup env links of
    Nothing                            -> return ()
    Just AutoLink{autoRetry=retryVars} -> do
      debugStrLn1 ("<<< recoverBrokenInactiveTrans retryVars: "++show retryVars)
      mapM_ coreResume retryVars -- resume retryVars (deadlock from broken link)
coreResume :: RetryVar -> IO ()
coreResume (RetryVar retryMVar _) = resumeRetryVarAct retryMVar
coreResume (LinkRetryVar (VarLink env retVarId)) = CE.catch (
  remPutMsg env (RemResume [retVarId])
-- ignore errors in coreResume, dead process proxy waking up other dead process
  ) (\(_::SomeException) -> return ())

--------------------
-- Initialization --
--------------------

-- |'startDist' enables inter process communication and exception handling and
-- then executes the given main function 
startDist :: IO ()  -- ^application main function to be executed. Each main 
                    -- function in the distributed system has to be wrapped
                    -- in a 'startDist' call  
             -> IO ()
startDist nodeMain = CE.catch (do
  serverPid <- startNameService -- may be removed
  threadDelay gMegaMuSec
  installHandler sigPIPE Ignore Nothing -- no UNIX signal on closed sockets
  time <- getProcessTimes
  debugStrLn1 ("listenOn: "++show (elapsedTime time))
  seq gMyEnv (forkIO (nodeReceiver gMySocket))
  --                  >> forkIO timedDebugger
  nodeMain
  swapMVar gLinks [] -- delete all links to enable finalizer GC
  performGC          -- activate all pending finalizers
  terminateProcess serverPid -- may be removed
  debugStrLn1 ("terminate NameService")
  )(propagateEx "startDist")

startNameService :: IO ProcessHandle
startNameService = do
  debugStrLn1 ("startNameService")
  runCommand "NameServer"