{-# LANGUAGE LambdaCase, RecordWildCards, BangPatterns, CPP #-}
{-| This package contains a concurrency primitive which can be used to limit an
    expensive consumer from running unnecessarily. Crucially the consumer must
    be able to tolerate missing some updates.

    'NextRef' provides non-blocking writes, blocking reads, and non-blocking 
    reads.

    The blocking read interface ('takeNextRef') will not necessarily present 
    all values. 

    Additionally the 'NextRef' can be 'closed'. This is useful to graceful
    shutdown the consumer when the producer closes the 'NextRef'
    
-}
module Control.Concurrent.NextRef 
  ( NextRef 
  , newNextRef
  , takeNextRef
  , readLast
  , writeNextRef
  , modifyNextRef 
  , close
  , open
  , status
  , Status (..)
  ) where
import Control.Concurrent.STM
import Data.IORef
#if !MIN_VERSION_base(4,8,0)
import Control.Applicative
#endif

-- | Status is used to prevent future reads. When the status is 'Closed'
--   'takeNextRef' will always return 'Nothing'. When the status is 
--   open it will return Just. This is based off of the design of 'TMQueue'
--   from the 'stm-chans' package.
data Status = Open | Closed
  deriving (Show, Eq, Ord, Read, Enum, Bounded)

-- | A concurrency primitive for a slow consumer that can tolerate
--   missing some updates.
data NextRef a = NextRef  
  { nrAccum     :: IORef a
  , nrNextValue :: TMVar a
  , nrStatus    :: TVar  Status 
  }

-- | Create a 'NextVar'
newNextRef :: a -> IO (NextRef a)
newNextRef x = NextRef <$> newIORef x <*> newTMVarIO x <*> newTVarIO Open

-- | Block until the next value is available. If the 'NextVar' is 
--   closed it returns 'Nothing' immediantly. 
takeNextRef :: NextRef a -> IO (Maybe a)
takeNextRef NextRef {..} = atomically $ readTVar nrStatus >>= \case
  Closed -> return Nothing
  Open   -> Just <$> takeTMVar nrNextValue

update :: NextRef a -> a -> IO ()
update NextRef {..} !new = atomically $ readTVar nrStatus >>= \case
  Closed -> return ()
  Open   -> do
    tryTakeTMVar nrNextValue
    putTMVar     nrNextValue new

-- | Read the most recent value. Non-blocking
readLast :: NextRef a -> IO a
readLast NextRef {..} = readIORef nrAccum

tupleResult :: (a, b) -> (a, (a, b))
tupleResult (x, y) = (x, (x, y))

-- | Write a new value. Never blocks.
writeNextRef :: NextRef a -> a -> IO ()
writeNextRef nv@(NextRef {..}) newValue = do 
  writeIORef nrAccum newValue
  update nv newValue

-- | Apply a function to current value to produce the next value and return 
--   a result. 
modifyNextRef :: NextRef a -> (a -> (a, b)) -> IO b
modifyNextRef nv@(NextRef {..}) f = do
  (!newValue, !result) <- atomicModifyIORef' nrAccum $ tupleResult . f
  update nv newValue
  return result

-- | Modify the status of the 'NextRef' to 'Closed'. All future reads
--   using 'takeNextRef' will result a 'Nothing'. 'readLast' is unaffected.
close :: NextRef a -> IO ()
close NextRef {..} = atomically $ writeTVar nrStatus Closed

-- | Modify the status of the 'NextRef' to 'Closed'. All future reads
--   using 'takeNextRef' will return a 'Just'. 'readLast' is unaffected.
open :: NextRef a -> IO ()
open NextRef {..} = atomically $ writeTVar nrStatus Open

-- | Get the current status of the 'NextRef'
status :: NextRef a -> IO Status
status = atomically . readTVar . nrStatus