module Control.Pipe.Common (
Pipe(..),
Producer,
Consumer,
Pipeline,
Void,
await,
yield,
masked,
pipe,
idP,
discard,
(>+>),
(<+<),
runPipe,
runPurePipe,
runPurePipe_,
BrokenPipe,
MaskState(..),
throwP,
catchP,
liftP,
) where
import Control.Applicative
import Control.Category
import Control.Exception (SomeException, Exception)
import qualified Control.Exception.Lifted as E
import Control.Monad
import Control.Monad.IO.Class
import Control.Monad.Trans.Class
import Control.Monad.Trans.Control
import Data.Maybe
import Data.Typeable
import Data.Void
import Prelude hiding (id, (.), catch)
data BrokenPipe = BrokenPipe
deriving (Show, Typeable)
instance Exception BrokenPipe
data MaskState
= Masked
| Unmasked
type Finalizer m = [m ()]
addFinalizer :: m () -> Finalizer m -> Finalizer m
addFinalizer m w = w ++ [m]
data Pipe a b m r
= Pure r (Finalizer m)
| Throw SomeException (Finalizer m)
| Await (a -> Pipe a b m r)
(SomeException -> Pipe a b m r)
| M MaskState (m (Pipe a b m r))
(SomeException -> Pipe a b m r)
| Yield b (Pipe a b m r) (Finalizer m)
type Producer b m = Pipe () b m
type Consumer a m = Pipe a Void m
type Pipeline m = Pipe () Void m
instance Monad m => Monad (Pipe a b m) where
return r = Pure r []
Pure r w >>= f = case f r of
Pure r' w' -> Pure r' (w ++ w')
Throw e w' -> Throw e (w ++ w')
p' -> foldr run p' w
where
run m p = M Masked (m >> return p) throwP
Throw e w >>= _ = Throw e w
Await k h >>= f = Await (k >=> f) (h >=> f)
M s m h >>= f = M s (m >>= \p -> return $ p >>= f) (h >=> f)
Yield x p w >>= f = Yield x (p >>= f) w
instance Monad m => Functor (Pipe a b m) where
fmap = liftM
instance Monad m => Applicative (Pipe a b m) where
pure = return
(<*>) = ap
throwP :: Monad m => SomeException -> Pipe a b m r
throwP e = Throw e []
catchP :: Monad m
=> Pipe a b m r
-> (SomeException -> Pipe a b m r)
-> Pipe a b m r
catchP (Pure r w) _ = Pure r w
catchP (Throw e w) h = case h e of
Pure r w' -> Pure r (w ++ w')
Throw e' w' -> Throw e' (w ++ w')
p' -> mapM_ masked w >> p'
catchP (Await k h) h' = Await (\a -> catchP (k a) h')
(\e -> catchP (h e) h')
catchP (M s m h) h' = M s (m >>= \p' -> return $ catchP p' h')
(\e -> catchP (h e) h')
catchP (Yield x p w) h' = Yield x (catchP p h') w'
where
w' = addFinalizer (fin $ h' bp) w
fin (M _ m _) = m >>= fin
fin _ = return ()
await :: Monad m => Pipe a b m a
await = Await return throwP
yield :: Monad m => b -> Pipe a b m ()
yield x = Yield x (return ()) []
liftP :: Monad m => MaskState -> m r -> Pipe a b m r
liftP s m = M s (liftM return m) throwP
instance MonadTrans (Pipe a b) where
lift = liftP Unmasked
instance MonadIO m => MonadIO (Pipe a b m) where
liftIO = lift . liftIO
masked :: Monad m => m r -> Pipe a b m r
masked = liftP Masked
pipe :: Monad m => (a -> b) -> Pipe a b m r
pipe f = forever $ await >>= yield . f
idP :: Monad m => Pipe a a m r
idP = pipe id
discard :: Monad m => Pipe a b m r
discard = forever await
protect :: Monad m => Finalizer m -> Pipe a b m r -> Pipe a b m r
protect w = go
where
go (Pure r w') = Pure r (w ++ w')
go (Throw e w') = Throw e (w ++ w')
go (Await k h) = Await k h
go (M s m h) = M s (liftM go m) (go . h)
go (Yield x p' w') = Yield x (go p') (w ++ w')
handleBP :: Monad m => r -> Pipe a b m r -> Pipe a b m r
handleBP r = go
where
go (Pure r' w) = Pure r' w
go (Await k h) = Await k h
go (M s m h) = M s (liftM go m) (go . h)
go (Yield x p' w) = Yield x (go p') w
go (Throw e w)
| isBrokenPipe e = Pure r w
| otherwise = Throw e w
bp :: SomeException
bp = E.toException BrokenPipe
isBrokenPipe :: SomeException -> Bool
isBrokenPipe e = isJust (E.fromException e :: Maybe BrokenPipe)
infixl 9 >+>
(>+>) :: Monad m => Pipe a b m r -> Pipe b c m r -> Pipe a c m r
p1 >+> p2 = case (p1, p2) of
(_, Yield x p2' w) -> Yield x (p1 >+> p2') w
(_, M s m h2) -> M s (m >>= \p2' -> return $ p1 >+> p2')
(\e -> p1 >+> h2 e)
(_, Pure r w) -> Pure r w
(_, Throw e w) -> Throw e w
(M s m h1, Await _ _) -> M s (m >>= \p1' -> return $ p1' >+> p2)
(\e -> h1 e >+> p2)
(Await k h1, Await _ _) -> Await (\a -> k a >+> p2)
(\e -> h1 e >+> p2)
(Pure r w, Await _ h2) -> p1 >+> handleBP r (protect w (h2 bp))
(Throw e w, Await _ h2) -> p1 >+> protect w (h2 e)
(Yield x p1' w, Await k _) -> p1' >+> protect w (k x)
infixr 9 <+<
(<+<) :: Monad m => Pipe b c m r -> Pipe a b m r -> Pipe a c m r
p2 <+< p1 = p1 >+> p2
runPipe :: MonadBaseControl IO m => Pipeline m r -> m r
runPipe p = E.mask $ \restore -> run restore p
where
fin = mapM_ $ \m -> E.catch m (\(_ :: SomeException) -> return ())
run restore = go
where
go (Pure r w) = fin w >> return r
go (Throw e w) = fin w >> E.throwIO e
go (Await k _) = go (k ())
go (Yield x _ _) = absurd x
go (M s m h) = try s m >>= \r -> case r of
Left e -> go $ h e
Right p' -> go p'
try s m = E.try $ case s of
Unmasked -> restore m
_ -> m
runPurePipe :: Monad m => Pipeline m r -> m (Either SomeException r)
runPurePipe (Pure r w) = sequence_ w >> return (Right r)
runPurePipe (Throw e w) = sequence_ w >> return (Left e)
runPurePipe (Await k _) = runPurePipe $ k ()
runPurePipe (Yield x _ _) = absurd x
runPurePipe (M _ m _) = m >>= runPurePipe
runPurePipe_ :: Monad m => Pipeline m r -> m r
runPurePipe_ = runPurePipe >=> either E.throw return