{-# LANGUAGE CPP #-}
{-# LANGUAGE DeriveFunctor #-}
{-# LANGUAGE BangPatterns  #-}
{-# LANGUAGE MagicHash, UnboxedTuples, PatternGuards, ScopedTypeVariables, RankNTypes #-}
-- | Concurrent queue for single reader, single writer
module Control.Distributed.Process.Internal.CQueue
  ( CQueue
  , BlockSpec(..)
  , MatchOn(..)
  , newCQueue
  , enqueue
  , enqueueSTM
  , dequeue
  , mkWeakCQueue
  , queueSize
  ) where

import Prelude hiding (length, reverse)
import Control.Concurrent.STM
  ( atomically
  , STM
  , TChan
  , TVar
  , modifyTVar'
  , tryReadTChan
  , newTChan
  , newTVarIO
  , writeTChan
  , readTChan
  , readTVarIO
  , orElse
  , retry
  )
import Control.Exception (mask_, onException)
import System.Timeout (timeout)
import Control.Distributed.Process.Internal.StrictMVar
  ( StrictMVar(StrictMVar)
  , newMVar
  , takeMVar
  , putMVar
  )
import Control.Distributed.Process.Internal.StrictList
  ( StrictList(..)
  , append
  )
import Data.Maybe (fromJust)
import GHC.MVar (MVar(MVar))
import GHC.IO (IO(IO), unIO)
import GHC.Exts (mkWeak#)
import GHC.Weak (Weak(Weak))

-- We use a TCHan rather than a Chan so that we have a non-blocking read
data CQueue a = CQueue (StrictMVar (StrictList a)) -- Arrived
                       (TChan a)                   -- Incoming
                       (TVar Int)                 -- Queue size

newCQueue :: IO (CQueue a)
newCQueue :: forall a. IO (CQueue a)
newCQueue = StrictMVar (StrictList a) -> TChan a -> TVar Int -> CQueue a
forall a.
StrictMVar (StrictList a) -> TChan a -> TVar Int -> CQueue a
CQueue (StrictMVar (StrictList a) -> TChan a -> TVar Int -> CQueue a)
-> IO (StrictMVar (StrictList a))
-> IO (TChan a -> TVar Int -> CQueue a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StrictList a -> IO (StrictMVar (StrictList a))
forall a. a -> IO (StrictMVar a)
newMVar StrictList a
forall a. StrictList a
Nil IO (TChan a -> TVar Int -> CQueue a)
-> IO (TChan a) -> IO (TVar Int -> CQueue a)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> STM (TChan a) -> IO (TChan a)
forall a. STM a -> IO a
atomically STM (TChan a)
forall a. STM (TChan a)
newTChan IO (TVar Int -> CQueue a) -> IO (TVar Int) -> IO (CQueue a)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Int -> IO (TVar Int)
forall a. a -> IO (TVar a)
newTVarIO Int
0

-- | Enqueue an element
--
-- Enqueue is strict.
enqueue :: CQueue a -> a -> IO ()
enqueue :: forall a. CQueue a -> a -> IO ()
enqueue CQueue a
c !a
a = STM () -> IO ()
forall a. STM a -> IO a
atomically (CQueue a -> a -> STM ()
forall a. CQueue a -> a -> STM ()
enqueueSTM CQueue a
c a
a)

-- | Variant of enqueue for use in the STM monad.
enqueueSTM :: CQueue a -> a -> STM ()
enqueueSTM :: forall a. CQueue a -> a -> STM ()
enqueueSTM (CQueue StrictMVar (StrictList a)
_arrived TChan a
incoming TVar Int
size) !a
a = do
   TChan a -> a -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan TChan a
incoming a
a
   TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
size Int -> Int
forall a. Enum a => a -> a
succ

data BlockSpec =
    NonBlocking
  | Blocking
  | Timeout Int

-- Match operations
--
-- They can be either a message match or a channel match.
data MatchOn m a
 = MatchMsg  (m -> Maybe a)
 | MatchChan (STM a)
 deriving ((forall a b. (a -> b) -> MatchOn m a -> MatchOn m b)
-> (forall a b. a -> MatchOn m b -> MatchOn m a)
-> Functor (MatchOn m)
forall a b. a -> MatchOn m b -> MatchOn m a
forall a b. (a -> b) -> MatchOn m a -> MatchOn m b
forall m a b. a -> MatchOn m b -> MatchOn m a
forall m a b. (a -> b) -> MatchOn m a -> MatchOn m b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
$cfmap :: forall m a b. (a -> b) -> MatchOn m a -> MatchOn m b
fmap :: forall a b. (a -> b) -> MatchOn m a -> MatchOn m b
$c<$ :: forall m a b. a -> MatchOn m b -> MatchOn m a
<$ :: forall a b. a -> MatchOn m b -> MatchOn m a
Functor)

-- Lists of chunks of matches
--
-- Two consecutive chunks never have the same kind of matches. i.e. if one chunk
-- contains message matches then the next one must contain channel matches and
-- viceversa.
type MatchChunks m a = [Either [m -> Maybe a] [STM a]]

-- Splits a list of matches into chunks.
--
-- > concatMap (either (map MatchMsg) (map MatchChan)) . chunkMatches == id
--
chunkMatches :: [MatchOn m a] -> MatchChunks m a
chunkMatches :: forall m a. [MatchOn m a] -> MatchChunks m a
chunkMatches [] = []
chunkMatches (MatchMsg m -> Maybe a
m : [MatchOn m a]
ms) = [m -> Maybe a] -> Either [m -> Maybe a] [STM a]
forall a b. a -> Either a b
Left (m -> Maybe a
m (m -> Maybe a) -> [m -> Maybe a] -> [m -> Maybe a]
forall a. a -> [a] -> [a]
: [m -> Maybe a]
chk) Either [m -> Maybe a] [STM a]
-> [Either [m -> Maybe a] [STM a]]
-> [Either [m -> Maybe a] [STM a]]
forall a. a -> [a] -> [a]
: [MatchOn m a] -> [Either [m -> Maybe a] [STM a]]
forall m a. [MatchOn m a] -> MatchChunks m a
chunkMatches [MatchOn m a]
rest
   where ([m -> Maybe a]
chk, [MatchOn m a]
rest) = [MatchOn m a] -> ([m -> Maybe a], [MatchOn m a])
forall m a. [MatchOn m a] -> ([m -> Maybe a], [MatchOn m a])
spanMatchMsg [MatchOn m a]
ms
chunkMatches (MatchChan STM a
r : [MatchOn m a]
ms) = [STM a] -> Either [m -> Maybe a] [STM a]
forall a b. b -> Either a b
Right (STM a
r STM a -> [STM a] -> [STM a]
forall a. a -> [a] -> [a]
: [STM a]
chk) Either [m -> Maybe a] [STM a]
-> [Either [m -> Maybe a] [STM a]]
-> [Either [m -> Maybe a] [STM a]]
forall a. a -> [a] -> [a]
: [MatchOn m a] -> [Either [m -> Maybe a] [STM a]]
forall m a. [MatchOn m a] -> MatchChunks m a
chunkMatches [MatchOn m a]
rest
   where ([STM a]
chk, [MatchOn m a]
rest) = [MatchOn m a] -> ([STM a], [MatchOn m a])
forall m a. [MatchOn m a] -> ([STM a], [MatchOn m a])
spanMatchChan [MatchOn m a]
ms

-- | @spanMatchMsg = first (map (\(MatchMsg x) -> x)) . span isMatchMsg@
spanMatchMsg :: [MatchOn m a] -> ([m -> Maybe a], [MatchOn m a])
spanMatchMsg :: forall m a. [MatchOn m a] -> ([m -> Maybe a], [MatchOn m a])
spanMatchMsg [] = ([],[])
spanMatchMsg (MatchOn m a
m : [MatchOn m a]
ms)
    | MatchMsg m -> Maybe a
msg <- MatchOn m a
m = (m -> Maybe a
msg(m -> Maybe a) -> [m -> Maybe a] -> [m -> Maybe a]
forall a. a -> [a] -> [a]
:[m -> Maybe a]
msgs, [MatchOn m a]
rest)
    | Bool
otherwise         = ([], MatchOn m a
mMatchOn m a -> [MatchOn m a] -> [MatchOn m a]
forall a. a -> [a] -> [a]
:[MatchOn m a]
ms)
    where !([m -> Maybe a]
msgs,[MatchOn m a]
rest) = [MatchOn m a] -> ([m -> Maybe a], [MatchOn m a])
forall m a. [MatchOn m a] -> ([m -> Maybe a], [MatchOn m a])
spanMatchMsg [MatchOn m a]
ms

-- | @spanMatchMsg = first (map (\(MatchChan x) -> x)) . span isMatchChan@
spanMatchChan :: [MatchOn m a] -> ([STM a], [MatchOn m a])
spanMatchChan :: forall m a. [MatchOn m a] -> ([STM a], [MatchOn m a])
spanMatchChan [] = ([],[])
spanMatchChan (MatchOn m a
m : [MatchOn m a]
ms)
    | MatchChan STM a
stm <- MatchOn m a
m = (STM a
stmSTM a -> [STM a] -> [STM a]
forall a. a -> [a] -> [a]
:[STM a]
stms, [MatchOn m a]
rest)
    | Bool
otherwise          = ([], MatchOn m a
mMatchOn m a -> [MatchOn m a] -> [MatchOn m a]
forall a. a -> [a] -> [a]
:[MatchOn m a]
ms)
    where !([STM a]
stms,[MatchOn m a]
rest)  = [MatchOn m a] -> ([STM a], [MatchOn m a])
forall m a. [MatchOn m a] -> ([STM a], [MatchOn m a])
spanMatchChan [MatchOn m a]
ms

-- | Dequeue an element
--
-- The timeout (if any) is applied only to waiting for incoming messages, not
-- to checking messages that have already arrived
dequeue :: forall m a.
           CQueue m          -- ^ Queue
        -> BlockSpec         -- ^ Blocking behaviour
        -> [MatchOn m a]     -- ^ List of matches
        -> IO (Maybe a)      -- ^ 'Nothing' only on timeout
dequeue :: forall m a. CQueue m -> BlockSpec -> [MatchOn m a] -> IO (Maybe a)
dequeue (CQueue StrictMVar (StrictList m)
arrived TChan m
incoming TVar Int
size) BlockSpec
blockSpec [MatchOn m a]
matchons = IO (Maybe a) -> IO (Maybe a)
forall a. IO a -> IO a
mask_ (IO (Maybe a) -> IO (Maybe a)) -> IO (Maybe a) -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$ IO (Maybe (Either a a)) -> IO (Maybe a)
decrementJust (IO (Maybe (Either a a)) -> IO (Maybe a))
-> IO (Maybe (Either a a)) -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$
  case BlockSpec
blockSpec of
    Timeout Int
n -> Int -> IO (Either a a) -> IO (Maybe (Either a a))
forall a. Int -> IO a -> IO (Maybe a)
timeout Int
n (IO (Either a a) -> IO (Maybe (Either a a)))
-> IO (Either a a) -> IO (Maybe (Either a a))
forall a b. (a -> b) -> a -> b
$ (Maybe (Either a a) -> Either a a)
-> IO (Maybe (Either a a)) -> IO (Either a a)
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Maybe (Either a a) -> Either a a
forall a. HasCallStack => Maybe a -> a
fromJust IO (Maybe (Either a a))
run
    BlockSpec
_other    ->
       case MatchChunks m a
chunks of
         [Right [STM a]
ports] -> -- channels only, this is easy:
           case BlockSpec
blockSpec of
             BlockSpec
NonBlocking -> STM (Maybe (Either a a)) -> IO (Maybe (Either a a))
forall a. STM a -> IO a
atomically (STM (Maybe (Either a a)) -> IO (Maybe (Either a a)))
-> STM (Maybe (Either a a)) -> IO (Maybe (Either a a))
forall a b. (a -> b) -> a -> b
$ [STM a] -> STM (Maybe (Either a a)) -> STM (Maybe (Either a a))
waitChans [STM a]
ports (Maybe (Either a a) -> STM (Maybe (Either a a))
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (Either a a)
forall a. Maybe a
Nothing)
             BlockSpec
_           -> STM (Maybe (Either a a)) -> IO (Maybe (Either a a))
forall a. STM a -> IO a
atomically (STM (Maybe (Either a a)) -> IO (Maybe (Either a a)))
-> STM (Maybe (Either a a)) -> IO (Maybe (Either a a))
forall a b. (a -> b) -> a -> b
$ [STM a] -> STM (Maybe (Either a a)) -> STM (Maybe (Either a a))
waitChans [STM a]
ports STM (Maybe (Either a a))
forall a. STM a
retry
                              -- no onException needed
         MatchChunks m a
_other -> IO (Maybe (Either a a))
run
  where
    -- Decrement counter is smth is returned from the queue,
    -- this is safe to use as method is called under a mask
    -- and there is no 'unmasked' operation inside
    decrementJust :: IO (Maybe (Either a a)) -> IO (Maybe a)
    decrementJust :: IO (Maybe (Either a a)) -> IO (Maybe a)
decrementJust IO (Maybe (Either a a))
f =
       (Either a a -> IO a) -> Maybe (Either a a) -> IO (Maybe a)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> Maybe a -> f (Maybe b)
traverse ((a -> IO a) -> (a -> IO a) -> Either a a -> IO a
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either a -> IO a
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (\a
x -> IO ()
decrement IO () -> IO a -> IO a
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> a -> IO a
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x)) (Maybe (Either a a) -> IO (Maybe a))
-> IO (Maybe (Either a a)) -> IO (Maybe a)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IO (Maybe (Either a a))
f
    decrement :: IO ()
decrement = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
size Int -> Int
forall a. Enum a => a -> a
pred

    chunks :: MatchChunks m a
chunks = [MatchOn m a] -> MatchChunks m a
forall m a. [MatchOn m a] -> MatchChunks m a
chunkMatches [MatchOn m a]
matchons

    run :: IO (Maybe (Either a a))
run = do
           StrictList m
arr <- StrictMVar (StrictList m) -> IO (StrictList m)
forall a. StrictMVar a -> IO a
takeMVar StrictMVar (StrictList m)
arrived
           let grabNew :: StrictList m -> IO (StrictList m)
grabNew StrictList m
xs = do
                 Maybe m
r <- STM (Maybe m) -> IO (Maybe m)
forall a. STM a -> IO a
atomically (STM (Maybe m) -> IO (Maybe m)) -> STM (Maybe m) -> IO (Maybe m)
forall a b. (a -> b) -> a -> b
$ TChan m -> STM (Maybe m)
forall a. TChan a -> STM (Maybe a)
tryReadTChan TChan m
incoming
                 case Maybe m
r of
                   Maybe m
Nothing -> StrictList m -> IO (StrictList m)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return StrictList m
xs
                   Just m
x  -> StrictList m -> IO (StrictList m)
grabNew (StrictList m -> m -> StrictList m
forall a. StrictList a -> a -> StrictList a
Snoc StrictList m
xs m
x)
           StrictList m
arr' <- StrictList m -> IO (StrictList m)
grabNew StrictList m
arr
           MatchChunks m a -> StrictList m -> IO (Maybe (Either a a))
goCheck MatchChunks m a
chunks StrictList m
arr'

    -- Yields the value of the first succesful STM transaction as
    -- @Just (Left v)@. If all transactions fail, yields the value of the second
    -- argument.
    waitChans :: [STM a] -> STM (Maybe (Either a a)) -> STM (Maybe (Either a a))
    waitChans :: [STM a] -> STM (Maybe (Either a a)) -> STM (Maybe (Either a a))
waitChans [STM a]
ports STM (Maybe (Either a a))
on_block =
        (STM (Maybe (Either a a))
 -> STM (Maybe (Either a a)) -> STM (Maybe (Either a a)))
-> STM (Maybe (Either a a))
-> [STM (Maybe (Either a a))]
-> STM (Maybe (Either a a))
forall a b. (a -> b -> b) -> b -> [a] -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr STM (Maybe (Either a a))
-> STM (Maybe (Either a a)) -> STM (Maybe (Either a a))
forall a. STM a -> STM a -> STM a
orElse STM (Maybe (Either a a))
on_block ((STM a -> STM (Maybe (Either a a)))
-> [STM a] -> [STM (Maybe (Either a a))]
forall a b. (a -> b) -> [a] -> [b]
map ((a -> Maybe (Either a a)) -> STM a -> STM (Maybe (Either a a))
forall a b. (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Either a a -> Maybe (Either a a)
forall a. a -> Maybe a
Just (Either a a -> Maybe (Either a a))
-> (a -> Either a a) -> a -> Maybe (Either a a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Either a a
forall a b. a -> Either a b
Left)) [STM a]
ports)

    --
    -- First check the MatchChunks against the messages already in the
    -- mailbox.  For channel matches, we do a non-blocking check at
    -- this point.
    --
    -- Yields @Just (Left a)@ when a channel is matched, @Just (Right a)@
    -- when a message is matched and @Nothing@ when there are no messages and we
    -- aren't blocking.
    --
    goCheck :: MatchChunks m a
            -> StrictList m  -- messages to check, in this order
            -> IO (Maybe (Either a a))

    goCheck :: MatchChunks m a -> StrictList m -> IO (Maybe (Either a a))
goCheck [] StrictList m
old = StrictList m -> IO (Maybe (Either a a))
goWait StrictList m
old

    goCheck (Right [STM a]
ports : MatchChunks m a
rest) StrictList m
old = do
      Maybe (Either a a)
r <- STM (Maybe (Either a a)) -> IO (Maybe (Either a a))
forall a. STM a -> IO a
atomically (STM (Maybe (Either a a)) -> IO (Maybe (Either a a)))
-> STM (Maybe (Either a a)) -> IO (Maybe (Either a a))
forall a b. (a -> b) -> a -> b
$ [STM a] -> STM (Maybe (Either a a)) -> STM (Maybe (Either a a))
waitChans [STM a]
ports (Maybe (Either a a) -> STM (Maybe (Either a a))
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (Either a a)
forall a. Maybe a
Nothing) -- does not block
      case Maybe (Either a a)
r of
        Just Either a a
_  -> StrictList m -> Maybe (Either a a) -> IO (Maybe (Either a a))
returnOld StrictList m
old Maybe (Either a a)
r
        Maybe (Either a a)
Nothing -> MatchChunks m a -> StrictList m -> IO (Maybe (Either a a))
goCheck MatchChunks m a
rest StrictList m
old

    goCheck (Left [m -> Maybe a]
matches : MatchChunks m a
rest) StrictList m
old = do
           -- checkArrived might in principle take arbitrary time, so
           -- we ought to call restore and use an exception handler.  However,
           -- the check is usually fast (just a comparison), and the overhead
           -- of passing around restore and setting up exception handlers is
           -- high.  So just don't use expensive matchIfs!
      case [m -> Maybe a] -> StrictList m -> (StrictList m, Maybe a)
checkArrived [m -> Maybe a]
matches StrictList m
old of
        (StrictList m
old', Just a
r)  -> StrictList m -> Maybe (Either a a) -> IO (Maybe (Either a a))
returnOld StrictList m
old' (Either a a -> Maybe (Either a a)
forall a. a -> Maybe a
Just (a -> Either a a
forall a b. b -> Either a b
Right a
r))
        (StrictList m
old', Maybe a
Nothing) -> MatchChunks m a -> StrictList m -> IO (Maybe (Either a a))
goCheck MatchChunks m a
rest StrictList m
old'
          -- use the result list, which is now left-biased

    --
    -- Construct an STM transaction that looks at the relevant channels
    -- in the correct order.
    --
    mkSTM :: MatchChunks m a -> STM (Either m a)
    mkSTM :: MatchChunks m a -> STM (Either m a)
mkSTM [] = STM (Either m a)
forall a. STM a
retry
    mkSTM (Left [m -> Maybe a]
_ : MatchChunks m a
rest)
      = (m -> Either m a) -> STM m -> STM (Either m a)
forall a b. (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap m -> Either m a
forall a b. a -> Either a b
Left (TChan m -> STM m
forall a. TChan a -> STM a
readTChan TChan m
incoming) STM (Either m a) -> STM (Either m a) -> STM (Either m a)
forall a. STM a -> STM a -> STM a
`orElse` MatchChunks m a -> STM (Either m a)
mkSTM MatchChunks m a
rest
    mkSTM (Right [STM a]
ports : MatchChunks m a
rest)
      = (STM (Either m a) -> STM (Either m a) -> STM (Either m a))
-> STM (Either m a) -> [STM (Either m a)] -> STM (Either m a)
forall a b. (a -> b -> b) -> b -> [a] -> b
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr STM (Either m a) -> STM (Either m a) -> STM (Either m a)
forall a. STM a -> STM a -> STM a
orElse (MatchChunks m a -> STM (Either m a)
mkSTM MatchChunks m a
rest) ((STM a -> STM (Either m a)) -> [STM a] -> [STM (Either m a)]
forall a b. (a -> b) -> [a] -> [b]
map ((a -> Either m a) -> STM a -> STM (Either m a)
forall a b. (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> Either m a
forall a b. b -> Either a b
Right) [STM a]
ports)

    waitIncoming :: IO (Maybe (Either m a))
    waitIncoming :: IO (Maybe (Either m a))
waitIncoming = case BlockSpec
blockSpec of
      BlockSpec
NonBlocking -> STM (Maybe (Either m a)) -> IO (Maybe (Either m a))
forall a. STM a -> IO a
atomically (STM (Maybe (Either m a)) -> IO (Maybe (Either m a)))
-> STM (Maybe (Either m a)) -> IO (Maybe (Either m a))
forall a b. (a -> b) -> a -> b
$ (Either m a -> Maybe (Either m a))
-> STM (Either m a) -> STM (Maybe (Either m a))
forall a b. (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Either m a -> Maybe (Either m a)
forall a. a -> Maybe a
Just STM (Either m a)
stm STM (Maybe (Either m a))
-> STM (Maybe (Either m a)) -> STM (Maybe (Either m a))
forall a. STM a -> STM a -> STM a
`orElse` Maybe (Either m a) -> STM (Maybe (Either m a))
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (Either m a)
forall a. Maybe a
Nothing
      BlockSpec
_           -> STM (Maybe (Either m a)) -> IO (Maybe (Either m a))
forall a. STM a -> IO a
atomically (STM (Maybe (Either m a)) -> IO (Maybe (Either m a)))
-> STM (Maybe (Either m a)) -> IO (Maybe (Either m a))
forall a b. (a -> b) -> a -> b
$ (Either m a -> Maybe (Either m a))
-> STM (Either m a) -> STM (Maybe (Either m a))
forall a b. (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Either m a -> Maybe (Either m a)
forall a. a -> Maybe a
Just STM (Either m a)
stm
     where
      stm :: STM (Either m a)
stm = MatchChunks m a -> STM (Either m a)
mkSTM MatchChunks m a
chunks

    --
    -- The initial pass didn't find a message, so now we go into blocking
    -- mode.
    --
    -- Contents of 'arrived' from now on is (old ++ new), and
    -- messages that arrive are snocced onto new.
    --
    goWait :: StrictList m -> IO (Maybe (Either a a))
    goWait :: StrictList m -> IO (Maybe (Either a a))
goWait StrictList m
old = do
      Maybe (Either m a)
r <- IO (Maybe (Either m a))
waitIncoming IO (Maybe (Either m a)) -> IO () -> IO (Maybe (Either m a))
forall a b. IO a -> IO b -> IO a
`onException` StrictMVar (StrictList m) -> StrictList m -> IO ()
forall a. StrictMVar a -> a -> IO ()
putMVar StrictMVar (StrictList m)
arrived StrictList m
old
      case Maybe (Either m a)
r of
        --  Nothing => non-blocking and no message
        Maybe (Either m a)
Nothing -> StrictList m -> Maybe (Either a a) -> IO (Maybe (Either a a))
returnOld StrictList m
old Maybe (Either a a)
forall a. Maybe a
Nothing
        Just Either m a
e  -> case Either m a
e of
          --
          -- Left => message arrived in the process mailbox.  We now have to
          -- run through the MatchChunks checking each one, because we might
          -- have a situation where the first chunk fails to match and the
          -- second chunk is a channel match and there *is* a message in the
          -- channel.  In that case the channel wins.
          --
          Left m
m -> MatchChunks m a -> m -> StrictList m -> IO (Maybe (Either a a))
goCheck1 MatchChunks m a
chunks m
m StrictList m
old
          --
          -- Right => message arrived on a channel first
          --
          Right a
a -> StrictList m -> Maybe (Either a a) -> IO (Maybe (Either a a))
returnOld StrictList m
old (Either a a -> Maybe (Either a a)
forall a. a -> Maybe a
Just (a -> Either a a
forall a b. a -> Either a b
Left a
a))

    --
    -- A message arrived in the process inbox; check the MatchChunks for
    -- a valid match.
    --
    goCheck1 :: MatchChunks m a
             -> m               -- single message to check
             -> StrictList m    -- old messages we have already checked
             -> IO (Maybe (Either a a))

    goCheck1 :: MatchChunks m a -> m -> StrictList m -> IO (Maybe (Either a a))
goCheck1 [] m
m StrictList m
old = StrictList m -> IO (Maybe (Either a a))
goWait (StrictList m -> m -> StrictList m
forall a. StrictList a -> a -> StrictList a
Snoc StrictList m
old m
m)

    goCheck1 (Right [STM a]
ports : MatchChunks m a
rest) m
m StrictList m
old = do
      Maybe (Either a a)
r <- STM (Maybe (Either a a)) -> IO (Maybe (Either a a))
forall a. STM a -> IO a
atomically (STM (Maybe (Either a a)) -> IO (Maybe (Either a a)))
-> STM (Maybe (Either a a)) -> IO (Maybe (Either a a))
forall a b. (a -> b) -> a -> b
$ [STM a] -> STM (Maybe (Either a a)) -> STM (Maybe (Either a a))
waitChans [STM a]
ports (Maybe (Either a a) -> STM (Maybe (Either a a))
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (Either a a)
forall a. Maybe a
Nothing) -- does not block
      case Maybe (Either a a)
r of
        Maybe (Either a a)
Nothing -> MatchChunks m a -> m -> StrictList m -> IO (Maybe (Either a a))
goCheck1 MatchChunks m a
rest m
m StrictList m
old
        Just Either a a
_  -> StrictList m -> Maybe (Either a a) -> IO (Maybe (Either a a))
returnOld (StrictList m -> m -> StrictList m
forall a. StrictList a -> a -> StrictList a
Snoc StrictList m
old m
m) Maybe (Either a a)
r

    goCheck1 (Left [m -> Maybe a]
matches : MatchChunks m a
rest) m
m StrictList m
old = do
      case [m -> Maybe a] -> m -> Maybe a
checkMatches [m -> Maybe a]
matches m
m of
        Maybe a
Nothing -> MatchChunks m a -> m -> StrictList m -> IO (Maybe (Either a a))
goCheck1 MatchChunks m a
rest m
m StrictList m
old
        Just a
p  -> StrictList m -> Maybe (Either a a) -> IO (Maybe (Either a a))
returnOld StrictList m
old (Either a a -> Maybe (Either a a)
forall a. a -> Maybe a
Just (a -> Either a a
forall a b. b -> Either a b
Right a
p))

    -- a common pattern for putting back the arrived queue at the end
    returnOld :: StrictList m -> Maybe (Either a a) -> IO (Maybe (Either a a))
    returnOld :: StrictList m -> Maybe (Either a a) -> IO (Maybe (Either a a))
returnOld StrictList m
old Maybe (Either a a)
r = do StrictMVar (StrictList m) -> StrictList m -> IO ()
forall a. StrictMVar a -> a -> IO ()
putMVar StrictMVar (StrictList m)
arrived StrictList m
old; Maybe (Either a a) -> IO (Maybe (Either a a))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (Either a a)
r

    -- as a side-effect, this left-biases the list
    checkArrived :: [m -> Maybe a] -> StrictList m -> (StrictList m, Maybe a)
    checkArrived :: [m -> Maybe a] -> StrictList m -> (StrictList m, Maybe a)
checkArrived [m -> Maybe a]
matches StrictList m
list = StrictList m -> StrictList m -> (StrictList m, Maybe a)
go StrictList m
list StrictList m
forall a. StrictList a
Nil
      where
        -- @go xs ys@ searches for a message match in @append xs ys@
        go :: StrictList m -> StrictList m -> (StrictList m, Maybe a)
go StrictList m
Nil StrictList m
Nil           = (StrictList m
forall a. StrictList a
Nil, Maybe a
forall a. Maybe a
Nothing)
        go StrictList m
Nil StrictList m
r             = StrictList m -> StrictList m -> (StrictList m, Maybe a)
go StrictList m
r StrictList m
forall a. StrictList a
Nil
        go (Append StrictList m
xs StrictList m
ys) StrictList m
tl = StrictList m -> StrictList m -> (StrictList m, Maybe a)
go StrictList m
xs (StrictList m -> StrictList m -> StrictList m
forall a. StrictList a -> StrictList a -> StrictList a
append StrictList m
ys StrictList m
tl)
        go (Snoc StrictList m
xs m
x)    StrictList m
tl = StrictList m -> StrictList m -> (StrictList m, Maybe a)
go StrictList m
xs (m -> StrictList m -> StrictList m
forall a. a -> StrictList a -> StrictList a
Cons m
x StrictList m
tl)
        go (Cons m
x StrictList m
xs)    StrictList m
tl
          | Just a
y <- [m -> Maybe a] -> m -> Maybe a
checkMatches [m -> Maybe a]
matches m
x = (StrictList m -> StrictList m -> StrictList m
forall a. StrictList a -> StrictList a -> StrictList a
append StrictList m
xs StrictList m
tl, a -> Maybe a
forall a. a -> Maybe a
Just a
y)
          | Bool
otherwise = let !(StrictList m
rest,Maybe a
r) = StrictList m -> StrictList m -> (StrictList m, Maybe a)
go StrictList m
xs StrictList m
tl in (m -> StrictList m -> StrictList m
forall a. a -> StrictList a -> StrictList a
Cons m
x StrictList m
rest, Maybe a
r)

    checkMatches :: [m -> Maybe a] -> m -> Maybe a
    checkMatches :: [m -> Maybe a] -> m -> Maybe a
checkMatches []     m
_ = Maybe a
forall a. Maybe a
Nothing
    checkMatches (m -> Maybe a
m:[m -> Maybe a]
ms) m
a = case m -> Maybe a
m m
a of Maybe a
Nothing -> [m -> Maybe a] -> m -> Maybe a
checkMatches [m -> Maybe a]
ms m
a
                                        Just a
b  -> a -> Maybe a
forall a. a -> Maybe a
Just a
b

-- | Weak reference to a CQueue
mkWeakCQueue :: CQueue a -> IO () -> IO (Weak (CQueue a))
mkWeakCQueue :: forall a. CQueue a -> IO () -> IO (Weak (CQueue a))
mkWeakCQueue m :: CQueue a
m@(CQueue (StrictMVar (MVar MVar# RealWorld (StrictList a)
m#)) TChan a
_ TVar Int
_) IO ()
f = (State# RealWorld -> (# State# RealWorld, Weak (CQueue a) #))
-> IO (Weak (CQueue a))
forall a. (State# RealWorld -> (# State# RealWorld, a #)) -> IO a
IO ((State# RealWorld -> (# State# RealWorld, Weak (CQueue a) #))
 -> IO (Weak (CQueue a)))
-> (State# RealWorld -> (# State# RealWorld, Weak (CQueue a) #))
-> IO (Weak (CQueue a))
forall a b. (a -> b) -> a -> b
$ \State# RealWorld
s ->
#if MIN_VERSION_base(4,9,0)
  case MVar# RealWorld (StrictList a)
-> CQueue a
-> (State# RealWorld -> (# State# RealWorld, () #))
-> State# RealWorld
-> (# State# RealWorld, Weak# (CQueue a) #)
forall a b c.
a
-> b
-> (State# RealWorld -> (# State# RealWorld, c #))
-> State# RealWorld
-> (# State# RealWorld, Weak# b #)
mkWeak# MVar# RealWorld (StrictList a)
m# CQueue a
m (IO () -> State# RealWorld -> (# State# RealWorld, () #)
forall a. IO a -> State# RealWorld -> (# State# RealWorld, a #)
unIO IO ()
f) State# RealWorld
s of (# State# RealWorld
s1, Weak# (CQueue a)
w #) -> (# State# RealWorld
s1, Weak# (CQueue a) -> Weak (CQueue a)
forall v. Weak# v -> Weak v
Weak Weak# (CQueue a)
w #)
#else
  case mkWeak# m# m f s of (# s1, w #) -> (# s1, Weak w #)
#endif

queueSize :: CQueue a -> IO Int
queueSize :: forall a. CQueue a -> IO Int
queueSize (CQueue StrictMVar (StrictList a)
_ TChan a
_ TVar Int
size) = TVar Int -> IO Int
forall a. TVar a -> IO a
readTVarIO TVar Int
size