-- |Description: Queue interpreters for 'TBQueue'
module Polysemy.Conc.Queue.TB where

import Control.Concurrent.STM (
  TBQueue,
  isFullTBQueue,
  newTBQueueIO,
  peekTBQueue,
  readTBQueue,
  tryPeekTBQueue,
  tryReadTBQueue,
  writeTBQueue,
  )

import qualified Polysemy.Conc.Data.Queue as Queue
import Polysemy.Conc.Data.Queue (Queue)
import qualified Polysemy.Conc.Data.QueueResult as QueueResult
import Polysemy.Conc.Data.Race (Race)
import Polysemy.Conc.Queue.Result (naResult)
import Polysemy.Conc.Queue.Timeout (withTimeout)

-- |Interpret 'Queue' with a 'TBQueue'.
--
-- This variant expects an allocated queue as an argument.
interpretQueueTBWith ::
   d r .
  Members [Race, Embed IO] r =>
  TBQueue d ->
  InterpreterFor (Queue d) r
interpretQueueTBWith :: TBQueue d -> InterpreterFor (Queue d) r
interpretQueueTBWith TBQueue d
queue =
  (forall x (rInitial :: EffectRow).
 Queue d (Sem rInitial) x -> Sem r x)
-> Sem (Queue d : r) a -> Sem r a
forall (e :: Effect) (r :: EffectRow) a.
FirstOrder e "interpret" =>
(forall x (rInitial :: EffectRow). e (Sem rInitial) x -> Sem r x)
-> Sem (e : r) a -> Sem r a
interpret \case
    Queue d (Sem rInitial) x
Queue.Read ->
      STM (QueueResult d) -> Sem r (QueueResult d)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (d -> QueueResult d
forall d. d -> QueueResult d
QueueResult.Success (d -> QueueResult d) -> STM d -> STM (QueueResult d)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TBQueue d -> STM d
forall a. TBQueue a -> STM a
readTBQueue TBQueue d
queue)
    Queue d (Sem rInitial) x
Queue.TryRead ->
      STM (QueueResult d) -> Sem r (QueueResult d)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (Maybe d -> QueueResult d
forall d. Maybe d -> QueueResult d
naResult (Maybe d -> QueueResult d) -> STM (Maybe d) -> STM (QueueResult d)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TBQueue d -> STM (Maybe d)
forall a. TBQueue a -> STM (Maybe a)
tryReadTBQueue TBQueue d
queue)
    Queue.ReadTimeout timeout ->
      t -> STM (Maybe d) -> Sem r (QueueResult d)
forall t (r :: EffectRow) d.
(TimeUnit t, Members '[Race, Embed IO] r) =>
t -> STM (Maybe d) -> Sem r (QueueResult d)
withTimeout t
timeout (d -> Maybe d
forall a. a -> Maybe a
Just (d -> Maybe d) -> STM d -> STM (Maybe d)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TBQueue d -> STM d
forall a. TBQueue a -> STM a
readTBQueue TBQueue d
queue)
    Queue d (Sem rInitial) x
Queue.Peek ->
      STM (QueueResult d) -> Sem r (QueueResult d)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (d -> QueueResult d
forall d. d -> QueueResult d
QueueResult.Success (d -> QueueResult d) -> STM d -> STM (QueueResult d)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TBQueue d -> STM d
forall a. TBQueue a -> STM a
peekTBQueue TBQueue d
queue)
    Queue d (Sem rInitial) x
Queue.TryPeek ->
      STM (QueueResult d) -> Sem r (QueueResult d)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (Maybe d -> QueueResult d
forall d. Maybe d -> QueueResult d
naResult (Maybe d -> QueueResult d) -> STM (Maybe d) -> STM (QueueResult d)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TBQueue d -> STM (Maybe d)
forall a. TBQueue a -> STM (Maybe a)
tryPeekTBQueue TBQueue d
queue)
    Queue.Write d ->
      STM () -> Sem r ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (TBQueue d -> d -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue d
queue d
d)
    Queue.TryWrite d ->
      STM (QueueResult ()) -> Sem r (QueueResult ())
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically do
        STM Bool
-> STM (QueueResult ())
-> STM (QueueResult ())
-> STM (QueueResult ())
forall (m :: * -> *) a. Monad m => m Bool -> m a -> m a -> m a
ifM (TBQueue d -> STM Bool
forall a. TBQueue a -> STM Bool
isFullTBQueue TBQueue d
queue) (QueueResult () -> STM (QueueResult ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure QueueResult ()
forall d. QueueResult d
QueueResult.NotAvailable) (() -> QueueResult ()
forall d. d -> QueueResult d
QueueResult.Success (() -> QueueResult ()) -> STM () -> STM (QueueResult ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TBQueue d -> d -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue d
queue d
d)
    Queue.WriteTimeout timeout d ->
      t -> STM (Maybe ()) -> Sem r (QueueResult ())
forall t (r :: EffectRow) d.
(TimeUnit t, Members '[Race, Embed IO] r) =>
t -> STM (Maybe d) -> Sem r (QueueResult d)
withTimeout t
timeout (() -> Maybe ()
forall a. a -> Maybe a
Just (() -> Maybe ()) -> STM () -> STM (Maybe ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TBQueue d -> d -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue d
queue d
d)
    Queue d (Sem rInitial) x
Queue.Closed ->
      Bool -> Sem r Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
    Queue d (Sem rInitial) x
Queue.Close ->
      Sem r x
forall (f :: * -> *). Applicative f => f ()
pass
{-# INLINE interpretQueueTBWith #-}

-- |Interpret 'Queue' with a 'TBQueue'.
interpretQueueTB ::
   d r .
  Members [Race, Embed IO] r =>
  -- |Buffer size
  Natural ->
  InterpreterFor (Queue d) r
interpretQueueTB :: Natural -> InterpreterFor (Queue d) r
interpretQueueTB Natural
maxQueued Sem (Queue d : r) a
sem = do
  TBQueue d
queue <- IO (TBQueue d) -> Sem r (TBQueue d)
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
embed (Natural -> IO (TBQueue d)
forall a. Natural -> IO (TBQueue a)
newTBQueueIO @d Natural
maxQueued)
  TBQueue d -> Sem (Queue d : r) a -> Sem r a
forall d (r :: EffectRow).
Members '[Race, Embed IO] r =>
TBQueue d -> InterpreterFor (Queue d) r
interpretQueueTBWith TBQueue d
queue Sem (Queue d : r) a
sem
{-# INLINE interpretQueueTB #-}