module Control.Pipeline (Pipeline, await, yield, finish, impact, (=*=), pipeline) where import "base" Control.Monad (Monad ((>>=))) import "base" Control.Applicative (Applicative (pure)) import "base" Data.Function (($), (.), flip) import "transformers" Control.Monad.Trans.Cont (ContT (..)) import "transformers" Control.Monad.Trans.Class (lift) newtype Producer i t r = Producer { produce :: Consumer i t r -> t r } newtype Consumer o t r = Consumer { consume :: o -> Producer o t r -> t r } newtype Pipe i o r t a = Pipe { pipe :: Producer i t r -> Consumer o t r -> t r } type Pipeline i o t a r = ContT r (Pipe i o r t) a pause :: (() -> Pipe i o r t a) -> Producer i t r -> Producer o t r pause next ik = Producer $ \ok -> (pipe $ next ()) ik ok suspend :: (i -> Pipe i o r t a) -> Consumer o t r -> Consumer i t r suspend next ok = Consumer $ \v ik -> pipe (next v) ik ok -- | Take incoming value from pipeline await :: Pipeline i o t i r await = ContT $ \k -> Pipe $ \ik ok -> produce ik (suspend k ok) -- | Give a value to the future consuming yield :: o -> Pipeline i o t () r yield v = ContT $ \k -> Pipe $ \ik ok -> consume ok v (pause k ik) -- | Pipeline that does nothing finish :: Monad t => Pipeline i o t () () finish = ContT $ \_ -> Pipe $ \_ _ -> pure () -- | Do some effectful computation within pipeline impact :: Monad t => t a -> Pipeline i o t a () impact e = ContT $ \next -> Pipe $ \ik ok -> e >>= \x -> pipe (next x) ik ok -- | Compose two pipelines into one (=*=) :: forall i e a o t . Monad t => Pipeline i e t () () -> Pipeline e o t () () -> Pipeline i o t a () p =*= q = ContT $ \k -> Pipe $ \ik ok -> pipe (runContT q end) (pause (\() -> runContT p end) ik) ok where end :: b -> Pipe c d () t () end _ = Pipe $ \ _ _ -> pure () -- | Run pipeline and get result pipeline :: Monad t => Pipeline i o t r r -> t r pipeline p = pipe (runContT p (\r -> Pipe $ \_ _ -> pure r)) i o where i :: Producer i t r i = Producer $ \o -> produce i o o :: Consumer o t r o = Consumer $ \v i -> consume o v i