{-# OPTIONS -fglasgow-exts -XUndecidableInstances #-} {- | A version of @Data.TCache@ using @TMVar@s instead of @TVars@s. See @Control.Concurrent.TMVar@ -} module Data.TCache.TMVar ( IResource(..) -- class interface to be implemented for the object by the user ,Resources(..) -- data definition used to communicate object Inserts and Deletes to the cache ,resources -- empty resources ,getTMVars -- :: (IResource a)=> [a] -- the list of resources to be retrieved -- -> IO [Maybe (TMVar a)] -- The Transactional variables ,getTMVarsIO -- :: (IResource a)=> [a] -> IO [TMVar a] ,withSTMResources -- :: (IResource a)=> [a] -- list of resources to retrieve -- -> ([Maybe a]-> Res a x) -- the function to apply that contains a Res structure -- -> STM x -- return value within the STM monad ,withResources -- :: (IResource a)=> [a] --list of resources to be retrieve -- -> ([Maybe a]-> [a]) ----function that get the retrieved resources -- -> IO () --and return a list of objects to be inserted/modified ,withResource -- :: (IResource a)=> a --same as withResources , but for one only object -- -> ([Maybe a]-> a) -- -- -> IO () -- ,getResources -- :: (IResource a)=>[a] --resources [a] are read from cache and returned -- -> IO [Maybe a] ,getResource -- :: :: (IResource a)=>a --to retrieve one object instead of a list -- -> IO [Maybe a] ,deleteResources -- :: (IResource a)=>[a]-> IO() -- delete the list of resources from cache and from persistent storage ,deleteResource -- :: (IResource a)=>a-> IO() -- delete the resource from cache and from persistent storage --cache handling ,Cache -- :: IORef (Ht a,Int, Integer) --The cache definition ,setCache -- :: Cache a -> IO() -- set the cache. this is useful for hot loaded modules that will use an existing cache ,newCache -- :: (Ht a, Integer) --newCache creates a new cache ,refcache -- :: Cache a --the reference to the cache (see data definition below) ,syncCache -- :: (IResource a) =>Cache a -> IO() --force the atomic write of all the cache objects into permanent storage --useful for termination --start the thread that clean and writes on the persistent storage trough syncCache ,clearSyncCacheProc -- :: (IResource a) =>Cache a --The cache reference -- -> Int --number of seconds betwen checks -- -> (Integer-> Integer-> Bool) --The user-defined check-for-cleanup-from-cache for each object --(when True, the object is removed from cache) -- -> Int --The max number of objects in the cache, if more, the cleanup start -- -> >IO ThreadId --Identifier of the thread created -- the default check procedure ,defaultCheck -- :: Integer -- current time in seconds -- -> Integer --last access time for a given object -- -> Integer --last cache syncronization (with the persisten storage) -- -> Bool --return true for all the elems not accesed since --half the time between now and the last sync -- auxiliary ,readFileStrict -- :: String -> IO String -- Strict file read, needed for the default file persistence ) where import GHC.Conc import Control.Concurrent.STM.TMVar import Control.Monad(when) import Data.HashTable as H import Data.IORef import System.IO.Unsafe import System.Time import Data.Maybe(catMaybes,mapMaybe) import Debug.Trace import Data.TCache.IResource import Control.Exception(handle,assert) import Data.List (nubBy,deleteFirstsBy ) debug a b= trace b a type Block a= (TMVar a,AccessTime,ModifTime) type Ht a= HashTable String (Block a) -- contains the hastable, number of items, last sync time type Cache a= IORef (Ht a, Integer) data CheckBlockFlags= AddToHash | NoAddToHash | MaxTime -- |set the cache. this is useful for hot loaded modules that will update an existing cache setCache :: (Ht a, Integer) -> IO() setCache ch= writeIORef refcache ch -- the cache holder. stablished by default refcache :: Cache a refcache =unsafePerformIO $do c <- newCache newIORef c -- | newCache creates a new cache newCache :: IO (Ht a, Integer) newCache =do c <- H.new (==) hashString return (c,0) -- | getTMVars return the TMVar that wraps the resources for which the keys are given . -- | it return Nothing if a TMVar with this object has not been allocated -- These TMVars can be used in explicit user constructed atomic blocks -- Additionally, the TMVars remain in the cache and can be accessed and updated by the rest -- of the TCache methods. -- the content of the TMVars are written every time the cache is syncronized with the storage until releaseTMVars is called -- :: (IResource a)=> [a] -- the list of resources to be retrieved -- -> IO [Maybe (TMVar a)] -- The Transactional variables getTMVars :: (IResource a)=> [a] -> STM [Maybe (TMVar a)] getTMVars rs= do (cache,_) <- unsafeIOToSTM $ readIORef refcache takeBlocks rs cache MaxTime -- | releaseTMVars permits the TMVars captured by getTMVars to be released. so they can be discarded when not used -- Use this when you no longer need to use them directly in atomic blocks. releaseTMVars :: (IResource a)=> [a]-> STM () releaseTMVars rs=do (cache,_) <- unsafeIOToSTM $ readIORef refcache releaseBlocks rs cache -- | getTMVarsIO does not search for a TMVar in the cache like getTMVars. Instead of this getTMVarsIO creates a list of -- TMVars with the content given in the list of resourcees and add these TMVars to the cache and return them. -- the content of the TMVars are written every time the cache is syncronized with the storage until releaseTMVars is called getTMVarsIO :: (IResource a)=> [a] -> IO [TMVar a] getTMVarsIO rs= do tvs<- mapM newTMVarIO rs (cache,_) <- readIORef refcache mapM_ (\(tv,r)-> H.update cache (keyResource r) (tv, infinite, infinite)) $ zip tvs rs return tvs -- | this is the main function for the *Resources primitivas, all the rest derive from it. the Res structure processed by the -- with*Resources primitives are more efficient for cached TMVars because the internal loop is never retried, since all the necessary -- resources at the beginning so no costly retries are necessary. The advantage increases with the complexity of the process -- function passed to withSTMResources is interpreted as such: -- -toUpdate secton is used to update the retrieved resources in the same order. -- if the resource donĀ“t exist, it is created. Nothing means do nothing as usual. extra resources are not considered, -- it uses the rules of zip. -- -toAdd: additional resources not read in the first parameter of withSTMResources are created/updated with toAdd -- -toDelete: obvious -- -toReturn: will be returned by the call withSTMResources :: (IResource a)=> [a] -- ^ the list of resources to be retrieved -> ([Maybe a]-> Resources a x) -- ^ The function that process the resources found and return a Resources structure -> STM x -- ^ The return value in the STM monad. withSTMResources rs f= do (cache,_) <- unsafeIOToSTM $ readIORef refcache mtrs <- takeBlocks rs cache NoAddToHash mrs <- mapM mreadTMVar mtrs case f mrs of Retry -> retry Resources as ds r -> do unsafeIOToSTM $ do delListFromHash cache $ map keyResource ds mapM delResource ds releaseBlocks as cache return r where mreadTMVar (Just tvar)= do r <- takeTMVar tvar return $ Just r mreadTMVar Nothing = return Nothing -- | update of a single object in the cache -- :: (IResource a)=> a same as withResources , but for one only object -- -> ([Maybe a]-> a) -- -> IO () withResource:: (IResource a)=> a-> (Maybe a-> a)-> IO () withResource r f= withResources [r] (\[mr]-> [f mr]) -- | to atomically add/modify many objects in the cache -- :: (IResource a)=> [a] list of resources to be retrieve -- -> ([Maybe a]-> [a]) function that process the retrieved resources -- -> IO () and return a list of objects to be inserted/modified withResources:: (IResource a)=> [a]-> ([Maybe a]-> [a])-> IO () withResources rs f= atomically $ withSTMResources rs f1 >> return() where f1 mrs= let as= f mrs in Resources as [] () -- | to read a resource from the cache getResource r= do{mr<- getResources [r];return $! head mr} ---to read a list of resources from the cache if they exist -- :: (IResource a)=>[a] resources [a] are read from cache and returned -- -> IO [Maybe a] the result getResources:: (IResource a)=>[a]-> IO [Maybe a] getResources rs= atomically $ withSTMResources rs f1 where f1 mrs= Resources [] [] mrs -- | delete the resource from cache and from persistent storage deleteResource r= deleteResources [r] -- | delete the list of resources from cache and from persistent storage deleteResources rs= atomically $ withSTMResources rs f1 where f1 mrs = Resources [] (catMaybes mrs) () takeBlocks :: (IResource a)=> [a] -> Ht a -> CheckBlockFlags -> STM [Maybe (TMVar a)] takeBlocks rs cache addToHash= mapM (checkBlock cache addToHash) rs where checkBlock :: IResource a => Ht a -> CheckBlockFlags -> a-> STM(Maybe (TMVar a)) checkBlock cache flags r =do c <- unsafeIOToSTM $ H.lookup cache keyr case c of Nothing -> do mr <- unsafeIOToSTM $ readResource r -- `debug` ("read "++keyr++ " hash= "++ (show $ H.hashString keyr)) case mr of Nothing -> return Nothing Just r2 -> do tvr <- newTMVar r2 case flags of NoAddToHash -> return $ Just tvr AddToHash -> do ti <- unsafeIOToSTM timeInteger unsafeIOToSTM $ H.update cache keyr (tvr, ti, 0) -- accesed, not modified return $ Just tvr MaxTime -> do unsafeIOToSTM $ H.update cache keyr (tvr, infinite, infinite) -- accesed, not modified return $ Just tvr Just(tvr,_,_) -> return $ Just tvr where keyr= keyResource r releaseBlocks :: (IResource a)=> [a] -> Ht a -> STM () releaseBlocks rs cache = mapM_ checkBlock rs where checkBlock r =do c <- unsafeIOToSTM $ H.lookup cache keyr case c of Nothing -> do tvr <- newTMVar r ti <- unsafeIOToSTM timeInteger unsafeIOToSTM $ H.update cache keyr (tvr, ti, ti ) -- accesed and modified XXX Just(tvr,_,tm) -> do ti <- unsafeIOToSTM timeInteger let t= max ti tm try<- tryPutTMVar tvr r --putTMVar tvr r case try of False -> do swapTMVar tvr r; return () True -> return () unsafeIOToSTM $ H.update cache keyr (tvr ,t,t) where keyr= keyResource r timeInteger= do TOD t _ <- getClockTime return t delListFromHash hash l= do{mapM (delete hash) l; return()} updateListToHash hash kv= do{mapM (update1 hash) kv; return()}where update1 h (k,v)= update h k v -----------------------clear, sync cache------------- -- | start the thread that clean and writes on the persistent storage. -- Otherwise, clearSyncCache must be invoked explicitly or no persistence will exist -- :: (IResource a) =>Cache a --The cache reference -- -> Int --number of seconds betwen checks -- -> (Integer-> Integer-> Bool) --The user-defined check-for-cleanup-from-cache for each object --(when this function return True, the object is removed from cache) -- -> Int --The max number of objects in the cache, if more, the cleanup start -- -> >IO ThreadId --Identifier of the thread created clearSyncCacheProc :: (IResource a)=> Cache a-> Int-> (Integer -> Integer-> Integer-> Bool)-> Int-> IO ThreadId clearSyncCacheProc refcache time check sizeObjects= forkIO clear where clear = do threadDelay $ (fromIntegral$ time * 1000000) clearSyncCache refcache time check sizeObjects clear -- | force the atomic write of all the cached objects into permanent storage -- useful for termination syncCache refcache = do (cache,t1) <- readIORef refcache list <- toList cache t2<- timeInteger atomically $ save list t1 writeIORef refcache (cache, t2) --print $ "write to persistent storage finised: "++ show (length list)++ " objects" -- | Saves the unsaved elems of the cache -- delete some elems of the cache when the number of elems > sizeObjects -- The deletion depends on the check criteria. defaultCheck is the one implemented clearSyncCache ::(IResource a) => Cache a-> Int -> (Integer -> Integer-> Integer-> Bool)-> Int -> IO () clearSyncCache refcache time check sizeObjects=do (cache,lastSync) <- readIORef refcache handle (\e-> do{print e;return ()})$ do elems <- toList cache let size=length elems atomically $ save elems lastSync t<- timeInteger when (size > sizeObjects) (filtercache t cache lastSync elems) writeIORef refcache (cache, t) where -- delete elems from the cache according with the check criteria filtercache t cache lastSync elems= mapM_ filter elems where check1 (_,lastAccess,_)=check t lastAccess lastSync filter ::(String,Block a)-> IO Int filter (k,e)= if check1 e then do{H.delete cache k;return 1} else return 0 -- | To drop from the cache all the elems not accesed since half the time between now and the last sync -- the default check procedure -- :: Integer -- current time in seconds -- -> Integer --last access time for a given object -- -> Integer --last cache syncronization (with the persisten storage) -- -> Bool --return true for all the elems not accesed since --half the time between now and the last sync defaultCheck:: Integer -> Integer-> Integer-> Bool defaultCheck now lastAccess lastSync | lastAccess > halftime = False | otherwise = True where halftime= now- (now-lastSync) `div` 2 save:: (IResource a) => [(String, Block a)]-> Integer-> STM () save list lastSave= mapM_ save1 list --`debug` ("saving "++ (show $ length list)) where save1 :: IResource a =>(String, Block a) -> STM() save1(_, (tvr,_,modTime))= do if modTime >= lastSave --`debug` ("modTime="++show modTime++"lastSave="++show lastSave) then do r<- readTMVar tvr unsafeIOToSTM $! writeResource r --`debug` ("saved " ++ keyResource r) else return()