{-# LANGUAGE RecordWildCards, DeriveDataTypeable, ExistentialQuantification #-} module Control.TSession ( atomicTransactionPart , createTransaction , readVal , safeLoadVal , writeVal , setStatus , getStatus , getReadSet , commit , finishSession , TouchedValue(..) , TouchedValues(..) , TransactionBase(..) , Trans(..) , TSessionId , TSessionIdGenerator(..) ) where import Control.Monad.State import Control.Exception (catch, Exception, SomeException, throw) import Prelude hiding (catch) import Control.Concurrent (forkIO, threadDelay) import Control.Concurrent.MVar import Control.Applicative ((<$>)) import Data.Maybe import Data.Typeable (Typeable) import Data.Foldable (foldlM) import Data.Time import Data.IORef (IORef, newIORef, readIORef, atomicModifyIORef) import qualified Data.Map.Lazy as Map type TSessionId = Integer type Timeout = UTCTime data TransactionBase k v status = TransactionBase { _trBaseName :: String , _trBaseLoadFun :: k -> IO (Maybe v) , _trBaseSaveFun :: k -> v -> IO () , _trBaseTouchedValues :: MVar (Map.Map k [TouchedValue v status]) , _tsessions :: TSessions k v status , _tsessionIdGenerator :: TSessionIdGenerator , _tsessioninitStatus :: status , _tsessionTimeout :: NominalDiffTime , _tsessionTimeoutIndex :: IORef (Map.Map TSessionId Timeout) } type TSessions k v status = MVar (Map.Map TSessionId (TSession k v status)) newtype TSession k v status = TSession { _tsessionVar :: MVar (TSessionVar k v status) } data TSessionVar k v status = TSessionVar { _tsessionVarTimeout :: Timeout , _tsessionVarReadSet :: Map.Map k (TouchedValue v status) , _tsessionVarWriteSet :: Map.Map k (TouchedValue v status) , _tsessionVarStatus :: status } data TSessionState k v status = TSessionState { _tsessionBase :: TransactionBase k v status , _tsessionId :: TSessionId , _tsessionReadSet :: Map.Map k (TouchedValue v status) , _tsessionWriteSet :: Map.Map k (TouchedValue v status) , _tsessionCommit :: Bool , _tsessionFinish :: Bool , _tsessionStatus :: status , _trBaseTouchedValuesCopy :: Map.Map k [TouchedValue v status] } data TSessionIdGenerator = forall g. TSessionIdGenerator { _tsessionIdGeneratorRef :: IORef g , _tsessionIdGeneratorGenFun :: g -> (TSessionId, g) } data TouchedValues v status = TouchedValues { _touchedValuesAll :: [TouchedValue v status] , _touchedValuesAfterLastLoad :: [TouchedValue v status] , _touchedValuesWithoutMe :: [TouchedValue v status] } deriving Show data TouchedValue v status = ValueCommitted TSessionId v | ValueRead TSessionId v status | ValueWritten TSessionId v status deriving (Show, Ord, Eq) getVal :: TouchedValue v status -> v getVal (ValueCommitted _ v) = v getVal (ValueRead _ v _) = v getVal (ValueWritten _ v _) = v getId :: TouchedValue v status -> TSessionId getId (ValueCommitted id _) = id getId (ValueRead id _ _) = id getId (ValueWritten id _ _) = id manageTouchedValues :: Ord k => Map.Map k [TouchedValue v status] -> TSessionId -> Map.Map k (TouchedValue v status) -> Map.Map k [TouchedValue v status] manageTouchedValues touchedVals tId touchedValuesToMerge = Map.foldrWithKey f touchedVals touchedValuesToMerge where f :: Ord k => k -> TouchedValue v status -> Map.Map k [TouchedValue v status] -> Map.Map k [TouchedValue v status] f k touchedValToUpdate touchedVals = Map.alter g k touchedVals where -- g :: Maybe [TouchedValue v status] -> Maybe [TouchedValue v status] g Nothing = Just [touchedValToUpdate] g (Just touchedVals) = Just $ touchedValToUpdate:(filter touchedValWithId touchedVals) touchedValWithId :: TouchedValue v status -> Bool touchedValWithId (ValueRead tId' _ _) = tId' /= tId touchedValWithId (ValueWritten tId' _ _) = tId' /= tId touchedValWithId _ = True createTransaction :: Ord k => String -- ^ TransactionBase name -> (g -> (TSessionId, g), g) -- ^ Tuple of ID generation function -- and generator state -> status -- ^ Initial status of new tsession -> (k -> IO (Maybe v)) -- ^ Function for loading from database -> (k -> v -> IO ()) -- ^ Function for storing to database -> NominalDiffTime -- ^ Timeout in seconds -> IO (TransactionBase k v status) createTransaction name idFun_initransactionIdState iStatus loadFun saveFun timeout = do let (idFun, initransactionIdState) = idFun_initransactionIdState idRef <- newIORef initransactionIdState let idGenerator = TSessionIdGenerator idRef idFun statusRef <- newMVar Map.empty touchedValsRef <- newMVar Map.empty timeoutIndex <- newIORef Map.empty let tr = TransactionBase { _trBaseName = name , _tsessionIdGenerator = idGenerator , _tsessions = statusRef , _tsessioninitStatus = iStatus , _trBaseLoadFun = loadFun , _trBaseSaveFun = saveFun , _tsessionTimeout = timeout , _trBaseTouchedValues = touchedValsRef , _tsessionTimeoutIndex = timeoutIndex } forkIO $ timeouthandler tr return tr -- The STM monad itself -- newtype STM a = STM {runSTM :: StateT StmState IO (STMResult a)} data TransactionResult a = Success a | Exception TransactionException data TransactionException = TransactionException String deriving (Show, Eq, Typeable) instance Exception TransactionException newtype Trans k v status a = Trans { runTransactionPart :: StateT (TSessionState k v status) IO (TransactionResult a) } instance Monad (Trans k v status) where return = Trans . return . Success x >>= f = Trans $ do res <- runTransactionPart x case res of Success a -> runTransactionPart $ f a Exception e -> return $ Exception e readVal :: Ord k => k -> Trans k v status (Maybe v, TouchedValues v status) readVal k = Trans $ do state@TSessionState{..} <- get let lookup_touchedVals = fromMaybe [] $ Map.lookup k _trBaseTouchedValuesCopy let tuchedVals = TouchedValues { _touchedValuesAll = lookup_touchedVals , _touchedValuesAfterLastLoad = takeWhile filterTId lookup_touchedVals , _touchedValuesWithoutMe = filter filterTId lookup_touchedVals } where filterTId :: TouchedValue v status -> Bool filterTId tVal = (getId tVal) /= _tsessionId -- first look in write set: case Map.lookup k _tsessionWriteSet of Just (ValueWritten _ v _) -> do put $ state { _tsessionReadSet = Map.insert k (ValueRead _tsessionId v _tsessionStatus) _tsessionReadSet } return $ Success (Just v, tuchedVals) Nothing -> do -- then look in read set: case Map.lookup k _tsessionReadSet of Just (ValueRead _ v _) -> do return $ Success (Just v, tuchedVals) _ -> do -- then look in touched vals: case lookupValueCommitted lookup_touchedVals of Just v -> do put $ state { _tsessionReadSet = Map.insert k (ValueRead _tsessionId v _tsessionStatus) _tsessionReadSet } return $ Success (Just v, tuchedVals) Nothing -> do -- finally load from DB: let tr@TransactionBase{..} = _tsessionBase vM <- liftIO $ _trBaseLoadFun k case vM of Just v -> do put $ state { _tsessionReadSet = Map.insert k (ValueRead _tsessionId v _tsessionStatus) _tsessionReadSet } return $ Success (Just v, tuchedVals) Nothing -> return $ Success (Nothing, tuchedVals) lookupValueCommitted :: [TouchedValue v status] -> Maybe v lookupValueCommitted [] = Nothing lookupValueCommitted ((ValueCommitted _ v):_) = Just v lookupValueCommitted (_:xs) = lookupValueCommitted xs safeLoadVal :: Ord k => k -> Trans k v status (Maybe v) safeLoadVal k = Trans $ do state@TSessionState{..} <- get let lookup_touchedVals = fromMaybe [] $ Map.lookup k _trBaseTouchedValuesCopy case lookupValueCommitted lookup_touchedVals of Just v -> do put $ state { _tsessionReadSet = Map.insert k (ValueRead _tsessionId v _tsessionStatus) _tsessionReadSet } return $ Success $ Just v Nothing -> do -- finally load from DB: let tr@TransactionBase{..} = _tsessionBase vM <- liftIO $ _trBaseLoadFun k case vM of Just v -> do put $ state { _tsessionReadSet = Map.insert k (ValueRead _tsessionId v _tsessionStatus) _tsessionReadSet } return $ Success $ Just v Nothing -> return $ Success Nothing writeVal :: Ord k => k -> v -> Trans k v status () writeVal k v = Trans $ do state@TSessionState{..} <- get put $ state { _tsessionWriteSet = Map.insert k (ValueWritten _tsessionId v _tsessionStatus) _tsessionWriteSet } return $ Success () setStatus :: Eq status => status -> Trans k v status () setStatus status = Trans $ do state@TSessionState{..} <- get when (_tsessionStatus /= status) $ do let transactionReadSet = Map.map (\(ValueRead id v _) -> (ValueRead id v status)) _tsessionReadSet let transactionWriteSet = Map.map (\(ValueWritten id v _) -> (ValueWritten id v status)) _tsessionWriteSet -- _tsessionReadSet :: Map.Map k (TouchedValue v status) put $ state { _tsessionStatus = status , _tsessionWriteSet = transactionWriteSet , _tsessionReadSet = transactionReadSet } return $ Success () getStatus :: Trans k v status status getStatus = Trans $ do state@TSessionState{..} <- get return $ Success _tsessionStatus commit :: Trans k v status () commit = Trans $ do state <- get -- liftIO $ putStrLn "Commit called" put $ state { _tsessionCommit = True } return $ Success () finishSession :: Trans k v status () finishSession = Trans $ do state <- get put $ state { _tsessionFinish = True } return $ Success () getReadSet :: Ord k => Trans k v status (Map.Map k [TouchedValue v status]) getReadSet = Trans $ do state@TSessionState{..} <- get let a = Map.foldrWithKey f Map.empty _tsessionReadSet where -- f :: Ord k -- => k -- -> (TouchedValue v status) -- -> Map.Map k [TouchedValue v status] -- -> Map.Map k [TouchedValue v status] f k _ readSetMap = let lookup_touchedVals = fromMaybe [] $ Map.lookup k _trBaseTouchedValuesCopy in let touchedReadValsAfterMyLastCommit = filter filterReadVal {- $ takeWhile filterTId -} lookup_touchedVals in Map.insert k touchedReadValsAfterMyLastCommit readSetMap filterTId :: TouchedValue v status -> Bool filterTId (ValueRead tId _ _) = tId /= _tsessionId filterTId (ValueWritten tId _ _) = tId /= _tsessionId filterTId _ = True filterReadVal :: TouchedValue v status -> Bool filterReadVal (ValueRead _ _ _) = False filterReadVal _ = True return $ Success a atomicTransactionPart :: (Ord k, Eq v, MonadIO m) => (TransactionBase k v status -> m TSessionId) -> TransactionBase k v status -> Trans k v status a -> m a atomicTransactionPart cookieFun tr@TransactionBase{..} code = do tId <- cookieFun tr --liftIO $ print tId tSessions <- liftIO $ readMVar _tsessions let tsM = findSession tId tSessions case tsM of Nothing -> do -- status was deleted after timeout or new transaction tSessions <- liftIO $ takeMVar _tsessions -- search again with lock: let pageVarM = findSession tId tSessions case pageVarM of Nothing -> do -- create new transaction instance: trVar <- liftIO $ newEmptyMVar let tSession = TSession trVar liftIO $ putMVar _tsessions $ Map.insert tId tSession tSessions globalTouchedVals <- liftIO $ readMVar _trBaseTouchedValues let state = TSessionState tr tId Map.empty Map.empty False False _tsessioninitStatus globalTouchedVals runcode tSession Map.empty Map.empty _tsessioninitStatus tId state Just _ -> do -- fallback liftIO $ putMVar _tsessions tSessions atomicTransactionPart cookieFun tr code Just tSession@TSession{..} -> do _trVar@TSessionVar{..} <- liftIO $ takeMVar _tsessionVar globalTouchedVals <- liftIO $ readMVar _trBaseTouchedValues let state = TSessionState tr tId _tsessionVarReadSet _tsessionVarWriteSet False False _tsessionVarStatus globalTouchedVals runcode tSession _tsessionVarReadSet _tsessionVarWriteSet _tsessionVarStatus tId state where -- _tsessionVar already taken ! runcode _ts@TSession{..} oldReadSet oldWriteSet oldStatus tId state = do (transactionResult, _state'@TSessionState{..}) <- liftIO $ catch (runStateT (runTransactionPart code) state) (\e -> return (Exception (TransactionException (show (e :: SomeException))), state)) timeout <- liftIO $ addUTCTime _tsessionTimeout <$> getCurrentTime liftIO $ updateTimeout tr tId (Just timeout) case transactionResult of Exception e -> do liftIO $ putMVar _tsessionVar $ TSessionVar timeout oldReadSet oldWriteSet oldStatus Control.Exception.throw e Success a -> do -- check if values from read-set are saved in the mean time: let newValsInReadSet = Map.difference _tsessionReadSet oldReadSet -- liftIO $ print newValsInReadSet -- lookupValueCommitted -- when ((not.Map.null) newValsInReadSet) $ -- liftIO $ threadDelay $ 3 * 1000 * 1000 globalTouchedVals <- liftIO $ takeMVar _trBaseTouchedValues -- liftIO $ putStrLn $ "newValsInReadSet: " ++ show newValsInReadSet -- check if values from read-set are saved in the mean time: let invalid = or $ map f (Map.toList newValsInReadSet) where -- f :: (k, TouchedValue v status) -> Bool f (k, touchedValRS) = case Map.lookup k globalTouchedVals of Nothing -> False Just touchedVal -> case lookupValueCommitted touchedVal of Nothing -> False Just v -> getVal touchedValRS /= v if invalid then do -- rollback liftIO $ putMVar _trBaseTouchedValues globalTouchedVals liftIO $ putMVar _tsessionVar $ TSessionVar timeout oldReadSet oldWriteSet oldStatus atomicTransactionPart cookieFun tr code else do -- not invalid if _tsessionCommit then do -- write write-set to data base: liftIO $ Map.traverseWithKey (\k (ValueWritten _ v _) -> _trBaseSaveFun k v) _tsessionWriteSet let globalTouchedVals' = Map.foldrWithKey turnWrittenToComitted globalTouchedVals _tsessionWriteSet liftIO $ putMVar _trBaseTouchedValues globalTouchedVals' else do -- no commit let unionReadWriteSet = -- if val in write and read set, use val from -- write set (argument order of union): Map.union _tsessionWriteSet _tsessionReadSet let globalTouchedVals' = manageTouchedValues globalTouchedVals tId unionReadWriteSet liftIO $ putMVar _trBaseTouchedValues globalTouchedVals' liftIO $ putMVar _tsessionVar $ TSessionVar timeout _tsessionReadSet _tsessionWriteSet _tsessionStatus when _tsessionFinish $ liftIO $ removeSessions tr Nothing [_tsessionId] return a where filterValueWritten :: TouchedValue v status -> Bool filterValueWritten (ValueWritten _ _ _) = False filterValueWritten _ = True turnWrittenToComitted :: Ord k => k -> TouchedValue v status -> Map.Map k [TouchedValue v status] -> Map.Map k [TouchedValue v status] turnWrittenToComitted k touchedValToUpdate touchedVals = Map.alter (g touchedValToUpdate) k touchedVals where g _ Nothing = Nothing -- if tuchedVal == ValueWritten, insert a corresponding ValueCommitted to -- list and filter out all vals with my id and the ValueCommitted g (ValueWritten id v _status) (Just touchedVals) = Just $ (ValueCommitted id v) : filter touchedValWithIdAndCommitted touchedVals g _ (Just touchedVals) = Just touchedVals touchedValWithIdAndCommitted :: TouchedValue v status -> Bool touchedValWithIdAndCommitted (ValueCommitted _ _) = False touchedValWithIdAndCommitted _ = True timeouthandler :: Ord k => TransactionBase k v status -> IO () timeouthandler tr@TransactionBase{..} = do threadDelay $ 1000 * 1000 * (round _tsessionTimeout ) `div` 10 timeoutIndex <- readIORef _tsessionTimeoutIndex now <- getCurrentTime -- find all timedout IDs: let timedOutSessionIds = Map.foldlWithKey (f now) [] timeoutIndex where -- foldlWithKey :: (a -> k -> b -> a) -> a -> Map k b -> a f :: Timeout -> [TSessionId] -> TSessionId -> Timeout -> [TSessionId] f now ids id timeout = if round (diffUTCTime timeout now) <= 0 then id:ids else ids when ((not.null) timedOutSessionIds) $ do -- putStrLn $ "Timed out IDs: " ++ show timedOutSessionIds removeSessions tr (Just now) timedOutSessionIds timeouthandler tr removeSessions :: Ord k => TransactionBase k v status -> Maybe UTCTime -> [TSessionId] -> IO () removeSessions tr@TransactionBase{..} nowM trIdsToRemove = do tSessions <- takeMVar _tsessions -- delete given Instances and collect TouchedValues that are to delete: (touchedValsToDelete, tSessions') <- foldlM (removeSessionAndFindTVtoDel nowM) ([], tSessions) trIdsToRemove putMVar _tsessions tSessions' -- when ((not.null) touchedValsToDelete) $ -- putStrLn $ "TouchedVals to delete: " ++ show touchedValsToDelete -- :: Map.Map k (TouchedValue v status) touchedVals <- takeMVar _trBaseTouchedValues let touchedVals' = cleanupTouchedValues touchedVals touchedValsToDelete putMVar _trBaseTouchedValues touchedVals' -- when ((not.Map.null) touchedVals') $ -- putStrLn $ "New touched vals: " ++ show touchedVals' -- :: Map.Map k (TouchedValue v status) where removeSessionAndFindTVtoDel :: Ord k => Maybe UTCTime -> ([(k, TouchedValue v status)], Map.Map TSessionId (TSession k v status)) -> TSessionId -> IO ([(k, TouchedValue v status)], Map.Map TSessionId (TSession k v status)) removeSessionAndFindTVtoDel nowM (touchedValsToDelete, tSessions) tId = do let tSessionM = Map.lookup tId tSessions case tSessionM of Just tSession@TSession{..}-> do -- try to take MVar. If taken, don't delete. -- if not taken, check timeout again and delete. tSessionVarM <- tryTakeMVar _tsessionVar case tSessionVarM of Just tSessionVar@TSessionVar{..} -> do if nowM == Nothing || round (diffUTCTime _tsessionVarTimeout (fromJust nowM)) <= 0 then do -- delete entry in Timeout Index: updateTimeout tr tId Nothing -- putStrLn $ "timeout: " ++ show tId -- look up and delete transaction instance: let (tSessionM, tSessions') = Map.updateLookupWithKey (\k v -> Nothing) tId tSessions let touchedValsToDelete' = case tSessionM of -- if instance available, collect read and write -- set for later deletion in touched values: Just tSession@TSession{..} -> Map.toList $ Map.union _tsessionVarReadSet _tsessionVarWriteSet Nothing -> [] return ( touchedValsToDelete' ++ touchedValsToDelete , tSessions' ) else return (touchedValsToDelete, tSessions) Nothing -> return (touchedValsToDelete, tSessions) Nothing -> return (touchedValsToDelete, tSessions) cleanupTouchedValues :: Ord k => Map.Map k [TouchedValue v status] -- ^ global Touched Values (input) -> [(k, TouchedValue v status)] -- ^ list of Touched Vals to remove -> Map.Map k [TouchedValue v status] -- ^ global Touched Values (output) cleanupTouchedValues touchedVals touchedValsToDelete = foldl cleanupTouchedValue touchedVals touchedValsToDelete cleanupTouchedValue :: Ord k => Map.Map k [TouchedValue v status] -- ^ global Touched Values (input) -> (k, TouchedValue v status) -- ^ Pair of key and TV to remove -> Map.Map k [TouchedValue v status] -- ^ global Touched Values (output) cleanupTouchedValue touchedVals (k, touchedVal) = Map.alter (delIdInList (getId touchedVal)) k touchedVals delIdInList :: TSessionId -- ^ ID to remove -> Maybe [TouchedValue v status] -- ^ Touched Value List (input) -> Maybe [TouchedValue v status] -- ^ Touched Value List (output) delIdInList _ Nothing = Nothing delIdInList tId (Just valList) = let newValList = filter filterTId valList in if isEmptyOrValueCommitted newValList then Nothing else Just newValList where filterTId (ValueRead tId' _ _) = tId /= tId' filterTId (ValueWritten tId' _ _) = tId /= tId' filterTId _ = True isEmptyOrValueCommitted [] = True isEmptyOrValueCommitted [(ValueCommitted _ _)] = True isEmptyOrValueCommitted _ = False updateTimeout :: TransactionBase k v status -> TSessionId -> Maybe Timeout -> IO () updateTimeout _tr@TransactionBase{..} tId timeout = do atomicModifyIORef _tsessionTimeoutIndex (\x -> (Map.alter (\_ -> timeout) tId x, ())) -- [(TSessionId, Timeout, Messagebox k v, MVar ([(k, v)], status))] findSession :: TSessionId -> Map.Map TSessionId (TSession k v status) -> Maybe (TSession k v status) findSession = Map.lookup