{-
Copyright (C) 2009-2010 Andrejs Sisojevs <andrejs.sisojevs@nextmail.ru>

All rights reserved.

For license and copyright information, see the file COPYRIGHT
-}

--------------------------------------------------------------------------
--------------------------------------------------------------------------

-- | This module wraps STM PCC with IO.
-- Some locks are added to balance heavyweight transactions, to reduce \"waste
-- efforts\".
module Control.Concurrent.PriorityChansConverger.PriorityChansConverger where

import           Control.Concurrent
import           Control.Concurrent.MVar
import           Control.Concurrent.PriorityChansConverger.Commons
import           Control.Concurrent.PriorityChansConverger.PriorityChansConvergerSTM
import           Control.Concurrent.ConcurrentUISupport
import           Control.Concurrent.STM
import           Control.Concurrent.STM.TChan.ExtsCommons
import           Control.Exception
import           Control.Monad
import           Data.IORef
import           Data.List
import qualified Data.Map as M
import           Data.Map (Map)
import           Data.MyHelpers
import           System.IO

__locksObserverEnabled = False
----------------------------------------

-- * PriorityChansConverger ADT and it's administration

data PCC_Lock = PCC_Lock | PCC_IOInterruptableRead | PCC_IOInterruptableWrite deriving (Eq, Ord, Show)

-- | Wrapper around @'PriorityChansConvergerSTM'@. Added locks. @'Show'@ instance
-- is defined using @'unsafePerformIO'@.
data Ord k => PriorityChansConverger k e = PriorityChansConverger {
        pccSTM   :: PriorityChansConvergerSTM k e
      , pccLocks :: Map PCC_Lock (MVar ())
}

reportIfPossible :: Ord k => PriorityChansConverger k e -> String -> IO ()
reportIfPossible pcc s =
        case pccConcurrentUISupport $ pccSTM pcc of
            Nothing   -> return ()
            Just cuis -> cuisWrite cuis s

maybeObserveExceptions :: Ord k => PriorityChansConverger k e -> FunctionName -> IO a -> IO a
maybeObserveExceptions pcc f_name io_a =
        case pccConcurrentUISupport $ pccSTM pcc of
            Nothing   -> io_a
            Just cuis -> reportExceptionIfAny_2 cuis f_name io_a

-- | Wrapper around @'newPriorityChansConverger_wCUIS'@ constructor.
--
-- @newPriorityChansConverger = 'newPriorityChansConverger_wCUIS' Nothing@
newPriorityChansConverger :: Ord k => Map k (StartPriority, ChanCapacity) -> IO (PriorityChansConverger k e)
newPriorityChansConverger = newPriorityChansConverger_wCUIS Nothing

-- | Wrapper around @'newPriorityChansConvergerSTM_wCUIS'@ constructor.
-- With an option to enable utility @'ConcurrentUISupport'@
-- (which is useful for debugging)
newPriorityChansConverger_wCUIS :: Ord k => Maybe ConcurrentUISupport -> Map k (StartPriority, ChanCapacity) -> IO (PriorityChansConverger k e)
newPriorityChansConverger_wCUIS mb_cuis chan_priors_map = do
        pcc <- let constr = atomically $ newPriorityChansConvergerSTM_wCUIS mb_cuis chan_priors_map
                in case mb_cuis of
                       Nothing   -> constr
                       Just cuis -> reportExceptionIfAny_2 cuis f_name constr
        l   <- newMVar ()
        return PriorityChansConverger {
                 pccSTM   = pcc
               , pccLocks = M.fromList [(PCC_Lock, l)]
               }
     where f_name = "newPriorityChansConverger_wCUIS"

type FunctionName = String
type LockUserName = FunctionName
-- | Sort the locks and lock them.
--
-- WARNING !!! Asynchronous exceptions may cause loss of locks!
lockPCC   :: Ord k => LockUserName -> [PCC_Lock] -> PriorityChansConverger k e -> IO ()
lockPCC locks_user_f_name locks pcc =
        mapM_
           (\ l ->
                case M.lookup l (pccLocks pcc) of
                    Nothing   -> error ("An error occurred in 'lockPCC': no lock for key '" ++ show l ++ "' exists.")
                    Just l_mv -> takeMVar l_mv >> when __locksObserverEnabled (reportIfPossible pcc ("Lock '" ++ show l ++ "' taken by function '" ++ locks_user_f_name ++ "'."))
           )
           (map head $ group $ sort locks)

