module Control.Distributed.STM.TVar (TVar (TVar, LinkTVar),
tVarToLink, tVarEnv, isDistErrTVar,
ValidLog, CommitLog,
lockAct, unLockAct, validAct, extWaitQAct, commitAct, notifyAct,
CommitRemVal, ValidRemVal, MaybeRead,
TransID, RemCont (Com, Ret, End, Err),
STMMessage (RemResume,RemAddEnvToAction,RemDelEnvFromAction
,RemLifeCheck,RemReadTVar,RemStartTrans,RemContTrans
,RemElectedNewCoord),
remGetMsg, remPutMsg, bundledValidLogs, bundledCommitLogs,
addEnvToTVarActions, delEnvFromTVarActions,
lockTVarFromId, unLockTVarFromId, readTVarFromId, writeTVarFromId,
validateTVarFromId, notifyFromId, extWaitQFromId,
gTVarActionMap,remLock,remValidate,
readTVarValFromId, swapTVarValFromId) where
import Control.Concurrent
import qualified Control.Exception as CE (catch,throw,SomeException)
import Control.Monad
import qualified Data.List
import qualified Data.Map as DM hiding (map)
import Control.Distributed.STM.DebugBase
import Control.Distributed.STM.Dist
import Control.Distributed.STM.EnvAddr
import Control.Distributed.STM.Utils hiding (insertWith)
import qualified Control.Distributed.STM.Utils as U
import Prelude as P hiding (catch, putStr, putStrLn)
import Control.Distributed.STM.RetryVar
import System.Mem.Weak
import System.IO
import System.IO.Unsafe
data TVar a = TVar (MVar (a,VersionID))
VarID
(MVar ())
(MVar [RetryLog])
| LinkTVar VarLink
instance Show (TVar a) where
show (TVar _ tId _ _) = show (LinkTVar (VarLink gMyEnv tId))
show (LinkTVar v) = "(" ++ show v ++ ")"
instance Read (TVar a) where
readsPrec i str = map (\(x,s) -> (LinkTVar x,s)) (readsPrec i str)
tVarEnv :: TVar a -> EnvAddr
tVarEnv (TVar _ _ _ _) = gMyEnv
tVarEnv (LinkTVar (VarLink env _)) = env
tVarToLink :: TVar a -> VarLink
tVarToLink (TVar _ tId _ _) = VarLink gMyEnv tId
tVarToLink (LinkTVar link) = link
type ValidLog = (EnvAddr, ValidLogBundle)
type ValidLogBundle = Either ([(VarID, LockActs)], ValidActs)
[ValidRemVal]
data LockActs = LockActs {
lockAct :: IO (),
unLockAct :: IO ()
} deriving Show
data ValidActs = ValidActs {
validAct :: IO Bool,
extWaitQAct :: IO ()
} deriving Show
type ValidRemVal = (VarID, MaybeReadWrite)
type MaybeReadWrite = (MaybeRead, Maybe String)
type MaybeRead = Maybe (VersionID, RetryVar)
bundledValidLogs :: TVar a -> MaybeRead -> [ValidLog] -> [ValidLog]
bundledValidLogs tVar versIdRetryVar validLog =
U.insertWith updValidLog tVEnv singletonVLog validLog
where singletonVLog = singletonValidLog tVar versIdRetryVar
tVEnv = tVarEnv tVar
singletonValidLog :: TVar a -> MaybeRead -> ValidLogBundle
singletonValidLog (TVar ref tId lock waitQ) (Just (versionId, retryVar)) = Left
([(tId, LockActs {lockAct = takeMVar lock,
unLockAct = putMVar lock () })],
ValidActs {validAct = readMVar ref >>= return . (== versionId) . snd,
extWaitQAct = modifyMVar_ waitQ insertRetry })
where insertRetry = return . (bundledRetryLogs retryVar)
singletonValidLog (TVar _ tId lock _) Nothing = Left
([(tId, LockActs {lockAct = takeMVar lock,
unLockAct = putMVar lock () })],
ValidActs {validAct = return True, extWaitQAct = return ()})
singletonValidLog (LinkTVar (VarLink env tId)) (Just (versionId, retryVar))
| env == gMyEnv = Left
([(tId, LockActs{lockAct = lockTVarFromId tId,
unLockAct = unLockTVarFromId tId })],
ValidActs {validAct = validateTVarFromId (tId, versionId),
extWaitQAct = extWaitQFromId (tId, retryVar)})
| otherwise = Right [(tId, (Just (versionId, retryVar), Nothing))]
singletonValidLog (LinkTVar (VarLink env tId)) Nothing
| env == gMyEnv = Left
([(tId, LockActs {lockAct = lockTVarFromId tId,
unLockAct = unLockTVarFromId tId })],
ValidActs {validAct = return True, extWaitQAct = return ()})
| otherwise = Right [(tId, (Nothing, Nothing))]
updValidLog :: ValidLogBundle -> ValidLogBundle -> ValidLogBundle
updValidLog (Left ([(tId, lock)], val)) (Left (locks, vals)) = Left
(U.insertWith const tId lock locks,
ValidActs {validAct = validAct val >>+ validAct vals,
extWaitQAct = extWaitQAct val >> extWaitQAct vals })
updValidLog (Right [(tId, rwVal)]) (Right rwVals) = Right
(U.insertWith mergeRVal tId rwVal rwVals)
updValidLog _ y = y
mergeRVal :: MaybeReadWrite -> MaybeReadWrite -> MaybeReadWrite
mergeRVal (Nothing, _) v = v
mergeRVal (Just r, w) _ = (Just r, w)
type CommitLog = (EnvAddr, CommitLogBundle)
type CommitLogBundle = Either [(VarID, CommitActs)] ([CommitRemVal], RegIO)
data CommitActs = CommitActs {
commitAct :: IO (),
notifyAct :: IO ()
} deriving Show
type CommitRemVal = (VarID, String)
type RegIO = IO ()
bundledCommitLogs :: Dist a => TVar a -> a -> [CommitLog] -> [CommitLog]
bundledCommitLogs tVar value commitLog =
U.insertWith updCommit tVEnv singletonCLog commitLog
where singletonCLog = singletonCommitLog tVar value
tVEnv = tVarEnv tVar
singletonCommitLog :: Dist a => TVar a -> a -> CommitLogBundle
singletonCommitLog (TVar ref tId _ waitQ) value = Left
[(tId, CommitActs{commitAct= modifyMVar_ ref incVersId,
notifyAct= swapMVar waitQ [] >>= mapM_ coreNotify })]
where incVersId =
return . (,)value . (+1) . snd
singletonCommitLog (LinkTVar (VarLink env tId)) value
| env == gMyEnv =
Left [(tId, CommitActs{commitAct = writeTVarFromId (tId, show value),
notifyAct = notifyFromId tId
})]
| otherwise = Right ([(tId, show value)], regTVars env value)
updCommit :: CommitLogBundle -> CommitLogBundle -> CommitLogBundle
updCommit (Left [(tId, comNfyAct)]) (Left comNfys) =
Left (U.insertWith const tId comNfyAct comNfys)
updCommit (Right ([(tId, val)], reg)) (Right (idStrs, regs)) =
Right (U.insertWith const tId val idStrs, regs >> reg)
updCommit _ y = y
data TVarActions = TVarActions {
remRead :: EnvAddr -> IO String,
remWrite :: String -> IO (),
remReadVal :: IO String,
remSwapVal :: String -> IO String,
remValidate :: VersionID -> IO String,
remLock :: IO (),
remUnLock :: IO (),
remExtWaitQ :: RetryVar -> IO (),
remNotify :: IO ()
}
gTVarActionMap :: MVar (DM.Map VarID (TVarActions, [EnvAddr]))
gTVarActionMap = unsafePerformIO (newMVar DM.empty)
insertTVarActions :: Dist a => TVar a -> EnvAddr -> IO ()
insertTVarActions (TVar tVarRef tVarId tVarLock waitQ) fstDestEnv =
let tVarActions = TVarActions {
remRead = \destEnv -> do
takeMVar tVarLock
v'@(v, _) <- readMVar tVarRef
putMVar tVarLock ()
regTVars destEnv v
debugStrLn8 ("###>>> remRead "++show v')
return (show v'),
remWrite = \str -> do
debugStrLn8 ("###>>> remWrite "++show str)
let v = read str
modifyMVar_ tVarRef (return .((,) v) .((+1) .snd))
finTVars v,
remReadVal = readMVar tVarRef >>= return . show,
remSwapVal = liftM show . swapMVar tVarRef . read,
remValidate = \versionId -> do
(_, vId) <- readMVar tVarRef
return (show (versionId == vId)),
remLock = takeMVar tVarLock,
remUnLock = putMVar tVarLock (),
remExtWaitQ = \retryVar -> modifyMVar_ waitQ
(return . (bundledRetryLogs retryVar)),
remNotify = do queue <- swapMVar waitQ []
mapM_ coreNotify queue
} in
modifyMVar_ gTVarActionMap $
return . DM.insertWith (\_ (acts, envs) -> (acts, (fstDestEnv:envs)))
tVarId (tVarActions, [fstDestEnv])
insertTVarActions (LinkTVar _) _ = return ()
coreNotify :: RetryLog -> IO ()
coreNotify (_, Left resumeAct) = resumeAct
coreNotify (env, Right retryVarIds) = do
forkIO $ CE.catch (remPutRetryEnvLine env (RemResume retryVarIds)
)(\(e::CE.SomeException) -> debugStrLn1 ("coreNotify "++show e))
return ()
addEnvToTVarActions :: VarID -> EnvAddr -> IO ()
addEnvToTVarActions tVarId env = do
modifyMVar_ gTVarActionMap
(return . (DM.insertWith (\_ (as, envs) -> (as, (env:envs)))
tVarId (error ("error addEnvToTVarActions: TVarId "++
show tVarId ++" not listed"))))
delEnvFromTVarActions :: VarID -> EnvAddr -> IO ()
delEnvFromTVarActions tVarId env = do
oldMap <- takeMVar gTVarActionMap
case DM.lookup tVarId oldMap of
Nothing -> error ("error trying to remove non-existing TVar: "++show tVarId)
Just (acts, envs) ->
putMVar gTVarActionMap
(case Data.List.delete env envs of
[] -> DM.delete tVarId oldMap
newEnvs -> DM.insert tVarId (acts, newEnvs) oldMap)
readTVarFromId :: VarID -> EnvAddr -> (String -> IO ()) -> IO ()
readTVarFromId tVarId destEnv sendReply = CE.catch (do
aMap <- readMVar gTVarActionMap
remRead (fst (aMap DM.! tVarId)) destEnv >>= sendReply
)(distInstanceError "readTVarFromId")
writeTVarFromId :: (VarID, String) -> IO ()
writeTVarFromId (tVarId, str) = CE.catch (do
aMap <- readMVar gTVarActionMap
remWrite (fst (aMap DM.! tVarId)) str
)(distInstanceError "writeTVarFromId")
readTVarValFromId :: VarID -> IO String
readTVarValFromId tVarId = CE.catch (do
aMap <- readMVar gTVarActionMap
remReadVal (fst (aMap DM.! tVarId))
)(\e -> (distInstanceError "readTVarValFromId" e) >> return "")
swapTVarValFromId :: VarID -> String -> IO String
swapTVarValFromId tVarId v' = CE.catch (do
aMap <- readMVar gTVarActionMap
remSwapVal (fst (aMap DM.! tVarId)) v'
)(\e -> (distInstanceError "swapTVarValFromId" e) >> return "")
lockTVarFromId :: VarID -> IO ()
lockTVarFromId tVarId = CE.catch (do
aMap <- readMVar gTVarActionMap
remLock (fst (aMap DM.! tVarId))
return ()
)(distInstanceError "lockTVarFromId")
unLockTVarFromId :: VarID -> IO ()
unLockTVarFromId tVarId = CE.catch (do
aMap <- readMVar gTVarActionMap
remUnLock (fst (aMap DM.! tVarId))
)(distInstanceError "unLockTVarFromId")
validateTVarFromId :: (VarID, VersionID) -> IO Bool
validateTVarFromId (tVarId, versionId) = CE.catch (do
aMap <- readMVar gTVarActionMap
answer <- remValidate (fst (aMap DM.! tVarId)) versionId
return (read answer)
)(\e -> (distInstanceError "validateTVarFromId" e) >> return False)
extWaitQFromId :: (VarID, RetryVar) -> IO ()
extWaitQFromId (tVarId, retryVar) = CE.catch (do
aMap <- readMVar gTVarActionMap
remExtWaitQ (fst (aMap DM.! tVarId)) retryVar
)(distInstanceError "extWaitQFromId")
notifyFromId :: VarID -> IO ()
notifyFromId tVarId = CE.catch (do
aMap <- readMVar gTVarActionMap
forkIO (CE.catch (remNotify (fst (aMap DM.! tVarId)))
(\(e::CE.SomeException) -> debugStrLn1 ("notifyFromId " ++ show e)))
return ()
)(distInstanceError "notifyFromId")
distInstanceError :: String -> CE.SomeException -> IO ()
distInstanceError s e = do
putStrLn ("\nDSTM Program Error in " ++ s ++
"\nPossibly missing Dist instance for TVar data type")
CE.throw e
remPutMsg :: Show a => EnvAddr -> a -> IO ()
remPutMsg env msg = CE.catch (do
h <- connectToEnvHold env
sendTCP msg h
)(propagateEx "remPutMsg")
remGetMsg :: Show a => EnvAddr -> a -> IO String
remGetMsg env msg = CE.catch (
connectAtomicToEnv env (recvTCP msg)
)(propagateEx "remGetMsg")
type TransID = (EnvAddr, STMID)
data RemCont = Com | Ret | End | Err
deriving (Show, Read, Eq)
data STMMessage = RemResume [VarID]
| RemAddEnvToAction VarID EnvAddr
| RemDelEnvFromAction VarID EnvAddr
| RemLifeCheck
| RemReadTVar VarID EnvAddr
| RemStartTrans TransID [ValidRemVal] [EnvAddr]
| RemContTrans TransID RemCont
| RemElectedNewCoord TransID RemCont [EnvAddr]
deriving (Show, Read)
instance Dist a => Dist (TVar a) where
finTVars (TVar _ _ _ _) = return ()
finTVars remTVar@(LinkTVar linkTVar) = CE.catch (do
addFinalizer remTVar (finalizeTVar linkTVar)
)(\(e::CE.SomeException) -> debugStrLn1 ("### Error finTVars: " ++ show linkTVar ++ show e))
regTVars destEnv tVar@(TVar _ _ _ _) = insertTVarActions tVar destEnv
regTVars destEnv (LinkTVar (VarLink tEnv tVarId)) = CE.catch (
remPutMsg tEnv (RemAddEnvToAction tVarId destEnv)
)(\(e::CE.SomeException) -> debugStrLn1 ("### Error regTVars: " ++ show tEnv ++ " -> destEnv: " ++ show destEnv ++ show e) )
finalizeTVar :: VarLink -> IO ()
finalizeTVar (VarLink tEnv tVarId) = do
remPutMsg tEnv (RemDelEnvFromAction tVarId gMyEnv)
isDistErrTVar :: SomeDistTVarException -> TVar a -> Bool
isDistErrTVar e tVar = distTVarExEnv e == tVarEnv tVar