-- | Thread safe queues for message passing
-- between many concurrent processes.
--
-- This message box is __UNLIMITED__.
--
-- Good single producer/single consumer performance
--
-- If you are sure that the producer(s) send messages
-- at a lower rate than the rate at which the consumer
-- consumes messages, use this module.
--
-- Otherwise use the more conservative
-- "UnliftIO.MessageBox.Limited" module.
module UnliftIO.MessageBox.Unlimited
  ( BlockingUnlimited (..),
    UnlimitedBox (),
    UnlimitedBoxInput (),
  )
where

-- import qualified Control.Concurrent.Chan.Unagi.NoBlocking as Unagi
import qualified Control.Concurrent.Chan.Unagi as Unagi
import Data.Functor (($>))
import UnliftIO.MessageBox.Util.Future (Future (..))
import qualified UnliftIO.MessageBox.Class as Class
import UnliftIO
  ( MonadIO (liftIO),
    MonadUnliftIO,
  )

-- | A message queue out of which messages can
--   by 'receive'd.
--
-- This is the counter part of 'Input'. Can be
-- used for reading messages.
--
-- Messages can be received by 'receive' or 'tryReceive'.
data UnlimitedBox a
  = MkUnlimitedBox
      !(Unagi.InChan a)
      !(Unagi.OutChan a)

-- | A message queue into which messages can be enqued by,
--   e.g. 'deliver'.
--   Messages can be received from an 'UnlimitedBox`.
--
--   The 'UnlimitedBoxInput' is the counter part of a 'UnlimitedBox'.
newtype UnlimitedBoxInput a = MkUnlimitedBoxInput (Unagi.InChan a)

-- | The (empty) configuration for creating
-- 'UnlimitedBox'es using the 'Class.IsMessageBoxArg' methods.
data BlockingUnlimited = BlockingUnlimited

instance Show BlockingUnlimited where
  showsPrec :: Int -> BlockingUnlimited -> ShowS
showsPrec Int
_ BlockingUnlimited
_ = String -> ShowS
showString String
"Unlimited"

