{-# LANGUAGE CPP #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE Trustworthy #-}
module BroadcastChan.Internal where
#if !MIN_VERSION_base(4,8,0)
import Control.Applicative ((<*))
#endif
import Control.Concurrent.MVar
import Control.Exception (mask_)
import Control.Monad.IO.Class (MonadIO(..))
import System.IO.Unsafe (unsafeInterleaveIO)
#if !MIN_VERSION_base(4,6,0)
import Control.Exception (evaluate, onException)
#endif
data Direction = In
| Out
type In = 'In
type Out = 'Out
newtype BroadcastChan (d :: Direction) a = BChan (MVar (Stream a))
deriving (Eq)
type Stream a = MVar (ChItem a)
data ChItem a = ChItem a {-# UNPACK #-} !(Stream a) | Closed
newBroadcastChan :: MonadIO m => m (BroadcastChan In a)
newBroadcastChan = liftIO $ do
hole <- newEmptyMVar
writeVar <- newMVar hole
return (BChan writeVar)
closeBChan :: MonadIO m => BroadcastChan In a -> m Bool
closeBChan (BChan writeVar) = liftIO . mask_ $ do
old_hole <- takeMVar writeVar
tryPutMVar old_hole Closed <* putMVar writeVar old_hole
isClosedBChan :: MonadIO m => BroadcastChan In a -> m Bool
#if MIN_VERSION_base(4,7,0)
isClosedBChan (BChan writeVar) = liftIO $ do
old_hole <- readMVar writeVar
val <- tryReadMVar old_hole
#else
isClosedBChan (BChan writeVar) = liftIO . mask_ $ do
old_hole <- takeMVar writeVar
val <- tryTakeMVar old_hole
case val of
Just x -> putMVar old_hole x
Nothing -> return ()
putMVar writeVar old_hole
#endif
case val of
Just Closed -> return True
_ -> return False
writeBChan :: MonadIO m => BroadcastChan In a -> a -> m Bool
writeBChan (BChan writeVar) val = liftIO $ do
new_hole <- newEmptyMVar
mask_ $ do
old_hole <- takeMVar writeVar
empty <- tryPutMVar old_hole (ChItem val new_hole)
if empty
then putMVar writeVar new_hole
else putMVar writeVar old_hole
return empty
{-# INLINE writeBChan #-}
readBChan :: MonadIO m => BroadcastChan Out a -> m (Maybe a)
readBChan (BChan readVar) = liftIO $ do
modifyMVarMasked readVar $ \read_end -> do
result <- readMVar read_end
case result of
ChItem val new_read_end -> return (new_read_end, Just val)
Closed -> return (read_end, Nothing)
{-# INLINE readBChan #-}
newBChanListener :: MonadIO m => BroadcastChan In a -> m (BroadcastChan Out a)
newBChanListener (BChan writeVar) = liftIO $ do
hole <- readMVar writeVar
newReadVar <- newMVar hole
return (BChan newReadVar)
foldBChan
:: MonadIO m
=> (x -> a -> x)
-> x
-> (x -> b)
-> BroadcastChan In a
-> m (m b)
foldBChan step begin done chan = do
listen <- newBChanListener chan
return $ go listen begin
where
go listen x = do
x' <- readBChan listen
case x' of
Just x'' -> go listen $! step x x''
Nothing -> return $! done x
{-# INLINABLE foldBChan #-}
foldBChanM
:: MonadIO m
=> (x -> a -> m x)
-> m x
-> (x -> m b)
-> BroadcastChan In a
-> m (m b)
foldBChanM step begin done chan = do
listen <- newBChanListener chan
x0 <- begin
return $ go listen x0
where
go listen x = do
x' <- readBChan listen
case x' of
Just x'' -> step x x'' >>= go listen
Nothing -> done x
{-# INLINABLE foldBChanM #-}
getBChanContents :: BroadcastChan In a -> IO [a]
getBChanContents chan = newBChanListener chan >>= go
where
go ch = unsafeInterleaveIO $ do
result <- readBChan ch
case result of
Nothing -> return []
Just x -> do
xs <- go ch
return (x:xs)
#if !MIN_VERSION_base(4,6,0)
{-# INLINE modifyMVarMasked #-}
modifyMVarMasked :: MVar a -> (a -> IO (a,b)) -> IO b
modifyMVarMasked m io =
mask_ $ do
a <- takeMVar m
(a',b) <- (io a >>= evaluate) `onException` putMVar m a
putMVar m a'
return b
#endif