{-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE NoImplicitPrelude #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TypeFamilies #-} {-# OPTIONS_GHC -Wall #-} {-# OPTIONS_GHC -fno-warn-type-defaults #-} {-# LANGUAGE OverloadedStrings #-} -- | queues -- Follows [pipes-concurrency](https://hackage.haskell.org/package/pipes-concurrency) -- module Box.Queue ( Queue(..) , queue , queueC , queueE , queueCM , queueEM , queueCLog , queueELog , waitCancel , toBoxLog , ends ) where import Box.Box import Box.Committer import Box.Emitter -- import GHC.Conc import Protolude hiding (STM, check, wait, cancel, atomically, withAsync, concurrently) import Control.Concurrent.Classy.STM as C import Control.Monad.Conc.Class as C import Control.Concurrent.Classy.Async as C import Control.Monad.Catch as C -- | 'Queue' specifies how messages are queued data Queue a = Unbounded | Bounded Int | Single | Latest a | Newest Int | New -- | create a queue, returning the ends ends :: MonadSTM stm => Queue a -> stm (a -> stm (), stm a) ends qu = case qu of Bounded n -> do q <- newTBQueue n return (writeTBQueue q, readTBQueue q) Unbounded -> do q <- newTQueue return (writeTQueue q, readTQueue q) Single -> do m <- newEmptyTMVar return (putTMVar m, takeTMVar m) Latest a -> do t <- newTVar a return (writeTVar t, readTVar t) New -> do m <- newEmptyTMVar return (\x -> tryTakeTMVar m *> putTMVar m x, takeTMVar m) Newest n -> do q <- newTBQueue n let write x = writeTBQueue q x <|> (tryReadTBQueue q *> write x) return (write, readTBQueue q) -- | write to a queue, checking the seal writeCheck :: (MonadSTM stm) => TVar stm Bool -> (a -> stm ()) -> a -> stm Bool writeCheck sealed i a = do b <- readTVar sealed if b then pure False else do i a pure True -- | read from a queue, and retry if not sealed readCheck :: MonadSTM stm => TVar stm Bool -> stm a -> stm (Maybe a) readCheck sealed o = (Just <$> o) <|> (do b <- readTVar sealed C.check b pure Nothing) -- | turn a queue into a box (and a seal) toBox :: (MonadSTM stm) => Queue a -> stm (Box stm a a, stm ()) toBox q = do (i, o) <- ends q sealed <- newTVarN "sealed" False let seal = writeTVar sealed True pure (Box (Committer (writeCheck sealed i)) (Emitter (readCheck sealed o)), seal) -- | write to a queue, checking the seal writeCheckLog :: (Show a, MonadIO m, MonadConc m) => TVar (STM m) Bool -> (a -> (STM m) ()) -> a -> m Bool writeCheckLog sealed i a = do b <- atomically $ readTVar sealed if b then do putStrLn ("writeCheckLog sealed" :: Text) pure False else do putStrLn $ "writeCheckLog writing: " <> (show a:: Text) atomically $ i a pure True -- | read from a queue, and retry if not sealed readCheckLog :: (Show a, MonadIO m, MonadConc m) => TVar (STM m) Bool -> (STM m) a -> m (Maybe a) readCheckLog sealed o = do o' <- atomically o putStrLn $ "readCheckLog reading" <> (show o' :: Text) atomically (Just <$> o <|> do b <- readTVar sealed C.check b pure Nothing) toBoxM :: (MonadConc m) => Queue a -> m (Box m a a, m ()) toBoxM q = do (i, o) <- atomically $ ends q sealed <- atomically $ newTVarN "sealed" False let seal = atomically $ writeTVar sealed True pure (Box (Committer (atomically . writeCheck sealed i)) (Emitter (atomically $ readCheck sealed o)), seal) toBoxLog :: (Show a, MonadIO m, MonadConc m) => Queue a -> m (Box m a a, m ()) toBoxLog q = do (i, o) <- atomically $ ends q sealed <- atomically $ newTVarN "sealed" False initSeal <- atomically $ readTVar sealed putStrLn $ "toBoxLog: seal: " <> (show initSeal :: Text) let seal = atomically $ writeTVar sealed True pure (Box (Committer (writeCheckLog sealed i)) (Emitter (readCheckLog sealed o)), seal) -- | wait for the first action, and then cancel the second waitCancel :: (MonadConc m) => m b -> m a -> m b waitCancel a b = withAsync a $ \a' -> withAsync b $ \b' -> do a'' <- wait a' cancel b' pure a'' -- | connect a committer and emitter action via spawning a queue, and wait for both to complete. withQ :: (MonadConc m) => Queue a -> (Queue a -> (STM m) (Box (STM m) a a, (STM m) ())) -> (Committer (STM m) a -> m l) -> (Emitter (STM m) a -> m r) -> m (l, r) withQ q spawner cio eio = C.bracket (atomically $ spawner q) (\(_, seal) -> atomically seal) (\(box, seal) -> concurrently (cio (committer box) `C.finally` atomically seal) (eio (emitter box) `C.finally` atomically seal)) -- | connect a committer and emitter action via spawning a queue, and wait for both to complete. withQM :: (MonadConc m) => Queue a -> (Queue a -> m (Box m a a, m ())) -> (Committer m a -> m l) -> (Emitter m a -> m r) -> m (l, r) withQM q spawner cio eio = C.bracket (spawner q) (\(_, seal) -> seal) (\(box, seal) -> concurrently (cio (committer box) `C.finally` seal) (eio (emitter box) `C.finally` seal)) -- | connect a committer and emitter action via spawning a queue, and wait for both to complete. withQMLog :: (MonadConc m, MonadIO m) => Queue a -> (Queue a -> m (Box m a a, m ())) -> (Committer m a -> m l) -> (Emitter m a -> m r) -> m (l, r) withQMLog q spawner cio eio = C.bracket (spawner q) (\(_, seal) -> seal) (\(box, seal) -> concurrently (cio (committer box) `C.finally` (putStrLn ("committer sealed" :: Text) >> seal)) (eio (emitter box) `C.finally` (putStrLn ("emitter sealed" :: Text) >> seal))) -- | create an unbounded queue queue :: (MonadConc m) => (Committer (STM m) a -> m l) -> (Emitter (STM m) a -> m r) -> m (l, r) queue = withQ Unbounded toBox -- | create an unbounded queue, returning the emitter result queueE :: (MonadConc m) => (Committer (STM m) a -> m l) -> (Emitter (STM m) a -> m r) -> m r queueE cm em = snd <$> withQ Unbounded toBox cm em -- | create an unbounded queue, returning the committer result queueC :: (MonadConc m) => (Committer (STM m) a -> m l) -> (Emitter (STM m) a -> m r) -> m l queueC cm em = fst <$> withQ Unbounded toBox cm em -- | create an unbounded queue, returning the emitter result queueCM :: (MonadConc m) => (Committer m a -> m l) -> (Emitter m a -> m r) -> m l queueCM cm em = fst <$> withQM Unbounded toBoxM cm em -- | create an unbounded queue, returning the emitter result queueEM :: (MonadConc m) => (Committer m a -> m l) -> (Emitter m a -> m r) -> m r queueEM cm em = snd <$> withQM Unbounded toBoxM cm em -- | create an unbounded queue, returning the emitter result queueELog :: (Show a, MonadConc m, MonadIO m) => (Committer m a -> m l) -> (Emitter m a -> m r) -> m r queueELog cm em = snd <$> withQMLog Unbounded toBoxLog (\c -> putStrLn ("cm acted" :: Text) >> cm c) (\c -> putStrLn ("em acted" :: Text) >> em c) -- | create an unbounded queue, returning the committer result queueCLog :: (Show a, MonadConc m, MonadIO m) => (Committer m a -> m l) -> (Emitter m a -> m r) -> m l queueCLog cm em = fst <$> withQM Unbounded toBoxLog cm em