{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeFamilies #-}

{-# OPTIONS_GHC -Wall #-}
module Chan.KickChan (
  -- * Kick Channels
  KickChan
-- ** Creation
, newKickChan
, kcSize
-- ** Writing
, putKickChan
, invalidateKickChan
-- ** Reading
, KCReader
, newReader
, readNext
, currentLag
-- ** type constraint helpers
, KickChanS
, KickChanV
, KickChanU
, kcUnboxed
, kcStorable
, kcDefault

, KCReaderS
, KCReaderV
, KCReaderU
) where

import Control.Concurrent.MVar
import Control.Concurrent (yield)
import Control.Exception

import Data.Bits
import Data.IORef
import Data.IntMap (IntMap)
import qualified Data.IntMap as IM
import Data.Foldable as Fold
import Data.Maybe (maybeToList)

import Data.Vector.Generic.Mutable (MVector)
import qualified Data.Vector.Generic.Mutable as M
import qualified Data.Vector.Mutable as V
import qualified Data.Vector.Storable.Mutable as S
import qualified Data.Vector.Unboxed.Mutable as U
import Control.Monad.Primitive

#if MIN_VERSION_base(4,6,0)
-- nothing to do here...
#else
atomicModifyIORef' :: IORef a -> (a -> (a,b)) -> IO b
atomicModifyIORef' ref f = do
    b <- atomicModifyIORef ref
            (\x -> let (a, b) = f x
                    in (a, a `seq` b))
    b `seq` return b
#endif

-- internal structure to hold the channel head
data Position a = Position
    { nSeq     :: {-# UNPACK #-} !Int
    , waiting  :: IntMap [MVar (Maybe a)]
    }

{- invariants
 - nSeq is the next position to be written (buffer head, initialized to 0)
 - nSeq changes monotonically
 - the minimum key of the 'waiting' map is >= nSeq.
 -
 - Use a two-stage commit strategy.  Writers first claim a sequence number.
 - This causes nSeq to be incremented (so subsequent writers get a new number)
 - and inserts the claimed sequence number into the 'waiting' map with an
 - empty list.  Writers then write to the vector and commit the write, which
 - returns a list of readers waiting for that sequence to be written.  The
 - writer then writes the value to every waiting reader.
 -
 - Reading uses a similar two-stage process.  First the reader checks if the
 - value has been committed.  If not it blocks, otherwise it reads from the
 - buffer and checks if the read position was valid (hasn't been
 - over-written).
-}

emptyPosition :: Position a
emptyPosition = Position 0 IM.empty

-- increment a Position, returning the current sequence number or Nothing
-- if the buffer is full
incrPosition :: Int -> Position a -> (Position a,Maybe Int)
incrPosition sz oldP@(Position curSeq pMap) = case IM.minViewWithKey pMap of
    Just ((lowKey,_),_)
        | lowKey+sz <= curSeq -> (oldP,Nothing)
    _ -> (newP,Just curSeq)
  where
    !newSeq = curSeq+1
    newP = Position
           { nSeq = newSeq
           , waiting=IM.insertWith (++) curSeq [] pMap
           }

-- increment a position by a given value, which must be enough to wrap the
-- buffer.  Returns a list of all current waiting readers.
invalidatePosition :: Int -> Position a -> (Position a,[MVar (Maybe a)])
invalidatePosition wrapAmount (Position oldP waiting) = (newP,pList)
  where
    newP = Position (oldP+wrapAmount) IM.empty
    pList = Prelude.concat $ IM.elems waiting

-- | commit a value that's been written to the vector.
commit :: Int -> Position a -> (Position a,[MVar (Maybe a)])
commit seqNum (Position nSeq pMap) = (newP,pList)
  where
    pList = Prelude.concat $ maybeToList pending
    (pending,pMap') = IM.updateLookupWithKey (\_ _ -> Nothing) seqNum pMap
    newP = Position nSeq pMap'

data CheckResult = Ok | Invalid

-- check if a value is possibly ready to be read from.  If so, return True,
-- else add ourselves to the waiting map on that value.
--
-- if readyOrWait returns True, the value has definitely already been commited
-- (the sequence number has been assigned to a writer but isn't in the pending
-- map).  It may have been over-written however.
readyOrWait :: MVar (Maybe a) -> Int -> Position a -> (Position a, Bool)
readyOrWait await readP p@(Position nextP pMap) =
  case IM.updateLookupWithKey (\_ xs -> Just $ await:xs) readP pMap of
    (Just _waitList,pMap') -> (p { waiting = pMap' }, False)
    (Nothing,_)
      | readP >= nextP ->
          (p{waiting=IM.insert readP [await] pMap} ,False)
      | otherwise -> (p,True)

-- we know the value has been committed, we just need to check that it's still
-- valid.
checkWithPosition :: Int -> Int -> Position a -> CheckResult
checkWithPosition sz readP (Position nextP _pMap) =
  case nextP-readP of
      dif -- result should be ok.
          | dif > 0 && dif <= sz -> Ok
          -- requests that are too old or too far in the future
          | otherwise -> Invalid

-- | A Channel that drops elements from the end when a 'KCReader' lags too far
-- behind the writer.
data KickChan v a = KickChan
    { kcSz  :: {-# UNPACK #-} !Int
    , kcPos :: (IORef (Position a))
    , kcV   :: (v a)
    }

{- invariants
 - kcSz is a power of 2 - 1
 -}

-- | Create a new 'KickChan' of the requested size.  The actual size will be
-- rounded up to the next highest power of 2.
-- The stored size will have one subtracted, because that's the value we use
-- for masking, which is the most common operation.
newKickChan :: (MVector v' a, v ~ v' RealWorld) => Int -> IO (KickChan v a)
newKickChan sz = do
    kcPos <- newIORef emptyPosition
    kcV <- M.new (kcSz+1)
    return KickChan {..}
  where
    kcSz = 2^(ceiling (logBase 2 (fromIntegral $ sz) :: Double) :: Int) - 1
{-# INLINABLE newKickChan #-}

-- | Get the size of a 'KickChan'.
kcSize :: KickChan v a -> Int
kcSize KickChan {kcSz} = kcSz+1

-- | Put a value into a 'KickChan'.
--
-- if there are multiple writers, putKickChan may block if one writer has
-- wrapped around and a write is pending to the underlying storage location.
--
-- putKickChan will never block on readers, instead 'KCReader's will be
-- invalidated if they lag too far behind.
putKickChan :: (MVector v' a, v ~ v' RealWorld) => KickChan v a -> a -> IO ()
putKickChan  KickChan {..} x = mask_ $ do
    -- none of these actions should raise exceptions of their own.
    -- For async exceptions, everything should be uninterruptible,
    -- except 'claim', which calls yield.  But if an async exception
    -- arises at that point it's ok because we haven't actually
    -- claimed a sequence number yet, so there's nothing to clean up.
    curSeq <- claim
    M.unsafeWrite kcV (curSeq .&. kcSz) x
    waiting <- atomicModifyIORef' kcPos $ commit curSeq
    Fold.mapM_ (\v -> putMVar v (Just x)) waiting
    -- these shouldn't be interrupted as the MVars are definitely empty.
  where
    claim = do
        curSeq'm <- atomicModifyIORef' kcPos (incrPosition (kcSz+1))
        maybe (yield >> claim) return curSeq'm
{-# INLINE putKickChan #-}

-- | Invalidate all current readers on a channel.
invalidateKickChan :: KickChan v a -> IO ()
invalidateKickChan KickChan {..} = mask_ $ do
    waiting <- atomicModifyIORef' kcPos (invalidatePosition $ 2+kcSz)
    Fold.mapM_ (flip putMVar Nothing) waiting
    -- see comments for putKickChan WRT exceptions.

-- | get a value from a 'KickChan', or 'Nothing' if no longer available.
-- 
-- if there are no new values, getKickChan will block until a new value is
-- written, or the channel is invalidated.
getKickChan :: (MVector v' a, v ~ v' RealWorld) => KickChan v a -> Int -> IO (Maybe a)
getKickChan KickChan {..} readP = do
    await <- newEmptyMVar
    -- we don't need any special exception handling.  The worst that can
    -- happen is we leave an empty MVar in the Position, but in that case
    -- the pending write will clear it out.
    proceed <- atomicModifyIORef' kcPos $ readyOrWait await readP
    if proceed -- value is definitely committed.
      then do
        x <- M.unsafeRead kcV (readP .&. kcSz)
        checkedPos <- readIORef kcPos
        -- add 1 to kcSize because we store 1-size
        case checkWithPosition (kcSz+1) readP checkedPos of
            Ok    -> return $ Just x
            Invalid -> return Nothing
      else takeMVar await

-- | A reader for a 'KickChan'
data KCReader v a = KCReader
    { kcrChan :: {-# UNPACK #-} !(KickChan v a)
    , kcrPos  :: IORef Int
    }

{- invariants
 - kcrPos is the position of the most recently read element (initialized to -1)
 -}

-- | create a new reader for a 'KickChan'.  The reader will be initialized to
-- the head of the KickChan, so that an immediate call to 'readNext' will
-- block (provided no new values have been put into the chan in the meantime).
newReader :: KickChan v a -> IO (KCReader v a)
newReader kcrChan@KickChan{..} = do
    (Position writeP _pMap) <- readIORef kcPos
    kcrPos <- newIORef (writeP-1)
    return KCReader {..}
{-# INLINABLE newReader #-}

-- | get the next value from a 'KCReader'.  This function will block if the next
-- value is not yet available.
--
-- if Nothing is returned, the reader has lagged the writer and values have
-- been dropped.
readNext :: (MVector v' a, v ~ v' RealWorld) => KCReader v a -> IO (Maybe a)
readNext (KCReader {..}) = do
    readP <- atomicModifyIORef' kcrPos (\lastP -> let p = lastP+1 in (p,p))
    getKickChan kcrChan readP
{-# INLINE readNext #-}

-- | The lag between a 'KCReader' and its writer.  Mostly useful for
-- determining if a call to 'readNext' will block.
currentLag :: KCReader v a -> IO Int
currentLag KCReader {..} = do
    lastRead <- readIORef kcrPos
    Position nextWrite pMap <- readIORef $ kcPos kcrChan
    return $! nextWrite - lastRead - IM.size pMap - 1


type KickChanU a = KickChan (U.MVector RealWorld) a
type KickChanS a = KickChan (S.MVector RealWorld) a
type KickChanV a = KickChan (V.MVector RealWorld) a

type KCReaderU a = KCReader (U.MVector RealWorld) a
type KCReaderS a = KCReader (S.MVector RealWorld) a
type KCReaderV a = KCReader (V.MVector RealWorld) a

-- | Constrain a KickChan to work with an 'Unboxed' data storage
kcUnboxed :: KickChan (U.MVector RealWorld) a -> KickChan (U.MVector RealWorld) a
kcUnboxed = id

-- | Constrain a KickChan to work with a standard boxed vector data storage
kcDefault :: KickChan (V.MVector RealWorld) a -> KickChan (V.MVector RealWorld) a
kcDefault = id

-- | Constrain a KickChan to work with a 'Storable' data storage
kcStorable :: KickChan (S.MVector RealWorld) a -> KickChan (S.MVector RealWorld) a
kcStorable = id