{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StrictData #-}
{-# LANGUAGE TypeFamilies #-}
{-# OPTIONS_GHC -Wall #-}
{-# OPTIONS_GHC -fno-warn-type-defaults #-}

-- | STM Queues, based originally on [pipes-concurrency](https://hackage.haskell.org/package/pipes-concurrency)
module Box.Queue
  ( Queue (..),
    queueL,
    queueR,
    queue,
    fromAction,
    emitQ,
    commitQ,
    fromActionWith,
    toBoxM,
    toBoxSTM,
  )
where

import Box.Box
import Box.Codensity
import Box.Committer
import Box.Emitter
import Box.Functor
import Control.Applicative
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Monad.Catch as C
import Prelude

-- $setup
-- >>> :set -XOverloadedStrings
-- >>> import Box
-- >>> import Prelude

-- | 'Queue' specifies how messages are queued
data Queue a
  = Unbounded
  | Bounded Int
  | Single
  | Latest a
  | Newest Int
  | New

-- | create a queue, supplying the ends and a sealing function.
ends :: Queue a -> STM (a -> STM (), STM a)
ends :: forall a. Queue a -> STM (a -> STM (), STM a)
ends Queue a
qu =
  case Queue a
qu of
    Bounded Int
n -> do
      TBQueue a
q <- forall a. Natural -> STM (TBQueue a)
newTBQueue (forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
n)
      forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue a
q, forall a. TBQueue a -> STM a
readTBQueue TBQueue a
q)
    Queue a
Unbounded -> do
      TQueue a
q <- forall a. STM (TQueue a)
newTQueue
      forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a. TQueue a -> a -> STM ()
writeTQueue TQueue a
q, forall a. TQueue a -> STM a
readTQueue TQueue a
q)
    Queue a
Single -> do
      TMVar a
m <- forall a. STM (TMVar a)
newEmptyTMVar
      forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a. TMVar a -> a -> STM ()
putTMVar TMVar a
m, forall a. TMVar a -> STM a
takeTMVar TMVar a
m)
    Latest a
a -> do
      TVar a
t <- forall a. a -> STM (TVar a)
newTVar a
a
      forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a. TVar a -> a -> STM ()
writeTVar TVar a
t, forall a. TVar a -> STM a
readTVar TVar a
t)
    Queue a
New -> do
      TMVar a
m <- forall a. STM (TMVar a)
newEmptyTMVar
      forall (f :: * -> *) a. Applicative f => a -> f a
pure (\a
x -> forall a. TMVar a -> STM (Maybe a)
tryTakeTMVar TMVar a
m forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> forall a. TMVar a -> a -> STM ()
putTMVar TMVar a
m a
x, forall a. TMVar a -> STM a
takeTMVar TMVar a
m)
    Newest Int
n -> do
      TBQueue a
q <- forall a. Natural -> STM (TBQueue a)
newTBQueue (forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
n)
      let write :: a -> STM ()
write a
x = forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue a
q a
x forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> (forall a. TBQueue a -> STM (Maybe a)
tryReadTBQueue TBQueue a
q forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> a -> STM ()
write a
x)
      forall (f :: * -> *) a. Applicative f => a -> f a
pure (a -> STM ()
write, forall a. TBQueue a -> STM a
readTBQueue TBQueue a
q)

-- | write to a queue, checking the seal
writeCheck :: TVar Bool -> (a -> STM ()) -> a -> STM Bool
writeCheck :: forall a. TVar Bool -> (a -> STM ()) -> a -> STM Bool
writeCheck TVar Bool
sealed a -> STM ()
i a
a = do
  Bool
b <- forall a. TVar a -> STM a
readTVar TVar Bool
sealed
  if Bool
b
    then forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
    else do
      a -> STM ()
i a
a
      forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True

-- | read from a queue, and retry if not sealed
readCheck :: TVar Bool -> STM a -> STM (Maybe a)
readCheck :: forall a. TVar Bool -> STM a -> STM (Maybe a)
readCheck TVar Bool
sealed STM a
o =
  (forall a. a -> Maybe a
Just forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STM a
o)
    forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> ( do
            Bool
b <- forall a. TVar a -> STM a
readTVar TVar Bool
sealed
            Bool -> STM ()
check Bool
b
            forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Maybe a
Nothing
        )

-- | turn a queue into a box (and a seal)
toBoxSTM ::
  Queue a ->
  STM (Box STM a a, STM ())
toBoxSTM :: forall a. Queue a -> STM (Box STM a a, STM ())
toBoxSTM Queue a
q = do
  (a -> STM ()
i, STM a
o) <- forall a. Queue a -> STM (a -> STM (), STM a)
ends Queue a
q
  TVar Bool
sealed <- forall a. a -> STM (TVar a)
newTVar Bool
False
  let seal :: STM ()
seal = forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
sealed Bool
True
  forall (f :: * -> *) a. Applicative f => a -> f a
pure
    ( forall (m :: * -> *) c e. Committer m c -> Emitter m e -> Box m c e
Box
        (forall (m :: * -> *) a. (a -> m Bool) -> Committer m a
Committer (forall a. TVar Bool -> (a -> STM ()) -> a -> STM Bool
writeCheck TVar Bool
sealed a -> STM ()
i))
        (forall (m :: * -> *) a. m (Maybe a) -> Emitter m a
Emitter (forall a. TVar Bool -> STM a -> STM (Maybe a)
readCheck TVar Bool
sealed STM a
o)),
      STM ()
seal
    )

-- | turn a queue into a box (and a seal), and lift from stm to the underlying monad.
toBoxM ::
  Queue a ->
  IO (Box IO a a, IO ())
toBoxM :: forall a. Queue a -> IO (Box IO a a, IO ())
toBoxM Queue a
q = do
  (Box STM a a
b, STM ()
s) <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. Queue a -> STM (Box STM a a, STM ())
toBoxSTM Queue a
q
  forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. Box STM a b -> Box IO a b
liftB Box STM a a
b, forall a. STM a -> IO a
atomically STM ()
s)

