{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StrictData #-}
{-# LANGUAGE TypeFamilies #-}
{-# OPTIONS_GHC -Wall #-}
{-# OPTIONS_GHC -fno-warn-type-defaults #-}
module Box.Queue
( Queue (..),
queueL,
queueR,
queue,
fromAction,
emitQ,
commitQ,
fromActionWith,
toBoxM,
toBoxSTM,
)
where
import Box.Box
import Box.Codensity
import Box.Committer
import Box.Emitter
import Box.Functor
import Control.Applicative
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Monad.Catch as C
import Prelude
data Queue a
= Unbounded
| Bounded Int
| Single
| Latest a
| Newest Int
| New
ends :: Queue a -> STM (a -> STM (), STM a)
ends :: forall a. Queue a -> STM (a -> STM (), STM a)
ends Queue a
qu =
case Queue a
qu of
Bounded Int
n -> do
TBQueue a
q <- forall a. Natural -> STM (TBQueue a)
newTBQueue (forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
n)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue a
q, forall a. TBQueue a -> STM a
readTBQueue TBQueue a
q)
Queue a
Unbounded -> do
TQueue a
q <- forall a. STM (TQueue a)
newTQueue
forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a. TQueue a -> a -> STM ()
writeTQueue TQueue a
q, forall a. TQueue a -> STM a
readTQueue TQueue a
q)
Queue a
Single -> do
TMVar a
m <- forall a. STM (TMVar a)
newEmptyTMVar
forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a. TMVar a -> a -> STM ()
putTMVar TMVar a
m, forall a. TMVar a -> STM a
takeTMVar TMVar a
m)
Latest a
a -> do
TVar a
t <- forall a. a -> STM (TVar a)
newTVar a
a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a. TVar a -> a -> STM ()
writeTVar TVar a
t, forall a. TVar a -> STM a
readTVar TVar a
t)
Queue a
New -> do
TMVar a
m <- forall a. STM (TMVar a)
newEmptyTMVar
forall (f :: * -> *) a. Applicative f => a -> f a
pure (\a
x -> forall a. TMVar a -> STM (Maybe a)
tryTakeTMVar TMVar a
m forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> forall a. TMVar a -> a -> STM ()
putTMVar TMVar a
m a
x, forall a. TMVar a -> STM a
takeTMVar TMVar a
m)
Newest Int
n -> do
TBQueue a
q <- forall a. Natural -> STM (TBQueue a)
newTBQueue (forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
n)
let write :: a -> STM ()
write a
x = forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue a
q a
x forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> (forall a. TBQueue a -> STM (Maybe a)
tryReadTBQueue TBQueue a
q forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> a -> STM ()
write a
x)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a -> STM ()
write, forall a. TBQueue a -> STM a
readTBQueue TBQueue a
q)
writeCheck :: TVar Bool -> (a -> STM ()) -> a -> STM Bool
writeCheck :: forall a. TVar Bool -> (a -> STM ()) -> a -> STM Bool
writeCheck TVar Bool
sealed a -> STM ()
i a
a = do
Bool
b <- forall a. TVar a -> STM a
readTVar TVar Bool
sealed
if Bool
b
then forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
else do
a -> STM ()
i a
a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
readCheck :: TVar Bool -> STM a -> STM (Maybe a)
readCheck :: forall a. TVar Bool -> STM a -> STM (Maybe a)
readCheck TVar Bool
sealed STM a
o =
(forall a. a -> Maybe a
Just forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STM a
o)
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> ( do
Bool
b <- forall a. TVar a -> STM a
readTVar TVar Bool
sealed
Bool -> STM ()
check Bool
b
forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Maybe a
Nothing
)
toBoxSTM ::
Queue a ->
STM (Box STM a a, STM ())
toBoxSTM :: forall a. Queue a -> STM (Box STM a a, STM ())
toBoxSTM Queue a
q = do
(a -> STM ()
i, STM a
o) <- forall a. Queue a -> STM (a -> STM (), STM a)
ends Queue a
q
TVar Bool
sealed <- forall a. a -> STM (TVar a)
newTVar Bool
False
let seal :: STM ()
seal = forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
sealed Bool
True
forall (f :: * -> *) a. Applicative f => a -> f a
pure
( forall (m :: * -> *) c e. Committer m c -> Emitter m e -> Box m c e
Box
(forall (m :: * -> *) a. (a -> m Bool) -> Committer m a
Committer (forall a. TVar Bool -> (a -> STM ()) -> a -> STM Bool
writeCheck TVar Bool
sealed a -> STM ()
i))
(forall (m :: * -> *) a. m (Maybe a) -> Emitter m a
Emitter (forall a. TVar Bool -> STM a -> STM (Maybe a)
readCheck TVar Bool
sealed STM a
o)),
STM ()
seal
)
toBoxM ::
Queue a ->
IO (Box IO a a, IO ())
toBoxM :: forall a. Queue a -> IO (Box IO a a, IO ())
toBoxM Queue a
q = do
(Box STM a a
b, STM ()
s) <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. Queue a -> STM (Box STM a a, STM ())
toBoxSTM Queue a
q
forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a b. Box STM a b -> Box IO a b
liftB Box STM a a
b, forall a. STM a -> IO a
atomically STM ()
s)
concurrentlyLeft :: IO a -> IO b -> IO a
concurrentlyLeft :: forall a b. IO a -> IO b -> IO a
concurrentlyLeft IO a
left IO b
right =
forall a b. IO a -> (Async a -> IO b) -> IO b
withAsync IO a
left forall a b. (a -> b) -> a -> b
$ \Async a
a ->
forall a b. IO a -> (Async a -> IO b) -> IO b
withAsync IO b
right forall a b. (a -> b) -> a -> b
$ \Async b
_ ->
forall a. Async a -> IO a
wait Async a
a
concurrentlyRight :: IO a -> IO b -> IO b
concurrentlyRight :: forall a b. IO a -> IO b -> IO b
concurrentlyRight IO a
left IO b
right =
forall a b. IO a -> (Async a -> IO b) -> IO b
withAsync IO a
left forall a b. (a -> b) -> a -> b
$ \Async a
_ ->
forall a b. IO a -> (Async a -> IO b) -> IO b
withAsync IO b
right forall a b. (a -> b) -> a -> b
$ \Async b
b ->
forall a. Async a -> IO a
wait Async b
b
withQL ::
Queue a ->
(Queue a -> IO (Box IO a a, IO ())) ->
(Committer IO a -> IO l) ->
(Emitter IO a -> IO r) ->
IO l
withQL :: forall a l r.
Queue a
-> (Queue a -> IO (Box IO a a, IO ()))
-> (Committer IO a -> IO l)
-> (Emitter IO a -> IO r)
-> IO l
withQL Queue a
q Queue a -> IO (Box IO a a, IO ())
spawner Committer IO a -> IO l
cio Emitter IO a -> IO r
eio =
forall (m :: * -> *) a c b.
MonadMask m =>
m a -> (a -> m c) -> (a -> m b) -> m b
bracket
(Queue a -> IO (Box IO a a, IO ())
spawner Queue a
q)
forall a b. (a, b) -> b
snd
( \(Box IO a a
box, IO ()
seal) ->
forall a b. IO a -> IO b -> IO a
concurrentlyLeft
(Committer IO a -> IO l
cio (forall (m :: * -> *) c e. Box m c e -> Committer m c
committer Box IO a a
box) forall (m :: * -> *) a b. MonadMask m => m a -> m b -> m a
`finally` IO ()
seal)
(Emitter IO a -> IO r
eio (forall (m :: * -> *) c e. Box m c e -> Emitter m e
emitter Box IO a a
box) forall (m :: * -> *) a b. MonadMask m => m a -> m b -> m a
`finally` IO ()
seal)
)
withQR ::
Queue a ->
(Queue a -> IO (Box IO a a, IO ())) ->
(Committer IO a -> IO l) ->
(Emitter IO a -> IO r) ->
IO r
withQR :: forall a l r.
Queue a
-> (Queue a -> IO (Box IO a a, IO ()))
-> (Committer IO a -> IO l)
-> (Emitter IO a -> IO r)
-> IO r
withQR Queue a
q Queue a -> IO (Box IO a a, IO ())
spawner Committer IO a -> IO l
cio Emitter IO a -> IO r
eio =
forall (m :: * -> *) a c b.
MonadMask m =>
m a -> (a -> m c) -> (a -> m b) -> m b
bracket
(Queue a -> IO (Box IO a a, IO ())
spawner Queue a
q)
forall a b. (a, b) -> b
snd
( \(Box IO a a
box, IO ()
seal) ->
forall a b. IO a -> IO b -> IO b
concurrentlyRight
(Committer IO a -> IO l
cio (forall (m :: * -> *) c e. Box m c e -> Committer m c
committer Box IO a a
box) forall (m :: * -> *) a b. MonadMask m => m a -> m b -> m a
`finally` IO ()
seal)
(Emitter IO a -> IO r
eio (forall (m :: * -> *) c e. Box m c e -> Emitter m e
emitter Box IO a a
box) forall (m :: * -> *) a b. MonadMask m => m a -> m b -> m a
`finally` IO ()
seal)
)
withQ ::
Queue a ->
(Queue a -> IO (Box IO a a, IO ())) ->
(Committer IO a -> IO l) ->
(Emitter IO a -> IO r) ->
IO (l, r)
withQ :: forall a l r.
Queue a
-> (Queue a -> IO (Box IO a a, IO ()))
-> (Committer IO a -> IO l)
-> (Emitter IO a -> IO r)
-> IO (l, r)
withQ Queue a
q Queue a -> IO (Box IO a a, IO ())
spawner Committer IO a -> IO l
cio Emitter IO a -> IO r
eio =
forall (m :: * -> *) a c b.
MonadMask m =>
m a -> (a -> m c) -> (a -> m b) -> m b
bracket
(Queue a -> IO (Box IO a a, IO ())
spawner Queue a
q)
forall a b. (a, b) -> b
snd
( \(Box IO a a
box, IO ()
seal) ->
forall a b. IO a -> IO b -> IO (a, b)
concurrently
(Committer IO a -> IO l
cio (forall (m :: * -> *) c e. Box m c e -> Committer m c
committer Box IO a a
box) forall (m :: * -> *) a b. MonadMask m => m a -> m b -> m a
`finally` IO ()
seal)
(Emitter IO a -> IO r
eio (forall (m :: * -> *) c e. Box m c e -> Emitter m e
emitter Box IO a a
box) forall (m :: * -> *) a b. MonadMask m => m a -> m b -> m a
`finally` IO ()
seal)
)
queueL ::
Queue a ->
(Committer IO a -> IO l) ->
(Emitter IO a -> IO r) ->
IO l
queueL :: forall a l r.
Queue a
-> (Committer IO a -> IO l) -> (Emitter IO a -> IO r) -> IO l
queueL Queue a
q Committer IO a -> IO l
cm Emitter IO a -> IO r
em = forall a l r.
Queue a
-> (Queue a -> IO (Box IO a a, IO ()))
-> (Committer IO a -> IO l)
-> (Emitter IO a -> IO r)
-> IO l
withQL Queue a
q forall a. Queue a -> IO (Box IO a a, IO ())
toBoxM Committer IO a -> IO l
cm Emitter IO a -> IO r
em
queueR ::
Queue a ->
(Committer IO a -> IO l) ->
(Emitter IO a -> IO r) ->
IO r
queueR :: forall a l r.
Queue a
-> (Committer IO a -> IO l) -> (Emitter IO a -> IO r) -> IO r
queueR Queue a
q Committer IO a -> IO l
cm Emitter IO a -> IO r
em = forall a l r.
Queue a
-> (Queue a -> IO (Box IO a a, IO ()))
-> (Committer IO a -> IO l)
-> (Emitter IO a -> IO r)
-> IO r
withQR Queue a
q forall a. Queue a -> IO (Box IO a a, IO ())
toBoxM Committer IO a -> IO l
cm Emitter IO a -> IO r
em
queue ::
Queue a ->
(Committer IO a -> IO l) ->
(Emitter IO a -> IO r) ->
IO (l, r)
queue :: forall a l r.
Queue a
-> (Committer IO a -> IO l) -> (Emitter IO a -> IO r) -> IO (l, r)
queue Queue a
q Committer IO a -> IO l
cm Emitter IO a -> IO r
em = forall a l r.
Queue a
-> (Queue a -> IO (Box IO a a, IO ()))
-> (Committer IO a -> IO l)
-> (Emitter IO a -> IO r)
-> IO (l, r)
withQ Queue a
q forall a. Queue a -> IO (Box IO a a, IO ())
toBoxM Committer IO a -> IO l
cm Emitter IO a -> IO r
em
liftB :: Box STM a b -> Box IO a b
liftB :: forall a b. Box STM a b -> Box IO a b
liftB (Box Committer STM a
c Emitter STM b
e) = forall (m :: * -> *) c e. Committer m c -> Emitter m e -> Box m c e
Box (forall (h :: (* -> *) -> * -> *) (f :: * -> *) (g :: * -> *) a.
FFunctor h =>
(forall x. f x -> g x) -> h f a -> h g a
foist forall a. STM a -> IO a
atomically Committer STM a
c) (forall (h :: (* -> *) -> * -> *) (f :: * -> *) (g :: * -> *) a.
FFunctor h =>
(forall x. f x -> g x) -> h f a -> h g a
foist forall a. STM a -> IO a
atomically Emitter STM b
e)
fromAction :: (Box IO a b -> IO r) -> CoBox IO b a
fromAction :: forall a b r. (Box IO a b -> IO r) -> CoBox IO b a
fromAction Box IO a b -> IO r
baction = forall k (m :: k -> *) a.
(forall (b :: k). (a -> m b) -> m b) -> Codensity m a
Codensity forall a b. (a -> b) -> a -> b
$ forall a b r r'.
(Box IO a b -> IO r) -> (Box IO b a -> IO r') -> IO r'
fuseActions Box IO a b -> IO r
baction
fromActionWith :: Queue a -> Queue b -> (Box IO a b -> IO r) -> CoBox IO b a
fromActionWith :: forall a b r.
Queue a -> Queue b -> (Box IO a b -> IO r) -> CoBox IO b a
fromActionWith Queue a
qa Queue b
qb Box IO a b -> IO r
baction = forall k (m :: k -> *) a.
(forall (b :: k). (a -> m b) -> m b) -> Codensity m a
Codensity forall a b. (a -> b) -> a -> b
$ forall a b r r'.
Queue a
-> Queue b
-> (Box IO a b -> IO r)
-> (Box IO b a -> IO r')
-> IO r'
fuseActionsWith Queue a
qa Queue b
qb Box IO a b -> IO r
baction
fuseActions :: (Box IO a b -> IO r) -> (Box IO b a -> IO r') -> IO r'
fuseActions :: forall a b r r'.
(Box IO a b -> IO r) -> (Box IO b a -> IO r') -> IO r'
fuseActions Box IO a b -> IO r
abm Box IO b a -> IO r'
bam = do
(Box Committer IO a
ca Emitter IO a
ea, IO ()
_) <- forall a. Queue a -> IO (Box IO a a, IO ())
toBoxM forall a. Queue a
Unbounded
(Box Committer IO b
cb Emitter IO b
eb, IO ()
_) <- forall a. Queue a -> IO (Box IO a a, IO ())
toBoxM forall a. Queue a
Unbounded
forall a b. IO a -> IO b -> IO b
concurrentlyRight (Box IO a b -> IO r
abm (forall (m :: * -> *) c e. Committer m c -> Emitter m e -> Box m c e
Box Committer IO a
ca Emitter IO b
eb)) (Box IO b a -> IO r'
bam (forall (m :: * -> *) c e. Committer m c -> Emitter m e -> Box m c e
Box Committer IO b
cb Emitter IO a
ea))
fuseActionsWith :: Queue a -> Queue b -> (Box IO a b -> IO r) -> (Box IO b a -> IO r') -> IO r'
fuseActionsWith :: forall a b r r'.
Queue a
-> Queue b
-> (Box IO a b -> IO r)
-> (Box IO b a -> IO r')
-> IO r'
fuseActionsWith Queue a
qa Queue b
qb Box IO a b -> IO r
abm Box IO b a -> IO r'
bam = do
(Box Committer IO a
ca Emitter IO a
ea, IO ()
_) <- forall a. Queue a -> IO (Box IO a a, IO ())
toBoxM Queue a
qa
(Box Committer IO b
cb Emitter IO b
eb, IO ()
_) <- forall a. Queue a -> IO (Box IO a a, IO ())
toBoxM Queue b
qb
forall a b. IO a -> IO b -> IO b
concurrentlyRight (Box IO a b -> IO r
abm (forall (m :: * -> *) c e. Committer m c -> Emitter m e -> Box m c e
Box Committer IO a
ca Emitter IO b
eb)) (Box IO b a -> IO r'
bam (forall (m :: * -> *) c e. Committer m c -> Emitter m e -> Box m c e
Box Committer IO b
cb Emitter IO a
ea))
emitQ :: Queue a -> (Committer IO a -> IO r) -> CoEmitter IO a
emitQ :: forall a r. Queue a -> (Committer IO a -> IO r) -> CoEmitter IO a
emitQ Queue a
q Committer IO a -> IO r
cio = forall k (m :: k -> *) a.
(forall (b :: k). (a -> m b) -> m b) -> Codensity m a
Codensity forall a b. (a -> b) -> a -> b
$ \Emitter IO a -> IO b
eio -> forall a l r.
Queue a
-> (Committer IO a -> IO l) -> (Emitter IO a -> IO r) -> IO r
queueR Queue a
q Committer IO a -> IO r
cio Emitter IO a -> IO b
eio
commitQ :: Queue a -> (Emitter IO a -> IO r) -> CoCommitter IO a
commitQ :: forall a r. Queue a -> (Emitter IO a -> IO r) -> CoCommitter IO a
commitQ Queue a
q Emitter IO a -> IO r
eio = forall k (m :: k -> *) a.
(forall (b :: k). (a -> m b) -> m b) -> Codensity m a
Codensity forall a b. (a -> b) -> a -> b
$ \Committer IO a -> IO b
cio -> forall a l r.
Queue a
-> (Committer IO a -> IO l) -> (Emitter IO a -> IO r) -> IO l
queueL Queue a
q Committer IO a -> IO b
cio Emitter IO a -> IO r
eio