module Control.Distributed.STM.DSTM (TVar, STM, newTVar, readTVar, writeTVar,
atomic, retry, orElse, throw, catch,
module Control.Distributed.STM.Dist,
module Control.Distributed.STM.NameService,
startDist, SomeDistTVarException, isDistErrTVar) where
import Control.Concurrent
import Control.Distributed.STM.DebugDSTM
import Control.Distributed.STM.Dist
import Control.Distributed.STM.EnvAddr
import Control.Distributed.STM.NameService
import Control.Distributed.STM.RetryVar
import Control.Distributed.STM.STM
import Control.Distributed.STM.TVar
import Control.Distributed.STM.Utils
import Control.Exception hiding (catch, throw)
import qualified Control.Exception as CE (catch, throw)
import Control.Monad
import qualified Data.List
import Network
import Network.Socket hiding (accept)
import Prelude as P hiding (catch)
import System.IO
import System.IO.Unsafe
import System.Mem
import System.Process (ProcessHandle, runCommand, terminateProcess)
import System.Posix
infixl 2 `orElse`
gInitVersionID :: ID
gInitVersionID = 1
newTVar :: Dist a => a -> STM (TVar a)
newTVar v = STM (\stmState -> do
newID <- uniqueId
newRef <- newMVar (v, gInitVersionID)
newLock <- newMVar ()
waitQ <- newMVar []
let tVar = (TVar newRef newID newLock waitQ)
return (Success stmState tVar))
readTVar :: Dist a => TVar a -> STM a
readTVar tVar = STM (\stmState -> do
(val, vId) <- readIntraTransTVar tVar (stmCommit stmState)
let newState = stmState
{stmValid = bundledValidLogs tVar
(Just (vId, (stmRetryVar stmState)))
(stmValid stmState)
}
return (Success newState val))
readIntraTransTVar :: Dist a => TVar a -> [CommitLog] -> IO (a, VersionID)
readIntraTransTVar tVar commitLog =
case P.lookup env commitLog of
Just (Left commitActs) ->
case P.lookup tId commitActs of
Just acts -> readHost tVar $ commitAct acts
Nothing -> coreReadTVar tVar
Just (Right (strVals, _)) ->
case P.lookup tId strVals of
Just v -> coreReadTVar tVar >>= return . ((,) (read v)) . snd
Nothing -> coreReadTVar tVar
Nothing -> coreReadTVar tVar
where VarLink env tId = tVarToLink tVar
coreReadTVar :: Dist a => TVar a -> IO (a,VersionID)
coreReadTVar (TVar tVarRef _ lock _) = do
takeMVar lock
v <- readMVar tVarRef
putMVar lock ()
debugStrLn8 ("coreReadTVar Loc "++show v)
return v
coreReadTVar (LinkTVar (VarLink tEnv tVarId)) = CE.catch (do
answer <- remGetMsg tEnv (RemReadTVar tVarId gMyEnv)
let vv@(v,_) = read answer
finTVars v
debugStrLn8 ("coreReadTVar Rem "++show vv)
return vv
)(propagateEx "coreReadTVar")
readHost :: forall a . Read a => TVar a -> IO () -> IO (a, VersionID)
readHost (TVar tVarRef _ lock _) commit = do
takeMVar lock
orig@(_, origVersion) <- readMVar tVarRef
commit
(modV, _) <- swapMVar tVarRef orig
putMVar lock ()
return (modV, origVersion)
readHost (LinkTVar (VarLink _ tId)) commit = do
lockTVarFromId tId
origStr <- readTVarValFromId tId
let (_::a, origVersion) = read origStr
commit
(modV, _::VersionID) <- liftM read (swapTVarValFromId tId origStr)
unLockTVarFromId tId
return (modV, origVersion)
writeTVar :: Dist a => TVar a -> a -> STM ()
writeTVar tVar v = STM (\stmState ->
let newState = stmState{
stmValid = bundledValidLogs tVar Nothing (stmValid stmState),
stmCommit = bundledCommitLogs tVar v (stmCommit stmState)
}
in debugStrLn8 ("writeTVar "++show (stmCommit newState))>>return (Success newState ()))
atomic :: Show a => STM a -> IO a
atomic stmAction = do
aStat ATM
debugStrLn7 ("A")
iState <- initialState
debugStrLn7 ("I")
stmResult <- runSTM stmAction iState
debugStrLn7 ("M")
case stmResult of
Exception newState e -> do
debugStrLn7 ("E")
let gState = gatherStmState newState
valid <- startTrans gState
if valid
then do
endTrans gState
CE.throw e
else do
endTrans gState
atomic stmAction
Retry newState -> do
debugStrLn7 ("R")
let gState = gatherStmState newState
valid <- startTrans gState
if valid
then do
debugStrLn7 ("v")
retryTrans gState
endTrans gState
let retryVar = stmRetryVar gState
insertRetryVarAction retryVar
debugStrLn7 ("SUSPEND "++show (stmId gState))
suspend retryVar
debugStrLn7 ("RESUME "++show (stmId gState))
deleteRetryVarAction retryVar
unRetryTrans gState
else do
debugStrLn7 ("i")
endTrans gState
aStat INV
atomic stmAction
Success newState res -> do
debugStrLn7 ("S")
let gState = gatherStmState newState
valid <- startTrans gState
if valid
then do
debugStrLn7 ("v")
commitTrans gState
endTrans gState
return res
else do
debugStrLn7 ("i")
endTrans gState
aStat INV
atomic stmAction
initialState :: IO STMState
initialState = do
atomicId <- uniqueId
retryVar <- newRetryVar
return (STMState {stmId = (gMyEnv, atomicId),
stmRetryVar = retryVar,
stmValid = [],
stmCommit = []})
retry :: STM a
retry = STM (\state -> return (Retry state))
orElse :: STM a -> STM a -> STM a
orElse (STM stm1) (STM stm2) =
STM (\(stmState@STMState{stmCommit = savedCont}) -> do
stm1Res <- stm1 stmState
case stm1Res of
Retry newState -> stm2 newState{stmCommit = savedCont}
_ -> return stm1Res)
throw :: SomeException -> STM a
throw e = STM (\state -> return (Exception state e))
catch :: STM a -> (SomeException -> STM a) -> STM a
catch (STM stm) eHandler = STM (\stmState -> do
res <- stm stmState
case res of
Exception _ e -> do
let (STM stmEx) = eHandler e
stmEx stmState
_ -> return res)
gatherStmState :: STMState -> STMState
gatherStmState state = state {stmValid =
map (gatherValidRemVal (stmCommit state)) (stmValid state)}
gatherValidRemVal :: [CommitLog] -> ValidLog -> ValidLog
gatherValidRemVal _ (env, Left stActs) = (env, Left stActs)
gatherValidRemVal comLogs vLog@(env, Right validRemVals) =
case P.lookup env comLogs of
Nothing -> vLog
Just (Right (v, _)) -> (env, Right(map (addCommitRemVals v) validRemVals))
_ -> vLog
addCommitRemVals :: [CommitRemVal] -> ValidRemVal -> ValidRemVal
addCommitRemVals comVals (tId,(rVal, _)) = (tId, (rVal, P.lookup tId comVals))
startTrans :: STMState -> IO Bool
startTrans state = do
robustFoldValidAct (stmId state) (fst.unzip $ valLogs) True valLogs
where valLogs = stmValid state
robustFoldValidAct :: TransID -> [EnvAddr] -> Bool -> [ValidLog] -> IO Bool
robustFoldValidAct _ _ isValid [] = return isValid
robustFoldValidAct transId envs isValid (vLog:vLogs) = CE.catch (do
debugStrLn5 ("robustFoldValidAct: x= "++show vLog++"xs= "++show vLogs)
b <- doValidAction isValid vLog transId envs
robustFoldValidAct transId envs b vLogs
)(\(e::SomeDistTVarException) -> do
debugStrLn1 (">>> robustFoldValidAct -> error: " ++ show e)
debugStrLn1 (">>> robustFoldValidAct dyn: "++ show (distTVarExEnv e))
CE.catch (doEndAction transId vLog)
(\(_::SomeDistTVarException) -> return ())
propagateEx "robustFoldValidAct _" e
)
doValidAction :: Bool -> ValidLog -> TransID -> [EnvAddr] -> IO Bool
doValidAction isValid (_, Left (lockActs, readAct)) _ _ = do
mapM_ (lockAct.snd) lockActs
isValid +>> validAct readAct
doValidAction isValid (env, (Right idVRs)) transId transEnvs =
remGetMsg env (RemStartTrans transId idVRs transEnvs)
>>= return . (isValid &&) . read
commitTrans :: STMState -> IO ()
commitTrans state = robustMapM_ (doCommitAction (stmId state)) (stmCommit state)
doCommitAction :: TransID -> CommitLog -> IO ()
doCommitAction _ (env, Left idCommits) = do
debugStrLn5 ("doCommitAction: env= "++show env++"ids= "++show idCommits)
mapM_ (commitAct.snd) idCommits
mapM_ (notifyAct.snd) idCommits
doCommitAction transId (env, Right (_, regAct)) = do
regAct
remPutMsg env (RemContTrans transId Com)
retryTrans :: STMState -> IO ()
retryTrans state = robustMapM_ (doRetryAction (stmId state)) (stmValid state)
doRetryAction :: TransID -> ValidLog -> IO ()
doRetryAction _ (env, Left (_, valLog)) = do
debugStrLn5 ("doRetryAction: env= "++show env)
extWaitQAct valLog
doRetryAction transId (env, Right idrs) = do
remPutMsg env (RemContTrans transId Ret)
insertRetryLinks env idrs
unRetryTrans :: STMState -> IO ()
unRetryTrans state = mapM_ doUnRetryAction (stmValid state)
doUnRetryAction :: ValidLog -> IO ()
doUnRetryAction (_, Left _) = return ()
doUnRetryAction (env, Right idrs) = do
deleteRetryLinks env idrs
debugStrLn5 ("makeUnWaitQAction: env= "++show env++"idrs= "++show idrs)
endTrans :: STMState -> IO ()
endTrans state = robustMapM_ (doEndAction (stmId state)) (stmValid state)
doEndAction :: TransID -> ValidLog -> IO ()
doEndAction _ (_, Left stLog) = do
mapM_ (unLockAct.snd) (fst stLog)
doEndAction transId (env, Right _) = do
remPutMsg env (RemContTrans transId End)
robustMapM_ :: Show a => (a -> IO ()) -> [a] -> IO ()
robustMapM_ _ [] = return ()
robustMapM_ io (x:xs) = CE.catch (do
debugStrLn5 ("robustMapM_: x= "++show x++"xs= "++show xs)
io x
robustMapM_ io xs
)(\(ex::SomeDistTVarException) -> do
debugStrLn1 (">>> robustMapM_ -> error: " ++ show ex)
debugStrLn1 (">>> robustMapM_ dyn: "++ show (distTVarExEnv ex))
robustMapM_ io xs
)
type DistTransCont = (TransID, Chan RemCont)
gDistTransCont :: MVar [DistTransCont]
gDistTransCont = unsafePerformIO (newMVar [])
startRemTrans :: TransID -> [ValidRemVal] ->
[EnvAddr] -> (Bool -> IO ()) -> IO ()
startRemTrans transId idVRVars transEnvs notifyCaller = do
updateAutoTrans (+1) transId
msgChan <- newChan
modifyMVar_ gDistTransCont (return . ((transId, msgChan):))
forkIO (CE.catch
(ctrlTrans msgChan transId idVRVars transEnvs notifyCaller)
(\(e::SomeException) -> debugStrLn1 ("startRemTrans " ++ show e)))
return ()
contRemTrans :: TransID -> RemCont -> IO ()
contRemTrans transId msg = do
debugStrLn8 ("<<< contRemTrans transId "++show transId)
conts <- readMVar gDistTransCont
case P.lookup transId conts of
Just msgChan -> do
debugStrLn8 ("<<< contRemTrans : transId = "++show transId)
writeChan msgChan msg
debugStrLn8 ("<<< contRemTrans : writeChan msg = "++show msg)
Nothing ->
debugStrLn8 ("<<< contRemTrans N RemCont= "++show msg)
ctrlTrans :: Chan RemCont -> TransID -> [ValidRemVal] ->
[EnvAddr] -> (Bool -> IO ()) -> IO ()
ctrlTrans msgChan transId idVRVars transEnvs notifyCaller = do
printValidFromId "ctrlTrans" transId idVRVars
debugStrLn8 ("<<< ctrlTrans lock transId "++show transId ++ " idVRVars "++ show idVRVars)
mapM_ (lockTVarFromId.fst) idVRVars
debugStrLn8 ("### ctrlTrans locked transId= "++show transId)
isValid <- foldr validateValidIds (return True) idVRVars
CE.catch (notifyCaller isValid)
(\(e::SomeException) -> do
debugStrLn1 $ "<<< ctrlTransFromIds !!! catch= " ++ show e
writeChan msgChan Err
)
ctrlContTrans msgChan transId idVRVars transEnvs
ctrlContTrans :: Chan RemCont -> TransID -> [ValidRemVal] ->
[EnvAddr] -> IO ()
ctrlContTrans msgChan transId idVRVars transEnvs = do
debugStrLn8 ("> ctrlContTrans readChan ")
msg <- readChan msgChan
debugStrLn8 ("< ctrlContTrans readChan ")
case msg of
Com -> do
debugStrLn8 ("<<< ctrlContTrans: Com transId = "++show transId)
mapM_ contWriteTVar idVRVars
mapM_ (notifyFromId.fst) idVRVars
ctrlEndTrans msgChan transId Com idVRVars transEnvs
Ret -> do
debugStrLn8 ("<<< ctrlContTrans: Ret transId = "++show transId)
mapM_ contExtWaitQ idVRVars
ctrlEndTrans msgChan transId Ret idVRVars transEnvs
End -> do
debugStrLn8 ("<<< ctrlContTrans: End/Invalid transId = "++show transId)
finishTrans transId idVRVars
Err -> do
debugStrLn8 ("<<< ctrlContTrans: Err transId = "++show transId)
electNewTransCoordinator transId transEnvs End
ctrlContTrans msgChan transId idVRVars transEnvs
ctrlEndTrans :: Chan RemCont -> TransID -> RemCont ->
[ValidRemVal] -> [EnvAddr] -> IO ()
ctrlEndTrans msgChan transId remCont idVRVars transEnvs = do
debugStrLn8 ("> ctrlEndTrans readChan ")
msg <- readChan msgChan
debugStrLn8 ("< ctrlEndTrans readChan ")
case msg of
End -> do
debugStrLn8 ("<<< ctrlEndTrans: End transId = "++show transId ++ " idVRVars "++ show idVRVars)
finishTrans transId idVRVars
debugStrLn8 ("<<< ctrlEndTrans: End unlocked transId = "++show transId)
Err -> do
debugStrLn8 ("<<< ctrlEndTrans: Err transId = "++show transId)
debugStrLn8 ("### ctrlEndTrans phase III cont trans w/new initiator")
electNewTransCoordinator transId transEnvs remCont
ctrlEndTrans msgChan transId remCont idVRVars transEnvs
debugStrLn8 ("### ctrlEndTrans ok")
m -> if m == remCont
then ctrlEndTrans msgChan transId m idVRVars transEnvs
else error "error ctrlEndTrans"
finishTrans :: TransID -> [ValidRemVal] -> IO ()
finishTrans trId idVRs = do
mapM_ (unLockTVarFromId.fst) idVRs
updateAutoTrans (+ (1)) trId
modifyMVar_ gDistTransCont (return . P.filter ((/= trId).fst))
electNewTransCoordinator :: TransID -> [EnvAddr] -> RemCont -> IO ()
electNewTransCoordinator transId@(env, _) transEnvs remCont = do
let upEnvs = P.filter (/= env) transEnvs
case minimum upEnvs of
newC | newC == gMyEnv -> contTransWNewCoord transId remCont upEnvs
| otherwise -> remPutMsg newC
(RemElectedNewCoord transId remCont upEnvs)
`CE.catch`
\ex -> let eEnv = (/= distTVarExEnv ex)
upEnvs = P.filter eEnv . P.filter (/= env) $ transEnvs
in electNewTransCoordinator transId upEnvs remCont
`CE.catch`
\(e1::SomeException) -> debugStrLn1 ("electNewTransCoordinator other ex"++show e1)
contTransWNewCoord :: TransID -> RemCont -> [EnvAddr] -> IO ()
contTransWNewCoord transId remCont upEnvs = do
debugStrLn1 (">>><<< contTransWNewCoord 1 : "++show remCont)
let remEnvs = P.filter (/= gMyEnv) upEnvs
case remCont of
End -> return ()
_ -> do
debugStrLn1 (">>><<< contTransWNewCoord 2 : "++show remCont)
robustMapM_ (flip remPutMsg (RemContTrans transId remCont)) remEnvs
robustMapM_ (flip remPutMsg (RemContTrans transId End)) remEnvs
contRemTrans transId End
validateValidIds :: ValidRemVal -> IO Bool -> IO Bool
validateValidIds (_, (Nothing, _)) isValid = isValid
validateValidIds (tId, (Just (versionId, _), _)) isValid =
validateTVarFromId (tId, versionId) >>+ isValid
contWriteTVar :: ValidRemVal -> IO ()
contWriteTVar (_, (_, Nothing)) = return ()
contWriteTVar (tId, (_, Just str)) = writeTVarFromId (tId, str)
contExtWaitQ :: ValidRemVal -> IO ()
contExtWaitQ (_, (Nothing, _)) = return ()
contExtWaitQ (tId, (Just (_, rVar), _)) = extWaitQFromId (tId, rVar)
nodeReceiver :: Socket -> IO ()
nodeReceiver sock = CE.catch (do
setSocketOption sock NoDelay 1
time <- getProcessTimes
debugStrLn1 ("accept: "++show (elapsedTime time))
(h, hn, p) <- accept sock
debugStrLn4 ("### new connection: " ++ show h ++ " -> name: " ++ show hn ++ " -> port: " ++ show p)
hSetBuffering h LineBuffering
forkIO (readMsg h)
nodeReceiver sock
)(\(e::SomeException) -> debugStrLn1 ("nodeReceiver "++show e))
readMsg :: Handle -> IO ()
readMsg h = CE.catch (do
str <- hGetLine h
handleMsg h (read str)
readMsg h
)(\(e::SomeException) -> debugStrLn1 ("readMsg dropLostSocket "++show e))
handleMsg :: Handle -> STMMessage -> IO ()
handleMsg h msg =
case msg of
RemResume retryVarIds ->
mapM_ resumeFromId retryVarIds >> aStat RES
RemAddEnvToAction tVarId env ->
addEnvToTVarActions tVarId env >> aStat AEA
RemDelEnvFromAction tVarId env ->
delEnvFromTVarActions tVarId env >> aStat DEA
RemLifeCheck ->
debugStrLn2 ("RemLifeCheck: I'm alive") >> aStat LFC
RemReadTVar tId destEnv ->
readTVarFromId tId destEnv (hPutStrLn h) >> aStat RDT
RemStartTrans transId idVRVars transEnvs ->
startRemTrans transId idVRVars transEnvs (hPutStrLn h . show)
>> aStat STT
RemContTrans transId remCont ->
contRemTrans transId remCont >> aStat CTT
RemElectedNewCoord transId remCont oprtlEnvs ->
contTransWNewCoord transId remCont oprtlEnvs >> aStat ENC
data AutoLink = AutoLink {autoTrans :: Int,
autoRetry :: [RetryVar]}
deriving Show
gDefaultLink :: AutoLink
gDefaultLink = AutoLink {autoTrans = 0, autoRetry = []}
type Link = (EnvAddr, AutoLink)
gLinks :: MVar [Link]
gLinks = unsafePerformIO (newMVar [])
gTransChkIntv, gRetryChkIntv :: Int
gTransChkIntv = 1
gRetryChkIntv = 3
gMegaMuSec :: Int
gMegaMuSec = 1000000
updateAutoTrans :: (Int -> Int) -> TransID -> IO ()
updateAutoTrans f (env, _) = do
links <- takeMVar gLinks
case Data.List.partition ((== env).fst) links of
([], otherLinks) -> do
putMVar gLinks ((env, gDefaultLink{autoTrans = f (autoTrans gDefaultLink)}):otherLinks)
forkIO (lifeCheck env)
return ()
([(e, link)], otherLinks) -> do
putMVar gLinks ((e, link{autoTrans = f (autoTrans link)}):otherLinks)
_ -> putMVar gLinks links
insertRetryLinks :: EnvAddr -> [ValidRemVal] -> IO ()
insertRetryLinks env idVRs = do
links <- takeMVar gLinks
case Data.List.partition ((== env).fst) links of
([], otherLinks) -> do
putMVar gLinks ((env,retrylink gDefaultLink):otherLinks)
forkIO (lifeCheck env)
return ()
([(e, link)], otherLinks) -> do
putMVar gLinks ((e, retrylink link):otherLinks)
_ -> putMVar gLinks links
where retrylink l = foldr insertRetryLink l idVRs
insertRetryLink :: ValidRemVal -> AutoLink -> AutoLink
insertRetryLink (_, (Just (_, rVar), _)) link@AutoLink{autoRetry=rVars} =
link{autoRetry=rVar:rVars}
insertRetryLink _ link = link
deleteRetryLinks :: EnvAddr -> [ValidRemVal] -> IO ()
deleteRetryLinks env idVRs = do
links <- takeMVar gLinks
case Data.List.partition ((== env).fst) links of
([], otherLinks) -> do
putMVar gLinks otherLinks
([(e, link)], otherLinks) ->
putMVar gLinks ((e, foldr deleteRetryLink link idVRs):otherLinks)
_ -> putMVar gLinks links
deleteRetryLink :: ValidRemVal -> AutoLink -> AutoLink
deleteRetryLink (_, (Just (_, rVar), _)) link@AutoLink{autoRetry=rVars} =
link{autoRetry = P.filter (/= rVar) rVars}
deleteRetryLink _ link = link
lifeCheck :: EnvAddr -> IO ()
lifeCheck env = CE.catch (do
links <- readMVar gLinks
case P.lookup env links of
Nothing -> do
debugStrLn9 ("lifeCheck Nothing: "++show env)
return ()
Just link | autoTrans link > 0 -> do
debugStrLn9 ("lifeCheck trans "++show env)
ping
threadDelay (gMegaMuSec * gTransChkIntv)
lifeCheck env
| autoRetry link /= [] -> do
debugStrLn9 ("lifeCheck retry "++show env)
ping
threadDelay (gMegaMuSec * gRetryChkIntv)
lifeCheck env
| otherwise -> do
ping
debugStrLn9 ("lifeCheck -------: "++show env)
deleteLink
)(\(e::SomeException) -> do
debugStrLn9 ("!!! lifeCheck recovery" ++ show e)
aStat DRP
recoverBrokenReactiveTrans env
recoverBrokenInactiveTrans env
deleteLink
)
where
ping = remPutMsg env RemLifeCheck
deleteLink = modifyMVar_ gLinks (return.P.filter ((/= env).fst))
recoverBrokenReactiveTrans :: EnvAddr -> IO ()
recoverBrokenReactiveTrans env = do
debugStrLn1 ("<<< !!! recoverBrokenReactiveTrans: "++show env)
conts <- readMVar gDistTransCont
let brokenRemTrans = P.filter ((== env) . fst . fst) conts
mapM_ ((flip writeChan Err).snd) brokenRemTrans
debugStrLn1 ("<<< recoverBrokenReactiveTrans "++show brokenRemTrans)
recoverBrokenInactiveTrans :: EnvAddr -> IO ()
recoverBrokenInactiveTrans env = do
debugStrLn1 ("<<< !!! recoverBrokenInactiveTrans env: "++show env)
links <- readMVar gLinks
case P.lookup env links of
Nothing -> return ()
Just AutoLink{autoRetry=retryVars} -> do
debugStrLn1 ("<<< recoverBrokenInactiveTrans retryVars: "++show retryVars)
mapM_ coreResume retryVars
coreResume :: RetryVar -> IO ()
coreResume (RetryVar retryMVar _) = resumeRetryVarAct retryMVar
coreResume (LinkRetryVar (VarLink env retVarId)) = CE.catch (
remPutMsg env (RemResume [retVarId])
) (\(_::SomeException) -> return ())
startDist :: IO ()
-> IO ()
startDist nodeMain = CE.catch (do
serverPid <- startNameService
threadDelay gMegaMuSec
installHandler sigPIPE Ignore Nothing
time <- getProcessTimes
debugStrLn1 ("listenOn: "++show (elapsedTime time))
seq gMyEnv (forkIO (nodeReceiver gMySocket))
nodeMain
swapMVar gLinks []
performGC
terminateProcess serverPid
debugStrLn1 ("terminate NameService")
)(propagateEx "startDist")
startNameService :: IO ProcessHandle
startNameService = do
debugStrLn1 ("startNameService")
runCommand "NameServer"