module Control.Distributed.Process.Internal.CQueue
( CQueue
, BlockSpec(..)
, newCQueue
, enqueue
, dequeue
, mkWeakCQueue
) where
import Prelude hiding (length, reverse)
import Control.Concurrent.STM
( atomically
, TChan
, newTChan
, writeTChan
, readTChan
, tryReadTChan
)
import Control.Applicative ((<$>), (<*>))
import Control.Exception (mask, onException)
import System.Timeout (timeout)
import Control.Distributed.Process.Internal.StrictMVar
( StrictMVar(StrictMVar)
, newMVar
, takeMVar
, putMVar
)
import Control.Distributed.Process.Internal.StrictList
( StrictList(..)
, reverse
, reverse'
)
import GHC.MVar (MVar(MVar))
import GHC.IO (IO(IO))
import GHC.Prim (mkWeak#)
import GHC.Weak (Weak(Weak))
data CQueue a = CQueue (StrictMVar (StrictList a))
(TChan a)
newCQueue :: IO (CQueue a)
newCQueue = CQueue <$> newMVar Nil <*> atomically newTChan
enqueue :: CQueue a -> a -> IO ()
enqueue (CQueue _arrived incoming) !a = atomically $ writeTChan incoming a
data BlockSpec =
NonBlocking
| Blocking
| Timeout Int
dequeue :: forall a b.
CQueue a
-> BlockSpec
-> [a -> Maybe b]
-> IO (Maybe b)
dequeue (CQueue arrived incoming) blockSpec matches = go
where
go :: IO (Maybe b)
go = mask $ \restore -> do
arr <- takeMVar arrived
(arr', mb) <- onException (restore (checkArrived Nil arr))
(putMVar arrived arr)
case (mb, blockSpec) of
(Just b, _) -> do
putMVar arrived arr'
return (Just b)
(Nothing, NonBlocking) ->
checkNonBlocking arr'
(Nothing, Blocking) ->
Just <$> checkBlocking arr'
(Nothing, Timeout n) ->
timeout n $ checkBlocking arr'
checkArrived :: StrictList a -> StrictList a -> IO (StrictList a, Maybe b)
checkArrived acc Nil = return (acc, Nothing)
checkArrived acc (Cons x xs) =
case check x of
Just y -> return (reverse' acc xs, Just y)
Nothing -> checkArrived (Cons x acc) xs
checkBlocking :: StrictList a -> IO b
checkBlocking acc = do
x <- onException (atomically $ readTChan incoming)
(putMVar arrived $ reverse acc)
case check x of
Nothing -> checkBlocking (Cons x acc)
Just y -> putMVar arrived (reverse acc) >> return y
checkNonBlocking :: StrictList a -> IO (Maybe b)
checkNonBlocking acc = do
mx <- atomically $ tryReadTChan incoming
case mx of
Nothing -> putMVar arrived (reverse acc) >> return Nothing
Just x -> case check x of
Nothing -> checkNonBlocking (Cons x acc)
Just y -> putMVar arrived (reverse acc) >> return (Just y)
check :: a -> Maybe b
check = checkMatches matches
checkMatches :: [a -> Maybe b] -> a -> Maybe b
checkMatches [] _ = Nothing
checkMatches (m:ms) a = case m a of Nothing -> checkMatches ms a
Just b -> Just b
mkWeakCQueue :: CQueue a -> IO () -> IO (Weak (CQueue a))
mkWeakCQueue m@(CQueue (StrictMVar (MVar m#)) _) f = IO $ \s ->
case mkWeak# m# m f s of (# s1, w #) -> (# s1, Weak w #)