module Control.Concurrent.PriorityChansConverger.PriorityChansConvergerSTM where
import Control.Concurrent.ConcurrentUISupport
import Control.Concurrent.PriorityChansConverger.Commons
import Control.Concurrent.STM
import Control.Concurrent.STM.TChan.TChanB
import Control.Monad
import Data.Foldable
import qualified Data.Map as M
import Data.Map (Map)
import Data.Maybe
import Data.MyHelpers
decPriority :: (CurrentPriority, StartPriority) -> (CurrentPriority, StartPriority)
decPriority (cur, start) =
( case cur <= 1 of
True -> start
False -> cur 1
, start
)
data Ord k => PriorityChansConvergerSTM k e = PriorityChansConvergerSTM {
pccStruct :: Map k (PriorityChan k e)
, pccEmpties :: TVar (Map (CurrentPriority, StartPriority) [PriorityChan k e])
, pccNonEmpties :: TVar (Map (CurrentPriority, StartPriority) [PriorityChan k e])
, pccConcurrentUISupport :: Maybe ConcurrentUISupport
}
data PriorityChan k e = PriorityChan {
pcID :: k
, pcChanB :: TChanB e
, pcPriorityTV :: TVar (CurrentPriority, StartPriority)
}
pcLoad :: PriorityChan k e -> STM ChanLoad
pcLoad = chanLoad . pcChanB
pcCapacity :: PriorityChan k e -> STM ChanCapacity
pcCapacity = chanCapacity . pcChanB
pcIsEmpty :: PriorityChan k e -> STM Bool
pcIsEmpty = isEmptyTChanB . pcChanB
pcPriority :: PriorityChan k e -> STM (CurrentPriority, StartPriority)
pcPriority = readTVar . pcPriorityTV
pcStartPriority :: PriorityChan k e -> STM StartPriority
pcStartPriority pc = snd `liftM` pcPriority pc
newPriorityChan :: k -> StartPriority -> ChanCapacity -> STM (PriorityChan k e)
newPriorityChan _id _pcStartPrior _pcCapac = do
_pcChanB <- newTChanB _pcCapac
_pcPriorityTV <- newTVar (_pcStartPrior, _pcStartPrior)
return PriorityChan {
pcID = _id
, pcChanB = _pcChanB
, pcPriorityTV = _pcPriorityTV
}
newPriorityChansConvergerSTM_wCUIS :: Ord k => Maybe ConcurrentUISupport -> Map k (StartPriority, ChanCapacity) -> STM (PriorityChansConvergerSTM k e)
newPriorityChansConvergerSTM_wCUIS mb_cuis chan_priors_map = do
(_pccStruct, _pccEmpties) <-
foldrM (\ (k, (p, c)) (struct, state) -> do
pc <- newPriorityChan k p c
let new_struct = M.insert k pc struct
let new_state = M.unionWith (++) state (M.singleton (p, p) [pc])
return (new_struct, new_state)
)
(M.empty, M.empty)
(M.toList chan_priors_map)
_pccEmptiesTV <- newTVar _pccEmpties
_pccNonemptiesTV <- newTVar M.empty
return PriorityChansConvergerSTM {
pccStruct = _pccStruct
, pccEmpties = _pccEmptiesTV
, pccNonEmpties = _pccNonemptiesTV
, pccConcurrentUISupport = mb_cuis
}
newPriorityChansConvergerSTM :: Ord k => Map k (StartPriority, ChanCapacity) -> STM (PriorityChansConvergerSTM k e)
newPriorityChansConvergerSTM = newPriorityChansConvergerSTM_wCUIS Nothing
isOfStructPCC_STM :: Ord k => Map k (StartPriority, ChanCapacity) -> PriorityChansConvergerSTM k e -> STM (PCC_ContainsGivenSpecification, NothingElseIsInStruct)
isOfStructPCC_STM mp pcc = do
(sogood_sofar, left_overs) <-
foldrM (\ (k, (p, c)) (sogood_sofar, mp) ->
case takeFromMap k mp of
(Nothing, _) -> return (False, mp)
(Just pc, rest_mp) -> do
cap <- pcCapacity pc
start <- pcStartPriority pc
return (sogood_sofar && cap == c && start == p, rest_mp)
)
(True, pccStruct pcc)
(M.toList mp)
return (sogood_sofar, M.null left_overs)
mutatePCC_STM :: Ord k => Map k (StartPriority, ChanCapacity) -> PriorityChansConvergerSTM k e -> STM (PriorityChansConvergerSTM k e)
mutatePCC_STM chan_priors_map pcc = do
new_struct_map <- getNewStruct
return pcc { pccStruct = new_struct_map }
where
getNewStruct = foldrM
(\ (k, (new_start, cap)) struct_map -> do
(new_struct_map, pc, mb_old_prior) <-
case M.lookup k struct_map of
Nothing -> do
pc <- newPriorityChan k new_start cap
return $ (M.insert k pc struct_map, pc, Nothing)
Just pc -> do
(_cur, _start) <- readTVar (pcPriorityTV pc)
let new_prior = (_cur, new_start)
writeTVar (pcPriorityTV pc) (_cur, new_start)
changeChanCapacity cap (pcChanB pc)
return (struct_map, pc, Just (_cur, _start))
emp <- pcIsEmpty pc
case mb_old_prior of
Nothing -> modifyTVar_
(pccEmpties pcc)
(M.unionWith (++) (M.singleton (new_start, new_start) [pc]))
Just old_key@(cur, old_start) -> modifyTVar_
(case emp of
True -> pccEmpties pcc
False -> pccNonEmpties pcc
)
(\ mp0 ->
let mp1 = M.update (\ pc_list ->
let new_pc_list = filter (\ pc -> pcID pc /= k) pc_list
in case null new_pc_list of
True -> Nothing
False -> Just new_pc_list
) old_key mp0
mp2 = M.unionWith (++) (M.singleton (cur, new_start) [pc]) mp1
in mp2
)
return new_struct_map
)
(pccStruct pcc)
(M.toList chan_priors_map)
writeInPCC_STM :: Ord k => PermitToBlock -> k -> e -> PriorityChansConvergerSTM k e -> STM (Maybe FailureReasonWPCC)
writeInPCC_STM block_dowe k e pcc =
case M.lookup k (pccStruct pcc) of
Nothing -> return $ Just BadKey_FRWPCC
Just pc -> do
emp <- pcIsEmpty pc
wrote <- elasticWriteTChanB block_dowe (pcChanB pc) e
case wrote of
False -> return $ Just ChanFull_FRWPCC
True ->
case emp of
False -> return Nothing
True -> do
prior_tuple <- pcPriority pc
modifyTVar_
(pccEmpties pcc)
(M.update
(\ pc_list ->
case filter (\ pc -> pcID pc /= k) pc_list of
[] -> Nothing
new_pc_list -> Just new_pc_list
)
prior_tuple
)
modifyTVar_
(pccNonEmpties pcc)
(M.unionWith (++) (M.singleton prior_tuple [pc]))
return Nothing
interruptableWriteInPCC_STM :: Ord k => STM InterruptShouldWe -> k -> e -> PriorityChansConvergerSTM k e -> STM (Either () (Maybe FailureReasonWPCC))
interruptableWriteInPCC_STM stm_interrupter k e pcc = do
mb_r <- interruptableSTM stm_interrupter (writeInPCC_STM True k e pcc)
return $ case mb_r of
Nothing -> Left ()
Just smtng -> Right smtng
readFromPCC_STM :: Ord k => PermitToBlock -> PriorityChansConvergerSTM k e -> STM (Maybe (k, e))
readFromPCC_STM block_dowe pcc = do
mp <- readTVar $ pccNonEmpties pcc
case M.maxViewWithKey mp of
Nothing ->
case block_dowe of
True -> retry
False -> return Nothing
Just ((prior, pc : rest_pc_list), rest_mp) -> do
mb_e <- fromJust `liftM` tryReadTChanB (pcChanB pc)
emp <- pcIsEmpty pc
let new_prior = decPriority prior
case emp of
True -> modifyTVar_
(pccEmpties pcc)
(M.unionWith (++) (M.singleton new_prior [pc]))
False -> return ()
let new_mp =
case emp of
True ->
case rest_pc_list of
[] -> rest_mp
_ -> M.insert prior rest_pc_list rest_mp
False ->
let map_1 =
case rest_pc_list of
[] -> rest_mp
_ -> M.insert prior rest_pc_list rest_mp
in M.unionWith (++) (M.singleton new_prior [pc]) map_1
modifyTVar_
(pccNonEmpties pcc)
(const new_mp)
modifyTVar_
(pcPriorityTV pc)
(const new_prior)
return $ Just (pcID pc, mb_e)
Just ((_, []),_) -> error "An error occurred in 'readFromPCC_STM': empty list in the entry of pccNonEmpties!"
where f_name = "readFromPCC_STM"
interruptableReadFromPCC_STM :: Ord k => STM InterruptShouldWe -> PriorityChansConvergerSTM k e -> STM (Either () (k, e))
interruptableReadFromPCC_STM stm_interrupter pcc = do
mb_r <- interruptableSTM stm_interrupter (readFromPCC_STM True pcc)
return $ case mb_r of
Nothing -> Left ()
Just (Just smtng) -> Right smtng
Just Nothing -> error "An error occurred in 'interruptableReadFromPCC_STM': blocking read returned Nothing!"
flushPCC2List_STM :: Ord k => PriorityChansConvergerSTM k e -> STM [(k, e)]
flushPCC2List_STM pcc = reverse `liftM` whileJustM_1 (:) [] (readFromPCC_STM False pcc)
flushPCC2List_STM' :: Ord k => PriorityChansConvergerSTM k e -> STM [(k, e)]
flushPCC2List_STM' pcc = do
ne_pc_mp <- readTVar $ pccNonEmpties pcc
(e_list, empties) <- readFromNonempties ne_pc_mp
modifyTVar_ (pccEmpties pcc) (M.unionWith (++) empties)
return e_list
where
readFromNonempties :: Map (CurrentPriority, StartPriority) [PriorityChan k e] -> STM ([(k, e)], Map (CurrentPriority, StartPriority) [PriorityChan k e])
readFromNonempties mp =
case M.maxViewWithKey mp of
Nothing -> return ([], M.empty)
Just ((prior, pc_list), rest_mp) -> do
let new_prior = decPriority prior
(e_list, nonempties, empties) <- foldrM
(\ pc (elems_accum, nonempties_accum, empties_accum) -> do
mb_e <- tryReadTChanB (pcChanB pc)
return $ case mb_e of
Nothing -> ( elems_accum, nonempties_accum, pc : empties_accum)
Just e -> ((pcID pc, e) : elems_accum, pc : nonempties_accum, empties_accum)
)
([], [], [])
pc_list
let continue_with_map = M.unionWith (++) rest_mp (M.singleton new_prior nonempties)
let stay_with_map = M.singleton prior empties
(lower_prior_e_list, rest_of_empties) <- readFromNonempties continue_with_map
return (e_list ++ lower_prior_e_list, M.unionWith (++) stay_with_map rest_of_empties)
fastFlushPCC2List_STM' :: Ord k => PriorityChansConvergerSTM k e -> STM [(k, e)]
fastFlushPCC2List_STM' pcc = do
ne_pc_mp <- readTVar $ pccNonEmpties pcc
modifyTVar_ (pccEmpties pcc) (M.unionWith (++) ne_pc_mp)
let pc_lists_list = M.elems ne_pc_mp
elems_list <- foldlM (foldlM foldlM_r) [] pc_lists_list
return elems_list
where
foldlM_r :: [(k, e)] -> PriorityChan k e -> STM [(k, e)]
foldlM_r accum pc = do
let k = pcID pc
el <- getTChanBContents (pcChanB pc)
return (accum ++ map (\ e -> (k, e)) el)
isEmptyPCC_STM :: Ord k => PriorityChansConvergerSTM k e -> STM Bool
isEmptyPCC_STM pcc = M.null `liftM` (readTVar $ pccNonEmpties pcc)
freeSpaceInPCCInput_STM :: Ord k => PriorityChansConvergerSTM k e -> k -> STM (Maybe ChanContentAmountMeasure)
freeSpaceInPCCInput_STM pcc k = do
case M.lookup k $ pccStruct pcc of
Nothing -> return Nothing
Just pc -> do
cap <- pcCapacity pc
lo <- pcLoad pc
return $ Just (cap lo)
filterOutPCCElements_STM' :: Ord k => PriorityChansConvergerSTM k e -> (e -> TakeElementOutShouldWe) -> STM [(k, e)]
filterOutPCCElements_STM' pcc p = foldlM _filterOutPCElements [] (M.elems $ pccStruct pcc)
where
_filterOutPCElements takeout_accum pc = do
takeout_add <- filterOutTChanBElements p (pcChanB pc)
return $ join [zip (repeat $ pcID pc) takeout_add, takeout_accum]
showPCC_STM :: (Ord k, Show k, Show e) => PriorityChansConvergerSTM k e -> STM String
showPCC_STM pcc = do
let f = (\ pc_map_by_prior -> map (\ (p, pc_list) -> (p, map pcID pc_list)) (M.toList pc_map_by_prior) )
emps <- f `liftM` (readTVar $ pccEmpties pcc)
nemps <- f `liftM` (readTVar $ pccNonEmpties pcc)
let s = "PriorityChansConvergerSTM {"
++ "\n | Empties = " ++ (show $ emps)
++ "\n | NonEmpties = " ++ (show $ nemps)
++ "\n | Chans count = " ++ (show $ M.size $ pccStruct pcc) ++ " chans"
++ "\n |+ Channels : "
s_w_c <- foldrM (\ pc s_accum -> showPC_STM pc >>= \ s_add -> return (s_accum ++ s_add) ) s (M.elems $ pccStruct pcc)
return (s_w_c ++ "\n}")
showPC_STM :: (Show e, Show k) => PriorityChan k e -> STM String
showPC_STM pc = do
(cur, start) <- pcPriority pc
cap <- pcCapacity pc
load <- pcLoad pc
let s = "\n ||+ PriorityChan {"
++ "\n ||| ID = " ++ (show $ pcID pc)
++ "\n ||| CurrentPriority = " ++ (show cur)
++ "\n ||| StartPriority = " ++ (show start)
++ "\n ||| Capacity = " ++ (show cap)
++ "\n ||| Load = " ++ (show load)
++ "\n |||+ Content : "
elems <- viewChanBContent $ pcChanB pc
let ch_content_str = (Prelude.concat $ mapWithIndex (\ i e -> "\n |||| -- " ++ show i ++ ") " ++ show e) elems)
++ "\n ||- }"
return (s ++ ch_content_str)
interruptableSTM :: STM InterruptShouldWe -> STM a -> STM (Maybe a)
interruptableSTM interrupter subj = do
ir <- interrupter
case ir of
True -> return Nothing
False -> Just `liftM` subj