{-# LANGUAGE CPP  #-}
{-# LANGUAGE DeriveDataTypeable  #-}
{-# LANGUAGE MagicHash, UnboxedTuples #-}
-- | Clone of Control.Concurrent.STM.TQueue with support for mkWeakTQueue
--
-- Not all functionality from the original module is available: unGetTQueue,
-- peekTQueue and tryPeekTQueue are missing. In order to implement these we'd
-- need to be able to touch# the write end of the queue inside unGetTQueue, but
-- that means we need a version of touch# that works within the STM monad.
module Control.Distributed.Process.Internal.WeakTQueue (
  -- * Original functionality
  TQueue,
  newTQueue,
  newTQueueIO,
  readTQueue,
  tryReadTQueue,
  writeTQueue,
  isEmptyTQueue,
  -- * New functionality
  mkWeakTQueue
  ) where

import Prelude hiding (read)
import GHC.Conc
import Data.Typeable (Typeable)
import GHC.IO (IO(IO), unIO)
import GHC.Exts (mkWeak#)
import GHC.Weak (Weak(Weak))

--------------------------------------------------------------------------------
-- Original functionality                                                     --
--------------------------------------------------------------------------------

-- | 'TQueue' is an abstract type representing an unbounded FIFO channel.
data TQueue a = TQueue {-# UNPACK #-} !(TVar [a])
                       {-# UNPACK #-} !(TVar [a])
  deriving Typeable

instance Eq (TQueue a) where
  TQueue TVar [a]
a TVar [a]
_ == :: TQueue a -> TQueue a -> Bool
== TQueue TVar [a]
b TVar [a]
_ = TVar [a]
a TVar [a] -> TVar [a] -> Bool
forall a. Eq a => a -> a -> Bool
== TVar [a]
b

-- |Build and returns a new instance of 'TQueue'
newTQueue :: STM (TQueue a)
newTQueue :: forall a. STM (TQueue a)
newTQueue = do
  TVar [a]
read  <- [a] -> STM (TVar [a])
forall a. a -> STM (TVar a)
newTVar []
  TVar [a]
write <- [a] -> STM (TVar [a])
forall a. a -> STM (TVar a)
newTVar []
  TQueue a -> STM (TQueue a)
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (TVar [a] -> TVar [a] -> TQueue a
forall a. TVar [a] -> TVar [a] -> TQueue a
TQueue TVar [a]
read TVar [a]
write)

-- |@IO@ version of 'newTQueue'.  This is useful for creating top-level
-- 'TQueue's using 'System.IO.Unsafe.unsafePerformIO', because using
-- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
-- possible.
newTQueueIO :: IO (TQueue a)
newTQueueIO :: forall a. IO (TQueue a)
newTQueueIO = do
  TVar [a]
read  <- [a] -> IO (TVar [a])
forall a. a -> IO (TVar a)
newTVarIO []
  TVar [a]
write <- [a] -> IO (TVar [a])
forall a. a -> IO (TVar a)
newTVarIO []
  TQueue a -> IO (TQueue a)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (TVar [a] -> TVar [a] -> TQueue a
forall a. TVar [a] -> TVar [a] -> TQueue a
TQueue TVar [a]
read TVar [a]
write)

-- |Write a value to a 'TQueue'.
writeTQueue :: TQueue a -> a -> STM ()
writeTQueue :: forall a. TQueue a -> a -> STM ()
writeTQueue (TQueue TVar [a]
_read TVar [a]
write) a
a = do
  [a]
listend <- TVar [a] -> STM [a]
forall a. TVar a -> STM a
readTVar TVar [a]
write
  TVar [a] -> [a] -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar [a]
write (a
aa -> [a] -> [a]
forall a. a -> [a] -> [a]
:[a]
listend)

-- |Read the next value from the 'TQueue'.
readTQueue :: TQueue a -> STM a
readTQueue :: forall a. TQueue a -> STM a
readTQueue (TQueue TVar [a]
read TVar [a]
write) = do
  [a]
xs <- TVar [a] -> STM [a]
forall a. TVar a -> STM a
readTVar TVar [a]
read
  case [a]
xs of
    (a
x:[a]
xs') -> do TVar [a] -> [a] -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar [a]
read [a]
xs'
                  a -> STM a
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x
    [] -> do [a]
ys <- TVar [a] -> STM [a]
forall a. TVar a -> STM a
readTVar TVar [a]
write
             case [a]
ys of
               [] -> STM a
forall a. STM a
retry
               [a]
_  -> case [a] -> [a]
forall a. [a] -> [a]
reverse [a]
ys of
                       [] -> [Char] -> STM a
forall a. HasCallStack => [Char] -> a
error [Char]
"readTQueue"
                       (a
z:[a]
zs) -> do TVar [a] -> [a] -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar [a]
write []
                                    TVar [a] -> [a] -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar [a]
read [a]
zs
                                    a -> STM a
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return a
z

-- | A version of 'readTQueue' which does not retry. Instead it
-- returns @Nothing@ if no value is available.
tryReadTQueue :: TQueue a -> STM (Maybe a)
tryReadTQueue :: forall a. TQueue a -> STM (Maybe a)
tryReadTQueue TQueue a
c = (a -> Maybe a) -> STM a -> STM (Maybe a)
forall a b. (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> Maybe a
forall a. a -> Maybe a
Just (TQueue a -> STM a
forall a. TQueue a -> STM a
readTQueue TQueue a
c) STM (Maybe a) -> STM (Maybe a) -> STM (Maybe a)
forall a. STM a -> STM a -> STM a
`orElse` Maybe a -> STM (Maybe a)
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing

-- |Returns 'True' if the supplied 'TQueue' is empty.
isEmptyTQueue :: TQueue a -> STM Bool
isEmptyTQueue :: forall a. TQueue a -> STM Bool
isEmptyTQueue (TQueue TVar [a]
read TVar [a]
write) = do
  [a]
xs <- TVar [a] -> STM [a]
forall a. TVar a -> STM a
readTVar TVar [a]
read
  case [a]
xs of
    (a
_:[a]
_) -> Bool -> STM Bool
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
    [] -> do [a]
ys <- TVar [a] -> STM [a]
forall a. TVar a -> STM a
readTVar TVar [a]
write
             case [a]
ys of
               [] -> Bool -> STM Bool
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
               [a]
_  -> Bool -> STM Bool
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False

--------------------------------------------------------------------------------
-- New functionality                                                          --
--------------------------------------------------------------------------------

mkWeakTQueue :: TQueue a -> IO () -> IO (Weak (TQueue a))
mkWeakTQueue :: forall a. TQueue a -> IO () -> IO (Weak (TQueue a))
mkWeakTQueue q :: TQueue a
q@(TQueue TVar [a]
_read (TVar TVar# RealWorld [a]
write#)) IO ()
f = (State# RealWorld -> (# State# RealWorld, Weak (TQueue a) #))
-> IO (Weak (TQueue a))
forall a. (State# RealWorld -> (# State# RealWorld, a #)) -> IO a
IO ((State# RealWorld -> (# State# RealWorld, Weak (TQueue a) #))
 -> IO (Weak (TQueue a)))
-> (State# RealWorld -> (# State# RealWorld, Weak (TQueue a) #))
-> IO (Weak (TQueue a))
forall a b. (a -> b) -> a -> b
$ \State# RealWorld
s ->
#if MIN_VERSION_base(4,9,0)
  case TVar# RealWorld [a]
-> TQueue a
-> (State# RealWorld -> (# State# RealWorld, () #))
-> State# RealWorld
-> (# State# RealWorld, Weak# (TQueue a) #)
forall a b c.
a
-> b
-> (State# RealWorld -> (# State# RealWorld, c #))
-> State# RealWorld
-> (# State# RealWorld, Weak# b #)
mkWeak# TVar# RealWorld [a]
write# TQueue a
q (IO () -> State# RealWorld -> (# State# RealWorld, () #)
forall a. IO a -> State# RealWorld -> (# State# RealWorld, a #)
unIO IO ()
f) State# RealWorld
s of (# State# RealWorld
s', Weak# (TQueue a)
w #) -> (# State# RealWorld
s', Weak# (TQueue a) -> Weak (TQueue a)
forall v. Weak# v -> Weak v
Weak Weak# (TQueue a)
w #)
#else
  case mkWeak# write# q f s of (# s', w #) -> (# s', Weak w #)
#endif