module Control.Distributed.STM.RetryVar
(RetryLog, RetryVar (RetryVar, LinkRetryVar), newRetryVar,
suspend, resumeRetryVarAct, resumeFromId, bundledRetryLogs,
insertRetryVarAction, deleteRetryVarAction,
printGRetryVarActionMap, remPutRetryEnvLine) where
import Control.Concurrent
import qualified Control.Exception as CE (catch)
import qualified Data.Map as M hiding (map)
import Control.Distributed.STM.DebugBase
import Control.Distributed.STM.EnvAddr
import qualified Control.Distributed.STM.Utils as U
import Prelude as P
import System.IO.Unsafe
data RetryVar = RetryVar (MVar ())
VarID
| LinkRetryVar VarLink
deriving Eq
instance Show RetryVar where
show (RetryVar _ tId) = show (LinkRetryVar (VarLink gMyEnv tId))
show (LinkRetryVar v) = "(" ++ show v ++ ")"
instance Read RetryVar where
readsPrec i str = map (\(x,s) -> (LinkRetryVar x,s)) (readsPrec i str)
newRetryVar :: IO RetryVar
newRetryVar = do
retryMVar <- newEmptyMVar
newID <- uniqueId
return (RetryVar retryMVar newID)
suspend :: RetryVar -> IO ()
suspend (RetryVar retryMVar _) = takeMVar retryMVar
suspend (LinkRetryVar _) = return ()
resumeRetryVarAct :: MVar () -> IO ()
resumeRetryVarAct retryMVar = tryPutMVar retryMVar () >> return ()
type RetryLog = (EnvAddr, RetryLogBundle)
type RetryLogBundle = Either (IO ()) [VarID]
bundledRetryLogs :: RetryVar -> [RetryLog] -> [RetryLog]
bundledRetryLogs retryVar queue =
U.insertWith updRetryLog (retryVarEnv retryVar) singletonRLog queue
where singletonRLog = singletonRetryLog retryVar
singletonRetryLog :: RetryVar -> RetryLogBundle
singletonRetryLog (RetryVar retryMVar _) = Left (resumeRetryVarAct retryMVar)
singletonRetryLog (LinkRetryVar (VarLink env retryVarId))
| env == gMyEnv = Left (resumeFromId retryVarId)
| otherwise = Right [retryVarId]
updRetryLog :: RetryLogBundle -> RetryLogBundle -> RetryLogBundle
updRetryLog (Left resume) (Left resumes) = Left (resumes >> resume)
updRetryLog (Right [rVarId]) (Right rVarIds) = Right (rVarId:rVarIds)
updRetryLog _ y = y
retryVarEnv :: RetryVar -> EnvAddr
retryVarEnv (RetryVar _ _) = gMyEnv
retryVarEnv (LinkRetryVar (VarLink env _)) = env
type RetryVarAction = IO ()
gRetryVarActionMap :: MVar (M.Map VarID RetryVarAction)
gRetryVarActionMap = unsafePerformIO (newMVar M.empty)
printGRetryVarActionMap :: VarID -> IO ()
printGRetryVarActionMap tVarId = do
gMap <- readMVar gRetryVarActionMap
case M.lookup tVarId gMap of
Nothing -> debugStrLn2 ("tVarID non existing: " ++ show tVarId)
Just _ -> debugStrLn2 ("tVarID existing " ++ show tVarId)
debugStrLn2 ("---")
insertRetryVarAction :: RetryVar -> IO ()
insertRetryVarAction (RetryVar retryMVar retryVarId) =
modifyMVar_ gRetryVarActionMap $
return . M.insertWith (flip const) retryVarId (resumeRetryVarAct retryMVar)
insertRetryVarAction (LinkRetryVar _) = return ()
deleteRetryVarAction :: RetryVar -> IO ()
deleteRetryVarAction (RetryVar _ retryVarId) =
modifyMVar_ gRetryVarActionMap $ return . M.delete retryVarId
deleteRetryVarAction (LinkRetryVar _) = return ()
resumeFromId :: VarID -> IO ()
resumeFromId retryVarId = do
gMap <- readMVar gRetryVarActionMap
case M.lookup retryVarId gMap of
Just act -> act
Nothing -> return ()
remPutRetryEnvLine :: Show a => EnvAddr -> a -> IO ()
remPutRetryEnvLine env msg = CE.catch (do
h <- connectToRetryEnv env
sendTCP msg h
)(propagateEx "remPutRetryEnvLine")