-- | Reverse sort the locks and unlock them.
--
-- WARNING !!! Asynchronous exceptions may cause loss of locks!
unlockPCC :: Ord k => LockUserName -> [PCC_Lock] -> PriorityChansConverger k e -> IO ()
unlockPCC locks_user_f_name locks pcc =
        mapM_
           (\ l ->
                case M.lookup l (pccLocks pcc) of
                    Nothing   -> error ("An error occurred in 'unlockPCC': no lock for key '" ++ show l ++ "' exists.")
                    Just l_mv -> do
                        s <- tryPutMVar l_mv ()
                        when __locksObserverEnabled $ reportIfPossible pcc $ case s of
                                True  -> "Lock '" ++ show l ++ "' released by function '" ++ locks_user_f_name ++ "'."
                                False -> "Function '" ++ locks_user_f_name ++ "' wanted to release the lock '" ++ show l ++ "', but it appered to be already released."
           )
           (map head $ group $ reverse $ sort locks)

-- | Locking IO action wrapper.
--
-- WARNING !!! Asynchronous exceptions may cause loss of locks!
withLockedDo :: Ord k => LockUserName -> [PCC_Lock] -> PriorityChansConverger k e -> IO a -> IO a
withLockedDo locks_user_f_name locks pcc act = block $ do
        lockPCC locks_user_f_name locks pcc
        a <- finally (unblock act) (unlockPCC locks_user_f_name locks pcc)
        return a

---------------------------------------------------------------------------------------------------------------------------
-- * PriorityChansConverger mutation
---------------------------------------------------------------------------------------------------------------------------

-- | Wrapper around @'isOfStructPCC_STM'@. Used locks: none.
isOfStructPCC :: Ord k => Map k (StartPriority, ChanCapacity) -> PriorityChansConverger k e -> IO (PCC_ContainsGivenSpecification, NothingElseIsInStruct)
isOfStructPCC mp pcc = maybeObserveExceptions pcc f_name $ atomically $ isOfStructPCC_STM mp (pccSTM pcc)
        where f_name = "isOfStructPCC"

-- | Wrapper around @'mutatePCC_STM'@. Used locks: @['PCC_Lock']@
mutatePCC :: Ord k => Map k (StartPriority, ChanCapacity) -> PriorityChansConverger k e -> IO (PriorityChansConverger k e)
mutatePCC chan_priors_map pcc = withLockedDo f_name [PCC_Lock] pcc $ do
                new_pcc_stm <- maybeObserveExceptions pcc f_name $ atomically $ mutatePCC_STM chan_priors_map (pccSTM pcc)
                return pcc { pccSTM = new_pcc_stm }
        where f_name = "mutatePCC"

---------------------------------------------------------------------------------------------------------------------------
-- * PriorityChansConverger I/O
---------------------------------------------------------------------------------------------------------------------------

-- | Wrapper around @'writeInPCC_STM'@. Used locks: if blocking,
-- then @[]@ else @['PCC_Lock']@.
--
-- If blocking is enabled, may block, when there is no free space
-- in @'PriorityChan'@.
writeInPCC :: Ord k => PermitToBlock -> k -> e -> PriorityChansConverger k e -> IO (Maybe FailureReasonWPCC)
writeInPCC block_dowe k e pcc = maybe_lock $ maybeObserveExceptions pcc f_name $ atomically $ writeInPCC_STM block_dowe k e (pccSTM pcc)
        where maybe_lock = case block_dowe of { True -> id; False -> withLockedDo f_name [PCC_Lock] pcc }
              f_name = "writeInPCC"

