{-# LANGUAGE DataKinds #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE Trustworthy #-}
module BroadcastChan.Internal where

import Control.Concurrent.MVar
import Control.Exception (mask_)
import Control.Monad ((>=>))
import Control.Monad.IO.Unlift (MonadIO(..))
import System.IO.Unsafe (unsafeInterleaveIO)

-- | Used with DataKinds as phantom type indicating whether a 'BroadcastChan'
-- value is a read or write end.
data Direction = In  -- ^ Indicates a write 'BroadcastChan'
               | Out -- ^ Indicates a read 'BroadcastChan'

-- | Alias for the v'In' type from the 'Direction' kind, allows users to write
-- the @'BroadcastChan' v'In' a@ type without enabling @DataKinds@.
type In = 'In

-- | Alias for the v'Out' type from the 'Direction' kind, allows users to write
-- the @'BroadcastChan' v'Out' a@ type without enabling @DataKinds@.
type Out = 'Out

-- | The abstract type representing the read or write end of a 'BroadcastChan'.
newtype BroadcastChan (dir :: Direction) a = BChan (MVar (Stream a))
    deriving (BroadcastChan dir a -> BroadcastChan dir a -> Bool
(BroadcastChan dir a -> BroadcastChan dir a -> Bool)
-> (BroadcastChan dir a -> BroadcastChan dir a -> Bool)
-> Eq (BroadcastChan dir a)
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
forall (dir :: Direction) a.
BroadcastChan dir a -> BroadcastChan dir a -> Bool
/= :: BroadcastChan dir a -> BroadcastChan dir a -> Bool
$c/= :: forall (dir :: Direction) a.
BroadcastChan dir a -> BroadcastChan dir a -> Bool
== :: BroadcastChan dir a -> BroadcastChan dir a -> Bool
$c== :: forall (dir :: Direction) a.
BroadcastChan dir a -> BroadcastChan dir a -> Bool
Eq)

type Stream a = MVar (ChItem a)

data ChItem a = ChItem a {-# UNPACK #-} !(Stream a) | Closed

-- | Creates a new 'BroadcastChan' write end.
newBroadcastChan :: MonadIO m => m (BroadcastChan In a)
newBroadcastChan :: m (BroadcastChan In a)
newBroadcastChan = IO (BroadcastChan In a) -> m (BroadcastChan In a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (BroadcastChan In a) -> m (BroadcastChan In a))
-> IO (BroadcastChan In a) -> m (BroadcastChan In a)
forall a b. (a -> b) -> a -> b
$ do
   MVar (ChItem a)
hole  <- IO (MVar (ChItem a))
forall a. IO (MVar a)
newEmptyMVar
   MVar (MVar (ChItem a))
writeVar <- MVar (ChItem a) -> IO (MVar (MVar (ChItem a)))
forall a. a -> IO (MVar a)
newMVar MVar (ChItem a)
hole
   BroadcastChan In a -> IO (BroadcastChan In a)
forall (m :: * -> *) a. Monad m => a -> m a
return (MVar (MVar (ChItem a)) -> BroadcastChan In a
forall (dir :: Direction) a. MVar (Stream a) -> BroadcastChan dir a
BChan MVar (MVar (ChItem a))
writeVar)

-- | Close a 'BroadcastChan', disallowing further writes. Returns 'True' if the
-- 'BroadcastChan' was closed. Returns 'False' if the 'BroadcastChan' was
-- __already__ closed.
closeBChan :: MonadIO m => BroadcastChan In a -> m Bool
closeBChan :: BroadcastChan In a -> m Bool
closeBChan (BChan MVar (Stream a)
writeVar) = IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> (IO Bool -> IO Bool) -> IO Bool -> m Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO Bool -> IO Bool
forall a. IO a -> IO a
mask_ (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ do
    Stream a
old_hole <- MVar (Stream a) -> IO (Stream a)
forall a. MVar a -> IO a
takeMVar MVar (Stream a)
writeVar
    -- old_hole is always empty unless the channel was already closed
    Stream a -> ChItem a -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar Stream a
old_hole ChItem a
forall a. ChItem a
Closed IO Bool -> IO () -> IO Bool
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f a
<* MVar (Stream a) -> Stream a -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Stream a)
writeVar Stream a
old_hole

-- | Check whether a 'BroadcastChan' is closed. 'True' meaning that future
-- read/write operations on the channel will always fail.
--
--  ['BroadcastChan' v'In':]:
--
--      @True@ indicates the channel is closed and writes will always fail.
--
--      __Beware of TOC-TOU races__: It is possible for a 'BroadcastChan' to be
--      closed by another thread. If multiple threads use the same channel
--      a 'closeBChan' from another thread can result in the channel being
--      closed right after 'isClosedBChan' returns.
--
--  ['BroadcastChan' v'Out':]:
--
--      @True@ indicates the channel is both closed and empty, meaning reads
--      will always fail.
isClosedBChan :: MonadIO m => BroadcastChan dir a -> m Bool
isClosedBChan :: BroadcastChan dir a -> m Bool
isClosedBChan (BChan MVar (Stream a)
mvar) = IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ do
    Stream a
old_hole <- MVar (Stream a) -> IO (Stream a)
forall a. MVar a -> IO a
readMVar MVar (Stream a)
mvar
    Maybe (ChItem a)
val <- Stream a -> IO (Maybe (ChItem a))
forall a. MVar a -> IO (Maybe a)
tryReadMVar Stream a
old_hole
    case Maybe (ChItem a)
val of
        Just ChItem a
Closed -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
        Maybe (ChItem a)
_ -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False

-- | Write a value to write end of a 'BroadcastChan'. Any messages written
-- while there are no live read ends are dropped on the floor and can be
-- immediately garbage collected, thus avoiding space leaks.
--
-- The return value indicates whether the write succeeded, i.e., 'True' if the
-- message was written, 'False' is the channel is closed.
-- See @BroadcastChan.Throw.@'BroadcastChan.Throw.writeBChan' for an
-- exception throwing variant.
writeBChan :: MonadIO m => BroadcastChan In a -> a -> m Bool
writeBChan :: BroadcastChan In a -> a -> m Bool
writeBChan (BChan MVar (Stream a)
writeVar) a
val = IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ do
  Stream a
new_hole <- IO (Stream a)
forall a. IO (MVar a)
newEmptyMVar
  IO Bool -> IO Bool
forall a. IO a -> IO a
mask_ (IO Bool -> IO Bool) -> IO Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ do
    Stream a
old_hole <- MVar (Stream a) -> IO (Stream a)
forall a. MVar a -> IO a
takeMVar MVar (Stream a)
writeVar
    -- old_hole is only full if the channel was previously closed
    Bool
empty <- Stream a -> ChItem a -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar Stream a
old_hole (a -> Stream a -> ChItem a
forall a. a -> Stream a -> ChItem a
ChItem a
val Stream a
new_hole)
    if Bool
empty
       then MVar (Stream a) -> Stream a -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Stream a)
writeVar Stream a
new_hole
       else MVar (Stream a) -> Stream a -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Stream a)
