{- Copyright (C) 2009-2010 Andrejs Sisojevs All rights reserved. For license and copyright information, see the file COPYRIGHT -} -------------------------------------------------------------------------- -------------------------------------------------------------------------- -- | This module wraps STM PCC with IO. -- Some locks are added to balance heavyweight transactions, to reduce \"waste -- efforts\". module Control.Concurrent.PriorityChansConverger.PriorityChansConverger where import Control.Concurrent import Control.Concurrent.MVar import Control.Concurrent.PriorityChansConverger.Commons import Control.Concurrent.PriorityChansConverger.PriorityChansConvergerSTM import Control.Concurrent.ConcurrentUISupport import Control.Concurrent.STM import Control.Concurrent.STM.TChan.ExtsCommons import Control.Exception import Control.Monad import Data.IORef import Data.List import qualified Data.Map as M import Data.Map (Map) import Data.MyHelpers import System.IO __locksObserverEnabled = False ---------------------------------------- -- * PriorityChansConverger ADT and it's administration data PCC_Lock = PCC_Lock | PCC_IOInterruptableRead | PCC_IOInterruptableWrite deriving (Eq, Ord, Show) -- | Wrapper around @'PriorityChansConvergerSTM'@. Added locks. @'Show'@ instance -- is defined using @'unsafePerformIO'@. data Ord k => PriorityChansConverger k e = PriorityChansConverger { pccSTM :: PriorityChansConvergerSTM k e , pccLocks :: Map PCC_Lock (MVar ()) } reportIfPossible :: Ord k => PriorityChansConverger k e -> String -> IO () reportIfPossible pcc s = case pccConcurrentUISupport $ pccSTM pcc of Nothing -> return () Just cuis -> cuisWrite cuis s maybeObserveExceptions :: Ord k => PriorityChansConverger k e -> FunctionName -> IO a -> IO a maybeObserveExceptions pcc f_name io_a = case pccConcurrentUISupport $ pccSTM pcc of Nothing -> io_a Just cuis -> reportExceptionIfAny_2 cuis f_name io_a -- | Wrapper around @'newPriorityChansConverger_wCUIS'@ constructor. -- -- @newPriorityChansConverger = 'newPriorityChansConverger_wCUIS' Nothing@ newPriorityChansConverger :: Ord k => Map k (StartPriority, ChanCapacity) -> IO (PriorityChansConverger k e) newPriorityChansConverger = newPriorityChansConverger_wCUIS Nothing -- | Wrapper around @'newPriorityChansConvergerSTM_wCUIS'@ constructor. -- With an option to enable utility @'ConcurrentUISupport'@ -- (which is useful for debugging) newPriorityChansConverger_wCUIS :: Ord k => Maybe ConcurrentUISupport -> Map k (StartPriority, ChanCapacity) -> IO (PriorityChansConverger k e) newPriorityChansConverger_wCUIS mb_cuis chan_priors_map = do pcc <- let constr = atomically $ newPriorityChansConvergerSTM_wCUIS mb_cuis chan_priors_map in case mb_cuis of Nothing -> constr Just cuis -> reportExceptionIfAny_2 cuis f_name constr l <- newMVar () return PriorityChansConverger { pccSTM = pcc , pccLocks = M.fromList [(PCC_Lock, l)] } where f_name = "newPriorityChansConverger_wCUIS" type FunctionName = String type LockUserName = FunctionName -- | Sort the locks and lock them. -- -- WARNING !!! Asynchronous exceptions may cause loss of locks! lockPCC :: Ord k => LockUserName -> [PCC_Lock] -> PriorityChansConverger k e -> IO () lockPCC locks_user_f_name locks pcc = mapM_ (\ l -> case M.lookup l (pccLocks pcc) of Nothing -> error ("An error occurred in 'lockPCC': no lock for key '" ++ show l ++ "' exists.") Just l_mv -> takeMVar l_mv >> when __locksObserverEnabled (reportIfPossible pcc ("Lock '" ++ show l ++ "' taken by function '" ++ locks_user_f_name ++ "'.")) ) (map head $ group $ sort locks) -- | Reverse sort the locks and unlock them. -- -- WARNING !!! Asynchronous exceptions may cause loss of locks! unlockPCC :: Ord k => LockUserName -> [PCC_Lock] -> PriorityChansConverger k e -> IO () unlockPCC locks_user_f_name locks pcc = mapM_ (\ l -> case M.lookup l (pccLocks pcc) of Nothing -> error ("An error occurred in 'unlockPCC': no lock for key '" ++ show l ++ "' exists.") Just l_mv -> do s <- tryPutMVar l_mv () when __locksObserverEnabled $ reportIfPossible pcc $ case s of True -> "Lock '" ++ show l ++ "' released by function '" ++ locks_user_f_name ++ "'." False -> "Function '" ++ locks_user_f_name ++ "' wanted to release the lock '" ++ show l ++ "', but it appered to be already released." ) (map head $ group $ reverse $ sort locks) -- | Locking IO action wrapper. -- -- WARNING !!! Asynchronous exceptions may cause loss of locks! withLockedDo :: Ord k => LockUserName -> [PCC_Lock] -> PriorityChansConverger k e -> IO a -> IO a withLockedDo locks_user_f_name locks pcc act = block $ do lockPCC locks_user_f_name locks pcc a <- finally (unblock act) (unlockPCC locks_user_f_name locks pcc) return a --------------------------------------------------------------------------------------------------------------------------- -- * PriorityChansConverger mutation --------------------------------------------------------------------------------------------------------------------------- -- | Wrapper around @'isOfStructPCC_STM'@. Used locks: none. isOfStructPCC :: Ord k => Map k (StartPriority, ChanCapacity) -> PriorityChansConverger k e -> IO (PCC_ContainsGivenSpecification, NothingElseIsInStruct) isOfStructPCC mp pcc = maybeObserveExceptions pcc f_name $ atomically $ isOfStructPCC_STM mp (pccSTM pcc) where f_name = "isOfStructPCC" -- | Wrapper around @'mutatePCC_STM'@. Used locks: @['PCC_Lock']@ mutatePCC :: Ord k => Map k (StartPriority, ChanCapacity) -> PriorityChansConverger k e -> IO (PriorityChansConverger k e) mutatePCC chan_priors_map pcc = withLockedDo f_name [PCC_Lock] pcc $ do new_pcc_stm <- maybeObserveExceptions pcc f_name $ atomically $ mutatePCC_STM chan_priors_map (pccSTM pcc) return pcc { pccSTM = new_pcc_stm } where f_name = "mutatePCC" --------------------------------------------------------------------------------------------------------------------------- -- * PriorityChansConverger I/O --------------------------------------------------------------------------------------------------------------------------- -- | Wrapper around @'writeInPCC_STM'@. Used locks: if blocking, -- then @[]@ else @['PCC_Lock']@. -- -- If blocking is enabled, may block, when there is no free space -- in @'PriorityChan'@. writeInPCC :: Ord k => PermitToBlock -> k -> e -> PriorityChansConverger k e -> IO (Maybe FailureReasonWPCC) writeInPCC block_dowe k e pcc = maybe_lock $ maybeObserveExceptions pcc f_name $ atomically $ writeInPCC_STM block_dowe k e (pccSTM pcc) where maybe_lock = case block_dowe of { True -> id; False -> withLockedDo f_name [PCC_Lock] pcc } f_name = "writeInPCC" -- | Wrapper around @'interruptableWriteInPCC_STM'@. Used locks: @[]@ -- -- Blocking write. -- -- Do not put the control of the @'STM' 'InterruptShouldWe'@ under @'PCC_Lock'@. -- Keep there @False@, if you don't want to get @Left ()@ in return. -- Put there @True@, whenever you want to stop waiting. stmInterruptableWriteInPCC :: Ord k => STM InterruptShouldWe -> k -> e -> PriorityChansConverger k e -> IO (Either () (Maybe FailureReasonWPCC)) stmInterruptableWriteInPCC stm_interrupter k e pcc = maybeObserveExceptions pcc f_name $ atomically $ interruptableWriteInPCC_STM stm_interrupter k e (pccSTM pcc) where f_name = "stmInterruptableWriteInPCC" -- | Wrapper around @'stmInterruptableWriteInPCC'@. Used locks: @['PCC_IOInterruptableWrite']@ -- -- Blocking read. -- -- Creates a @'TVar' Bool@ to control the interruptability of write operation. -- Spawns additional thread, which cyclicly reads given @Chan@ and checks, -- if it signals terminating condition - if so, then using "interrupter" @TVar@ -- stops trying to write in PCC. Temporary thread gets terminated at the end. interruptableWriteInPCC :: Ord k => (Chan a, a -> InterruptShouldWe) -> k -> e -> PriorityChansConverger k e -> IO (Either () (Maybe FailureReasonWPCC)) interruptableWriteInPCC (interrupter_chan, interrupter_cond) k e pcc = withLockedDo f_name [PCC_IOInterruptableWrite] pcc $ maybeObserveExceptions pcc f_name $ do interrupter_tv <- atomically $ newTVar False tmp_thread_interrupter <- newIORef False _ <- forkIO $ let _cycle = do stop <- readIORef tmp_thread_interrupter case stop of True -> return () False -> do _interrupt <- interrupter_cond `liftM` readChan interrupter_chan case _interrupt of False -> _cycle True -> atomically $ writeTVar interrupter_tv True in _cycle r <- stmInterruptableWriteInPCC (readTVar interrupter_tv) k e pcc writeIORef tmp_thread_interrupter True return r where f_name = "interruptableWriteInPCC" -- | Wrapper around @'readFromPCC_STM'@. Used locks: if blocking, then @[]@ else @['PCC_Lock']@. readFromPCC :: Ord k => PermitToBlock -> PriorityChansConverger k e -> IO (Maybe (k, e)) readFromPCC block_dowe pcc = maybe_lock $ maybeObserveExceptions pcc f_name $ atomically $ readFromPCC_STM block_dowe (pccSTM pcc) where maybe_lock = case block_dowe of { True -> id; False -> withLockedDo f_name [PCC_Lock] pcc } f_name = "readFromPCC" -- | Wrapper around @'interruptableReadFromPCC_STM'@. Used locks: @[]@ -- -- Blocking read. stmInterruptableReadFromPCC :: Ord k => STM InterruptShouldWe -> PriorityChansConverger k e -> IO (Either () (k, e)) stmInterruptableReadFromPCC stm_interrupter pcc = maybeObserveExceptions pcc f_name $ atomically $ interruptableReadFromPCC_STM stm_interrupter (pccSTM pcc) where f_name = "stmInterruptableReadFromPCC" -- | Wrapper around @'stmInterruptableReadFromPCC'@. Used locks: @['PCC_IOInterruptableRead']@ -- -- Blocking read. -- -- Creates a @'TVar' Bool@ to control the interruptability of read operation. -- Spawns additional thread, which cyclicly reads given @Chan@ and checks, -- if it signals terminating condition - if so, then using "interrupter" @TVar@ -- stops trying to write in PCC. Temporary thread gets terminated at the end. interruptableReadFromPCC :: Ord k => (Chan a, a -> InterruptShouldWe) -> PriorityChansConverger k e -> IO (Either () (k, e)) interruptableReadFromPCC (interrupter_chan, interrupter_cond) pcc = withLockedDo f_name [PCC_IOInterruptableRead] pcc $ maybeObserveExceptions pcc f_name $ do interrupter_tv <- atomically $ newTVar False tmp_thread_interrupter <- newIORef False _ <- forkIO $ let _cycle = do stop <- readIORef tmp_thread_interrupter case stop of True -> return () False -> do _interrupt <- interrupter_cond `liftM` readChan interrupter_chan case _interrupt of False -> _cycle True -> atomically $ writeTVar interrupter_tv True in _cycle r <- stmInterruptableReadFromPCC (readTVar interrupter_tv) pcc writeIORef tmp_thread_interrupter True return r where f_name = "interruptableReadFromPCC" -- | Wrapper around @'flushPCC2List_STM'@. Used locks: @['PCC_Lock']@ -- -- Lazy (doublecheck that). flushPCC2List :: Ord k => PriorityChansConverger k e -> IO [(k, e)] flushPCC2List pcc = withLockedDo f_name [PCC_Lock] pcc $ maybeObserveExceptions pcc f_name $ atomically $ flushPCC2List_STM (pccSTM pcc) where f_name = "flushPCC2List" -- | Wrapper around @flushPCC2List_STM'@ -- (see "Control.Concurrent.PriorityChansConverger.PriorityChansConverger"). -- Used locks: @['PCC_Lock']@ -- -- Strict. Should be a bit faster than @'flushPCC2List'@. flushPCC2List' :: Ord k => PriorityChansConverger k e -> IO [(k, e)] flushPCC2List' pcc = withLockedDo f_name [PCC_Lock] pcc $ maybeObserveExceptions pcc f_name $ atomically $ flushPCC2List_STM' (pccSTM pcc) where f_name = "flushPCC2List\'" -- | Wrapper around @fastFlushPCC2List_STM'@ -- (see "Control.Concurrent.PriorityChansConverger.PriorityChansConverger"). -- Used locks: @['PCC_Lock']@ -- -- Strict. This is a fast flush, it doesn't deal with priorities. fastFlushPCC2List' :: Ord k => PriorityChansConverger k e -> IO [(k, e)] fastFlushPCC2List' pcc = withLockedDo f_name [PCC_Lock] pcc $ maybeObserveExceptions pcc f_name $ atomically $ fastFlushPCC2List_STM' (pccSTM pcc) where f_name = "fastFlushPCC2List\'" -- | Wrapper around @'isEmptyPCC_STM'@. Used locks: none isEmptyPCC :: Ord k => PriorityChansConverger k e -> IO Bool isEmptyPCC pcc = maybeObserveExceptions pcc f_name $ atomically $ isEmptyPCC_STM (pccSTM pcc) where f_name = "isEmptyPCC" -- | Wrapper around @'freeSpaceInPCCInput_STM'@. Used locks: none freeSpaceInPCCInput :: Ord k => PriorityChansConverger k e -> k -> IO (Maybe ChanContentAmountMeasure) freeSpaceInPCCInput pcc k = maybeObserveExceptions pcc f_name $ atomically $ freeSpaceInPCCInput_STM (pccSTM pcc) k where f_name = "freeSpaceInPCCInput" -- | Wrapper around @filterOutPCCElements_STM'@ -- (see "Control.Concurrent.PriorityChansConverger.PriorityChansConverger"). -- Used locks: @['PCC_Lock']@ -- -- Strict. -- -- Take everything, applu filter, return what's left in the PCC, -- return what's taken out. filterOutPCCElements' :: Ord k => PriorityChansConverger k e -> (e -> TakeElementOutShouldWe) -> IO [(k, e)] filterOutPCCElements' pcc p = withLockedDo f_name [PCC_Lock] pcc $ maybeObserveExceptions pcc f_name $ atomically $ filterOutPCCElements_STM' (pccSTM pcc) p where f_name = "filterOutPCCElements\'" --------------------------------------------------------------------------------------------------------------------------- -- * PriorityChansConverger representation --------------------------------------------------------------------------------------------------------------------------- -- | Wrapper around @'showPCC'@. Used locks: none showPCC :: (Ord k, Show k, Show e) => PriorityChansConverger k e -> IO String showPCC pcc = maybeObserveExceptions pcc f_name $ atomically $ showPCC_STM (pccSTM pcc) where f_name = "showPCC"