-- | Thread safe queues for uni directional message passing
-- between threads.
--
-- This message box has an upper limit, that means that
-- sometimes delivery either fails or is blocked until
-- the receiving thread has consumed more messages.
--
-- Use this module if the producer(s) outperform the consumer,
-- but you want the extra safety that the queue blocks the
-- 'Input' after a certain message limit is reached.
--
-- If you are sure that the producers fire at a slower rate
-- then the rate at which the consumer consumes messages, use this
-- module.
module UnliftIO.MessageBox.Limited
  ( MessageLimit (..),
    messageLimitToInt,
    BlockingBoxLimit (..),
    BlockingBox (),
    BlockingInput (),
    NonBlockingBoxLimit (..),
    NonBlockingBox (),
    NonBlockingInput (..),
    WaitingBoxLimit (..),
    WaitingBox (..),
    WaitingInput (..),
  )
where

import qualified Control.Concurrent.Chan.Unagi.Bounded as Unagi
import Control.Monad (unless)
import Data.Functor (($>))
import Data.Maybe (fromMaybe)
import UnliftIO.MessageBox.Util.Future (Future (..))
import qualified UnliftIO.MessageBox.Class as Class
import UnliftIO
  ( MonadIO (liftIO),
    MonadUnliftIO,
    timeout,
  )
import UnliftIO.Concurrent (threadDelay)

-- | Message Limit
--
-- The message limit must be a reasonable small positive integer
-- that is also a power of two. This stems from the fact that
-- Unagi is used under the hood.
--
-- The limit is a lower bound.
data MessageLimit
  = MessageLimit_1
  | MessageLimit_2
  | MessageLimit_4
  | MessageLimit_8
  | MessageLimit_16
  | MessageLimit_32
  | MessageLimit_64
  | MessageLimit_128
  | MessageLimit_256
  | MessageLimit_512
  | MessageLimit_1024
  | MessageLimit_2048
  | MessageLimit_4096
  deriving stock
    (MessageLimit -> MessageLimit -> Bool
(MessageLimit -> MessageLimit -> Bool)
-> (MessageLimit -> MessageLimit -> Bool) -> Eq MessageLimit
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: MessageLimit -> MessageLimit -> Bool
$c/= :: MessageLimit -> MessageLimit -> Bool
== :: MessageLimit -> MessageLimit -> Bool
$c== :: MessageLimit -> MessageLimit -> Bool
Eq, Eq MessageLimit
Eq MessageLimit
-> (MessageLimit -> MessageLimit -> Ordering)
-> (MessageLimit -> MessageLimit -> Bool)
-> (MessageLimit -> MessageLimit -> Bool)
-> (MessageLimit -> MessageLimit -> Bool)
-> (MessageLimit -> MessageLimit -> Bool)
-> (MessageLimit -> MessageLimit -> MessageLimit)
-> (MessageLimit -> MessageLimit -> MessageLimit)
-> Ord MessageLimit
MessageLimit -> MessageLimit -> Bool
MessageLimit -> MessageLimit -> Ordering
MessageLimit -> MessageLimit -> MessageLimit
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: MessageLimit -> MessageLimit -> MessageLimit
$cmin :: MessageLimit -> MessageLimit -> MessageLimit
max :: MessageLimit -> MessageLimit -> MessageLimit
$cmax :: MessageLimit -> MessageLimit -> MessageLimit
>= :: MessageLimit -> MessageLimit -> Bool
$c>= :: MessageLimit -> MessageLimit -> Bool
> :: MessageLimit -> MessageLimit -> Bool
$c> :: MessageLimit -> MessageLimit -> Bool
<= :: MessageLimit -> MessageLimit -> Bool
$c<= :: MessageLimit -> MessageLimit -> Bool
< :: MessageLimit -> MessageLimit -> Bool
$c< :: MessageLimit -> MessageLimit -> Bool
compare :: MessageLimit -> MessageLimit -> Ordering
$ccompare :: MessageLimit -> MessageLimit -> Ordering
$cp1Ord :: Eq MessageLimit
Ord, Int -> MessageLimit -> ShowS
[MessageLimit] -> ShowS
MessageLimit -> String
(Int -> MessageLimit -> ShowS)
-> (MessageLimit -> String)
-> ([MessageLimit] -> ShowS)
-> Show MessageLimit
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [MessageLimit] -> ShowS
$cshowList :: [MessageLimit] -> ShowS
show :: MessageLimit -> String
$cshow :: MessageLimit -> String
showsPrec :: Int -> MessageLimit -> ShowS
$cshowsPrec :: Int -> MessageLimit -> ShowS
Show, MessageLimit
MessageLimit -> MessageLimit -> Bounded MessageLimit
forall a. a -> a -> Bounded a
maxBound :: MessageLimit
$cmaxBound :: MessageLimit
minBound :: MessageLimit
$cminBound :: MessageLimit
Bounded, Int -> MessageLimit
MessageLimit -> Int
MessageLimit -> [MessageLimit]
MessageLimit -> MessageLimit
MessageLimit -> MessageLimit -> [MessageLimit]
MessageLimit -> MessageLimit -> MessageLimit -> [MessageLimit]
(MessageLimit -> MessageLimit)
-> (MessageLimit -> MessageLimit)
-> (Int -> MessageLimit)
-> (MessageLimit -> Int)
-> (MessageLimit -> [MessageLimit])
-> (MessageLimit -> MessageLimit -> [MessageLimit])
-> (MessageLimit -> MessageLimit -> [MessageLimit])
-> (MessageLimit -> MessageLimit -> MessageLimit -> [MessageLimit])
-> Enum MessageLimit
forall a.
(a -> a)
-> (a -> a)
-> (Int -> a)
-> (a -> Int)
-> (a -> [a])
-> (a -> a -> [a])
-> (a -> a -> [a])
-> (a -> a -> a -> [a])
-> Enum a
enumFromThenTo :: MessageLimit -> MessageLimit -> MessageLimit -> [MessageLimit]
$cenumFromThenTo :: MessageLimit -> MessageLimit -> MessageLimit -> [MessageLimit]
enumFromTo :: MessageLimit -> MessageLimit -> [MessageLimit]
$cenumFromTo :: MessageLimit -> MessageLimit -> [MessageLimit]
enumFromThen :: MessageLimit -> MessageLimit -> [MessageLimit]
$cenumFromThen :: MessageLimit -> MessageLimit -> [MessageLimit]
enumFrom :: MessageLimit -> [MessageLimit]
$cenumFrom :: MessageLimit -> [MessageLimit]
fromEnum :: MessageLimit -> Int
$cfromEnum :: MessageLimit -> Int
toEnum :: Int -> MessageLimit
$ctoEnum :: Int -> MessageLimit
pred :: MessageLimit -> MessageLimit
$cpred :: MessageLimit -> MessageLimit
succ :: MessageLimit -> MessageLimit
$csucc :: MessageLimit -> MessageLimit
Enum)