-- | run two actions concurrently, but wait and return on the left result.
concurrentlyLeft :: IO a -> IO b -> IO a
concurrentlyLeft :: forall a b. IO a -> IO b -> IO a
concurrentlyLeft IO a
left IO b
right =
  forall a b. IO a -> (Async a -> IO b) -> IO b
withAsync IO a
left forall a b. (a -> b) -> a -> b
$ \Async a
a ->
    forall a b. IO a -> (Async a -> IO b) -> IO b
withAsync IO b
right forall a b. (a -> b) -> a -> b
$ \Async b
_ ->
      forall a. Async a -> IO a
wait Async a
a

-- | run two actions concurrently, but wait and return on the right result.
concurrentlyRight :: IO a -> IO b -> IO b
concurrentlyRight :: forall a b. IO a -> IO b -> IO b
concurrentlyRight IO a
left IO b
right =
  forall a b. IO a -> (Async a -> IO b) -> IO b
withAsync IO a
left forall a b. (a -> b) -> a -> b
$ \Async a
_ ->
    forall a b. IO a -> (Async a -> IO b) -> IO b
withAsync IO b
right forall a b. (a -> b) -> a -> b
$ \Async b
b ->
      forall a. Async a -> IO a
wait Async b
b

-- | connect a committer and emitter action via spawning a queue, and wait for the Committer action to complete.
withQL ::
  Queue a ->
  (Queue a -> IO (Box IO a a, IO ())) ->
  (Committer IO a -> IO l) ->
  (Emitter IO a -> IO r) ->
  IO l
withQL :: forall a l r.
Queue a
-> (Queue a -> IO (Box IO a a, IO ()))
-> (Committer IO a -> IO l)
-> (Emitter IO a -> IO r)
-> IO l
withQL Queue a
q Queue a -> IO (Box IO a a, IO ())
spawner Committer IO a -> IO l
cio Emitter IO a -> IO r
eio =
  forall (m :: * -> *) a c b.
MonadMask m =>
m a -> (a -> m c) -> (a -> m b) -> m b
bracket
    (Queue a -> IO (Box IO a a, IO ())
spawner Queue a
q)
    forall a b. (a, b) -> b