writeVar Stream a
old_hole
    Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
empty
{-# INLINE writeBChan #-}

-- | Read the next value from the read end of a 'BroadcastChan'. Returns
-- 'Nothing' if the 'BroadcastChan' is closed and empty.
-- See @BroadcastChan.Throw.@'BroadcastChan.Throw.readBChan' for an exception
-- throwing variant.
readBChan :: MonadIO m => BroadcastChan Out a -> m (Maybe a)
readBChan :: BroadcastChan Out a -> m (Maybe a)
readBChan (BChan MVar (Stream a)
readVar) = IO (Maybe a) -> m (Maybe a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe a) -> m (Maybe a)) -> IO (Maybe a) -> m (Maybe a)
forall a b. (a -> b) -> a -> b
$ do
  MVar (Stream a)
-> (Stream a -> IO (Stream a, Maybe a)) -> IO (Maybe a)
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVarMasked MVar (Stream a)
readVar ((Stream a -> IO (Stream a, Maybe a)) -> IO (Maybe a))
-> (Stream a -> IO (Stream a, Maybe a)) -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$ \Stream a
read_end -> do -- Note [modifyMVarMasked]
    -- Use readMVar here, not takeMVar,
    -- else newBChanListener doesn't work
    ChItem a
result <- Stream a -> IO (ChItem a)
forall a. MVar a -> IO a
readMVar Stream a
read_end
    case ChItem a
result of
        ChItem a
val Stream a
new_read_end -> (Stream a, Maybe a) -> IO (Stream a, Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream a
new_read_end, a -> Maybe a
forall a. a -> Maybe a
Just a
val)
        ChItem a
