module Control.Pipe.Conduit (
sourcePipe,
conduitPipe,
conduitPipe_,
sinkPipe,
sinkPipe_
) where
import Control.Monad (void)
import Control.Monad.Trans
import Control.Monad.Trans.Resource
import Control.Pipe
import Control.Pipe.Combinators
import Control.Pipe.Exception
import Data.Conduit
conduitPipe :: Resource m => Conduit a m b -> Pipe a b (ResourceT m) (Maybe a)
conduitPipe (Conduit push close) = do
x <- tryAwait
case x of
Nothing -> lift close >>= mapM_ yield >> return Nothing
Just input -> do
result <- lift $ push input
case result of
Producing c' output -> mapM_ yield output >> conduitPipe c'
Finished input' output -> mapM_ yield output >> return input'
conduitPipe_ :: Resource m => Conduit a m b -> Pipe a b (ResourceT m) ()
conduitPipe_ = void . conduitPipe
sourcePipe :: Resource m => Source m a -> Pipe x a (ResourceT m) ()
sourcePipe (Source pull _) = do
result <- lift pull
case result of
Open s x -> yield x >> sourcePipe s
Closed -> return ()
sinkPipe :: Resource m => Sink a m b -> Pipe a x (ResourceT m) (Maybe a, b)
sinkPipe (SinkNoData out) = return (Nothing, out)
sinkPipe (SinkLift m) = lift m >>= sinkPipe
sinkPipe (SinkData p c) = go p c
where
go push close = do
mx <- tryAwait
case mx of
Nothing -> do
out <- lift close
return (Nothing, out)
Just x -> do
result <- lift $ push x
case result of
Processing push' close' -> go push' close'
Done input output -> return (input, output)
sinkPipe_ :: Resource m => Sink a m b -> Pipe a x (ResourceT m) ()
sinkPipe_ = void . sinkPipe