snd
    ( \(Box IO a a
box, IO ()
seal) ->
        forall a b. IO a -> IO b -> IO a
concurrentlyLeft
          (Committer IO a -> IO l
cio (forall (m :: * -> *) c e. Box m c e -> Committer m c
committer Box IO a a
box) forall (m :: * -> *) a b. MonadMask m => m a -> m b -> m a
`finally` IO ()
seal)
          (Emitter IO a -> IO r
eio (forall (m :: * -> *) c e. Box m c e -> Emitter m e
emitter Box IO a a
box) forall (m :: * -> *) a b. MonadMask m => m a -> m b -> m a
`finally` IO ()
seal)
    )

-- | connect a committer and emitter action via spawning a queue, and wait for the Emitter action to complete.
withQR ::
  Queue a ->
  (Queue a -> IO (Box IO a a, IO ())) ->
  (Committer IO a -> IO l) ->
  (Emitter IO a -> IO r) ->
  IO r
withQR :: forall a l r.
Queue a
-> (Queue a -> IO (Box IO a a, IO ()))
-> (Committer IO a -> IO l)
-> (Emitter IO a -> IO r)
-> IO r
withQR Queue a
q Queue a -> IO (Box IO a a, IO ())
spawner Committer IO a -> IO l
cio Emitter IO a -> IO r
eio =
  forall (m :: * -> *) a c b.
MonadMask m =>
m a -> (a -> m c) -> (a -> m b) -> m b
bracket
    (Queue a -> IO (Box IO a a, IO ())
spawner Queue a
q)
    forall a b. (a, b) -> b
snd
    ( \(Box IO a a
box, IO ()
seal) ->
        forall a b. IO a -> IO b -> IO b
concurrentlyRight
          (Committer IO a -> IO l
cio (forall (m :: * -> *) c e. Box m c e -> Committer m c
committer Box IO a a
box) forall (m :: * -> *) a b. MonadMask m => m a -> m b -> m a
`finally` IO ()
seal)
          (Emitter IO a -> IO r
eio (forall (m :: * -> *) c e. Box m c e -> Emitter m e
emitter Box IO a a
box) forall (m :: * -> *) a b. MonadMask m => m a -> m b -> m a
`finally` IO ()
seal)
    )

-- | connect a committer and emitter action via spawning a queue, and wait for both to complete.
withQ ::
  Queue a ->
  (Queue a -> IO (Box IO a a, IO ())) ->
  (Committer IO a -> IO l) ->
  (Emitter IO a -> IO r) ->
  IO (l, r)
withQ :: forall a l r.
Queue a
-> (Queue a -> IO (Box IO a a, IO ()))
-> (Committer IO a -> IO l)
-> (Emitter IO a -> IO r)
-> IO (l, r)
withQ Queue a
q Queue a -> IO (Box IO a a, IO ())
spawner Committer IO a -> IO l
cio Emitter IO a -> IO r
eio =
  forall (m :: * -> *) a c b.
MonadMask m =>
m a -> (a -> m c) -> (a -> m b) -> m b
bracket
    (Queue a -> IO (Box IO a a, IO ())
spawner Queue a
q)
    forall a b. (a, b) -> b
snd
    ( \(Box IO a a
box, IO ()
seal) ->
        forall a b. IO a -> IO b -> IO (a, b)
concurrently
          (Committer IO a -> IO l
cio (forall (m :: * -> *) c e. Box m c e -> Committer m c
committer Box IO a a
box) forall (m :: * -> *) a b. MonadMask m => m a -> m b -> m a
`finally` IO ()
seal)
          (Emitter IO a -> IO r
eio (forall (m :: * -> *) c e. Box m c e -> Emitter m e
emitter Box IO a a
box) forall (m :: * -> *) a b. MonadMask m => m a -> m b -> m a
`finally` IO ()
seal)
    )

-- | Create an unbounded queue, returning the result from the Committer action.
--
-- >>> queueL New (\c -> glue c <$|> qList [1..3]) toListM
queueL ::
  Queue a ->
  (Committer IO a -> IO l) ->
  (Emitter IO a -> IO r) ->
  IO l