Closed -> (Stream a, Maybe a) -> IO (Stream a, Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream a
read_end, Maybe a
forall a. Maybe a
Nothing)
{-# INLINE readBChan #-}

-- Note [modifyMVarMasked]
-- This prevents a theoretical deadlock if an asynchronous exception
-- happens during the readMVar while the MVar is empty.  In that case
-- the read_end MVar will be left empty, and subsequent readers will
-- deadlock.  Using modifyMVarMasked prevents this.  The deadlock can
-- be reproduced, but only by expanding readMVar and inserting an
-- artificial yield between its takeMVar and putMVar operations.

-- | Create a new read end for a 'BroadcastChan'.
--
--  ['BroadcastChan' v'In':]:
--
--      Will receive all messages written to the channel __after__ this read
--      end is created.
--
--  ['BroadcastChan' v'Out':]:
--
--      Will receive all currently unread messages and all future messages.
newBChanListener :: MonadIO m => BroadcastChan dir a -> m (BroadcastChan Out a)
newBChanListener :: BroadcastChan dir a -> m (BroadcastChan Out a)
newBChanListener (BChan MVar (Stream a)
mvar) = IO (BroadcastChan Out a) -> m (BroadcastChan Out a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (BroadcastChan Out a) -> m (BroadcastChan Out a))
-> IO (BroadcastChan Out a) -> m (BroadcastChan Out a)
forall a b. (a -> b) -> a -> b
$ do
   Stream a
hole       <- MVar (Stream a) -> IO (Stream a)
forall a. MVar a -> IO a
readMVar MVar (Stream a)
mvar
   MVar (Stream a)
newReadVar <- Stream a -> IO (MVar (Stream a))
forall a. a -> IO (MVar a)
newMVar Stream a
hole
   BroadcastChan Out a -> IO (BroadcastChan Out a)
forall (m :: * -> *) a. Monad m => a -> m a
return (MVar (Stream a) -> BroadcastChan Out a
forall (dir :: Direction) a. MVar (Stream a) -> BroadcastChan dir a
BChan MVar (Stream a)
newReadVar)

-- | Return a lazy list representing the messages written to the channel.
--
-- Uses 'unsafeInterleaveIO' to defer the IO operations.
--
--  ['BroadcastChan' v'In':]:
--
--      The list contains every message written to the channel after this 'IO'
--      action completes.
--
--  ['BroadcastChan' v'Out':]:
--
--      The list contains every currently unread message and all future
--      messages. It's safe to keep using the original channel in any thread.
--
--      Unlike 'Control.Concurrent.getChanContents' from "Control.Concurrent",
--      the list resulting from this function is __not__ affected by reads on
--      the input channel. Every message that is unread or written after the
--      'IO' action completes __will__ end up in the result list.
getBChanContents :: BroadcastChan dir a -> IO [a]
getBChanContents :: BroadcastChan dir a -> IO [a]
getBChanContents = BroadcastChan dir a -> IO (BroadcastChan Out a)
forall (m :: * -> *) (dir :: Direction) a.
MonadIO m =>
BroadcastChan dir a -> m (BroadcastChan Out a)
newBChanListener (BroadcastChan dir a -> IO (BroadcastChan Out a))
-> (BroadcastChan Out a -> IO [a]) -> BroadcastChan dir a -> IO [a]
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> BroadcastChan Out a -> IO [a]
forall a. BroadcastChan Out a -> IO [a]
go
  where
    go :: BroadcastChan Out a -> IO [a]
go BroadcastChan Out a
ch = IO [a] -> IO [a]
forall a. IO a -> IO a
unsafeInterleaveIO (IO [a] -> IO [a]) -> IO [a] -> IO [a]
forall a b. (a -> b) -> a -> b
$ do
        Maybe a
result <- BroadcastChan Out a -> IO (Maybe a)
forall (m :: * -> *) a.
MonadIO m =>
BroadcastChan Out a -> m (Maybe a)
readBChan BroadcastChan Out a
ch
        case Maybe a
result of
            Maybe a
Nothing -> [a] -> IO [a]
forall (m :: * -> *) a. Monad m => a -> m a
return []
            Just a
x -> do
                [a]
xs <- BroadcastChan Out a -> IO [a]
go BroadcastChan Out a
ch
                [a] -> IO [a]
forall (m :: * -> *) a. Monad m => a -> m a
return (a
xa -> [a] -> [a]
forall a. a -> [a] -> [a]
:[a]
xs)

-- | Strict fold of the 'BroadcastChan'​'s messages. Can be used with
-- "Control.Foldl" from Tekmo's foldl package:
--
-- @"Control.Foldl".'Control.Foldl.purely' 'foldBChan' :: ('MonadIO' m, 'MonadIO' n) => 'Control.Foldl.Fold' a b -> 'BroadcastChan' d a -> n (m b)@
--
-- The result of this function is a nested monadic value to give more
-- fine-grained control/separation between the start of listening for messages
-- and the start of processing. The inner action folds the actual messages and
-- completes when the channel is closed and exhausted. The outer action
-- controls from when on messages are received. Specifically:
--
--  ['BroadcastChan' v'In':]:
--
--      Will process all messages sent after the outer action completes.
--
--  ['BroadcastChan' v'Out':]:
--
--      Will process all messages that are unread when the outer action
--      completes, as well as all future messages.
--
--      After the outer action completes the fold is unaffected by other
--      (concurrent) reads performed on the original channel. So it's safe to
--      reuse the channel.
foldBChan
    :: (MonadIO m, MonadIO n)
    => (x -> a -> x)
    -> x
    -> (x -> b)
    -> BroadcastChan d a
    -> n (m b)
foldBChan :: (x -> a -> x) -> x -> (x -> b) -> BroadcastChan d a -> n (m b)
foldBChan x -> a -> x
step x
begin x -> b
done BroadcastChan d a
chan = do
    BroadcastChan Out a
listen <- BroadcastChan d a -> n (BroadcastChan Out a)
forall (m :: * -> *) (dir :: Direction) a.
MonadIO m =>
BroadcastChan dir a -> m (BroadcastChan Out a)
newBChanListener BroadcastChan d a
chan
    m b -> n (m b)
forall (m :: * -> *) a. Monad m => a -> m a
return (m b -> n (m b)) -> m b -> n (m b)
forall a b. (a -> b) -> a -> b
$ BroadcastChan Out a -> x -> m b
forall (m :: * -> *). MonadIO m => BroadcastChan Out a -> x -> m b
go BroadcastChan Out a
listen x
begin
  where
    go :: BroadcastChan Out a -> x -> m b
go BroadcastChan Out a
listen x
x = do
        Maybe a
x' <- BroadcastChan Out a -> m (Maybe a)
forall (m :: * -> *) a.
MonadIO m =>
BroadcastChan Out a -> m (Maybe a)
readBChan BroadcastChan Out a
listen
        case Maybe a
x' of
            Just a
x'' -> BroadcastChan Out a -> x -> m b
go BroadcastChan Out a
listen (x -> m b) -> x -> m b
forall a b. (a -> b) -> a -> b
$! x -> a -> x
step x
x a
x''
            Maybe a
Nothing -> b -> m b
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> m b) -> b -> m b
forall a b. (a -> b) -> a -> b
$! x -> b
done x
x
{-# INLINABLE foldBChan #-}

-- | Strict, monadic fold of the 'BroadcastChan'​'s messages. Can be used with
-- "Control.Foldl" from Tekmo's foldl package:
--
-- @"Control.Foldl".'Control.Foldl.impurely' 'foldBChanM' :: ('MonadIO' m, 'MonadIO' n) => 'Control.Foldl.FoldM' m a b -> 'BroadcastChan' d a -> n (m b)@
--
-- Has the same behaviour and guarantees as 'foldBChan'.
foldBChanM
    :: (MonadIO m, MonadIO n)
    => (x -> a -> m x)
    -> m x
    -> (x -> m b)
    -> BroadcastChan d a
    -> n (m b)
foldBChanM :: (x -> a -> m x)
-> m x -> (x -> m b) -> BroadcastChan d a -> n (m b)
foldBChanM x -> a -> m x
step m x
begin x -> m b
done BroadcastChan d a
chan = do
    BroadcastChan Out a
listen <- BroadcastChan d a -> n (BroadcastChan Out a)
forall (m :: * -> *) (dir :: Direction) a.
MonadIO m =>
BroadcastChan dir a -> m (BroadcastChan Out a)
newBChanListener BroadcastChan d a
chan
    m b -> n (m b)
forall (m :: * -> *) a. Monad m => a -> m a
return (m b -> n (m b)) -> m b -> n (m b)
forall a b. (a -> b) -> a -> b
$ do
        x
x0 <- m x
begin
        BroadcastChan Out a -> x -> m b
go BroadcastChan Out a
listen x
x0
  where
    go :: BroadcastChan Out a -> x -> m b
go BroadcastChan Out a
listen x
x = do
        Maybe a
x' <- BroadcastChan Out a -> m (Maybe a)
forall (m :: * -> *) a.
MonadIO m =>
BroadcastChan Out a -> m (Maybe a)
readBChan BroadcastChan Out a
listen
        case Maybe a
x' of
            Just a
x'' -> x -> a -> m x
step x
x a
x'' m x -> (x -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= BroadcastChan Out a -> x -> m b
go BroadcastChan Out a
listen
            Maybe a
Nothing -> x -> m b
done x
x
{-# INLINABLE foldBChanM #-}