{-# LANGUAGE CPP #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE Trustworthy #-}
module BroadcastChan.Internal where

#if !MIN_VERSION_base(4,8,0)
import Control.Applicative ((<*))
#endif
import Control.Concurrent.MVar
import Control.Exception (mask_)
import Control.Monad ((>=>))
import Control.Monad.IO.Unlift (MonadIO(..))
import System.IO.Unsafe (unsafeInterleaveIO)

-- | Used with DataKinds as phantom type indicating whether a 'BroadcastChan'
-- value is a read or write end.
data Direction = In  -- ^ Indicates a write 'BroadcastChan'
               | Out -- ^ Indicates a read 'BroadcastChan'

-- | Alias for the 'In' type from the 'Direction' kind, allows users to write
-- the @'BroadcastChan' 'In' a@ type without enabling @DataKinds@.
type In = 'In

-- | Alias for the 'Out' type from the 'Direction' kind, allows users to write
-- the @'BroadcastChan' 'Out' a@ type without enabling @DataKinds@.
type Out = 'Out

-- | The abstract type representing the read or write end of a 'BroadcastChan'.
newtype BroadcastChan (dir :: Direction) a = BChan (MVar (Stream a))
    deriving (Eq)

type Stream a = MVar (ChItem a)

data ChItem a = ChItem a {-# UNPACK #-} !(Stream a) | Closed

-- | Creates a new 'BroadcastChan' write end.
newBroadcastChan :: MonadIO m => m (BroadcastChan In a)
newBroadcastChan = liftIO $ do
   hole  <- newEmptyMVar
   writeVar <- newMVar hole
   return (BChan writeVar)

-- | Close a 'BroadcastChan', disallowing further writes. Returns 'True' if the
-- 'BroadcastChan' was closed. Returns 'False' if the 'BroadcastChan' was
-- __already__ closed.
closeBChan :: MonadIO m => BroadcastChan In a -> m Bool
closeBChan (BChan writeVar) = liftIO . mask_ $ do
    old_hole <- takeMVar writeVar
    -- old_hole is always empty unless the channel was already closed
    tryPutMVar old_hole Closed <* putMVar writeVar old_hole

-- | Check whether a 'BroadcastChan' is closed. 'True' meaning that future
-- read/write operations on the channel will always fail.
--
--  ['BroadcastChan' 'In':]:
--
--      @True@ indicates the channel is closed and writes will always fail.
--
--      __Beware of TOC-TOU races__: It is possible for a 'BroadcastChan' to be
--      closed by another thread. If multiple threads use the same channel
--      a 'closeBChan' from another thread can result in the channel being
--      closed right after 'isClosedBChan' returns.
--
--  ['BroadcastChan' 'Out':]:
--
--      @True@ indicates the channel is both closed and empty, meaning reads
--      will always fail.
isClosedBChan :: MonadIO m => BroadcastChan dir a -> m Bool
#if MIN_VERSION_base(4,7,0)
isClosedBChan (BChan mvar) = liftIO $ do
    old_hole <- readMVar mvar
    val <- tryReadMVar old_hole
#else
isClosedBChan (BChan mvar) = liftIO . mask_ $ do
    old_hole <- takeMVar mvar
    val <- tryTakeMVar old_hole
    case val of
        Just x -> putMVar old_hole x
        Nothing -> return ()
    putMVar mvar old_hole
#endif
    case val of
        Just Closed -> return True
        _ -> return False

-- | Write a value to write end of a 'BroadcastChan'. Any messages written
-- while there are no live read ends are dropped on the floor and can be
-- immediately garbage collected, thus avoiding space leaks.
--
-- The return value indicates whether the write succeeded, i.e., 'True' if the
-- message was written, 'False' is the channel is closed.
-- See @BroadcastChan.Throw.@'BroadcastChan.Throw.writeBChan' for an
-- exception throwing variant.
writeBChan :: MonadIO m => BroadcastChan In a -> a -> m Bool
writeBChan (BChan writeVar) val = liftIO $ do
  new_hole <- newEmptyMVar
  mask_ $ do
    old_hole <- takeMVar writeVar
    -- old_hole is only full if the channel was previously closed
    empty <- tryPutMVar old_hole (ChItem val new_hole)
    if empty
       then putMVar writeVar new_hole
       else putMVar writeVar old_hole
    return empty
{-# INLINE writeBChan #-}

-- | Read the next value from the read end of a 'BroadcastChan'. Returns
-- 'Nothing' if the 'BroadcastChan' is closed and empty.
-- See @BroadcastChan.Throw.@'BroadcastChan.Throw.readBChan' for an exception
-- throwing variant.
readBChan :: MonadIO m => BroadcastChan Out a -> m (Maybe a)
readBChan (BChan readVar) = liftIO $ do
  modifyMVarMasked readVar $ \read_end -> do -- Note [modifyMVarMasked]
    -- Use readMVar here, not takeMVar,
    -- else newBChanListener doesn't work
    result <- readMVar read_end
    case result of
        ChItem val new_read_end -> return (new_read_end, Just val)
        Closed -> return (read_end, Nothing)
{-# INLINE readBChan #-}

-- Note [modifyMVarMasked]
-- This prevents a theoretical deadlock if an asynchronous exception
-- happens during the readMVar while the MVar is empty.  In that case
-- the read_end MVar will be left empty, and subsequent readers will
-- deadlock.  Using modifyMVarMasked prevents this.  The deadlock can
-- be reproduced, but only by expanding readMVar and inserting an
-- artificial yield between its takeMVar and putMVar operations.

-- | Create a new read end for a 'BroadcastChan'.
--
--  ['BroadcastChan' 'In':]:
--
--      Will receive all messages written to the channel __after__ this read
--      end is created.
--
--  ['BroadcastChan' 'Out':]:
--
--      Will receive all currently unread messages and all future messages.
newBChanListener :: MonadIO m => BroadcastChan dir a -> m (BroadcastChan Out a)
newBChanListener (BChan mvar) = liftIO $ do
   hole       <- readMVar mvar
   newReadVar <- newMVar hole
   return (BChan newReadVar)

-- | Return a lazy list representing the messages written to the channel.
--
-- Uses 'unsafeInterleaveIO' to defer the IO operations.
--
--  ['BroadcastChan' 'In':]:
--
--      The list contains every message written to the channel after this 'IO'
--      action completes.
--
--  ['BroadcastChan' 'Out':]:
--
--      The list contains every currently unread message and all future
--      messages. It's safe to keep using the original channel in any thread.
--
--      Unlike 'Control.Concurrent.getChanContents' from "Control.Concurrent",
--      the list resulting from this function is __not__ affected by reads on
--      the input channel. Every message that is unread or written after the
--      'IO' action completes __will__ end up in the result list.
getBChanContents :: BroadcastChan dir a -> IO [a]
getBChanContents = newBChanListener >=> go
  where
    go ch = unsafeInterleaveIO $ do
        result <- readBChan ch
        case result of
            Nothing -> return []
            Just x -> do
                xs <- go ch
                return (x:xs)

-- | Strict fold of the 'BroadcastChan'​'s messages. Can be used with
-- "Control.Foldl" from Tekmo's foldl package:
--
-- @"Control.Foldl".'Control.Foldl.purely' 'foldBChan' :: ('MonadIO' m, 'MonadIO' n) => 'Control.Foldl.Fold' a b -> 'BroadcastChan' d a -> n (m b)@
--
-- The result of this function is a nested monadic value to give more
-- fine-grained control/separation between the start of listening for messages
-- and the start of processing. The inner action folds the actual messages and
-- completes when the channel is closed and exhausted. The outer action
-- controls from when on messages are received. Specifically:
--
--  ['BroadcastChan' 'In':]:
--
--      Will process all messages sent after the outer action completes.
--
--  ['BroadcastChan' 'Out':]:
--
--      Will process all messages that are unread when the outer action
--      completes, as well as all future messages.
--
--      After the outer action completes the fold is unaffected by other
--      (concurrent) reads performed on the original channel. So it's safe to
--      reuse the channel.
foldBChan
    :: (MonadIO m, MonadIO n)
    => (x -> a -> x)
    -> x
    -> (x -> b)
    -> BroadcastChan d a
    -> n (m b)
foldBChan step begin done chan = do
    listen <- newBChanListener chan
    return $ go listen begin
  where
    go listen x = do
        x' <- readBChan listen
        case x' of
            Just x'' -> go listen $! step x x''
            Nothing -> return $! done x
{-# INLINABLE foldBChan #-}

-- | Strict, monadic fold of the 'BroadcastChan'​'s messages. Can be used with
-- "Control.Foldl" from Tekmo's foldl package:
--
-- @"Control.Foldl".'Control.Foldl.impurely' 'foldBChanM' :: ('MonadIO' m, 'MonadIO' n) => 'Control.Foldl.FoldM' m a b -> 'BroadcastChan' d a -> n (m b)@
--
-- Has the same behaviour and guarantees as 'foldBChan'.
foldBChanM
    :: (MonadIO m, MonadIO n)
    => (x -> a -> m x)
    -> m x
    -> (x -> m b)
    -> BroadcastChan d a
    -> n (m b)
foldBChanM step begin done chan = do
    listen <- newBChanListener chan
    return $ do
        x0 <- begin
        go listen x0
  where
    go listen x = do
        x' <- readBChan listen
        case x' of
            Just x'' -> step x x'' >>= go listen
            Nothing -> done x
{-# INLINABLE foldBChanM #-}