queueL :: forall a l r.
Queue a
-> (Committer IO a -> IO l) -> (Emitter IO a -> IO r) -> IO l
queueL Queue a
q Committer IO a -> IO l
cm Emitter IO a -> IO r
em = forall a l r.
Queue a
-> (Queue a -> IO (Box IO a a, IO ()))
-> (Committer IO a -> IO l)
-> (Emitter IO a -> IO r)
-> IO l
withQL Queue a
q forall a. Queue a -> IO (Box IO a a, IO ())
toBoxM Committer IO a -> IO l
cm Emitter IO a -> IO r
em

-- | Create an unbounded queue, returning the result from the Emitter action.
--
-- >>> queueR New (\c -> glue c <$|> qList [1..3]) toListM
-- [3]
queueR ::
  Queue a ->
  (Committer IO a -> IO l) ->
  (Emitter IO a -> IO r) ->
  IO r
queueR :: forall a l r.
Queue a
-> (Committer IO a -> IO l) -> (Emitter IO a -> IO r) -> IO r
queueR Queue a
q Committer IO a -> IO l
cm Emitter IO a -> IO r
em = forall a l r.
Queue a
-> (Queue a -> IO (Box IO a a, IO ()))
-> (Committer IO a -> IO l)
-> (Emitter IO a -> IO r)
-> IO r
withQR Queue a
q forall a. Queue a -> IO (Box IO a a, IO ())
toBoxM Committer IO a -> IO l
cm Emitter IO a -> IO r
em

-- | Create an unbounded queue, returning both results.
--
-- >>> queue Unbounded (\c -> glue c <$|> qList [1..3]) toListM
-- ((),[1,2,3])
queue ::
  Queue a ->
  (Committer IO a -> IO l) ->
  (Emitter IO a -> IO r) ->
  IO (l, r)
queue :: forall a l r.
Queue a
-> (Committer IO a -> IO l) -> (Emitter IO a -> IO r) -> IO (l, r)
queue Queue a
q Committer IO a -> IO l
cm Emitter IO a -> IO r
em = forall a l r.
Queue a
-> (Queue a -> IO (Box IO a a, IO ()))
-> (Committer IO a -> IO l)
-> (Emitter IO a -> IO r)
-> IO (l, r)
withQ Queue a
q forall a. Queue a -> IO (Box IO a a, IO ())
toBoxM Committer IO a -> IO l
cm Emitter IO a -> IO r
em

-- | lift a box from STM
liftB :: Box STM a b -> Box IO a b
liftB :: forall a b. Box STM a b -> Box IO a b
liftB (Box Committer STM a
c Emitter STM b
e) = forall (m :: * -> *) c e. Committer m c -> Emitter m e -> Box m c e
Box (forall (h :: (* -> *) -> * -> *) (f :: * -> *) (g :: * -> *) a.
FFunctor h =>
(forall x. f x -> g x) -> h f a -> h g a
foist forall a. STM a -> IO a
atomically Committer STM a
c) (forall (h :: (* -> *) -> * -> *) (f :: * -> *) (g :: * -> *) a.
FFunctor h =>
(forall x. f x -> g x) -> h f a -> h g a
foist forall a. STM a -> IO a
atomically Emitter STM b
e)

