{- Copyright (C) 2009-2010 Andrejs Sisojevs All rights reserved. For license and copyright information, see the file COPYRIGHT -} -------------------------------------------------------------------------- -------------------------------------------------------------------------- {-# LANGUAGE MultiParamTypeClasses #-} -- | Abbreviation for the @PriorityChansConverger@ is PCC. -- -- Based on @STM.TChan@s, extended with capacity control -- (see @"Control.Concurrent.STM.TChan.TChanB"@). -- -- When user reads from the PCC, the choice is made - from which channel -- to read. System selects a nonempty channel, whose /(CurrentPriority, -- StartPriority)/ tuple is max. The side effect of the channel selection -- is it's /CurrentPriority/ decrease by one, if it's value becomes -- less than one, then the /CurrentPriority/ is set to /StartPriority/. module Control.Concurrent.PriorityChansConverger.PriorityChansConvergerSTM where import Control.Concurrent.ConcurrentUISupport import Control.Concurrent.PriorityChansConverger.Commons import Control.Concurrent.STM import Control.Concurrent.STM.TChan.TChanB import Control.Monad import Data.Foldable import qualified Data.Map as M import Data.Map (Map) import Data.Maybe import Data.MyHelpers ---------------------------------------- -- * PriorityChansConverger ADT and it's administration -- | Decrease @'CurrentPriority'@ by one, if it's value becomes -- less than one, then the @'CurrentPriority'@ is set to @'StartPriority'@. decPriority :: (CurrentPriority, StartPriority) -> (CurrentPriority, StartPriority) decPriority (cur, start) = ( case cur <= 1 of True -> start False -> cur - 1 , start ) -- Core ADT of this package. It is thought to be wrapped in a IO ADT, -- though, in order to optimize performance -- (which is done in the -- @Control.Concurrent.PriorityChansConverger.PriorityChansConverger@ -- module). data Ord k => PriorityChansConvergerSTM k e = PriorityChansConvergerSTM { -- | All channels. Convenient to use for input. -- Map keys == @'pcID'@ of elements. pccStruct :: Map k (PriorityChan k e) -- | Empty channels. Map keys == @'pcPriorityTV'@ of elements. , pccEmpties :: TVar (Map (CurrentPriority, StartPriority) [PriorityChan k e]) -- | Nonempty channels. Map keys == @'pcPriorityTV'@ of elements. -- Convenient for taking output. , pccNonEmpties :: TVar (Map (CurrentPriority, StartPriority) [PriorityChan k e]) -- | For testing and debugging purposes. , pccConcurrentUISupport :: Maybe ConcurrentUISupport } -- | PriorityChan = ID + @'TChanB'@ + @'TVar' ('CurrentPriority', 'StartPriority')@ data PriorityChan k e = PriorityChan { pcID :: k , pcChanB :: TChanB e , pcPriorityTV :: TVar (CurrentPriority, StartPriority) } -- | @'pcLoad' = 'chanLoad' . 'pcChanB'@ pcLoad :: PriorityChan k e -> STM ChanLoad pcLoad = chanLoad . pcChanB -- | @'pcCapacity' = 'chanCapacity' . 'pcChanB'@ pcCapacity :: PriorityChan k e -> STM ChanCapacity pcCapacity = chanCapacity . pcChanB -- | @'pcIsEmpty' = 'isEmptyTChanB' . 'pcChanB'@ pcIsEmpty :: PriorityChan k e -> STM Bool pcIsEmpty = isEmptyTChanB . pcChanB -- | @'pcPriority' = 'readTVar' . 'pcPriorityTV'@ pcPriority :: PriorityChan k e -> STM (CurrentPriority, StartPriority) pcPriority = readTVar . pcPriorityTV -- | @'pcStartPriority' pc = snd `liftM` 'pcPriority' pc@ pcStartPriority :: PriorityChan k e -> STM StartPriority pcStartPriority pc = snd `liftM` pcPriority pc -- | Constructor. newPriorityChan :: k -> StartPriority -> ChanCapacity -> STM (PriorityChan k e) newPriorityChan _id _pcStartPrior _pcCapac = do _pcChanB <- newTChanB _pcCapac _pcPriorityTV <- newTVar (_pcStartPrior, _pcStartPrior) return PriorityChan { pcID = _id , pcChanB = _pcChanB , pcPriorityTV = _pcPriorityTV } -- | PCC constructor with an option to enable utility @'ConcurrentUISupport'@ -- (which is useful for debugging). newPriorityChansConvergerSTM_wCUIS :: Ord k => Maybe ConcurrentUISupport -> Map k (StartPriority, ChanCapacity) -> STM (PriorityChansConvergerSTM k e) newPriorityChansConvergerSTM_wCUIS mb_cuis chan_priors_map = do (_pccStruct, _pccEmpties) <- foldrM (\ (k, (p, c)) (struct, state) -> do pc <- newPriorityChan k p c let new_struct = M.insert k pc struct let new_state = M.unionWith (++) state (M.singleton (p, p) [pc]) return (new_struct, new_state) ) (M.empty, M.empty) (M.toList chan_priors_map) _pccEmptiesTV <- newTVar _pccEmpties _pccNonemptiesTV <- newTVar M.empty return PriorityChansConvergerSTM { pccStruct = _pccStruct , pccEmpties = _pccEmptiesTV , pccNonEmpties = _pccNonemptiesTV , pccConcurrentUISupport = mb_cuis } -- | PCC constructor. @'ConcurrentUISupport'@ is off. -- -- @newPriorityChansConvergerSTM = 'newPriorityChansConvergerSTM_wCUIS' Nothing@ newPriorityChansConvergerSTM :: Ord k => Map k (StartPriority, ChanCapacity) -> STM (PriorityChansConvergerSTM k e) newPriorityChansConvergerSTM = newPriorityChansConvergerSTM_wCUIS Nothing --------------------------------------------------------------------------------------------------------------------------- -- * PriorityChansConvergerSTM mutation --------------------------------------------------------------------------------------------------------------------------- -- | Checks: -- -- * If PCC contains all channels, constructable by given specification. -- -- * If PCC contains nothing else, than mentioned in the given specification. isOfStructPCC_STM :: Ord k => Map k (StartPriority, ChanCapacity) -> PriorityChansConvergerSTM k e -> STM (PCC_ContainsGivenSpecification, NothingElseIsInStruct) isOfStructPCC_STM mp pcc = do (sogood_sofar, left_overs) <- foldrM (\ (k, (p, c)) (sogood_sofar, mp) -> case takeFromMap k mp of (Nothing, _) -> return (False, mp) (Just pc, rest_mp) -> do cap <- pcCapacity pc start <- pcStartPriority pc return (sogood_sofar && cap == c && start == p, rest_mp) ) (True, pccStruct pcc) (M.toList mp) return (sogood_sofar, M.null left_overs) -- | Updates PCC with given specification. New channels may be added -- or existing alerted. -- But no operation for removing channels. mutatePCC_STM :: Ord k => Map k (StartPriority, ChanCapacity) -> PriorityChansConvergerSTM k e -> STM (PriorityChansConvergerSTM k e) mutatePCC_STM chan_priors_map pcc = do new_struct_map <- getNewStruct return pcc { pccStruct = new_struct_map } where getNewStruct = foldrM (\ (k, (new_start, cap)) struct_map -> do (new_struct_map, pc, mb_old_prior) <- case M.lookup k struct_map of Nothing -> do pc <- newPriorityChan k new_start cap return $ (M.insert k pc struct_map, pc, Nothing) Just pc -> do (_cur, _start) <- readTVar (pcPriorityTV pc) let new_prior = (_cur, new_start) writeTVar (pcPriorityTV pc) (_cur, new_start) changeChanCapacity cap (pcChanB pc) return (struct_map, pc, Just (_cur, _start)) emp <- pcIsEmpty pc case mb_old_prior of Nothing -> modifyTVar_ (pccEmpties pcc) (M.unionWith (++) (M.singleton (new_start, new_start) [pc])) Just old_key@(cur, old_start) -> modifyTVar_ (case emp of True -> pccEmpties pcc False -> pccNonEmpties pcc ) (\ mp0 -> let mp1 = M.update (\ pc_list -> let new_pc_list = filter (\ pc -> pcID pc /= k) pc_list in case null new_pc_list of True -> Nothing False -> Just new_pc_list ) old_key mp0 mp2 = M.unionWith (++) (M.singleton (cur, new_start) [pc]) mp1 in mp2 ) return new_struct_map ) (pccStruct pcc) (M.toList chan_priors_map) --------------------------------------------------------------------------------------------------------------------------- -- * PriorityChansConvergerSTM I/O --------------------------------------------------------------------------------------------------------------------------- -- | If blocking is on, then blocks, when trying to write to a full channel -- (where's no free space). writeInPCC_STM :: Ord k => PermitToBlock -> k -> e -> PriorityChansConvergerSTM k e -> STM (Maybe FailureReasonWPCC) writeInPCC_STM block_dowe k e pcc = case M.lookup k (pccStruct pcc) of Nothing -> return $ Just BadKey_FRWPCC Just pc -> do emp <- pcIsEmpty pc wrote <- elasticWriteTChanB block_dowe (pcChanB pc) e case wrote of False -> return $ Just ChanFull_FRWPCC True -> case emp of False -> return Nothing True -> do prior_tuple <- pcPriority pc modifyTVar_ (pccEmpties pcc) (M.update (\ pc_list -> case filter (\ pc -> pcID pc /= k) pc_list of [] -> Nothing new_pc_list -> Just new_pc_list ) prior_tuple ) modifyTVar_ (pccNonEmpties pcc) (M.unionWith (++) (M.singleton prior_tuple [pc])) return Nothing -- | Wrapper around @('writeInPCC_STM' True)@ - blocking write. -- Uses 'interruptableSTM'. If returns Left, then transaction ended with -- interrupting condition. interruptableWriteInPCC_STM :: Ord k => STM InterruptShouldWe -> k -> e -> PriorityChansConvergerSTM k e -> STM (Either () (Maybe FailureReasonWPCC)) interruptableWriteInPCC_STM stm_interrupter k e pcc = do mb_r <- interruptableSTM stm_interrupter (writeInPCC_STM True k e pcc) return $ case mb_r of Nothing -> Left () Just smtng -> Right smtng -- | If @'PermitToBlock'@ is @True@, then never returns @Nothing@. readFromPCC_STM :: Ord k => PermitToBlock -> PriorityChansConvergerSTM k e -> STM (Maybe (k, e)) readFromPCC_STM block_dowe pcc = do mp <- readTVar $ pccNonEmpties pcc case M.maxViewWithKey mp of Nothing -> case block_dowe of True -> retry False -> return Nothing Just ((prior, pc : rest_pc_list), rest_mp) -> do -- this assumes that on place of "pc : _" can't be [] mb_e <- fromJust `liftM` tryReadTChanB (pcChanB pc) -- this assumes that if given PriorityChan is in pccNonEmpties map, then it may NOT be empty... orelse, we get an exception here emp <- pcIsEmpty pc let new_prior = decPriority prior case emp of True -> modifyTVar_ (pccEmpties pcc) (M.unionWith (++) (M.singleton new_prior [pc])) False -> return () let new_mp = case emp of True -> case rest_pc_list of [] -> rest_mp _ -> M.insert prior rest_pc_list rest_mp False -> let map_1 = case rest_pc_list of [] -> rest_mp _ -> M.insert prior rest_pc_list rest_mp in M.unionWith (++) (M.singleton new_prior [pc]) map_1 modifyTVar_ (pccNonEmpties pcc) (const new_mp) modifyTVar_ (pcPriorityTV pc) (const new_prior) return $ Just (pcID pc, mb_e) Just ((_, []),_) -> error "An error occurred in 'readFromPCC_STM': empty list in the entry of pccNonEmpties!" where f_name = "readFromPCC_STM" -- | Wrapper around @('readFromPCC_STM' True)@ - blocking read. -- Uses 'interruptableSTM'. If returns Left, then transaction ended with -- interrupting condition. interruptableReadFromPCC_STM :: Ord k => STM InterruptShouldWe -> PriorityChansConvergerSTM k e -> STM (Either () (k, e)) interruptableReadFromPCC_STM stm_interrupter pcc = do mb_r <- interruptableSTM stm_interrupter (readFromPCC_STM True pcc) return $ case mb_r of Nothing -> Left () Just (Just smtng) -> Right smtng Just Nothing -> error "An error occurred in 'interruptableReadFromPCC_STM': blocking read returned Nothing!" -- | Composition of @'readFromPCC_STM'@s. Lazy (doublecheck that). flushPCC2List_STM :: Ord k => PriorityChansConvergerSTM k e -> STM [(k, e)] flushPCC2List_STM pcc = reverse `liftM` whileJustM_1 (:) [] (readFromPCC_STM False pcc) -- | Strict. Should be a bit faster than @'flushPCC2List_STM'@. flushPCC2List_STM' :: Ord k => PriorityChansConvergerSTM k e -> STM [(k, e)] flushPCC2List_STM' pcc = do ne_pc_mp <- readTVar $ pccNonEmpties pcc (e_list, empties) <- readFromNonempties ne_pc_mp modifyTVar_ (pccEmpties pcc) (M.unionWith (++) empties) return e_list where readFromNonempties :: Map (CurrentPriority, StartPriority) [PriorityChan k e] -> STM ([(k, e)], Map (CurrentPriority, StartPriority) [PriorityChan k e]) readFromNonempties mp = case M.maxViewWithKey mp of Nothing -> return ([], M.empty) Just ((prior, pc_list), rest_mp) -> do let new_prior = decPriority prior (e_list, nonempties, empties) <- foldrM (\ pc (elems_accum, nonempties_accum, empties_accum) -> do mb_e <- tryReadTChanB (pcChanB pc) return $ case mb_e of Nothing -> ( elems_accum, nonempties_accum, pc : empties_accum) Just e -> ((pcID pc, e) : elems_accum, pc : nonempties_accum, empties_accum) ) ([], [], []) pc_list let continue_with_map = M.unionWith (++) rest_mp (M.singleton new_prior nonempties) let stay_with_map = M.singleton prior empties (lower_prior_e_list, rest_of_empties) <- readFromNonempties continue_with_map return (e_list ++ lower_prior_e_list, M.unionWith (++) stay_with_map rest_of_empties) -- | Strict. This is a fast flush, it doesn't deal with priorities. fastFlushPCC2List_STM' :: Ord k => PriorityChansConvergerSTM k e -> STM [(k, e)] fastFlushPCC2List_STM' pcc = do ne_pc_mp <- readTVar $ pccNonEmpties pcc modifyTVar_ (pccEmpties pcc) (M.unionWith (++) ne_pc_mp) let pc_lists_list = M.elems ne_pc_mp elems_list <- foldlM (foldlM foldlM_r) [] pc_lists_list return elems_list where foldlM_r :: [(k, e)] -> PriorityChan k e -> STM [(k, e)] foldlM_r accum pc = do let k = pcID pc el <- getTChanBContents (pcChanB pc) return (accum ++ map (\ e -> (k, e)) el) isEmptyPCC_STM :: Ord k => PriorityChansConvergerSTM k e -> STM Bool isEmptyPCC_STM pcc = M.null `liftM` (readTVar $ pccNonEmpties pcc) -- | Free space in referenced channel = Capacity - Load. freeSpaceInPCCInput_STM :: Ord k => PriorityChansConvergerSTM k e -> k -> STM (Maybe ChanContentAmountMeasure) freeSpaceInPCCInput_STM pcc k = do case M.lookup k $ pccStruct pcc of Nothing -> return Nothing Just pc -> do cap <- pcCapacity pc lo <- pcLoad pc return $ Just (cap - lo) -- | Strict. -- -- Take everything, applu filter, return what's left in the PCC, -- return what's taken out. filterOutPCCElements_STM' :: Ord k => PriorityChansConvergerSTM k e -> (e -> TakeElementOutShouldWe) -> STM [(k, e)] filterOutPCCElements_STM' pcc p = foldlM _filterOutPCElements [] (M.elems $ pccStruct pcc) where -- _filterOutPCElements :: [(k, e)] -> PriorityChan k e -> STM [(k, e)] _filterOutPCElements takeout_accum pc = do takeout_add <- filterOutTChanBElements p (pcChanB pc) return $ join [zip (repeat $ pcID pc) takeout_add, takeout_accum] --------------------------------------------------------------------------------------------------------------------------- -- * PriorityChansConvergerSTM representation --------------------------------------------------------------------------------------------------------------------------- -- | Used wrapped in @(unsafePerformIO . atomically)@ in the definition -- of show instance. showPCC_STM :: (Ord k, Show k, Show e) => PriorityChansConvergerSTM k e -> STM String showPCC_STM pcc = do let f = (\ pc_map_by_prior -> map (\ (p, pc_list) -> (p, map pcID pc_list)) (M.toList pc_map_by_prior) ) emps <- f `liftM` (readTVar $ pccEmpties pcc) nemps <- f `liftM` (readTVar $ pccNonEmpties pcc) let s = "PriorityChansConvergerSTM {" ++ "\n | Empties = " ++ (show $ emps) ++ "\n | NonEmpties = " ++ (show $ nemps) ++ "\n | Chans count = " ++ (show $ M.size $ pccStruct pcc) ++ " chans" ++ "\n |+ Channels : " s_w_c <- foldrM (\ pc s_accum -> showPC_STM pc >>= \ s_add -> return (s_accum ++ s_add) ) s (M.elems $ pccStruct pcc) return (s_w_c ++ "\n}") -- | Used wrapped in @(unsafePerformIO . atomically)@ in the definition -- of show instance. showPC_STM :: (Show e, Show k) => PriorityChan k e -> STM String showPC_STM pc = do (cur, start) <- pcPriority pc cap <- pcCapacity pc load <- pcLoad pc let s = "\n ||+ PriorityChan {" ++ "\n ||| ID = " ++ (show $ pcID pc) ++ "\n ||| CurrentPriority = " ++ (show cur) ++ "\n ||| StartPriority = " ++ (show start) ++ "\n ||| Capacity = " ++ (show cap) ++ "\n ||| Load = " ++ (show load) ++ "\n |||+ Content : " elems <- viewChanBContent $ pcChanB pc let ch_content_str = (Prelude.concat $ mapWithIndex (\ i e -> "\n |||| -- " ++ show i ++ ") " ++ show e) elems) ++ "\n ||- }" return (s ++ ch_content_str) -- instance (Show k, Show e) => Show (PriorityChan k e) where -- show pc = unsafePerformIO $ printExceptionIfAny "show:PriorityChan" $ atomically $ showPC_STM pc -- instance (Ord k, Show k, Show e) => Show (PriorityChansConvergerSTM k e) where -- show pcc = unsafePerformIO $ printExceptionIfAny "show:PriorityChansConvergerSTM" $ atomically $ showPCC_STM pcc -------------------------------- -- * Helpers -- | For @interruptableSTM interrupter subj@, when @interrupter@ returns @True@, -- the transaction returns @Nothing@. Else the result of @subj@ is waited -- and returned wrapped into @Just@. interruptableSTM :: STM InterruptShouldWe -> STM a -> STM (Maybe a) interruptableSTM interrupter subj = do ir <- interrupter case ir of True -> return Nothing False -> Just `liftM` subj