instance Class.IsMessageBoxArg BlockingUnlimited where
  type MessageBox BlockingUnlimited = UnlimitedBox
  {-# INLINE newMessageBox #-}
  newMessageBox :: BlockingUnlimited -> m (MessageBox BlockingUnlimited a)
newMessageBox BlockingUnlimited
BlockingUnlimited = m (MessageBox BlockingUnlimited a)
forall (m :: * -> *) a. MonadUnliftIO m => m (UnlimitedBox a)
create
  getConfiguredMessageLimit :: BlockingUnlimited -> Maybe Int
getConfiguredMessageLimit BlockingUnlimited
_ = Maybe Int
forall a. Maybe a
Nothing    

-- | A blocking instance that invokes 'receive'.
instance Class.IsMessageBox UnlimitedBox where
  type Input UnlimitedBox = UnlimitedBoxInput
  {-# INLINE receive #-}
  receive :: UnlimitedBox a -> m (Maybe a)
receive !UnlimitedBox a
i = a -> Maybe a
forall a. a -> Maybe a
Just (a -> Maybe a) -> m a -> m (Maybe a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> UnlimitedBox a -> m a
forall (m :: * -> *) a. MonadUnliftIO m => UnlimitedBox a -> m a
receive UnlimitedBox a
i
  {-# INLINE tryReceive #-}
  tryReceive :: UnlimitedBox a -> m (Future a)
tryReceive !UnlimitedBox a
i = UnlimitedBox a -> m (Future a)
forall (m :: * -> *) a.
MonadUnliftIO m =>
UnlimitedBox a -> m (Future a)
tryReceive UnlimitedBox a
i
  {-# INLINE newInput #-}
  newInput :: UnlimitedBox a -> m (Input UnlimitedBox a)
newInput !UnlimitedBox a
i = UnlimitedBox a -> m (UnlimitedBoxInput a)
forall (m :: * -> *) a.
MonadUnliftIO m =>
UnlimitedBox a -> m (UnlimitedBoxInput a)
newInput UnlimitedBox a
i

-- | A blocking instance that invokes 'deliver'.
instance Class.IsInput UnlimitedBoxInput where
  {-# INLINE deliver #-}
  deliver :: UnlimitedBoxInput a -> a -> m Bool
deliver !UnlimitedBoxInput a
o !a
m = UnlimitedBoxInput a -> a -> m ()
forall (m :: * -> *) a.
MonadUnliftIO m =>
UnlimitedBoxInput a -> a -> m ()
deliver UnlimitedBoxInput a
o a
m m () -> Bool -> m Bool
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> Bool
True


-- | Create a 'MessageBox'.
--
-- From a 'MessageBox' a corresponding 'Input' can
-- be made, that can be passed to some potential
-- communication partners.
{-# INLINE create #-}
create :: MonadUnliftIO m => m (UnlimitedBox a)
create :: m (UnlimitedBox a)
create = do
  (!InChan a
inChan, !OutChan a
outChan) <- IO (InChan a, OutChan a) -> m (InChan a, OutChan a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (InChan a, OutChan a)
forall a. IO (InChan a, OutChan a)
Unagi.newChan
  UnlimitedBox a -> m (UnlimitedBox a)
forall (m :: * -> *) a. Monad m => a -> m a
return (UnlimitedBox a -> m (UnlimitedBox a))
-> UnlimitedBox a -> m (UnlimitedBox a)
forall a b. (a -> b) -> a -> b
$! InChan a -> OutChan a -> UnlimitedBox a
forall a. InChan a -> OutChan a -> UnlimitedBox a
MkUnlimitedBox InChan a
inChan OutChan a
outChan

-- | Wait for and receive a message from a 'MessageBox'.
{-# INLINE receive #-}
receive :: MonadUnliftIO m => UnlimitedBox a -> m a
receive :: UnlimitedBox a -> m a
receive (MkUnlimitedBox InChan a
_ !OutChan a
s) =
  --liftIO (Unagi.readChan IO.yield s)
  IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (OutChan a -> IO a
forall a. OutChan a -> IO a
Unagi.readChan OutChan a
s)

-- | Try to receive a message from a 'MessageBox',
-- return @Nothing@ if the queue is empty.
{-# INLINE tryReceive #-}
tryReceive :: MonadUnliftIO m => UnlimitedBox a -> m (Future a)
tryReceive :: UnlimitedBox a -> m (Future a)
tryReceive (MkUnlimitedBox InChan a
_ !OutChan a
s) = IO (Future a) -> m (Future a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Future a) -> m (Future a)) -> IO (Future a) -> m (Future a)
forall a b. (a -> b) -> a -> b
$ do
  (!Element a
promise, IO a
_) <- OutChan a -> IO (Element a, IO a)
forall a. OutChan a -> IO (Element a, IO a)
Unagi.tryReadChan OutChan a
s
  Future a -> IO (Future a)
forall (m :: * -> *) a. Monad m => a -> m a
return (IO (Maybe a) -> Future a
forall a. IO (Maybe a) -> Future a
Future (Element a -> IO (Maybe a)
forall a. Element a -> IO (Maybe a)
Unagi.tryRead Element a
promise))

-- | Create an 'Input' to write the items
-- that the given 'MessageBox' receives.
{-# INLINE newInput #-}
newInput :: MonadUnliftIO m => UnlimitedBox a -> m (UnlimitedBoxInput a)
newInput :: UnlimitedBox a -> m (UnlimitedBoxInput a)
newInput (MkUnlimitedBox !InChan a
s OutChan a
_) = UnlimitedBoxInput a -> m (UnlimitedBoxInput a)
forall (m :: * -> *) a. Monad m => a -> m a
return (UnlimitedBoxInput a -> m (UnlimitedBoxInput a))
-> UnlimitedBoxInput a -> m (UnlimitedBoxInput a)
forall a b. (a -> b) -> a -> b
$! InChan a -> UnlimitedBoxInput a
forall a. InChan a -> UnlimitedBoxInput a
MkUnlimitedBoxInput InChan a
s

-- | Put a message into the 'Input'
-- of a 'MessageBox', such that the process
-- reading the 'MessageBox' receives the message.
{-# INLINE deliver #-}
deliver :: MonadUnliftIO m => UnlimitedBoxInput a -> a -> m ()
deliver :: UnlimitedBoxInput a -> a -> m ()
deliver (MkUnlimitedBoxInput !InChan a
s) !a
a =
  IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ InChan a -> a -> IO ()
forall a. InChan a -> a -> IO ()
Unagi.writeChan InChan a
s a
a