-- | Convert a 'MessageLimit' to the
-- 'Int' representation.
{-# INLINE messageLimitToInt #-}
messageLimitToInt :: MessageLimit -> Int
messageLimitToInt :: MessageLimit -> Int
messageLimitToInt =
  \case
    MessageLimit
MessageLimit_1 -> Int
1
    MessageLimit
MessageLimit_2 -> Int
2
    MessageLimit
MessageLimit_4 -> Int
4
    MessageLimit
MessageLimit_8 -> Int
8
    MessageLimit
MessageLimit_16 -> Int
16
    MessageLimit
MessageLimit_32 -> Int
32
    MessageLimit
MessageLimit_64 -> Int
64
    MessageLimit
MessageLimit_128 -> Int
128
    MessageLimit
MessageLimit_256 -> Int
256
    MessageLimit
MessageLimit_512 -> Int
512
    MessageLimit
MessageLimit_1024 -> Int
1024
    MessageLimit
MessageLimit_2048 -> Int
2048
    MessageLimit
MessageLimit_4096 -> Int
4096

-- * 'Class.IsMessageBoxArg' instances

-- ** Blocking

-- | Contains the (vague) limit of messages that a 'BlockingBox'
-- can buffer, i.e. that 'deliver' can put into a 'BlockingInput'
-- of a 'BlockingBox'.
newtype BlockingBoxLimit = BlockingBoxLimit MessageLimit
  deriving stock (BlockingBoxLimit -> BlockingBoxLimit -> Bool
(BlockingBoxLimit -> BlockingBoxLimit -> Bool)
-> (BlockingBoxLimit -> BlockingBoxLimit -> Bool)
-> Eq BlockingBoxLimit
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: BlockingBoxLimit -> BlockingBoxLimit -> Bool
$c/= :: BlockingBoxLimit -> BlockingBoxLimit -> Bool
== :: BlockingBoxLimit -> BlockingBoxLimit -> Bool
$c== :: BlockingBoxLimit -> BlockingBoxLimit -> Bool
Eq, Eq BlockingBoxLimit
Eq BlockingBoxLimit
-> (BlockingBoxLimit -> BlockingBoxLimit -> Ordering)
-> (BlockingBoxLimit -> BlockingBoxLimit -> Bool)
-> (BlockingBoxLimit -> BlockingBoxLimit -> Bool)
-> (BlockingBoxLimit -> BlockingBoxLimit -> Bool)
-> (BlockingBoxLimit -> BlockingBoxLimit -> Bool)
-> (BlockingBoxLimit -> BlockingBoxLimit -> BlockingBoxLimit)
-> (BlockingBoxLimit -> BlockingBoxLimit -> BlockingBoxLimit)
-> Ord BlockingBoxLimit
BlockingBoxLimit -> BlockingBoxLimit -> Bool
BlockingBoxLimit -> BlockingBoxLimit -> Ordering
BlockingBoxLimit -> BlockingBoxLimit -> BlockingBoxLimit
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: BlockingBoxLimit -> BlockingBoxLimit -> BlockingBoxLimit
$cmin :: BlockingBoxLimit -> BlockingBoxLimit -> BlockingBoxLimit
max :: BlockingBoxLimit -> BlockingBoxLimit -> BlockingBoxLimit
$cmax :: BlockingBoxLimit -> BlockingBoxLimit -> BlockingBoxLimit
>= :: BlockingBoxLimit -> BlockingBoxLimit -> Bool
$c>= :: BlockingBoxLimit -> BlockingBoxLimit -> Bool
> :: BlockingBoxLimit -> BlockingBoxLimit -> Bool
$c> :: BlockingBoxLimit -> BlockingBoxLimit -> Bool
<= :: BlockingBoxLimit -> BlockingBoxLimit -> Bool
$c<= :: BlockingBoxLimit -> BlockingBoxLimit -> Bool
< :: BlockingBoxLimit -> BlockingBoxLimit -> Bool
$c< :: BlockingBoxLimit -> BlockingBoxLimit -> Bool
compare :: BlockingBoxLimit -> BlockingBoxLimit -> Ordering
$ccompare :: BlockingBoxLimit -> BlockingBoxLimit -> Ordering
$cp1Ord :: Eq BlockingBoxLimit
Ord)

instance Show BlockingBoxLimit where
  showsPrec :: Int -> BlockingBoxLimit -> ShowS
showsPrec Int
_ (BlockingBoxLimit !MessageLimit
l) =
    String -> ShowS
showString String
"Blocking" ShowS -> ShowS -> ShowS
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> Int -> ShowS
forall a. Show a => Int -> a -> ShowS
showsPrec Int
9 (MessageLimit -> Int
messageLimitToInt MessageLimit
l)

-- | A message queue out of which messages can by 'receive'd.
--
-- This is the counter part of 'Input'. Can be used for reading
-- messages.
--
-- Messages can be received by 'receive' or 'tryReceive'.
data BlockingBox a
  = MkBlockingBox
      !(Unagi.InChan a)
      !(Unagi.OutChan a)

-- | A message queue into which messages can be enqued by,
--   e.g. 'tryToDeliver'.
--   Messages can be received from an 'BlockingBox`.
--
--   The 'Input' is the counter part of a 'BlockingBox'.
newtype BlockingInput a = MkBlockingInput (Unagi.InChan a)

instance Class.IsMessageBoxArg BlockingBoxLimit where
  type MessageBox BlockingBoxLimit = BlockingBox
  {-# INLINE newMessageBox #-}
  newMessageBox :: BlockingBoxLimit -> m (MessageBox BlockingBoxLimit a)
newMessageBox (BlockingBoxLimit !MessageLimit
limit) = MessageLimit -> m (BlockingBox a)
forall (m :: * -> *) a.
MonadUnliftIO m =>
MessageLimit -> m (BlockingBox a)
create MessageLimit
limit
  getConfiguredMessageLimit :: BlockingBoxLimit -> Maybe Int
getConfiguredMessageLimit (BlockingBoxLimit !MessageLimit
limit) =
    Int -> Maybe Int
forall a. a -> Maybe a
Just (MessageLimit -> Int
messageLimitToInt MessageLimit
limit)

-- | A blocking instance that invokes 'receive'.
instance Class.IsMessageBox BlockingBox where
  type Input BlockingBox = BlockingInput

  {-# INLINE receive #-}
  receive :: BlockingBox a -> m (Maybe a)
receive !BlockingBox a
i = a -> Maybe a
forall a. a -> Maybe a
Just (a -> Maybe a) -> m a -> m (Maybe a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> BlockingBox a -> m a
forall (m :: * -> *) a. MonadUnliftIO m => BlockingBox a -> m a
receive BlockingBox a
i
  {-# INLINE tryReceive #-}
  tryReceive :: BlockingBox a -> m (Future a)
tryReceive !BlockingBox a
i = BlockingBox a -> m (Future a)
forall (m :: * -> *) a.
MonadUnliftIO m =>
BlockingBox a -> m (Future a)
tryReceive BlockingBox a
i
  {-# INLINE newInput #-}
  newInput :: BlockingBox a -> m (Input BlockingBox a)
newInput !BlockingBox a
i = BlockingBox a -> m (BlockingInput a)
forall (m :: * -> *) a.
MonadUnliftIO m =>
BlockingBox a -> m (BlockingInput a)
newInput BlockingBox a
i
  receiveAfter :: BlockingBox a -> Int -> m (Maybe a)
receiveAfter (MkBlockingBox InChan a
_ !OutChan a
s) !Int
rto =
    do
      (!Element a
promise, !IO a
blocker) <- IO (Element a, IO a) -> m (Element a, IO a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (OutChan a -> IO (Element a, IO a)
forall a. OutChan a -> IO (Element a, IO a)
Unagi.tryReadChan OutChan a
s)
      IO (Maybe a) -> m (Maybe a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Element a -> IO (Maybe a)
forall a. Element a -> IO (Maybe a)
Unagi.tryRead Element a
promise)
        m (Maybe a) -> (Maybe a -> m (Maybe a)) -> m (Maybe a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= m (Maybe a) -> (a -> m (Maybe a)) -> Maybe a -> m (Maybe a)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe
          (Int -> m a -> m (Maybe a)
forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> m a -> m (Maybe a)
timeout Int
rto (IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO a
blocker))
          (Maybe a -> m (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe a -> m (Maybe a)) -> (a -> Maybe a) -> a -> m (Maybe a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Maybe a
forall a. a -> Maybe a
Just)

-- | A blocking instance that invokes 'deliver'.
instance Class.IsInput BlockingInput where
  {-# INLINE deliver #-}
  deliver :: BlockingInput a -> a -> m Bool
deliver !BlockingInput a
o !a
a = BlockingInput a -> a -> m ()
forall (m :: * -> *) a.
MonadUnliftIO m =>
BlockingInput a -> a -> m ()
deliver BlockingInput a
o a
a m () -> Bool -> m Bool
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> Bool
True

--  ** A wrapper around 'BlockingBox' for Non-Blocking Input (NBI)

-- | A 'BlockingBoxLimit' wrapper for non-blocking 'Class.IsMessageBoxArg' instances.
newtype NonBlockingBoxLimit = NonBlockingBoxLimit MessageLimit
  deriving stock (NonBlockingBoxLimit -> NonBlockingBoxLimit -> Bool
(NonBlockingBoxLimit -> NonBlockingBoxLimit -> Bool)
-> (NonBlockingBoxLimit -> NonBlockingBoxLimit -> Bool)
-> Eq NonBlockingBoxLimit
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: NonBlockingBoxLimit -> NonBlockingBoxLimit -> Bool
$c/= :: NonBlockingBoxLimit -> NonBlockingBoxLimit -> Bool
== :: NonBlockingBoxLimit -> NonBlockingBoxLimit -> Bool
$c== :: NonBlockingBoxLimit -> NonBlockingBoxLimit -> Bool
Eq, Eq NonBlockingBoxLimit
Eq NonBlockingBoxLimit
-> (NonBlockingBoxLimit -> NonBlockingBoxLimit -> Ordering)
-> (NonBlockingBoxLimit -> NonBlockingBoxLimit -> Bool)
-> (NonBlockingBoxLimit -> NonBlockingBoxLimit -> Bool)
-> (NonBlockingBoxLimit -> NonBlockingBoxLimit -> Bool)
-> (NonBlockingBoxLimit -> NonBlockingBoxLimit -> Bool)
-> (NonBlockingBoxLimit
    -> NonBlockingBoxLimit -> NonBlockingBoxLimit)
-> (NonBlockingBoxLimit
    -> NonBlockingBoxLimit -> NonBlockingBoxLimit)
-> Ord NonBlockingBoxLimit
NonBlockingBoxLimit -> NonBlockingBoxLimit -> Bool
NonBlockingBoxLimit -> NonBlockingBoxLimit -> Ordering
NonBlockingBoxLimit -> NonBlockingBoxLimit -> NonBlockingBoxLimit
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: NonBlockingBoxLimit -> NonBlockingBoxLimit -> NonBlockingBoxLimit
$cmin :: NonBlockingBoxLimit -> NonBlockingBoxLimit -> NonBlockingBoxLimit
max :: NonBlockingBoxLimit -> NonBlockingBoxLimit -> NonBlockingBoxLimit
$cmax :: NonBlockingBoxLimit -> NonBlockingBoxLimit -> NonBlockingBoxLimit
>= :: NonBlockingBoxLimit -> NonBlockingBoxLimit -> Bool
$c>= :: NonBlockingBoxLimit -> NonBlockingBoxLimit -> Bool
> :: NonBlockingBoxLimit -> NonBlockingBoxLimit -> Bool
$c> :: NonBlockingBoxLimit -> NonBlockingBoxLimit -> Bool
<= :: NonBlockingBoxLimit -> NonBlockingBoxLimit -> Bool
$c<= :: NonBlockingBoxLimit -> NonBlockingBoxLimit -> Bool
< :: NonBlockingBoxLimit -> NonBlockingBoxLimit -> Bool
$c< :: NonBlockingBoxLimit -> NonBlockingBoxLimit -> Bool
compare :: NonBlockingBoxLimit -> NonBlockingBoxLimit -> Ordering
$ccompare :: NonBlockingBoxLimit -> NonBlockingBoxLimit -> Ordering
$cp1Ord :: Eq NonBlockingBoxLimit
Ord)

instance Show NonBlockingBoxLimit where
  showsPrec :: Int -> NonBlockingBoxLimit -> ShowS
showsPrec Int
_ (NonBlockingBoxLimit !MessageLimit
l) =
    String -> ShowS
showString String
"NonBlocking" ShowS -> ShowS -> ShowS
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> Int -> ShowS
forall a. Show a => Int -> a -> ShowS
showsPrec Int
9 (MessageLimit -> Int
messageLimitToInt MessageLimit
l)

instance Class.IsMessageBoxArg NonBlockingBoxLimit where
  type MessageBox NonBlockingBoxLimit = NonBlockingBox
  {-# INLINE newMessageBox #-}
  newMessageBox :: NonBlockingBoxLimit -> m (MessageBox NonBlockingBoxLimit a)
newMessageBox (NonBlockingBoxLimit !MessageLimit
l) =
    BlockingBox a -> NonBlockingBox a
forall a. BlockingBox a -> NonBlockingBox a
NonBlockingBox (BlockingBox a -> NonBlockingBox a)
-> m (BlockingBox a) -> m (NonBlockingBox a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> BlockingBoxLimit -> m (MessageBox BlockingBoxLimit a)
forall argument (m :: * -> *) a.
(IsMessageBoxArg argument, MonadUnliftIO m) =>
argument -> m (MessageBox argument a)
Class.newMessageBox (MessageLimit -> BlockingBoxLimit
BlockingBoxLimit MessageLimit
l)
  getConfiguredMessageLimit :: NonBlockingBoxLimit -> Maybe Int
getConfiguredMessageLimit (NonBlockingBoxLimit !MessageLimit
limit) =
    Int -> Maybe Int
forall a. a -> Maybe a
Just (MessageLimit -> Int
messageLimitToInt MessageLimit
limit)

-- | A 'BlockingBox' wrapper for non-blocking 'Class.IsMessageBox' instances.
--
-- The difference to the 'BlockingBox' instance is that 'Class.deliver'
-- immediately returns if the message box limit is surpassed.
newtype NonBlockingBox a = NonBlockingBox (BlockingBox a)

instance Class.IsMessageBox NonBlockingBox where
  type Input NonBlockingBox = NonBlockingInput
  {-# INLINE receive #-}
  receive :: NonBlockingBox a -> m (Maybe a)
receive (NonBlockingBox !BlockingBox a
i) = a -> Maybe a
forall a. a -> Maybe a
Just (a -> Maybe a) -> m a -> m (Maybe a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> BlockingBox a -> m a
forall (m :: * -> *) a. MonadUnliftIO m => BlockingBox a -> m a
receive BlockingBox a
i
  {-# INLINE tryReceive #-}
  tryReceive :: NonBlockingBox a -> m (Future a)
tryReceive (NonBlockingBox !BlockingBox a
i) = BlockingBox a -> m (Future a)
forall (m :: * -> *) a.
MonadUnliftIO m =>
BlockingBox a -> m (Future a)
tryReceive BlockingBox a
i
  {-# INLINE receiveAfter #-}
  receiveAfter :: NonBlockingBox a -> Int -> m (Maybe a)
receiveAfter (NonBlockingBox !BlockingBox a
b) !Int
rto =
    BlockingBox a -> Int -> m (Maybe a)
forall (box :: * -> *) (m :: * -> *) a.
(IsMessageBox box, MonadUnliftIO m) =>
box a -> Int -> m (Maybe a)
Class.receiveAfter BlockingBox a
b Int
rto
  {-# INLINE newInput #-}
  newInput :: NonBlockingBox a -> m (Input NonBlockingBox a)
newInput (NonBlockingBox !BlockingBox a
i) = BlockingInput a -> NonBlockingInput a
forall a. BlockingInput a -> NonBlockingInput a
NonBlockingInput (BlockingInput a -> NonBlockingInput a)
-> m (BlockingInput a) -> m (NonBlockingInput a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> BlockingBox a -> m (BlockingInput a)
forall (m :: * -> *) a.
MonadUnliftIO m =>
BlockingBox a -> m (BlockingInput a)
newInput BlockingBox a
i

-- | A wrapper around 'BlockingInput' with a non-blocking 'Class.IsInput' instance.
--
-- 'deliver' will enqueue the message or return 'False' immediately,
-- if the message box already contains more messages than
-- it's limit allows.
newtype NonBlockingInput a = NonBlockingInput (BlockingInput a)

instance Class.IsInput NonBlockingInput where
  {-# INLINE deliver #-}
  deliver :: NonBlockingInput a -> a -> m Bool
deliver (NonBlockingInput !BlockingInput a
o) !a
a = do
    !Bool
res <- BlockingInput a -> a -> m Bool
forall (m :: * -> *) a.
MonadUnliftIO m =>
BlockingInput a -> a -> m Bool
tryToDeliver BlockingInput a
o a
a
    Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
res (Int -> m ()
forall (m :: * -> *). MonadIO m => Int -> m ()
threadDelay Int
10)
    Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
res

--  ** 'BlockingBox' Wrapper with Timeout

-- | A 'Class.IsMessageBoxArg' instance wrapping the 'BlockingBox'
--  with independently configurable timeouts for 'receive' and 'deliver'.
data WaitingBoxLimit
  = WaitingBoxLimit
      !(Maybe Int)
      !Int
      !MessageLimit
  deriving stock (WaitingBoxLimit -> WaitingBoxLimit -> Bool
(WaitingBoxLimit -> WaitingBoxLimit -> Bool)
-> (WaitingBoxLimit -> WaitingBoxLimit -> Bool)
-> Eq WaitingBoxLimit
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: WaitingBoxLimit -> WaitingBoxLimit -> Bool
$c/= :: WaitingBoxLimit -> WaitingBoxLimit -> Bool
== :: WaitingBoxLimit -> WaitingBoxLimit -> Bool
$c== :: WaitingBoxLimit -> WaitingBoxLimit -> Bool
Eq, Eq WaitingBoxLimit
Eq WaitingBoxLimit
-> (WaitingBoxLimit -> WaitingBoxLimit -> Ordering)
-> (WaitingBoxLimit -> WaitingBoxLimit -> Bool)
-> (WaitingBoxLimit -> WaitingBoxLimit -> Bool)
-> (WaitingBoxLimit -> WaitingBoxLimit -> Bool)
-> (WaitingBoxLimit -> WaitingBoxLimit -> Bool)
-> (WaitingBoxLimit -> WaitingBoxLimit -> WaitingBoxLimit)
-> (WaitingBoxLimit -> WaitingBoxLimit -> WaitingBoxLimit)
-> Ord WaitingBoxLimit
WaitingBoxLimit -> WaitingBoxLimit -> Bool
WaitingBoxLimit -> WaitingBoxLimit -> Ordering
WaitingBoxLimit -> WaitingBoxLimit -> WaitingBoxLimit
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: WaitingBoxLimit -> WaitingBoxLimit -> WaitingBoxLimit
$cmin :: WaitingBoxLimit -> WaitingBoxLimit -> WaitingBoxLimit
max :: WaitingBoxLimit -> WaitingBoxLimit -> WaitingBoxLimit
$cmax :: WaitingBoxLimit -> WaitingBoxLimit -> WaitingBoxLimit
>= :: WaitingBoxLimit -> WaitingBoxLimit -> Bool
$c>= :: WaitingBoxLimit -> WaitingBoxLimit -> Bool
> :: WaitingBoxLimit -> WaitingBoxLimit -> Bool
$c> :: WaitingBoxLimit -> WaitingBoxLimit -> Bool
<= :: WaitingBoxLimit -> WaitingBoxLimit -> Bool
$c<= :: WaitingBoxLimit -> WaitingBoxLimit -> Bool
< :: WaitingBoxLimit -> WaitingBoxLimit -> Bool
$c< :: WaitingBoxLimit -> WaitingBoxLimit -> Bool
compare :: WaitingBoxLimit -> WaitingBoxLimit -> Ordering
$ccompare :: WaitingBoxLimit -> WaitingBoxLimit -> Ordering
$cp1Ord :: Eq WaitingBoxLimit
Ord)

instance Show WaitingBoxLimit where
  showsPrec :: Int -> WaitingBoxLimit -> ShowS
showsPrec Int
_ (WaitingBoxLimit !Maybe Int
t0 !Int
t1 !MessageLimit
l) =
    String -> ShowS
showString String
"Waiting_"
      ShowS -> ShowS -> ShowS
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ( case Maybe Int
t0 of
            Maybe Int
Nothing -> ShowS
forall a. a -> a
id
            Just !Int
t -> Int -> Int -> ShowS
forall a. Show a => Int -> a -> ShowS
showsPrec Int
9 Int
t ShowS -> ShowS -> ShowS
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Char -> ShowS
showChar Char
'_'
        )
      ShowS -> ShowS -> ShowS
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> Int -> ShowS
forall a. Show a => Int -> a -> ShowS
showsPrec Int
9 Int
t1
      ShowS -> ShowS -> ShowS
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Char -> ShowS
showChar Char
'_'
      ShowS -> ShowS -> ShowS
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> Int -> ShowS
forall a. Show a => Int -> a -> ShowS
showsPrec Int
9 (MessageLimit -> Int
messageLimitToInt MessageLimit
l)

instance Class.IsMessageBoxArg WaitingBoxLimit where
  type MessageBox WaitingBoxLimit = WaitingBox
  {-# INLINE newMessageBox #-}
  newMessageBox :: WaitingBoxLimit -> m (MessageBox WaitingBoxLimit a)
newMessageBox l :: WaitingBoxLimit
l@(WaitingBoxLimit Maybe Int
_ Int
_ !MessageLimit
c) =
    WaitingBoxLimit -> BlockingBox a -> WaitingBox a
forall a. WaitingBoxLimit -> BlockingBox a -> WaitingBox a
WaitingBox WaitingBoxLimit
l (BlockingBox a -> WaitingBox a)
-> m (BlockingBox a) -> m (WaitingBox a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> BlockingBoxLimit -> m (MessageBox BlockingBoxLimit a)
forall argument (m :: * -> *) a.
(IsMessageBoxArg argument, MonadUnliftIO m) =>
argument -> m (MessageBox argument a)
Class.newMessageBox (MessageLimit -> BlockingBoxLimit
BlockingBoxLimit MessageLimit
c)
  getConfiguredMessageLimit :: WaitingBoxLimit -> Maybe Int
getConfiguredMessageLimit (WaitingBoxLimit Maybe Int
_ Int
_ !MessageLimit
limit) =
    Int -> Maybe Int
forall a. a -> Maybe a
Just (MessageLimit -> Int
messageLimitToInt MessageLimit
limit)

-- | A 'BlockingBox' an a 'WaitingBoxLimit' for
-- the 'Class.IsMessageBox' instance.
data WaitingBox a
  = WaitingBox WaitingBoxLimit (BlockingBox a)

instance Class.IsMessageBox WaitingBox where
  type Input WaitingBox = WaitingInput
  {-# INLINE receive #-}
  receive :: WaitingBox a -> m (Maybe a)
receive (WaitingBox (WaitingBoxLimit (Just !Int
rto) Int
_ MessageLimit
_) (MkBlockingBox InChan a
_ !OutChan a
s)) =
    IO (Maybe a) -> m (Maybe a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe a) -> m (Maybe a)) -> IO (Maybe a) -> m (Maybe a)
forall a b. (a -> b) -> a -> b
$ do
      (!Element a
promise, !IO a
blocker) <- OutChan a -> IO (Element a, IO a)
forall a. OutChan a -> IO (Element a, IO a)
Unagi.tryReadChan OutChan a
s
      Element a -> IO (Maybe a)
forall a. Element a -> IO (Maybe a)
Unagi.tryRead Element a
promise
        IO (Maybe a) -> (Maybe a -> IO (Maybe a)) -> IO (Maybe a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO (Maybe a) -> (a -> IO (Maybe a)) -> Maybe a -> IO (Maybe a)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe
          (Int -> IO a -> IO (Maybe a)
forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> m a -> m (Maybe a)
timeout Int
rto IO a
blocker)
          (Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe a -> IO (Maybe a)) -> (a -> Maybe a) -> a -> IO (Maybe a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Maybe a
forall a. a -> Maybe a
Just)
  receive (WaitingBox !WaitingBoxLimit
_ !BlockingBox a
m) =
    BlockingBox a -> m (Maybe a)
forall (box :: * -> *) (m :: * -> *) a.
(IsMessageBox box, MonadUnliftIO m) =>
box a -> m (Maybe a)
Class.receive BlockingBox a
m
  {-# INLINE receiveAfter #-}
  receiveAfter :: WaitingBox a -> Int -> m (Maybe a)
receiveAfter (WaitingBox WaitingBoxLimit
_ !BlockingBox a
b) !Int
rto =
    BlockingBox a -> Int -> m (Maybe a)
forall (box :: * -> *) (m :: * -> *) a.
(IsMessageBox box, MonadUnliftIO m) =>
box a -> Int -> m (Maybe a)
Class.receiveAfter BlockingBox a
b Int
rto
  {-# INLINE tryReceive #-}
  tryReceive :: WaitingBox a -> m (Future a)
tryReceive (WaitingBox WaitingBoxLimit
_ !BlockingBox a
m) = BlockingBox a -> m (Future a)
forall (m :: * -> *) a.
MonadUnliftIO m =>
BlockingBox a -> m (Future a)
tryReceive BlockingBox a
m
  {-# INLINE newInput #-}
  newInput :: WaitingBox a -> m (Input WaitingBox a)
newInput (WaitingBox (WaitingBoxLimit Maybe Int
_ !Int
dto MessageLimit
_) !BlockingBox a
m) =
    Int -> BlockingInput a -> WaitingInput a
forall a. Int -> BlockingInput a -> WaitingInput a
WaitingInput Int
dto (BlockingInput a -> WaitingInput a)
-> m (BlockingInput a) -> m (WaitingInput a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> BlockingBox a -> m (BlockingInput a)
forall (m :: * -> *) a.
MonadUnliftIO m =>
BlockingBox a -> m (BlockingInput a)
newInput BlockingBox a
m

-- | An input for a 'BlockingBox' that will block
-- for not much more than the given timeout when
-- the message box is full.
data WaitingInput a
  = WaitingInput
      !Int
      !(BlockingInput a)

instance Class.IsInput WaitingInput where
  {-# INLINE deliver #-}
  deliver :: WaitingInput a -> a -> m Bool
deliver (WaitingInput !Int
t !BlockingInput a
o) !a
a = Int -> BlockingInput a -> a -> m Bool
forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> BlockingInput a -> a -> m Bool
tryToDeliverAndWait Int
t BlockingInput a
o a
a

-- Internal Functions

{-# INLINE create #-}
create :: MonadUnliftIO m => MessageLimit -> m (BlockingBox a)
create :: MessageLimit -> m (BlockingBox a)
create !MessageLimit
limit = do
  (!InChan a
inChan, !OutChan a
outChan) <- IO (InChan a, OutChan a) -> m (InChan a, OutChan a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Int -> IO (InChan a, OutChan a)
forall a. Int -> IO (InChan a, OutChan a)
Unagi.newChan (MessageLimit -> Int
messageLimitToInt MessageLimit
limit))
  BlockingBox a -> m (BlockingBox a)
forall (m :: * -> *) a. Monad m => a -> m a
return (BlockingBox a -> m (BlockingBox a))
-> BlockingBox a -> m (BlockingBox a)
forall a b. (a -> b) -> a -> b
$! InChan a -> OutChan a -> BlockingBox a
forall a. InChan a -> OutChan a -> BlockingBox a
MkBlockingBox InChan a
inChan OutChan a
outChan

{-# INLINE receive #-}
receive :: MonadUnliftIO m => BlockingBox a -> m a
receive :: BlockingBox a -> m a
receive (MkBlockingBox InChan a
_ !OutChan a
s) =
  IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (OutChan a -> IO a
forall a. OutChan a -> IO a
Unagi.readChan OutChan a
s)

-- | Return a 'Future' for the next value that will be received.
{-# INLINE tryReceive #-}
tryReceive :: MonadUnliftIO m => BlockingBox a -> m (Future a)
tryReceive :: BlockingBox a -> m (Future a)
tryReceive (MkBlockingBox InChan a
_ !OutChan a
s) = IO (Future a) -> m (Future a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Future a) -> m (Future a)) -> IO (Future a) -> m (Future a)
forall a b. (a -> b) -> a -> b
$ do
  (!Element a
promise, IO a
_) <- OutChan a -> IO (Element a, IO a)
forall a. OutChan a -> IO (Element a, IO a)
Unagi.tryReadChan OutChan a
s
  Future a -> IO (Future a)
forall (m :: * -> *) a. Monad m => a -> m a
return (IO (Maybe a) -> Future a
forall a. IO (Maybe a) -> Future a
Future (Element a -> IO (Maybe a)
forall a. Element a -> IO (Maybe a)
Unagi.tryRead Element a
promise))

{-# INLINE newInput #-}
newInput :: MonadUnliftIO m => BlockingBox a -> m (BlockingInput a)
newInput :: BlockingBox a -> m (BlockingInput a)
newInput (MkBlockingBox !InChan a
s OutChan a
_) = BlockingInput a -> m (BlockingInput a)
forall (m :: * -> *) a. Monad m => a -> m a
return (BlockingInput a -> m (BlockingInput a))
-> BlockingInput a -> m (BlockingInput a)
forall a b. (a -> b) -> a -> b
$! InChan a -> BlockingInput a
forall a. InChan a -> BlockingInput a
MkBlockingInput InChan a
s

{-# INLINE deliver #-}
deliver :: MonadUnliftIO m => BlockingInput a -> a -> m ()
deliver :: BlockingInput a -> a -> m ()
deliver (MkBlockingInput !InChan a
s) !a
a =
  IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ InChan a -> a -> IO ()
forall a. InChan a -> a -> IO ()
Unagi.writeChan InChan a
s a
a

-- | Try to put a message into the 'BlockingInput'
-- of a 'MessageBox', such that the process
-- reading the 'MessageBox' receives the message.
--
-- If the 'MessageBox' is full return False.
{-# INLINE tryToDeliver #-}
tryToDeliver :: MonadUnliftIO m => BlockingInput a -> a -> m Bool
tryToDeliver :: BlockingInput a -> a -> m Bool
tryToDeliver (MkBlockingInput !InChan a
s) !a
a =
  IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ InChan a -> a -> IO Bool
forall a. InChan a -> a -> IO Bool
Unagi.tryWriteChan InChan a
s a
a

-- | Send a message by putting it into the 'BlockingInput'
-- of a 'MessageBox', such that the process
-- reading the 'MessageBox' receives the message.
--
-- Return False if the
-- 'MessageBox' has been closed or is full.
--
-- This assumes that the queue is likely empty, and
-- tries 'tryToDeliver' first before wasting any
-- precious cpu cycles entering 'timeout'.
tryToDeliverAndWait ::
  MonadUnliftIO m =>
  Int ->
  BlockingInput a ->
  a ->
  m Bool
tryToDeliverAndWait :: Int -> BlockingInput a -> a -> m Bool
tryToDeliverAndWait !Int
t !BlockingInput a
o !a
a =
  -- Benchmarks have shown great improvements
  -- when calling tryToDeliver once before doing
  -- deliver in a System.Timeout.timeout;
  --
  -- We even tried calling 'tryToDeliver' more than once,
  -- but that did not lead to convinving improvements.
  --
  -- Benachmarks have also shown, that sending pessimistically
  -- (i.e. avoiding `tryToDeliver`) does not improve performance,
  -- even when the message queue is congested
  --
  -- See benchmark results:
  -- `benchmark-results/optimistic-vs-pessimistic.html`
  BlockingInput a -> a -> m Bool
forall (m :: * -> *) a.
MonadUnliftIO m =>
BlockingInput a -> a -> m Bool
tryToDeliver BlockingInput a
o a
a m Bool -> (Bool -> m Bool) -> m Bool
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Bool
True -> Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
    Bool
False ->
      Bool -> Maybe Bool -> Bool
forall a. a -> Maybe a -> a
fromMaybe Bool
False (Maybe Bool -> Bool) -> m (Maybe Bool) -> m Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Int -> m Bool -> m (Maybe Bool)
forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> m a -> m (Maybe a)
timeout Int
t (BlockingInput a -> a -> m ()
forall (m :: * -> *) a.
MonadUnliftIO m =>
BlockingInput a -> a -> m ()
deliver BlockingInput a
o a
a m () -> Bool -> m Bool
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> Bool
True)