-- | Wrapper around @'interruptableWriteInPCC_STM'@. Used locks: @[]@
--
-- Blocking write.
--
-- Do not put the control of the @'STM' 'InterruptShouldWe'@ under @'PCC_Lock'@.
-- Keep there @False@, if you don't want to get @Left ()@ in return.
-- Put there @True@, whenever you want to stop waiting.
stmInterruptableWriteInPCC :: Ord k => STM InterruptShouldWe -> k -> e -> PriorityChansConverger k e -> IO (Either () (Maybe FailureReasonWPCC))
stmInterruptableWriteInPCC stm_interrupter k e pcc = maybeObserveExceptions pcc f_name $ atomically $ interruptableWriteInPCC_STM stm_interrupter k e (pccSTM pcc)
        where f_name = "stmInterruptableWriteInPCC"

-- | Wrapper around @'stmInterruptableWriteInPCC'@. Used locks: @['PCC_IOInterruptableWrite']@
--
-- Blocking read.
--
-- Creates a @'TVar' Bool@ to control the interruptability of write operation.
-- Spawns additional thread, which cyclicly reads given @Chan@ and checks,
-- if it signals terminating condition - if so, then using "interrupter" @TVar@
-- stops trying to write in PCC. Temporary thread gets terminated at the end.
interruptableWriteInPCC :: Ord k => (Chan a, a -> InterruptShouldWe) -> k -> e -> PriorityChansConverger k e -> IO (Either () (Maybe FailureReasonWPCC))
interruptableWriteInPCC (interrupter_chan, interrupter_cond) k e pcc = withLockedDo f_name [PCC_IOInterruptableWrite] pcc $ maybeObserveExceptions pcc f_name $ do
        interrupter_tv <- atomically $ newTVar False
        tmp_thread_interrupter <- newIORef False
        _ <- forkIO $ let _cycle = do
                               stop <- readIORef tmp_thread_interrupter
                               case stop of
                                   True -> return ()
                                   False -> do
                                        _interrupt <- interrupter_cond `liftM` readChan interrupter_chan
                                        case _interrupt of
                                            False -> _cycle
                                            True  -> atomically $ writeTVar interrupter_tv True
                       in _cycle
        r <- stmInterruptableWriteInPCC (readTVar interrupter_tv) k e pcc
        writeIORef tmp_thread_interrupter True
        return r
     where f_name = "interruptableWriteInPCC"

-- | Wrapper around @'readFromPCC_STM'@. Used locks: if blocking, then @[]@ else @['PCC_Lock']@.
readFromPCC :: Ord k => PermitToBlock -> PriorityChansConverger k e -> IO (Maybe (k, e))
readFromPCC block_dowe pcc = maybe_lock $ maybeObserveExceptions pcc f_name $ atomically $ readFromPCC_STM block_dowe (pccSTM pcc)
        where maybe_lock = case block_dowe of { True -> id; False -> withLockedDo f_name [PCC_Lock] pcc }
              f_name = "readFromPCC"

-- | Wrapper around @'interruptableReadFromPCC_STM'@. Used locks: @[]@
--
-- Blocking read.
stmInterruptableReadFromPCC :: Ord k => STM InterruptShouldWe -> PriorityChansConverger k e -> IO (Either () (k, e))
stmInterruptableReadFromPCC stm_interrupter pcc = maybeObserveExceptions pcc f_name $ atomically $ interruptableReadFromPCC_STM stm_interrupter (pccSTM pcc)
        where f_name = "stmInterruptableReadFromPCC"

-- | Wrapper around @'stmInterruptableReadFromPCC'@. Used locks: @['PCC_IOInterruptableRead']@
--
-- Blocking read.
--
-- Creates a @'TVar' Bool@ to control the interruptability of read operation.
-- Spawns additional thread, which cyclicly reads given @Chan@ and checks,
-- if it signals terminating condition - if so, then using "interrupter" @TVar@
-- stops trying to write in PCC. Temporary thread gets terminated at the end.
interruptableReadFromPCC :: Ord k => (Chan a, a -> InterruptShouldWe) -> PriorityChansConverger k e -> IO (Either () (k, e))
interruptableReadFromPCC (interrupter_chan, interrupter_cond) pcc = withLockedDo f_name [PCC_IOInterruptableRead] pcc $ maybeObserveExceptions pcc f_name $ do
        interrupter_tv <- atomically $ newTVar False
        tmp_thread_interrupter <- newIORef False
        _ <- forkIO $ let _cycle = do
                               stop <- readIORef tmp_thread_interrupter
                               case stop of
                                   True -> return ()
                                   False -> do
                                        _interrupt <- interrupter_cond `liftM` readChan interrupter_chan
                                        case _interrupt of
                                            False -> _cycle
                                            True  -> atomically $ writeTVar interrupter_tv True
                       in _cycle
        r <- stmInterruptableReadFromPCC (readTVar interrupter_tv) pcc
        writeIORef tmp_thread_interrupter True
        return r
      where f_name = "interruptableReadFromPCC"