-- | Turn a box action into a box continuation
fromAction :: (Box IO a b -> IO r) -> CoBox IO b a
fromAction :: forall a b r. (Box IO a b -> IO r) -> CoBox IO b a
fromAction Box IO a b -> IO r
baction = forall k (m :: k -> *) a.
(forall (b :: k). (a -> m b) -> m b) -> Codensity m a
Codensity forall a b. (a -> b) -> a -> b
$ forall a b r r'.
(Box IO a b -> IO r) -> (Box IO b a -> IO r') -> IO r'
fuseActions Box IO a b -> IO r
baction

-- | Turn a box action into a box continuation
fromActionWith :: Queue a -> Queue b -> (Box IO a b -> IO r) -> CoBox IO b a
fromActionWith :: forall a b r.
Queue a -> Queue b -> (Box IO a b -> IO r) -> CoBox IO b a
fromActionWith Queue a
qa Queue b
qb Box IO a b -> IO r
baction = forall k (m :: k -> *) a.
(forall (b :: k). (a -> m b) -> m b) -> Codensity m a
Codensity forall a b. (a -> b) -> a -> b
$ forall a b r r'.
Queue a
-> Queue b
-> (Box IO a b -> IO r)
-> (Box IO b a -> IO r')
-> IO r'
fuseActionsWith Queue a
qa Queue b
qb Box IO a b -> IO r
baction

-- | Connect up two box actions via two Unbounded queues
fuseActions :: (Box IO a b -> IO r) -> (Box IO b a -> IO r') -> IO r'
fuseActions :: forall a b r r'.
(Box IO a b -> IO r) -> (Box IO b a -> IO r') -> IO r'
fuseActions Box IO a b -> IO r
abm Box IO b a -> IO r'
bam = do
  (Box Committer IO a
ca Emitter IO a
ea, IO ()
_) <- forall a. Queue a -> IO (Box IO a a, IO ())
toBoxM forall a. Queue a
Unbounded
  (Box Committer IO b
cb Emitter IO b
eb, IO ()
_) <- forall a. Queue a -> IO (Box IO a a, IO ())
toBoxM forall a. Queue a
Unbounded
  forall a b. IO a -> IO b -> IO b
concurrentlyRight (Box IO a b -> IO r
abm (forall (m :: * -> *) c e. Committer m c -> Emitter m e -> Box m c e
Box Committer IO a
ca Emitter IO b
eb)) (Box IO b a -> IO r'
bam (forall (m :: * -> *) c e. Committer m c -> Emitter m e -> Box m c e
Box Committer IO b
cb Emitter IO a
ea))

-- | Connect up two box actions via two queues
fuseActionsWith :: Queue a -> Queue b -> (Box IO a b -> IO r) -> (Box IO b a -> IO r') -> IO r'
fuseActionsWith :: forall a b r r'.
Queue a
-> Queue b
-> (Box IO a b -> IO r)
-> (Box IO b a -> IO r')
-> IO r'
fuseActionsWith Queue a
qa Queue b
qb Box IO a b -> IO r
abm Box IO b a -> IO r'
bam = do
  (Box Committer IO a
ca Emitter IO a
ea, IO ()
_) <- forall a. Queue a -> IO (Box IO a a, IO ())
toBoxM Queue a
qa
  (Box Committer IO b
cb Emitter IO b
eb, IO ()
_) <- forall a. Queue a -> IO (Box IO a a, IO ())
toBoxM Queue b
qb
  forall a b. IO a -> IO b -> IO b
concurrentlyRight (Box IO a b -> IO r
abm (forall (m :: * -> *) c e. Committer m c -> Emitter m e -> Box m c e
Box Committer IO a
ca Emitter IO b
eb)) (Box IO b a -> IO r'
bam (forall (m :: * -> *) c e. Committer m c -> Emitter m e -> Box m c e
Box Committer IO b
cb Emitter IO a
ea))

-- | Hook a committer action to a queue, creating an emitter continuation.
emitQ :: Queue a -> (Committer IO a -> IO r) -> CoEmitter IO a
emitQ :: forall a r. Queue a -> (Committer IO a -> IO r) -> CoEmitter IO a
emitQ Queue a
q Committer IO a -> IO r
cio = forall k (m :: k -> *) a.
(forall (b :: k). (a -> m b) -> m b) -> Codensity m a
Codensity forall a b. (a -> b) -> a -> b
$ \Emitter IO a -> IO b
eio -> forall a l r.
Queue a
-> (Committer IO a -> IO l) -> (Emitter IO a -> IO r) -> IO r
queueR Queue a
q Committer IO a -> IO r
cio Emitter IO a -> IO b
eio

-- | Hook a committer action to a queue, creating an emitter continuation.
commitQ :: Queue a -> (Emitter IO a -> IO r) -> CoCommitter IO a
commitQ :: forall a r. Queue a -> (Emitter IO a -> IO r) -> CoCommitter IO a
commitQ Queue a
q Emitter IO a -> IO r
eio = forall k (m :: k -> *) a.
(forall (b :: k). (a -> m b) -> m b) -> Codensity m a
Codensity forall a b. (a -> b) -> a -> b
$ \Committer IO a -> IO b
cio -> forall a l r.
Queue a
-> (Committer IO a -> IO l) -> (Emitter IO a -> IO r) -> IO l
queueL Queue a
q Committer IO a -> IO b
cio Emitter IO a -> IO r
eio