{-
Copyright (C) 2009-2010 Andrejs Sisojevs <andrejs.sisojevs@nextmail.ru>

All rights reserved.

For license and copyright information, see the file COPYRIGHT
-}

--------------------------------------------------------------------------
--------------------------------------------------------------------------

{-# LANGUAGE MultiParamTypeClasses #-}

-- | Abbreviation for the @PriorityChansConverger@ is PCC.
--
-- Based on @STM.TChan@s, extended with capacity control
-- (see @"Control.Concurrent.STM.TChan.TChanB"@).
--
-- When user reads from the PCC, the choice is made - from which channel
-- to read. System selects a nonempty channel, whose /(CurrentPriority,
-- StartPriority)/ tuple is max. The side effect of the channel selection
-- is it's /CurrentPriority/ decrease by one, if it's value becomes
-- less than one, then the /CurrentPriority/ is set to /StartPriority/.
module Control.Concurrent.PriorityChansConverger.PriorityChansConvergerSTM where

import           Control.Concurrent.ConcurrentUISupport
import           Control.Concurrent.PriorityChansConverger.Commons
import           Control.Concurrent.STM
import           Control.Concurrent.STM.TChan.TChanB
import           Control.Monad
import           Data.Foldable
import qualified Data.Map as M
import           Data.Map (Map)
import           Data.Maybe
import           Data.MyHelpers

----------------------------------------
-- * PriorityChansConverger ADT and it's administration

-- | Decrease @'CurrentPriority'@ by one, if it's value becomes
-- less than one, then the @'CurrentPriority'@ is set to @'StartPriority'@.
decPriority :: (CurrentPriority, StartPriority) -> (CurrentPriority, StartPriority)
decPriority (cur, start) =
        ( case cur <= 1 of
              True  -> start
              False -> cur - 1
        , start
        )

-- Core ADT of this package. It is thought to be wrapped in a IO ADT,
-- though, in order to optimize performance
-- (which is done in the
-- @Control.Concurrent.PriorityChansConverger.PriorityChansConverger@
-- module).
data Ord k => PriorityChansConvergerSTM k e = PriorityChansConvergerSTM {
        -- | All channels. Convenient to use for input.
        -- Map keys == @'pcID'@ of elements.
        pccStruct     :: Map k (PriorityChan k e)
        -- | Empty channels. Map keys == @'pcPriorityTV'@ of elements.
      , pccEmpties    :: TVar (Map (CurrentPriority, StartPriority) [PriorityChan k e])
        -- | Nonempty channels. Map keys == @'pcPriorityTV'@ of elements.
        -- Convenient for taking output.
      , pccNonEmpties :: TVar (Map (CurrentPriority, StartPriority) [PriorityChan k e])
        -- | For testing and debugging purposes.
      , pccConcurrentUISupport :: Maybe ConcurrentUISupport
}

-- | PriorityChan = ID + @'TChanB'@ + @'TVar' ('CurrentPriority', 'StartPriority')@
data PriorityChan k e = PriorityChan {
          pcID         :: k
        , pcChanB      :: TChanB e
        , pcPriorityTV :: TVar (CurrentPriority, StartPriority)
}

-- | @'pcLoad' = 'chanLoad' . 'pcChanB'@
pcLoad :: PriorityChan k e -> STM ChanLoad
pcLoad = chanLoad . pcChanB

-- | @'pcCapacity' = 'chanCapacity' . 'pcChanB'@
pcCapacity :: PriorityChan k e -> STM ChanCapacity
pcCapacity = chanCapacity . pcChanB

-- | @'pcIsEmpty' = 'isEmptyTChanB' . 'pcChanB'@
pcIsEmpty :: PriorityChan k e -> STM Bool
pcIsEmpty = isEmptyTChanB . pcChanB

-- | @'pcPriority' = 'readTVar' . 'pcPriorityTV'@
pcPriority :: PriorityChan k e -> STM (CurrentPriority, StartPriority)
pcPriority = readTVar . pcPriorityTV

-- | @'pcStartPriority' pc = snd `liftM` 'pcPriority' pc@
pcStartPriority :: PriorityChan k e -> STM StartPriority
pcStartPriority pc = snd `liftM` pcPriority pc

-- | Constructor.
newPriorityChan :: k -> StartPriority -> ChanCapacity -> STM (PriorityChan k e)
newPriorityChan _id _pcStartPrior _pcCapac = do
        _pcChanB <- newTChanB _pcCapac
        _pcPriorityTV <- newTVar (_pcStartPrior, _pcStartPrior)
        return PriorityChan {
                 pcID         = _id
               , pcChanB      = _pcChanB
               , pcPriorityTV = _pcPriorityTV
               }

-- | PCC constructor with an option to enable utility @'ConcurrentUISupport'@
-- (which is useful for debugging).
newPriorityChansConvergerSTM_wCUIS :: Ord k => Maybe ConcurrentUISupport -> Map k (StartPriority, ChanCapacity) -> STM (PriorityChansConvergerSTM k e)
newPriorityChansConvergerSTM_wCUIS mb_cuis chan_priors_map = do
                (_pccStruct, _pccEmpties) <-
                           foldrM (\ (k, (p, c)) (struct, state) -> do
                                                pc <- newPriorityChan k p c
                                                let new_struct = M.insert k pc struct
                                                let new_state  = M.unionWith (++) state (M.singleton (p, p) [pc])
                                                return (new_struct, new_state)
                                  )
                                  (M.empty, M.empty)
                                  (M.toList chan_priors_map)
                _pccEmptiesTV <- newTVar _pccEmpties
                _pccNonemptiesTV <- newTVar M.empty
                return PriorityChansConvergerSTM {
                                pccStruct     = _pccStruct
                              , pccEmpties    = _pccEmptiesTV
                              , pccNonEmpties = _pccNonemptiesTV
                              , pccConcurrentUISupport = mb_cuis
                       }

-- | PCC constructor. @'ConcurrentUISupport'@ is off.
--
-- @newPriorityChansConvergerSTM = 'newPriorityChansConvergerSTM_wCUIS' Nothing@
newPriorityChansConvergerSTM :: Ord k => Map k (StartPriority, ChanCapacity) -> STM (PriorityChansConvergerSTM k e)
newPriorityChansConvergerSTM = newPriorityChansConvergerSTM_wCUIS Nothing

---------------------------------------------------------------------------------------------------------------------------
-- * PriorityChansConvergerSTM mutation
---------------------------------------------------------------------------------------------------------------------------

-- | Checks:
--
-- * If PCC contains all channels, constructable by given specification.
--
-- * If PCC contains nothing else, than mentioned in the given specification.
isOfStructPCC_STM :: Ord k => Map k (StartPriority, ChanCapacity) -> PriorityChansConvergerSTM k e -> STM (PCC_ContainsGivenSpecification, NothingElseIsInStruct)
isOfStructPCC_STM mp pcc = do
          (sogood_sofar, left_overs) <-
                  foldrM (\ (k, (p, c)) (sogood_sofar, mp) ->
                                case takeFromMap k mp of
                                    (Nothing, _) -> return (False, mp)
                                    (Just pc, rest_mp) -> do
                                        cap <- pcCapacity pc
                                        start <- pcStartPriority pc
                                        return (sogood_sofar && cap == c && start == p, rest_mp)
                         )
                         (True, pccStruct pcc)
                         (M.toList mp)
          return (sogood_sofar, M.null left_overs)

-- | Updates PCC with given specification. New channels may be added
-- or existing alerted.
-- But no operation for removing channels.
mutatePCC_STM :: Ord k => Map k (StartPriority, ChanCapacity) -> PriorityChansConvergerSTM k e -> STM (PriorityChansConvergerSTM k e)
mutatePCC_STM chan_priors_map pcc = do
           new_struct_map <- getNewStruct
           return pcc { pccStruct = new_struct_map }
        where
           getNewStruct = foldrM
                      (\ (k, (new_start, cap)) struct_map -> do
                                (new_struct_map, pc, mb_old_prior) <-
                                        case M.lookup k struct_map of
                                            Nothing -> do
                                                pc <- newPriorityChan k new_start cap
                                                return $ (M.insert k pc struct_map, pc, Nothing)
                                            Just pc -> do
                                                (_cur, _start) <- readTVar (pcPriorityTV pc)
                                                let new_prior = (_cur, new_start)
                                                writeTVar (pcPriorityTV pc) (_cur, new_start)
                                                changeChanCapacity cap (pcChanB pc)
                                                return (struct_map, pc, Just (_cur, _start))
                                emp <- pcIsEmpty pc
                                case mb_old_prior of
                                    Nothing -> modifyTVar_
                                        (pccEmpties pcc)
                                        (M.unionWith (++) (M.singleton (new_start, new_start) [pc]))
                                    Just old_key@(cur, old_start) -> modifyTVar_
                                        (case emp of
                                            True  -> pccEmpties    pcc
                                            False -> pccNonEmpties pcc
                                        )
                                        (\ mp0 ->
                                              let mp1 = M.update (\ pc_list ->
                                                                        let new_pc_list = filter (\ pc -> pcID pc /= k) pc_list
                                                                         in case null new_pc_list of
                                                                                True  -> Nothing
                                                                                False -> Just new_pc_list
                                                                 ) old_key mp0
                                                  mp2 = M.unionWith (++) (M.singleton (cur, new_start) [pc]) mp1
                                               in mp2
                                        )
                                return new_struct_map
                      )
                      (pccStruct pcc)
                      (M.toList chan_priors_map)

---------------------------------------------------------------------------------------------------------------------------
-- * PriorityChansConvergerSTM I/O
---------------------------------------------------------------------------------------------------------------------------

-- | If blocking is on, then blocks, when trying to write to a full channel
-- (where's no free space).
writeInPCC_STM :: Ord k => PermitToBlock -> k -> e -> PriorityChansConvergerSTM k e -> STM (Maybe FailureReasonWPCC)
writeInPCC_STM block_dowe k e pcc =
        case M.lookup k (pccStruct pcc) of
            Nothing -> return $ Just BadKey_FRWPCC
            Just pc -> do
               emp <- pcIsEmpty pc
               wrote <- elasticWriteTChanB block_dowe (pcChanB pc) e
               case wrote of
                   False -> return $ Just ChanFull_FRWPCC
                   True  ->
                        case emp of
                            False -> return Nothing
                            True  -> do
                                prior_tuple <- pcPriority pc
                                modifyTVar_
                                        (pccEmpties pcc)
                                        (M.update
                                             (\ pc_list ->
                                                   case filter (\ pc -> pcID pc /= k) pc_list of
                                                       [] -> Nothing
                                                       new_pc_list -> Just new_pc_list
                                             )
                                             prior_tuple
                                        )
                                modifyTVar_
                                        (pccNonEmpties pcc)
                                        (M.unionWith (++) (M.singleton prior_tuple [pc]))
                                return Nothing

-- | Wrapper around @('writeInPCC_STM' True)@ - blocking write.
-- Uses 'interruptableSTM'. If returns Left, then transaction ended with
-- interrupting condition.
interruptableWriteInPCC_STM :: Ord k => STM InterruptShouldWe -> k -> e -> PriorityChansConvergerSTM k e -> STM (Either () (Maybe FailureReasonWPCC))
interruptableWriteInPCC_STM stm_interrupter k e pcc = do
        mb_r <- interruptableSTM stm_interrupter (writeInPCC_STM True k e pcc)
        return $ case mb_r of
                     Nothing -> Left ()
                     Just smtng -> Right smtng

-- | If @'PermitToBlock'@ is @True@, then never returns @Nothing@.
readFromPCC_STM :: Ord k => PermitToBlock -> PriorityChansConvergerSTM k e -> STM (Maybe (k, e))
readFromPCC_STM block_dowe pcc = do
        mp <- readTVar $ pccNonEmpties pcc
        case M.maxViewWithKey mp of
            Nothing ->
                case block_dowe of
                    True  -> retry
                    False -> return Nothing
            Just ((prior, pc : rest_pc_list), rest_mp) -> do -- this assumes that on place of "pc : _" can't be []
                mb_e <- fromJust `liftM` tryReadTChanB (pcChanB pc) -- this assumes that if given PriorityChan is in pccNonEmpties map, then it may NOT be empty... orelse, we get an exception here
                emp <- pcIsEmpty pc
                let new_prior = decPriority prior
                case emp of
                    True -> modifyTVar_
                                (pccEmpties pcc)
                                (M.unionWith (++) (M.singleton new_prior [pc]))
                    False -> return ()
                let new_mp =
                        case emp of
                            True ->
                                case rest_pc_list of
                                    [] -> rest_mp
                                    _  -> M.insert prior rest_pc_list rest_mp
                            False ->
                                let map_1 =
                                        case rest_pc_list of
                                            [] -> rest_mp
                                            _  -> M.insert prior rest_pc_list rest_mp
                                 in M.unionWith (++) (M.singleton new_prior [pc]) map_1
                modifyTVar_
                        (pccNonEmpties pcc)
                        (const new_mp)
                modifyTVar_
                        (pcPriorityTV pc)
                        (const new_prior)
                return $ Just (pcID pc, mb_e)
            Just ((_, []),_) -> error "An error occurred in 'readFromPCC_STM': empty list in the entry of pccNonEmpties!"
       where f_name = "readFromPCC_STM"

-- | Wrapper around @('readFromPCC_STM' True)@ - blocking read.
-- Uses 'interruptableSTM'. If returns Left, then transaction ended with
-- interrupting condition.
interruptableReadFromPCC_STM :: Ord k => STM InterruptShouldWe -> PriorityChansConvergerSTM k e -> STM (Either () (k, e))
interruptableReadFromPCC_STM stm_interrupter pcc = do
        mb_r <- interruptableSTM stm_interrupter (readFromPCC_STM True pcc)
        return $ case mb_r of
                     Nothing -> Left ()
                     Just (Just smtng) -> Right smtng
                     Just Nothing -> error "An error occurred in 'interruptableReadFromPCC_STM': blocking read returned Nothing!"

-- | Composition of @'readFromPCC_STM'@s. Lazy (doublecheck that).
flushPCC2List_STM :: Ord k => PriorityChansConvergerSTM k e -> STM [(k, e)]
flushPCC2List_STM pcc = reverse `liftM` whileJustM_1 (:) [] (readFromPCC_STM False pcc)

-- | Strict. Should be a bit faster than @'flushPCC2List_STM'@.
flushPCC2List_STM' :: Ord k => PriorityChansConvergerSTM k e -> STM [(k, e)]
flushPCC2List_STM' pcc = do
        ne_pc_mp <- readTVar $ pccNonEmpties pcc
        (e_list, empties) <- readFromNonempties ne_pc_mp
        modifyTVar_ (pccEmpties pcc) (M.unionWith (++) empties)
        return e_list
     where
        readFromNonempties :: Map (CurrentPriority, StartPriority) [PriorityChan k e] -> STM ([(k, e)], Map (CurrentPriority, StartPriority) [PriorityChan k e])
        readFromNonempties mp =
                case M.maxViewWithKey mp of
                    Nothing -> return ([], M.empty)
                    Just ((prior, pc_list), rest_mp) -> do
                        let new_prior = decPriority prior
                        (e_list, nonempties, empties) <- foldrM
                                   (\ pc (elems_accum, nonempties_accum, empties_accum) -> do
                                                mb_e <- tryReadTChanB (pcChanB pc)
                                                return $ case mb_e of
                                                    Nothing -> (               elems_accum,      nonempties_accum, pc : empties_accum)
                                                    Just  e -> ((pcID pc, e) : elems_accum, pc : nonempties_accum,      empties_accum)
                                   )
                                   ([], [], [])
                                   pc_list
                        let continue_with_map = M.unionWith (++) rest_mp (M.singleton new_prior nonempties)
                        let stay_with_map     = M.singleton prior empties
                        (lower_prior_e_list, rest_of_empties) <- readFromNonempties continue_with_map
                        return (e_list ++ lower_prior_e_list, M.unionWith (++) stay_with_map rest_of_empties)

-- | Strict. This is a fast flush, it doesn't deal with priorities.
fastFlushPCC2List_STM' :: Ord k => PriorityChansConvergerSTM k e -> STM [(k, e)]
fastFlushPCC2List_STM' pcc = do
        ne_pc_mp <- readTVar $ pccNonEmpties pcc
        modifyTVar_ (pccEmpties pcc) (M.unionWith (++) ne_pc_mp)
        let pc_lists_list = M.elems ne_pc_mp
        elems_list <- foldlM (foldlM foldlM_r) [] pc_lists_list
        return elems_list
     where
        foldlM_r :: [(k, e)] -> PriorityChan k e -> STM [(k, e)]
        foldlM_r accum pc = do
                let k = pcID pc
                el <- getTChanBContents (pcChanB pc)
                return (accum ++ map (\ e -> (k, e)) el)

isEmptyPCC_STM :: Ord k => PriorityChansConvergerSTM k e -> STM Bool
isEmptyPCC_STM pcc = M.null `liftM` (readTVar $ pccNonEmpties pcc)

-- | Free space in referenced channel = Capacity - Load.
freeSpaceInPCCInput_STM :: Ord k => PriorityChansConvergerSTM k e -> k -> STM (Maybe ChanContentAmountMeasure)
freeSpaceInPCCInput_STM pcc k = do
        case M.lookup k $ pccStruct pcc of
            Nothing -> return Nothing
            Just pc -> do
                cap <- pcCapacity pc
                lo  <- pcLoad pc
                return $ Just (cap - lo)

-- | Strict.
--
-- Take everything, applu filter, return what's left in the PCC,
-- return what's taken out.
filterOutPCCElements_STM' :: Ord k => PriorityChansConvergerSTM k e -> (e -> TakeElementOutShouldWe) -> STM [(k, e)]
filterOutPCCElements_STM' pcc p = foldlM _filterOutPCElements [] (M.elems $ pccStruct pcc)
     where
     -- _filterOutPCElements :: [(k, e)] -> PriorityChan k e -> STM [(k, e)]
        _filterOutPCElements takeout_accum pc = do
                takeout_add <- filterOutTChanBElements p (pcChanB pc)
                return $ join [zip (repeat $ pcID pc) takeout_add, takeout_accum]


---------------------------------------------------------------------------------------------------------------------------
-- * PriorityChansConvergerSTM representation
---------------------------------------------------------------------------------------------------------------------------

-- | Used wrapped in @(unsafePerformIO . atomically)@ in the definition
-- of show instance.
showPCC_STM :: (Ord k, Show k, Show e) => PriorityChansConvergerSTM k e -> STM String
showPCC_STM pcc = do
        let f = (\ pc_map_by_prior -> map (\ (p, pc_list) -> (p, map pcID pc_list)) (M.toList pc_map_by_prior) )
        emps  <- f `liftM` (readTVar $ pccEmpties    pcc)
        nemps <- f `liftM` (readTVar $ pccNonEmpties pcc)
        let s = "PriorityChansConvergerSTM {"
             ++ "\n  |  Empties     = " ++ (show $ emps)
             ++ "\n  |  NonEmpties  = " ++ (show $ nemps)
             ++ "\n  |  Chans count = " ++ (show $ M.size $ pccStruct pcc) ++ " chans"
             ++ "\n  |+    Channels : "
        s_w_c <- foldrM (\ pc s_accum -> showPC_STM pc >>= \ s_add -> return (s_accum ++ s_add) ) s (M.elems $ pccStruct pcc)
        return (s_w_c ++ "\n}")

-- | Used wrapped in @(unsafePerformIO . atomically)@ in the definition
-- of show instance.
showPC_STM :: (Show e, Show k) => PriorityChan k e -> STM String
showPC_STM pc = do
        (cur, start) <- pcPriority pc
        cap <- pcCapacity pc
        load <- pcLoad pc
        let s = "\n  ||+ PriorityChan {"
             ++ "\n  |||    ID = " ++ (show $ pcID pc)
             ++ "\n  |||    CurrentPriority = " ++ (show cur)
             ++ "\n  |||    StartPriority   = " ++ (show start)
             ++ "\n  |||    Capacity = " ++ (show cap)
             ++ "\n  |||    Load     = " ++ (show load)
             ++ "\n  |||+   Content  : "
        elems <- viewChanBContent $ pcChanB pc
        let ch_content_str = (Prelude.concat $ mapWithIndex (\ i e -> "\n  ||||     -- " ++ show i ++ ") " ++ show e) elems)
                          ++ "\n  ||-  }"
        return (s ++ ch_content_str)

-- instance (Show k, Show e) => Show (PriorityChan k e) where
--     show pc  = unsafePerformIO $ printExceptionIfAny "show:PriorityChan" $ atomically $ showPC_STM pc
-- instance (Ord k, Show k, Show e) => Show (PriorityChansConvergerSTM k e) where
--     show pcc = unsafePerformIO $ printExceptionIfAny "show:PriorityChansConvergerSTM" $ atomically $ showPCC_STM pcc

--------------------------------

-- * Helpers

-- | For @interruptableSTM interrupter subj@, when @interrupter@ returns @True@,
-- the transaction returns @Nothing@. Else the result of @subj@ is waited
-- and returned wrapped into @Just@.
interruptableSTM :: STM InterruptShouldWe -> STM a -> STM (Maybe a)
interruptableSTM interrupter subj = do
        ir <- interrupter
        case ir of
            True  -> return Nothing
            False -> Just `liftM` subj