{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StrictData #-}
{-# LANGUAGE TypeFamilies #-}
{-# OPTIONS_GHC -Wall #-}
{-# OPTIONS_GHC -fno-warn-type-defaults #-}

-- | queues
-- Follows [pipes-concurrency](https://hackage.haskell.org/package/pipes-concurrency)
module Box.Queue
  ( Queue (..),
    queueC,
    queueE,
    waitCancel,
    ends,
    withQE,
    withQC,
    toBox,
    toBoxM,
    liftB,
    concurrentlyLeft,
    concurrentlyRight,
    fromAction,
    fuseActions,
  )
where

import Box.Box
import Box.Committer
import Box.Cont
import Box.Emitter
import Control.Applicative
import Control.Concurrent.Classy.Async as C
import Control.Concurrent.Classy.STM as C
import Control.Monad.Catch as C
import Control.Monad.Conc.Class as C
import Control.Monad.Morph
import Prelude

-- | 'Queue' specifies how messages are queued
data Queue a
  = Unbounded
  | Bounded Int
  | Single
  | Latest a
  | Newest Int
  | New

-- | create a queue, returning the ends
ends :: MonadSTM stm => Queue a -> stm (a -> stm (), stm a)
ends :: Queue a -> stm (a -> stm (), stm a)
ends Queue a
qu =
  case Queue a
qu of
    Bounded Int
n -> do
      TBQueue stm a
q <- Natural -> stm (TBQueue stm a)
forall (stm :: * -> *) a.
MonadSTM stm =>
Natural -> stm (TBQueue stm a)
newTBQueue (Int -> Natural
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
n)
      (a -> stm (), stm a) -> stm (a -> stm (), stm a)
forall (m :: * -> *) a. Monad m => a -> m a
return (TBQueue stm a -> a -> stm ()
forall (stm :: * -> *) a.
MonadSTM stm =>
TBQueue stm a -> a -> stm ()
writeTBQueue TBQueue stm a
q, TBQueue stm a -> stm a
forall (stm :: * -> *) a. MonadSTM stm => TBQueue stm a -> stm a
readTBQueue TBQueue stm a
q)
    Queue a
Unbounded -> do
      TQueue stm a
q <- stm (TQueue stm a)
forall (stm :: * -> *) a. MonadSTM stm => stm (TQueue stm a)
newTQueue
      (a -> stm (), stm a) -> stm (a -> stm (), stm a)
forall (m :: * -> *) a. Monad m => a -> m a
return (TQueue stm a -> a -> stm ()
forall (stm :: * -> *) a.
MonadSTM stm =>
TQueue stm a -> a -> stm ()
writeTQueue TQueue stm a
q, TQueue stm a -> stm a
forall (stm :: * -> *) a. MonadSTM stm => TQueue stm a -> stm a
readTQueue TQueue stm a
q)
    Queue a
Single -> do
      TMVar stm a
m <- stm (TMVar stm a)
forall (stm :: * -> *) a. MonadSTM stm => stm (TMVar stm a)
newEmptyTMVar
      (a -> stm (), stm a) -> stm (a -> stm (), stm a)
forall (m :: * -> *) a. Monad m => a -> m a
return (TMVar stm a -> a -> stm ()
forall (stm :: * -> *) a.
MonadSTM stm =>
TMVar stm a -> a -> stm ()
putTMVar TMVar stm a
m, TMVar stm a -> stm a
forall (stm :: * -> *) a. MonadSTM stm => TMVar stm a -> stm a
takeTMVar TMVar stm a
m)
    Latest a
a -> do
      TVar stm a
t <- a -> stm (TVar stm a)
forall (stm :: * -> *) a. MonadSTM stm => a -> stm (TVar stm a)
newTVar a
a
      (a -> stm (), stm a) -> stm (a -> stm (), stm a)
forall (m :: * -> *) a. Monad m => a -> m a
return (TVar stm a -> a -> stm ()
forall (stm :: * -> *) a. MonadSTM stm => TVar stm a -> a -> stm ()
writeTVar TVar stm a
t, TVar stm a -> stm a
forall (stm :: * -> *) a. MonadSTM stm => TVar stm a -> stm a
readTVar TVar stm a
t)
    Queue a
New -> do
      TMVar stm a
m <- stm (TMVar stm a)
forall (stm :: * -> *) a. MonadSTM stm => stm (TMVar stm a)
newEmptyTMVar
      (a -> stm (), stm a) -> stm (a -> stm (), stm a)
forall (m :: * -> *) a. Monad m => a -> m a
return (\a
x -> TMVar stm a -> stm (Maybe a)
forall (stm :: * -> *) a.
MonadSTM stm =>
TMVar stm a -> stm (Maybe a)
tryTakeTMVar TMVar stm a
m stm (Maybe a) -> stm () -> stm ()
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> TMVar stm a -> a -> stm ()
forall (stm :: * -> *) a.
MonadSTM stm =>
TMVar stm a -> a -> stm ()
putTMVar TMVar stm a
m a
x, TMVar stm a -> stm a
forall (stm :: * -> *) a. MonadSTM stm => TMVar stm a -> stm a
takeTMVar TMVar stm a
m)
    Newest Int
n -> do
      TBQueue stm a
q <- Natural -> stm (TBQueue stm a)
forall (stm :: * -> *) a.
MonadSTM stm =>
Natural -> stm (TBQueue stm a)
newTBQueue (Int -> Natural
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
n)
      let write :: a -> stm ()
write a
x = TBQueue stm a -> a -> stm ()
forall (stm :: * -> *) a.
MonadSTM stm =>
TBQueue stm a -> a -> stm ()
writeTBQueue TBQueue stm a
q a
x stm () -> stm () -> stm ()
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> (TBQueue stm a -> stm (Maybe a)
forall (stm :: * -> *) a.
MonadSTM stm =>
TBQueue stm a -> stm (Maybe a)
tryReadTBQueue TBQueue stm a
q stm (Maybe a) -> stm () -> stm ()
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> a -> stm ()
write a
x)
      (a -> stm (), stm a) -> stm (a -> stm (), stm a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> stm ()
write, TBQueue stm a -> stm a
forall (stm :: * -> *) a. MonadSTM stm => TBQueue stm a -> stm a
readTBQueue TBQueue stm a
q)

-- | write to a queue, checking the seal
writeCheck :: (MonadSTM stm) => TVar stm Bool -> (a -> stm ()) -> a -> stm Bool
writeCheck :: TVar stm Bool -> (a -> stm ()) -> a -> stm Bool
writeCheck TVar stm Bool
sealed a -> stm ()
i a
a = do
  Bool
b <- TVar stm Bool -> stm Bool
forall (stm :: * -> *) a. MonadSTM stm => TVar stm a -> stm a
readTVar TVar stm Bool
sealed
  if Bool
b
    then Bool -> stm Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
    else do
      a -> stm ()
i a
a
      Bool -> stm Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True

-- | read from a queue, and retry if not sealed
readCheck :: MonadSTM stm => TVar stm Bool -> stm a -> stm (Maybe a)
readCheck :: TVar stm Bool -> stm a -> stm (Maybe a)
readCheck TVar stm Bool
sealed stm a
o =
  (a -> Maybe a
forall a. a -> Maybe a
Just (a -> Maybe a) -> stm a -> stm (Maybe a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> stm a
o)
    stm (Maybe a) -> stm (Maybe a) -> stm (Maybe a)
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> ( do
            Bool
b <- TVar stm Bool -> stm Bool
forall (stm :: * -> *) a. MonadSTM stm => TVar stm a -> stm a
readTVar TVar stm Bool
sealed
            Bool -> stm ()
forall (stm :: * -> *). MonadSTM stm => Bool -> stm ()
C.check Bool
b
            Maybe a -> stm (Maybe a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe a
forall a. Maybe a
Nothing
        )

-- | turn a queue into a box (and a seal)
toBox ::
  (MonadSTM stm) =>
  Queue a ->
  stm (Box stm a a, stm ())
toBox :: Queue a -> stm (Box stm a a, stm ())
toBox Queue a
q = do
  (a -> stm ()
i, stm a
o) <- Queue a -> stm (a -> stm (), stm a)
forall (stm :: * -> *) a.
MonadSTM stm =>
Queue a -> stm (a -> stm (), stm a)
ends Queue a
q
  TVar stm Bool
sealed <- String -> Bool -> stm (TVar stm Bool)
forall (stm :: * -> *) a.
MonadSTM stm =>
String -> a -> stm (TVar stm a)
newTVarN String
"sealed" Bool
False
  let seal :: stm ()
seal = TVar stm Bool -> Bool -> stm ()
forall (stm :: * -> *) a. MonadSTM stm => TVar stm a -> a -> stm ()
writeTVar TVar stm Bool
sealed Bool
True
  (Box stm a a, stm ()) -> stm (Box stm a a, stm ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure
    ( Committer stm a -> Emitter stm a -> Box stm a a
forall (m :: * -> *) c e. Committer m c -> Emitter m e -> Box m c e
Box
        ((a -> stm Bool) -> Committer stm a
forall (m :: * -> *) a. (a -> m Bool) -> Committer m a
Committer (TVar stm Bool -> (a -> stm ()) -> a -> stm Bool
forall (stm :: * -> *) a.
MonadSTM stm =>
TVar stm Bool -> (a -> stm ()) -> a -> stm Bool
writeCheck TVar stm Bool
sealed a -> stm ()
i))
        (stm (Maybe a) -> Emitter stm a
forall (m :: * -> *) a. m (Maybe a) -> Emitter m a
Emitter (TVar stm Bool -> stm a -> stm (Maybe a)
forall (stm :: * -> *) a.
MonadSTM stm =>
TVar stm Bool -> stm a -> stm (Maybe a)
readCheck TVar stm Bool
sealed stm a
o)),
      stm ()
seal
    )

-- | turn a queue into a box (and a seal), and lift from stm to the underlying monad.
toBoxM ::
  (MonadConc m) =>
  Queue a ->
  m (Box m a a, m ())
toBoxM :: Queue a -> m (Box m a a, m ())
toBoxM Queue a
q = do
  (Box (STM m) a a
b, STM m ()
s) <- STM m (Box (STM m) a a, STM m ()) -> m (Box (STM m) a a, STM m ())
forall (m :: * -> *) a. MonadConc m => STM m a -> m a
atomically (STM m (Box (STM m) a a, STM m ())
 -> m (Box (STM m) a a, STM m ()))
-> STM m (Box (STM m) a a, STM m ())
-> m (Box (STM m) a a, STM m ())
forall a b. (a -> b) -> a -> b
$ Queue a -> STM m (Box (STM m) a a, STM m ())
forall (stm :: * -> *) a.
MonadSTM stm =>
Queue a -> stm (Box stm a a, stm ())
toBox Queue a
q
  (Box m a a, m ()) -> m (Box m a a, m ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Box (STM m) a a -> Box m a a
forall (m :: * -> *) a b.
MonadConc m =>
Box (STM m) a b -> Box m a b
liftB Box (STM m) a a
b, STM m () -> m ()
forall (m :: * -> *) a. MonadConc m => STM m a -> m a
atomically STM m ()
s)

-- | wait for the first action, and then cancel the second
waitCancel :: (MonadConc m) => m b -> m a -> m b
waitCancel :: m b -> m a -> m b
waitCancel m b
a m a
b =
  m b -> (Async m b -> m b) -> m b
forall (m :: * -> *) a b.
MonadConc m =>
m a -> (Async m a -> m b) -> m b
C.withAsync m b
a ((Async m b -> m b) -> m b) -> (Async m b -> m b) -> m b
forall a b. (a -> b) -> a -> b
$ \Async m b
a' ->
    m a -> (Async m a -> m b) -> m b
forall (m :: * -> *) a b.
MonadConc m =>
m a -> (Async m a -> m b) -> m b
C.withAsync m a
b ((Async m a -> m b) -> m b) -> (Async m a -> m b) -> m b
forall a b. (a -> b) -> a -> b
$ \Async m a
b' -> do
      b
a'' <- Async m b -> m b
forall (m :: * -> *) a. MonadConc m => Async m a -> m a
C.wait Async m b
a'
      Async m a -> m ()
forall (m :: * -> *) a. MonadConc m => Async m a -> m ()
C.cancel Async m a
b'
      b -> m b
forall (f :: * -> *) a. Applicative f => a -> f a
pure b
a''

-- | run two actions concurrently, but wait and return on the left result.
concurrentlyLeft :: MonadConc m => m a -> m b -> m a
concurrentlyLeft :: m a -> m b -> m a
concurrentlyLeft m a
left m b
right =
  m a -> (Async m a -> m a) -> m a
forall (m :: * -> *) a b.
MonadConc m =>
m a -> (Async m a -> m b) -> m b
C.withAsync m a
left ((Async m a -> m a) -> m a) -> (Async m a -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \Async m a
a ->
    m b -> (Async m b -> m a) -> m a
forall (m :: * -> *) a b.
MonadConc m =>
m a -> (Async m a -> m b) -> m b
C.withAsync m b
right ((Async m b -> m a) -> m a) -> (Async m b -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \Async m b
_ ->
      Async m a -> m a
forall (m :: * -> *) a. MonadConc m => Async m a -> m a
C.wait Async m a
a

-- | run two actions concurrently, but wait and return on the right result.
concurrentlyRight :: MonadConc m => m a -> m b -> m b
concurrentlyRight :: m a -> m b -> m b
concurrentlyRight m a
left m b
right =
  m a -> (Async m a -> m b) -> m b
forall (m :: * -> *) a b.
MonadConc m =>
m a -> (Async m a -> m b) -> m b
C.withAsync m a
left ((Async m a -> m b) -> m b) -> (Async m a -> m b) -> m b
forall a b. (a -> b) -> a -> b
$ \Async m a
_ ->
    m b -> (Async m b -> m b) -> m b
forall (m :: * -> *) a b.
MonadConc m =>
m a -> (Async m a -> m b) -> m b
C.withAsync m b
right ((Async m b -> m b) -> m b) -> (Async m b -> m b) -> m b
forall a b. (a -> b) -> a -> b
$ \Async m b
b ->
      Async m b -> m b
forall (m :: * -> *) a. MonadConc m => Async m a -> m a
C.wait Async m b
b

-- | connect a committer and emitter action via spawning a queue, and wait for both to complete.
withQC ::
  (MonadConc m) =>
  Queue a ->
  (Queue a -> m (Box m a a, m ())) ->
  (Committer m a -> m l) ->
  (Emitter m a -> m r) ->
  m l
withQC :: Queue a
-> (Queue a -> m (Box m a a, m ()))
-> (Committer m a -> m l)
-> (Emitter m a -> m r)
-> m l
withQC Queue a
q Queue a -> m (Box m a a, m ())
spawner Committer m a -> m l
cio Emitter m a -> m r
eio =
  m (Box m a a, m ())
-> ((Box m a a, m ()) -> m ()) -> ((Box m a a, m ()) -> m l) -> m l
forall (m :: * -> *) a c b.
MonadMask m =>
m a -> (a -> m c) -> (a -> m b) -> m b
C.bracket
    (Queue a -> m (Box m a a, m ())
spawner Queue a
q)
    (Box m a a, m ()) -> m ()
forall a b. (a, b) -> b
snd
    ( \(Box m a a
box, m ()
seal) ->
        m l -> m r -> m l
forall (m :: * -> *) a b. MonadConc m => m a -> m b -> m a
concurrentlyLeft
          (Committer m a -> m l
cio (Box m a a -> Committer m a
forall (m :: * -> *) c e. Box m c e -> Committer m c
committer Box m a a
box) m l -> m () -> m l
forall (m :: * -> *) a b. MonadMask m => m a -> m b -> m a
`C.finally` m ()
seal)
          (Emitter m a -> m r
eio (Box m a a -> Emitter m a
forall (m :: * -> *) c e. Box m c e -> Emitter m e
emitter Box m a a
box) m r -> m () -> m r
forall (m :: * -> *) a b. MonadMask m => m a -> m b -> m a
`C.finally` m ()
seal)
    )

-- | connect a committer and emitter action via spawning a queue, and wait for both to complete.
withQE ::
  (MonadConc m) =>
  Queue a ->
  (Queue a -> m (Box m a a, m ())) ->
  (Committer m a -> m l) ->
  (Emitter m a -> m r) ->
  m r
withQE :: Queue a
-> (Queue a -> m (Box m a a, m ()))
-> (Committer m a -> m l)
-> (Emitter m a -> m r)
-> m r
withQE Queue a
q Queue a -> m (Box m a a, m ())
spawner Committer m a -> m l
cio Emitter m a -> m r
eio =
  m (Box m a a, m ())
-> ((Box m a a, m ()) -> m ()) -> ((Box m a a, m ()) -> m r) -> m r
forall (m :: * -> *) a c b.
MonadMask m =>
m a -> (a -> m c) -> (a -> m b) -> m b
C.bracket
    (Queue a -> m (Box m a a, m ())
spawner Queue a
q)
    (Box m a a, m ()) -> m ()
forall a b. (a, b) -> b
snd
    ( \(Box m a a
box, m ()
seal) ->
        m l -> m r -> m r
forall (m :: * -> *) a b. MonadConc m => m a -> m b -> m b
concurrentlyRight
          (Committer m a -> m l
cio (Box m a a -> Committer m a
forall (m :: * -> *) c e. Box m c e -> Committer m c
committer Box m a a
box) m l -> m () -> m l
forall (m :: * -> *) a b. MonadMask m => m a -> m b -> m a
`C.finally` m ()
seal)
          (Emitter m a -> m r
eio (Box m a a -> Emitter m a
forall (m :: * -> *) c e. Box m c e -> Emitter m e
emitter Box m a a
box) m r -> m () -> m r
forall (m :: * -> *) a b. MonadMask m => m a -> m b -> m a
`C.finally` m ()
seal)
    )

-- | create an unbounded queue, returning the emitter result
queueC ::
  (MonadConc m) =>
  (Committer m a -> m l) ->
  (Emitter m a -> m r) ->
  m l
queueC :: (Committer m a -> m l) -> (Emitter m a -> m r) -> m l
queueC Committer m a -> m l
cm Emitter m a -> m r
em = Queue a
-> (Queue a -> m (Box m a a, m ()))
-> (Committer m a -> m l)
-> (Emitter m a -> m r)
-> m l
forall (m :: * -> *) a l r.
MonadConc m =>
Queue a
-> (Queue a -> m (Box m a a, m ()))
-> (Committer m a -> m l)
-> (Emitter m a -> m r)
-> m l
withQC Queue a
forall a. Queue a
Unbounded Queue a -> m (Box m a a, m ())
forall (m :: * -> *) a.
MonadConc m =>
Queue a -> m (Box m a a, m ())
toBoxM Committer m a -> m l
cm Emitter m a -> m r
em

-- | create an unbounded queue, returning the emitter result
queueE ::
  (MonadConc m) =>
  (Committer m a -> m l) ->
  (Emitter m a -> m r) ->
  m r
queueE :: (Committer m a -> m l) -> (Emitter m a -> m r) -> m r
queueE Committer m a -> m l
cm Emitter m a -> m r
em = Queue a
-> (Queue a -> m (Box m a a, m ()))
-> (Committer m a -> m l)
-> (Emitter m a -> m r)
-> m r
forall (m :: * -> *) a l r.
MonadConc m =>
Queue a
-> (Queue a -> m (Box m a a, m ()))
-> (Committer m a -> m l)
-> (Emitter m a -> m r)
-> m r
withQE Queue a
forall a. Queue a
Unbounded Queue a -> m (Box m a a, m ())
forall (m :: * -> *) a.
MonadConc m =>
Queue a -> m (Box m a a, m ())
toBoxM Committer m a -> m l
cm Emitter m a -> m r
em

-- | lift a box from STM
liftB :: (MonadConc m) => Box (STM m) a b -> Box m a b
liftB :: Box (STM m) a b -> Box m a b
liftB (Box Committer (STM m) a
c Emitter (STM m) b
e) = Committer m a -> Emitter m b -> Box m a b
forall (m :: * -> *) c e. Committer m c -> Emitter m e -> Box m c e
Box ((forall a. STM m a -> m a) -> Committer (STM m) a -> Committer m a
forall k (t :: (* -> *) -> k -> *) (m :: * -> *) (n :: * -> *)
       (b :: k).
(MFunctor t, Monad m) =>
(forall a. m a -> n a) -> t m b -> t n b
hoist forall a. STM m a -> m a
forall (m :: * -> *) a. MonadConc m => STM m a -> m a
atomically Committer (STM m) a
c) ((forall a. STM m a -> m a) -> Emitter (STM m) b -> Emitter m b
forall k (t :: (* -> *) -> k -> *) (m :: * -> *) (n :: * -> *)
       (b :: k).
(MFunctor t, Monad m) =>
(forall a. m a -> n a) -> t m b -> t n b
hoist forall a. STM m a -> m a
forall (m :: * -> *) a. MonadConc m => STM m a -> m a
atomically Emitter (STM m) b
e)

-- | turn a box action into a box continuation
fromAction :: (MonadConc m) => (Box m a b -> m r) -> Cont m (Box m b a)
fromAction :: (Box m a b -> m r) -> Cont m (Box m b a)
fromAction Box m a b -> m r
baction = (forall r. (Box m b a -> m r) -> m r) -> Cont m (Box m b a)
forall (m :: * -> *) a. (forall r. (a -> m r) -> m r) -> Cont m a
Cont ((forall r. (Box m b a -> m r) -> m r) -> Cont m (Box m b a))
-> (forall r. (Box m b a -> m r) -> m r) -> Cont m (Box m b a)
forall a b. (a -> b) -> a -> b
$ (Box m a b -> m r) -> (Box m b a -> m r) -> m r
forall (m :: * -> *) a b r r'.
MonadConc m =>
(Box m a b -> m r) -> (Box m b a -> m r') -> m r'
fuseActions Box m a b -> m r
baction

-- | connect up two box actions via two queues
fuseActions :: (MonadConc m) => (Box m a b -> m r) -> (Box m b a -> m r') -> m r'
fuseActions :: (Box m a b -> m r) -> (Box m b a -> m r') -> m r'
fuseActions Box m a b -> m r
abm Box m b a -> m r'
bam = do
  (Box Committer m a
ca Emitter m a
ea, m ()
_) <- Queue a -> m (Box m a a, m ())
forall (m :: * -> *) a.
MonadConc m =>
Queue a -> m (Box m a a, m ())
toBoxM Queue a
forall a. Queue a
Unbounded
  (Box Committer m b
cb Emitter m b
eb, m ()
_) <- Queue b -> m (Box m b b, m ())
forall (m :: * -> *) a.
MonadConc m =>
Queue a -> m (Box m a a, m ())
toBoxM Queue b
forall a. Queue a
Unbounded
  m r -> m r' -> m r'
forall (m :: * -> *) a b. MonadConc m => m a -> m b -> m b
concurrentlyRight (Box m a b -> m r
abm (Committer m a -> Emitter m b -> Box m a b
forall (m :: * -> *) c e. Committer m c -> Emitter m e -> Box m c e
Box Committer m a
ca Emitter m b
eb)) (Box m b a -> m r'
bam (Committer m b -> Emitter m a -> Box m b a
forall (m :: * -> *) c e. Committer m c -> Emitter m e -> Box m c e
Box Committer m b
cb Emitter m a
ea))