module Control.Concurrent.PriorityChansConverger.PriorityChansConverger where
import Control.Concurrent
import Control.Concurrent.MVar
import Control.Concurrent.PriorityChansConverger.Commons
import Control.Concurrent.PriorityChansConverger.PriorityChansConvergerSTM
import Control.Concurrent.ConcurrentUISupport
import Control.Concurrent.STM
import Control.Concurrent.STM.TChan.ExtsCommons
import Control.Exception
import Control.Monad
import Data.IORef
import Data.List
import qualified Data.Map as M
import Data.Map (Map)
import Data.MyHelpers
import System.IO
__locksObserverEnabled = False
data PCC_Lock = PCC_Lock | PCC_IOInterruptableRead | PCC_IOInterruptableWrite deriving (Eq, Ord, Show)
data Ord k => PriorityChansConverger k e = PriorityChansConverger {
pccSTM :: PriorityChansConvergerSTM k e
, pccLocks :: Map PCC_Lock (MVar ())
}
reportIfPossible :: Ord k => PriorityChansConverger k e -> String -> IO ()
reportIfPossible pcc s =
case pccConcurrentUISupport $ pccSTM pcc of
Nothing -> return ()
Just cuis -> cuisWrite cuis s
maybeObserveExceptions :: Ord k => PriorityChansConverger k e -> FunctionName -> IO a -> IO a
maybeObserveExceptions pcc f_name io_a =
case pccConcurrentUISupport $ pccSTM pcc of
Nothing -> io_a
Just cuis -> reportExceptionIfAny_2 cuis f_name io_a
newPriorityChansConverger :: Ord k => Map k (StartPriority, ChanCapacity) -> IO (PriorityChansConverger k e)
newPriorityChansConverger = newPriorityChansConverger_wCUIS Nothing
newPriorityChansConverger_wCUIS :: Ord k => Maybe ConcurrentUISupport -> Map k (StartPriority, ChanCapacity) -> IO (PriorityChansConverger k e)
newPriorityChansConverger_wCUIS mb_cuis chan_priors_map = do
pcc <- let constr = atomically $ newPriorityChansConvergerSTM_wCUIS mb_cuis chan_priors_map
in case mb_cuis of
Nothing -> constr
Just cuis -> reportExceptionIfAny_2 cuis f_name constr
l <- newMVar ()
return PriorityChansConverger {
pccSTM = pcc
, pccLocks = M.fromList [(PCC_Lock, l)]
}
where f_name = "newPriorityChansConverger_wCUIS"
type FunctionName = String
type LockUserName = FunctionName
lockPCC :: Ord k => LockUserName -> [PCC_Lock] -> PriorityChansConverger k e -> IO ()
lockPCC locks_user_f_name locks pcc =
mapM_
(\ l ->
case M.lookup l (pccLocks pcc) of
Nothing -> error ("An error occurred in 'lockPCC': no lock for key '" ++ show l ++ "' exists.")
Just l_mv -> takeMVar l_mv >> when __locksObserverEnabled (reportIfPossible pcc ("Lock '" ++ show l ++ "' taken by function '" ++ locks_user_f_name ++ "'."))
)
(map head $ group $ sort locks)
unlockPCC :: Ord k => LockUserName -> [PCC_Lock] -> PriorityChansConverger k e -> IO ()
unlockPCC locks_user_f_name locks pcc =
mapM_
(\ l ->
case M.lookup l (pccLocks pcc) of
Nothing -> error ("An error occurred in 'unlockPCC': no lock for key '" ++ show l ++ "' exists.")
Just l_mv -> do
s <- tryPutMVar l_mv ()
when __locksObserverEnabled $ reportIfPossible pcc $ case s of
True -> "Lock '" ++ show l ++ "' released by function '" ++ locks_user_f_name ++ "'."
False -> "Function '" ++ locks_user_f_name ++ "' wanted to release the lock '" ++ show l ++ "', but it appered to be already released."
)
(map head $ group $ reverse $ sort locks)
withLockedDo :: Ord k => LockUserName -> [PCC_Lock] -> PriorityChansConverger k e -> IO a -> IO a
withLockedDo locks_user_f_name locks pcc act = block $ do
lockPCC locks_user_f_name locks pcc
a <- finally (unblock act) (unlockPCC locks_user_f_name locks pcc)
return a
isOfStructPCC :: Ord k => Map k (StartPriority, ChanCapacity) -> PriorityChansConverger k e -> IO (PCC_ContainsGivenSpecification, NothingElseIsInStruct)
isOfStructPCC mp pcc = maybeObserveExceptions pcc f_name $ atomically $ isOfStructPCC_STM mp (pccSTM pcc)
where f_name = "isOfStructPCC"
mutatePCC :: Ord k => Map k (StartPriority, ChanCapacity) -> PriorityChansConverger k e -> IO (PriorityChansConverger k e)
mutatePCC chan_priors_map pcc = withLockedDo f_name [PCC_Lock] pcc $ do
new_pcc_stm <- maybeObserveExceptions pcc f_name $ atomically $ mutatePCC_STM chan_priors_map (pccSTM pcc)
return pcc { pccSTM = new_pcc_stm }
where f_name = "mutatePCC"
writeInPCC :: Ord k => PermitToBlock -> k -> e -> PriorityChansConverger k e -> IO (Maybe FailureReasonWPCC)
writeInPCC block_dowe k e pcc = maybe_lock $ maybeObserveExceptions pcc f_name $ atomically $ writeInPCC_STM block_dowe k e (pccSTM pcc)
where maybe_lock = case block_dowe of { True -> id; False -> withLockedDo f_name [PCC_Lock] pcc }
f_name = "writeInPCC"
stmInterruptableWriteInPCC :: Ord k => STM InterruptShouldWe -> k -> e -> PriorityChansConverger k e -> IO (Either () (Maybe FailureReasonWPCC))
stmInterruptableWriteInPCC stm_interrupter k e pcc = maybeObserveExceptions pcc f_name $ atomically $ interruptableWriteInPCC_STM stm_interrupter k e (pccSTM pcc)
where f_name = "stmInterruptableWriteInPCC"
interruptableWriteInPCC :: Ord k => (Chan a, a -> InterruptShouldWe) -> k -> e -> PriorityChansConverger k e -> IO (Either () (Maybe FailureReasonWPCC))
interruptableWriteInPCC (interrupter_chan, interrupter_cond) k e pcc = withLockedDo f_name [PCC_IOInterruptableWrite] pcc $ maybeObserveExceptions pcc f_name $ do
interrupter_tv <- atomically $ newTVar False
tmp_thread_interrupter <- newIORef False
_ <- forkIO $ let _cycle = do
stop <- readIORef tmp_thread_interrupter
case stop of
True -> return ()
False -> do
_interrupt <- interrupter_cond `liftM` readChan interrupter_chan
case _interrupt of
False -> _cycle
True -> atomically $ writeTVar interrupter_tv True
in _cycle
r <- stmInterruptableWriteInPCC (readTVar interrupter_tv) k e pcc
writeIORef tmp_thread_interrupter True
return r
where f_name = "interruptableWriteInPCC"
readFromPCC :: Ord k => PermitToBlock -> PriorityChansConverger k e -> IO (Maybe (k, e))
readFromPCC block_dowe pcc = maybe_lock $ maybeObserveExceptions pcc f_name $ atomically $ readFromPCC_STM block_dowe (pccSTM pcc)
where maybe_lock = case block_dowe of { True -> id; False -> withLockedDo f_name [PCC_Lock] pcc }
f_name = "readFromPCC"
stmInterruptableReadFromPCC :: Ord k => STM InterruptShouldWe -> PriorityChansConverger k e -> IO (Either () (k, e))
stmInterruptableReadFromPCC stm_interrupter pcc = maybeObserveExceptions pcc f_name $ atomically $ interruptableReadFromPCC_STM stm_interrupter (pccSTM pcc)
where f_name = "stmInterruptableReadFromPCC"
interruptableReadFromPCC :: Ord k => (Chan a, a -> InterruptShouldWe) -> PriorityChansConverger k e -> IO (Either () (k, e))
interruptableReadFromPCC (interrupter_chan, interrupter_cond) pcc = withLockedDo f_name [PCC_IOInterruptableRead] pcc $ maybeObserveExceptions pcc f_name $ do
interrupter_tv <- atomically $ newTVar False
tmp_thread_interrupter <- newIORef False
_ <- forkIO $ let _cycle = do
stop <- readIORef tmp_thread_interrupter
case stop of
True -> return ()
False -> do
_interrupt <- interrupter_cond `liftM` readChan interrupter_chan
case _interrupt of
False -> _cycle
True -> atomically $ writeTVar interrupter_tv True
in _cycle
r <- stmInterruptableReadFromPCC (readTVar interrupter_tv) pcc
writeIORef tmp_thread_interrupter True
return r
where f_name = "interruptableReadFromPCC"
flushPCC2List :: Ord k => PriorityChansConverger k e -> IO [(k, e)]
flushPCC2List pcc = withLockedDo f_name [PCC_Lock] pcc $ maybeObserveExceptions pcc f_name $ atomically $ flushPCC2List_STM (pccSTM pcc)
where f_name = "flushPCC2List"
flushPCC2List' :: Ord k => PriorityChansConverger k e -> IO [(k, e)]
flushPCC2List' pcc = withLockedDo f_name [PCC_Lock] pcc $ maybeObserveExceptions pcc f_name $ atomically $ flushPCC2List_STM' (pccSTM pcc)
where f_name = "flushPCC2List\'"
fastFlushPCC2List' :: Ord k => PriorityChansConverger k e -> IO [(k, e)]
fastFlushPCC2List' pcc = withLockedDo f_name [PCC_Lock] pcc $ maybeObserveExceptions pcc f_name $ atomically $ fastFlushPCC2List_STM' (pccSTM pcc)
where f_name = "fastFlushPCC2List\'"
isEmptyPCC :: Ord k => PriorityChansConverger k e -> IO Bool
isEmptyPCC pcc = maybeObserveExceptions pcc f_name $ atomically $ isEmptyPCC_STM (pccSTM pcc)
where f_name = "isEmptyPCC"
freeSpaceInPCCInput :: Ord k => PriorityChansConverger k e -> k -> IO (Maybe ChanContentAmountMeasure)
freeSpaceInPCCInput pcc k = maybeObserveExceptions pcc f_name $ atomically $ freeSpaceInPCCInput_STM (pccSTM pcc) k
where f_name = "freeSpaceInPCCInput"
filterOutPCCElements' :: Ord k => PriorityChansConverger k e -> (e -> TakeElementOutShouldWe) -> IO [(k, e)]
filterOutPCCElements' pcc p = withLockedDo f_name [PCC_Lock] pcc $ maybeObserveExceptions pcc f_name $ atomically $ filterOutPCCElements_STM' (pccSTM pcc) p
where f_name = "filterOutPCCElements\'"
showPCC :: (Ord k, Show k, Show e) => PriorityChansConverger k e -> IO String
showPCC pcc = maybeObserveExceptions pcc f_name $ atomically $ showPCC_STM (pccSTM pcc)
where f_name = "showPCC"