-- | Wrapper around @'flushPCC2List_STM'@. Used locks: @['PCC_Lock']@
--
-- Lazy (doublecheck that).
flushPCC2List :: Ord k => PriorityChansConverger k e -> IO [(k, e)]
flushPCC2List pcc = withLockedDo f_name [PCC_Lock] pcc $ maybeObserveExceptions pcc f_name $ atomically $ flushPCC2List_STM (pccSTM pcc)
        where f_name = "flushPCC2List"

-- | Wrapper around @flushPCC2List_STM&#39;@
-- (see "Control.Concurrent.PriorityChansConverger.PriorityChansConverger").
-- Used locks: @['PCC_Lock']@
--
-- Strict. Should be a bit faster than @'flushPCC2List'@.
flushPCC2List' :: Ord k => PriorityChansConverger k e -> IO [(k, e)]
flushPCC2List' pcc = withLockedDo f_name [PCC_Lock] pcc $ maybeObserveExceptions pcc f_name $ atomically $ flushPCC2List_STM' (pccSTM pcc)
        where f_name = "flushPCC2List\'"

-- | Wrapper around @fastFlushPCC2List_STM&#39;@
-- (see "Control.Concurrent.PriorityChansConverger.PriorityChansConverger").
-- Used locks: @['PCC_Lock']@
--
-- Strict. This is a fast flush, it doesn't deal with priorities.
fastFlushPCC2List' :: Ord k => PriorityChansConverger k e -> IO [(k, e)]
fastFlushPCC2List' pcc = withLockedDo f_name [PCC_Lock] pcc $ maybeObserveExceptions pcc f_name $ atomically $ fastFlushPCC2List_STM' (pccSTM pcc)
        where f_name = "fastFlushPCC2List\'"

-- | Wrapper around @'isEmptyPCC_STM'@. Used locks: none
isEmptyPCC :: Ord k => PriorityChansConverger k e -> IO Bool
isEmptyPCC pcc = maybeObserveExceptions pcc f_name $ atomically $ isEmptyPCC_STM (pccSTM pcc)
        where f_name = "isEmptyPCC"

-- | Wrapper around @'freeSpaceInPCCInput_STM'@. Used locks: none
freeSpaceInPCCInput :: Ord k => PriorityChansConverger k e -> k -> IO (Maybe ChanContentAmountMeasure)
freeSpaceInPCCInput pcc k = maybeObserveExceptions pcc f_name $ atomically $ freeSpaceInPCCInput_STM (pccSTM pcc) k
        where f_name = "freeSpaceInPCCInput"

-- | Wrapper around @filterOutPCCElements_STM&#39;@
-- (see "Control.Concurrent.PriorityChansConverger.PriorityChansConverger").
-- Used locks: @['PCC_Lock']@
--
-- Strict.
--
-- Take everything, applu filter, return what's left in the PCC,
-- return what's taken out.
filterOutPCCElements' :: Ord k => PriorityChansConverger k e -> (e -> TakeElementOutShouldWe) -> IO [(k, e)]
filterOutPCCElements' pcc p = withLockedDo f_name [PCC_Lock] pcc $ maybeObserveExceptions pcc f_name $ atomically $ filterOutPCCElements_STM' (pccSTM pcc) p
        where f_name = "filterOutPCCElements\'"

---------------------------------------------------------------------------------------------------------------------------
-- * PriorityChansConverger representation
---------------------------------------------------------------------------------------------------------------------------

-- | Wrapper around @'showPCC'@. Used locks: none
showPCC :: (Ord k, Show k, Show e) => PriorityChansConverger k e -> IO String
showPCC pcc = maybeObserveExceptions pcc f_name $ atomically $ showPCC_STM (pccSTM pcc)
        where f_name = "showPCC"