module Data.Conduit.Internal
(
Pipe (..)
, ConduitM (..)
, Source
, Producer
, Sink
, Consumer
, Conduit
, ResumableSource (..)
, await
, awaitE
, awaitForever
, yield
, yieldOr
, leftover
, bracketP
, addCleanup
, idP
, pipe
, pipeL
, connectResume
, runPipe
, injectLeftovers
, (>+>)
, (<+<)
, sourceToPipe
, sinkToPipe
, conduitToPipe
, toProducer
, toConsumer
, transPipe
, mapOutput
, mapOutputMaybe
, mapInput
, sourceList
, withUpstream
, unwrapResumable
) where
import Control.Applicative (Applicative (..))
import Control.Monad ((>=>), liftM, ap, when)
import Control.Monad.Trans.Class (MonadTrans (lift))
import Control.Monad.IO.Class (MonadIO (liftIO))
import Control.Monad.Base (MonadBase (liftBase))
import Data.Void (Void, absurd)
import Data.Monoid (Monoid (mappend, mempty))
import Control.Monad.Trans.Resource
import qualified GHC.Exts
import qualified Data.IORef as I
data Pipe l i o u m r =
HaveOutput (Pipe l i o u m r) (m ()) o
| NeedInput (i -> Pipe l i o u m r) (u -> Pipe l i o u m r)
| Done r
| PipeM (m (Pipe l i o u m r))
| Leftover (Pipe l i o u m r) l
instance Monad m => Functor (Pipe l i o u m) where
fmap = liftM
instance Monad m => Applicative (Pipe l i o u m) where
pure = return
(<*>) = ap
instance Monad m => Monad (Pipe l i o u m) where
return = Done
Done x >>= fp = fp x
HaveOutput p c o >>= fp = HaveOutput (p >>= fp) c o
NeedInput p c >>= fp = NeedInput (p >=> fp) (c >=> fp)
PipeM mp >>= fp = PipeM ((>>= fp) `liftM` mp)
Leftover p i >>= fp = Leftover (p >>= fp) i
instance MonadBase base m => MonadBase base (Pipe l i o u m) where
liftBase = lift . liftBase
instance MonadTrans (Pipe l i o u) where
lift mr = PipeM (Done `liftM` mr)
instance MonadIO m => MonadIO (Pipe l i o u m) where
liftIO = lift . liftIO
instance MonadThrow m => MonadThrow (Pipe l i o u m) where
monadThrow = lift . monadThrow
instance MonadActive m => MonadActive (Pipe l i o u m) where
monadActive = lift monadActive
instance Monad m => Monoid (Pipe l i o u m ()) where
mempty = return ()
mappend = (>>)
instance MonadResource m => MonadResource (Pipe l i o u m) where
liftResourceT = lift . liftResourceT
newtype ConduitM i o m r = ConduitM { unConduitM :: Pipe i i o () m r }
deriving (Functor, Applicative, Monad, MonadIO, MonadTrans, MonadThrow, MonadActive, MonadResource)
instance MonadBase base m => MonadBase base (ConduitM i o m) where
liftBase = lift . liftBase
instance Monad m => Monoid (ConduitM i o m ()) where
mempty = return ()
mappend = (>>)
type Source m o = ConduitM () o m ()
type Producer m o = forall i. ConduitM i o m ()
type Sink i m r = ConduitM i Void m r
type Consumer i m r = forall o. ConduitM i o m r
type Conduit i m o = ConduitM i o m ()
data ResumableSource m o = ResumableSource (Source m o) (m ())
await :: Pipe l i o u m (Maybe i)
await = NeedInput (Done . Just) (\_ -> Done Nothing)
awaitE :: Pipe l i o u m (Either u i)
awaitE = NeedInput (Done . Right) (Done . Left)
awaitForever :: Monad m => (i -> Pipe l i o r m r') -> Pipe l i o r m r
awaitForever inner =
self
where
self = awaitE >>= either return (\i -> inner i >> self)
yield :: Monad m
=> o
-> Pipe l i o u m ()
yield = HaveOutput (Done ()) (return ())
yieldOr :: Monad m
=> o
-> m ()
-> Pipe l i o u m ()
yieldOr o f = HaveOutput (Done ()) f o
leftover :: l -> Pipe l i o u m ()
leftover = Leftover (Done ())
bracketP :: MonadResource m
=> IO a
-> (a -> IO ())
-> (a -> Pipe l i o u m r)
-> Pipe l i o u m r
bracketP alloc free inside =
PipeM start
where
start = do
(key, seed) <- allocate alloc free
return $ addCleanup (const $ release key) (inside seed)
addCleanup :: Monad m
=> (Bool -> m ())
-> Pipe l i o u m r
-> Pipe l i o u m r
addCleanup cleanup (Done r) = PipeM (cleanup True >> return (Done r))
addCleanup cleanup (HaveOutput src close x) = HaveOutput
(addCleanup cleanup src)
(cleanup False >> close)
x
addCleanup cleanup (PipeM msrc) = PipeM (liftM (addCleanup cleanup) msrc)
addCleanup cleanup (NeedInput p c) = NeedInput
(addCleanup cleanup . p)
(addCleanup cleanup . c)
addCleanup cleanup (Leftover p i) = Leftover (addCleanup cleanup p) i
idP :: Monad m => Pipe l a a r m r
idP = NeedInput (HaveOutput idP (return ())) Done
pipe :: Monad m => Pipe l a b r0 m r1 -> Pipe Void b c r1 m r2 -> Pipe l a c r0 m r2
pipe =
pipe' (return ())
where
pipe' final left right =
case right of
Done r2 -> PipeM (final >> return (Done r2))
HaveOutput p c o -> HaveOutput (pipe' final left p) (c >> final) o
PipeM mp -> PipeM (liftM (pipe' final left) mp)
Leftover _ i -> absurd i
NeedInput rp rc -> upstream rp rc
where
upstream rp rc =
case left of
Done r1 -> pipe (Done r1) (rc r1)
HaveOutput left' final' o -> pipe' final' left' (rp o)
PipeM mp -> PipeM (liftM (\left' -> pipe' final left' right) mp)
Leftover left' i -> Leftover (pipe' final left' right) i
NeedInput left' lc -> NeedInput
(\a -> pipe' final (left' a) right)
(\r0 -> pipe' final (lc r0) right)
pipeL :: Monad m => Pipe l a b r0 m r1 -> Pipe b b c r1 m r2 -> Pipe l a c r0 m r2
pipeL =
pipe' (return ())
where
pipe' :: Monad m => m () -> Pipe l a b r0 m r1 -> Pipe b b c r1 m r2 -> Pipe l a c r0 m r2
pipe' final left right =
case right of
Done r2 -> PipeM (final >> return (Done r2))
HaveOutput p c o -> HaveOutput (pipe' final left p) (c >> final) o
PipeM mp -> PipeM (liftM (pipe' final left) mp)
Leftover right' i -> pipe' final (HaveOutput left final i) right'
NeedInput rp rc ->
case left of
Done r1 -> pipe' (return ()) (Done r1) (rc r1)
HaveOutput left' final' o -> pipe' final' left' (rp o)
PipeM mp -> PipeM (liftM (\left' -> pipe' final left' right) mp)
NeedInput left' lc -> NeedInput
(\a -> pipe' final (left' a) right)
(\r0 -> pipe' final (lc r0) right)
Leftover left' i -> Leftover (pipe' final left' right) i
connectResume :: Monad m
=> ResumableSource m o
-> Sink o m r
-> m (ResumableSource m o, r)
connectResume (ResumableSource (ConduitM left0) leftFinal0) =
go leftFinal0 left0 . unConduitM
where
go leftFinal left right =
case right of
Done r2 -> return (ResumableSource (ConduitM left) leftFinal, r2)
PipeM mp -> mp >>= go leftFinal left
HaveOutput _ _ o -> absurd o
Leftover p i -> go leftFinal (HaveOutput left leftFinal i) p
NeedInput rp rc ->
case left of
Leftover p () -> go leftFinal p right
HaveOutput left' leftFinal' o -> go leftFinal' left' (rp o)
NeedInput _ lc -> go leftFinal (lc ()) right
Done () -> go (return ()) (Done ()) (rc ())
PipeM mp -> mp >>= \left' -> go leftFinal left' right
runPipe :: Monad m => Pipe Void () Void () m r -> m r
runPipe (HaveOutput _ _ o) = absurd o
runPipe (NeedInput _ c) = runPipe (c ())
runPipe (Done r) = return r
runPipe (PipeM mp) = mp >>= runPipe
runPipe (Leftover _ i) = absurd i
injectLeftovers :: Monad m => Pipe i i o u m r -> Pipe l i o u m r
injectLeftovers =
go []
where
go _ (Done r) = Done r
go ls (HaveOutput p c o) = HaveOutput (go ls p) c o
go ls (PipeM mp) = PipeM (liftM (go ls) mp)
go ls (Leftover p l) = go (l:ls) p
go (l:ls) (NeedInput p _) = go ls $ p l
go [] (NeedInput p c) = NeedInput (go [] . p) (go [] . c)
transPipe :: Monad m => (forall a. m a -> n a) -> Pipe l i o u m r -> Pipe l i o u n r
transPipe f (HaveOutput p c o) = HaveOutput (transPipe f p) (f c) o
transPipe f (NeedInput p c) = NeedInput (transPipe f . p) (transPipe f . c)
transPipe _ (Done r) = Done r
transPipe f (PipeM mp) =
PipeM (f $ liftM (transPipe f) $ collapse mp)
where
collapse mpipe = do
pipe' <- mpipe
case pipe' of
PipeM mpipe' -> collapse mpipe'
_ -> return pipe'
transPipe f (Leftover p i) = Leftover (transPipe f p) i
mapOutput :: Monad m => (o1 -> o2) -> Pipe l i o1 u m r -> Pipe l i o2 u m r
mapOutput f (HaveOutput p c o) = HaveOutput (mapOutput f p) c (f o)
mapOutput f (NeedInput p c) = NeedInput (mapOutput f . p) (mapOutput f . c)
mapOutput _ (Done r) = Done r
mapOutput f (PipeM mp) = PipeM (liftM (mapOutput f) mp)
mapOutput f (Leftover p i) = Leftover (mapOutput f p) i
mapOutputMaybe :: Monad m => (o1 -> Maybe o2) -> Pipe l i o1 u m r -> Pipe l i o2 u m r
mapOutputMaybe f (HaveOutput p c o) = maybe id (\o' p' -> HaveOutput p' c o') (f o) (mapOutputMaybe f p)
mapOutputMaybe f (NeedInput p c) = NeedInput (mapOutputMaybe f . p) (mapOutputMaybe f . c)
mapOutputMaybe _ (Done r) = Done r
mapOutputMaybe f (PipeM mp) = PipeM (liftM (mapOutputMaybe f) mp)
mapOutputMaybe f (Leftover p i) = Leftover (mapOutputMaybe f p) i
mapInput :: Monad m
=> (i1 -> i2)
-> (l2 -> Maybe l1)
-> Pipe l2 i2 o u m r
-> Pipe l1 i1 o u m r
mapInput f f' (HaveOutput p c o) = HaveOutput (mapInput f f' p) c o
mapInput f f' (NeedInput p c) = NeedInput (mapInput f f' . p . f) (mapInput f f' . c)
mapInput _ _ (Done r) = Done r
mapInput f f' (PipeM mp) = PipeM (liftM (mapInput f f') mp)
mapInput f f' (Leftover p i) = maybe id (flip Leftover) (f' i) $ mapInput f f' p
sourceList :: Monad m => [a] -> Pipe l i a u m ()
sourceList =
go
where
go [] = Done ()
go (o:os) = HaveOutput (go os) (return ()) o
build :: Monad m => (forall b. (o -> b -> b) -> b -> b) -> Pipe l i o u m ()
build g = g (\o p -> HaveOutput p (return ()) o) (return ())
sourceToPipe :: Monad m => Source m o -> Pipe l i o u m ()
sourceToPipe =
go . unConduitM
where
go (Done ()) = Done ()
go (PipeM mp) = PipeM (liftM go mp)
go (NeedInput _ c) = go $ c ()
go (HaveOutput p c o) = HaveOutput (go p) c o
go (Leftover p ()) = go p
sinkToPipe :: Monad m => Sink i m r -> Pipe l i o u m r
sinkToPipe =
go . injectLeftovers . unConduitM
where
go (Done r) = Done r
go (PipeM mp) = PipeM (liftM go mp)
go (NeedInput p c) = NeedInput (go . p) (const $ go $ c ())
go (HaveOutput _ _ o) = absurd o
go (Leftover _ l) = absurd l
conduitToPipe :: Monad m => Conduit i m o -> Pipe l i o u m ()
conduitToPipe =
go . injectLeftovers . unConduitM
where
go (Done ()) = Done ()
go (PipeM mp) = PipeM (liftM go mp)
go (NeedInput p c) = NeedInput (go . p) (const $ go $ c ())
go (HaveOutput p c o) = HaveOutput (go p) c o
go (Leftover _ l) = absurd l
withUpstream :: Monad m
=> Pipe l i o u m r
-> Pipe l i o u m (u, r)
withUpstream down =
down >>= go
where
go r =
loop
where
loop = awaitE >>= either (\u -> return (u, r)) (\_ -> loop)
unwrapResumable :: MonadIO m => ResumableSource m o -> m (Source m o, m ())
unwrapResumable (ResumableSource src final) = do
ref <- liftIO $ I.newIORef True
let final' = do
x <- liftIO $ I.readIORef ref
when x final
return (liftIO (I.writeIORef ref False) >> src, final')
infixr 9 <+<
infixl 9 >+>
(>+>) :: Monad m => Pipe l a b r0 m r1 -> Pipe Void b c r1 m r2 -> Pipe l a c r0 m r2
(>+>) = pipe
(<+<) :: Monad m => Pipe Void b c r1 m r2 -> Pipe l a b r0 m r1 -> Pipe l a c r0 m r2
(<+<) = flip pipe
toProducer :: Monad m => Source m a -> Producer m a
toProducer =
ConduitM . go . unConduitM
where
go (HaveOutput p c o) = HaveOutput (go p) c o
go (NeedInput _ c) = go (c ())
go (Done r) = Done r
go (PipeM mp) = PipeM (liftM go mp)
go (Leftover p ()) = go p
toConsumer :: Monad m => Sink a m b -> Consumer a m b
toConsumer =
ConduitM . go . unConduitM
where
go (HaveOutput _ _ o) = absurd o
go (NeedInput p c) = NeedInput (go . p) (go . c)
go (Done r) = Done r
go (PipeM mp) = PipeM (liftM go mp)
go (Leftover p l) = Leftover (go p) l