{-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TypeFamilies #-} {-# OPTIONS_GHC -Wall #-} {-# OPTIONS_GHC -fno-warn-type-defaults #-} -- | queues -- Follows [pipes-concurrency](https://hackage.haskell.org/package/pipes-concurrency) module Box.Queue ( Queue (..), queue, queueC, queueE, queueCM, queueEM, waitCancel, ends, withQ, withQE, withQC, toBox, concurrentlyLeft, concurrentlyRight, ) where import Box.Box import Box.Committer import Box.Emitter import Control.Applicative import Control.Concurrent.Classy.Async as C import Control.Concurrent.Classy.STM as C import Control.Monad.Catch as C import Control.Monad.Conc.Class as C -- | 'Queue' specifies how messages are queued data Queue a = Unbounded | Bounded Int | Single | Latest a | Newest Int | New -- | create a queue, returning the ends ends :: MonadSTM stm => Queue a -> stm (a -> stm (), stm a) ends qu = case qu of Bounded n -> do q <- newTBQueue n return (writeTBQueue q, readTBQueue q) Unbounded -> do q <- newTQueue return (writeTQueue q, readTQueue q) Single -> do m <- newEmptyTMVar return (putTMVar m, takeTMVar m) Latest a -> do t <- newTVar a return (writeTVar t, readTVar t) New -> do m <- newEmptyTMVar return (\x -> tryTakeTMVar m *> putTMVar m x, takeTMVar m) Newest n -> do q <- newTBQueue n let write x = writeTBQueue q x <|> (tryReadTBQueue q *> write x) return (write, readTBQueue q) -- | write to a queue, checking the seal writeCheck :: (MonadSTM stm) => TVar stm Bool -> (a -> stm ()) -> a -> stm Bool writeCheck sealed i a = do b <- readTVar sealed if b then pure False else do i a pure True -- | read from a queue, and retry if not sealed readCheck :: MonadSTM stm => TVar stm Bool -> stm a -> stm (Maybe a) readCheck sealed o = (Just <$> o) <|> ( do b <- readTVar sealed C.check b pure Nothing ) -- | turn a queue into a box (and a seal) toBox :: (MonadSTM stm) => Queue a -> stm (Box stm a a, stm ()) toBox q = do (i, o) <- ends q sealed <- newTVarN "sealed" False let seal = writeTVar sealed True pure ( Box (Committer (writeCheck sealed i)) (Emitter (readCheck sealed o)), seal ) toBoxM :: (MonadConc m) => Queue a -> m (Box m a a, m ()) toBoxM q = do (i, o) <- atomically $ ends q sealed <- atomically $ newTVarN "sealed" False let seal = atomically $ writeTVar sealed True pure ( Box (Committer (atomically . writeCheck sealed i)) (Emitter (atomically $ readCheck sealed o)), seal ) -- | wait for the first action, and then cancel the second waitCancel :: (MonadConc m) => m b -> m a -> m b waitCancel a b = withAsync a $ \a' -> withAsync b $ \b' -> do a'' <- wait a' cancel b' pure a'' -- | run two actions concurrently, but wait and return on the left result. concurrentlyLeft :: MonadConc m => m a -> m b -> m a concurrentlyLeft left right = withAsync left $ \a -> withAsync right $ \_ -> wait a -- | run two actions concurrently, but wait and return on the right result. concurrentlyRight :: MonadConc m => m a -> m b -> m b concurrentlyRight left right = withAsync left $ \_ -> withAsync right $ \b -> wait b -- | connect a committer and emitter action via spawning a queue, and wait for both to complete. withQ :: (MonadConc m) => Queue a -> (Queue a -> (STM m) (Box (STM m) a a, (STM m) ())) -> (Committer (STM m) a -> m l) -> (Emitter (STM m) a -> m r) -> m (l, r) withQ q spawner cio eio = C.bracket (atomically $ spawner q) (\(_, seal) -> atomically seal) ( \(box, seal) -> concurrently (cio (committer box) `C.finally` atomically seal) (eio (emitter box) `C.finally` atomically seal) ) -- | connect a committer and emitter action via spawning a queue, and wait for committer to complete. withQC :: (MonadConc m) => Queue a -> (Queue a -> (STM m) (Box (STM m) a a, (STM m) ())) -> (Committer (STM m) a -> m l) -> (Emitter (STM m) a -> m r) -> m l withQC q spawner cio eio = C.bracket (atomically $ spawner q) (\(_, seal) -> atomically seal) ( \(box, seal) -> concurrentlyLeft (cio (committer box) `C.finally` atomically seal) (eio (emitter box) `C.finally` atomically seal) ) -- | connect a committer and emitter action via spawning a queue, and wait for emitter to complete. withQE :: (MonadConc m) => Queue a -> (Queue a -> (STM m) (Box (STM m) a a, (STM m) ())) -> (Committer (STM m) a -> m l) -> (Emitter (STM m) a -> m r) -> m r withQE q spawner cio eio = C.bracket (atomically $ spawner q) (\(_, seal) -> atomically seal) ( \(box, seal) -> concurrentlyRight (cio (committer box) `C.finally` atomically seal) (eio (emitter box) `C.finally` atomically seal) ) -- | connect a committer and emitter action via spawning a queue, and wait for both to complete. withQM :: (MonadConc m) => Queue a -> (Queue a -> m (Box m a a, m ())) -> (Committer m a -> m l) -> (Emitter m a -> m r) -> m (l, r) withQM q spawner cio eio = C.bracket (spawner q) snd ( \(box, seal) -> concurrently (cio (committer box) `C.finally` seal) (eio (emitter box) `C.finally` seal) ) -- | create an unbounded queue queue :: (MonadConc m) => (Committer (STM m) a -> m l) -> (Emitter (STM m) a -> m r) -> m (l, r) queue = withQ Unbounded toBox -- | create an unbounded queue, returning the emitter result queueE :: (MonadConc m) => (Committer (STM m) a -> m l) -> (Emitter (STM m) a -> m r) -> m r queueE cm em = snd <$> withQ Unbounded toBox cm em -- | create an unbounded queue, returning the committer result queueC :: (MonadConc m) => (Committer (STM m) a -> m l) -> (Emitter (STM m) a -> m r) -> m l queueC cm em = fst <$> withQ Unbounded toBox cm em -- | create an unbounded queue, returning the emitter result queueCM :: (MonadConc m) => (Committer m a -> m l) -> (Emitter m a -> m r) -> m l queueCM cm em = fst <$> withQM Unbounded toBoxM cm em -- | create an unbounded queue, returning the emitter result queueEM :: (MonadConc m) => (Committer m a -> m l) -> (Emitter m a -> m r) -> m r queueEM cm em = snd <$> withQM Unbounded toBoxM cm em -- | -- -- The one-in-the-chamber problem -- -- This is the referential transparency refactoring I did to solve the one-in-the-chamber problem. An etc process wasn't closing down when it should, until the committer fired once more: -- -- -- etc () (Transducer $ \s -> s & S.takeWhile (/="q")) (Box <$> cStdout 2 <*> eStdin 2) -- -- On entering a 'q' in stdin, this code piece requires another input from stdin before it shuts down. -- > etc () (Transducer $ \s -> s & S.takeWhile (/="q")) (Box <$> cStdout 2 <*> eStdin 2) -- etc substitution -- > with (Box <$> cStdout 2 <*> eStdin 2) $ \(Box c e) -> (e & toStream & transduce (Transducer $ \s -> s & S.takeWhile (/="q")) & fromStream) c & flip execStateT () -- no state & transduction unwrapping -- > with (Box <$> cStdout 2 <*> eStdin 2) $ \(Box c e) -> (e & toStream & S.takeWhile (/="q") & fromStream) c -- subbing the IO's -- > with (Box <$> (eStdout 2 & commitPlug) <*> (cStdin 2 & emitPlug)) $ \(Box c e) -> (e & toStream & transduce (Transducer $ \s -> s & S.takeWhile (/="q")) & fromStream) c -- unplugging -- > with (Box <$> (Cont $ \cio -> queueC cio (eStdout 2)) <*> (Cont $ \eio -> queueE (cStdin 2) eio)) $ \(Box c e) -> (e & toStream & S.takeWhile (/="q") & fromStream) c -- fmapping the Box -- > Cont (\r_ -> (Cont $ \cio -> queueC cio (eStdout 2)) `with` \x -> r_ (Box x)) -- twisting the with simplifying the Cont -- > Cont (\r_ -> queueC (r_ . Box) (eStdout 2)) -- spaceship time! -- > Cont (\r_ -> (Cont (\e -> queueC (e . Box) (eStdout 2))) `with` \f -> (Cont $ \eio -> queueE (cStdin 2) eio) `with` \x -> r_ (f x)) -- flipping the withs -- > Cont (\r_ -> with (Cont (\e -> queueC (e . Box) (eStdout 2))) (\f -> with (Cont $ \eio -> queueE (cStdin 2) eio) (r_ . f))) -- swallowing the withs -- > with Cont (\r_ -> queueC ((\f -> queueE (cStdin 2) (r_ . f)) . Box) (eStdout 2)) -- subbing back in mainline -- > with (Cont (\r_ -> queueC ((\f -> queueE (cStdin 2) (r_ . f)) . Box) (eStdout 2))) (\(Box c e) -> (e & toStream & S.takeWhile (/="q") & fromStream) c) -- > queueC ((\f -> queueE (cStdin 2) ((\(Box c e) -> (e & toStream & S.takeWhile (/="q") & fromStream) c) . f)) . Box) (eStdout 2) -- subbing queues -- > fmap fst (withQ Unbounded toBox ((\f -> queueE (cStdin 2) ((\(Box c e) -> (e & toStream & S.takeWhile (/="q") & fromStream) c) . f)) . Box) (eStdout 2)) -- subbing stdin and stdout -- > fmap fst (withQ Unbounded toBox ((\f -> fmap snd (withQ Unbounded toBox (\c -> cStdin_ c *> cStdin_ c) ((\(Box c e) -> (e & toStream & S.takeWhile (/="q") & fromStream) c) . f))) . Box) (\e -> eStdout_ e *> eStdout_ e)) -- removes the second eStdout_ (still requires another stdin input before it closes up) -- fmap fst (withQ Unbounded toBox ((\f -> fmap snd (withQ Unbounded toBox (\c -> cStdin_ c *> cStdin_ c) ((\(Box c e) -> (e & toStream & S.takeWhile (/="q") & fromStream) c) . f))) . Box) eStdout_) -- remove surperfluous fsts and snds -- > withQ Unbounded toBox ((\f -> (withQ Unbounded toBox (\c -> cStdin_ c *> cStdin_ c) ((\(Box c e) -> (e & toStream & S.takeWhile (/="q") & fromStream) c) . f))) . Box) eStdout_ -- IO (((),()), ()) -- an intuitive unwrapping of the f -- > withQ Unbounded toBox (\c -> (withQ Unbounded toBox (\c' -> cStdin_ c' *> cStdin_ c') ((\e -> (e & toStream & S.takeWhile (/="q") & fromStream) c)))) eStdout_ {- And here was the problem is much easier to see. The withQ's were waiting on both sides of the queue. I replaced `snd <$> withQ` with `withQE` -} -- subbing withQE fixes! -- withQ Unbounded toBox (\c -> (withQE Unbounded toBox (\c' -> cStdin_ c' *> cStdin_ c') ((\e -> (fromStream . S.takeWhile (/="q") . toStream $ e) c)))) eStdout_