module Data.Conduit
(
module Data.Conduit.Types.Source
, BufferedSource
, bufferSource
, unbufferSource
, bsourceClose
, IsSource
, module Data.Conduit.Types.Sink
, module Data.Conduit.Types.Conduit
,
($$)
, ($=)
, (=$)
, (=$=)
, module Data.Conduit.Util.Source
, module Data.Conduit.Util.Sink
, module Data.Conduit.Util.Conduit
, Flush (..)
, ResourceT
, Resource (..)
, ResourceIO
, ResourceUnsafeIO
, runResourceT
, ResourceThrow (..)
) where
import Control.Applicative ((<$>))
import Control.Monad (liftM)
import Control.Monad.Trans.Resource
import Data.Conduit.Types.Source
import Data.Conduit.Util.Source
import Data.Conduit.Types.Sink
import Data.Conduit.Util.Sink
import Data.Conduit.Types.Conduit
import Data.Conduit.Util.Conduit
infixr 0 $$
($$) :: (IsSource src, Resource m) => src m a -> Sink a m b -> ResourceT m b
($$) = connect
class IsSource src where
connect :: Resource m => src m a -> Sink a m b -> ResourceT m b
fuseLeft :: Resource m => src m a -> Conduit a m b -> Source m b
instance IsSource Source where
connect = normalConnect
fuseLeft = normalFuseLeft
instance IsSource BufferedSource where
connect = bufferedConnect
fuseLeft = bufferedFuseLeft
normalConnect :: Resource m => Source m a -> Sink a m b -> ResourceT m b
normalConnect _ (SinkNoData output) = return output
normalConnect src0 (SinkLift msink) = msink >>= normalConnect src0
normalConnect src0 (SinkData push0 close0) =
connect' src0 push0 close0
where
connect' src push close = do
res <- sourcePull src
case res of
Closed -> do
res' <- close
return res'
Open src' a -> do
mres <- push a
case mres of
Done _leftover res' -> do
sourceClose src'
return res'
Processing push' close' -> connect' src' push' close'
data FuseLeftState src conduit output =
FLClosed [output]
| FLOpen src conduit [output]
infixl 1 $=
($=) :: (IsSource src, Resource m)
=> src m a
-> Conduit a m b
-> Source m b
($=) = fuseLeft
normalFuseLeft :: Resource m => Source m a -> Conduit a m b -> Source m b
normalFuseLeft src0 conduit0 = Source
{ sourcePull = pull $ FLOpen src0 conduit0 []
, sourceClose = return ()
}
where
mkSrc state = Source (pull state) (close state)
pull state' =
case state' of
FLClosed [] -> return Closed
FLClosed (x:xs) -> return $ Open
(mkSrc (FLClosed xs))
x
FLOpen src conduit (x:xs) -> return $ Open
(mkSrc (FLOpen src conduit xs))
x
FLOpen src conduit [] -> do
mres <- sourcePull src
case mres of
Closed -> do
res <- conduitClose conduit
case res of
[] -> return Closed
x:xs -> return $ Open
(mkSrc (FLClosed xs))
x
Open src'' input -> do
res' <- conduitPush conduit input
case res' of
Producing conduit' [] ->
pull $ FLOpen src'' conduit' []
Producing conduit' (x:xs) -> return $ Open
(mkSrc (FLOpen src'' conduit' xs))
x
Finished _leftover output -> do
sourceClose src''
case output of
[] -> return Closed
x:xs -> return $ Open
(mkSrc (FLClosed xs))
x
close state = do
case state of
FLClosed _ -> return ()
FLOpen src' (Conduit _ closeC) _ -> do
_ignored <- closeC
sourceClose src'
infixr 0 =$
(=$) :: Resource m => Conduit a m b -> Sink b m c -> Sink a m c
_ =$ SinkNoData res = SinkNoData res
conduit =$ SinkLift msink = SinkLift (liftM (conduit =$) msink)
conduitOrig =$ SinkData pushI0 closeI0 = SinkData
{ sinkPush = push pushI0 closeI0 conduitOrig
, sinkClose = close pushI0 closeI0 conduitOrig
}
where
push pushI closeI conduit0 cinput = do
res <- conduitPush conduit0 cinput
case res of
Producing conduit' sinput -> do
let loop p c [] = return (Processing (push p c conduit') (close p c conduit'))
loop p _ (i:is) = do
mres <- p i
case mres of
Processing p' c' -> loop p' c' is
Done _sleftover res' -> do
_ <- conduitClose conduit'
return $ Done Nothing res'
loop pushI closeI sinput
Finished cleftover sinput -> do
let loop _ c [] = c
loop p _ (i:is) = do
mres <- p i
case mres of
Processing p' c' -> loop p' c' is
Done _sleftover res' -> return res'
res' <- loop pushI closeI sinput
return $ Done cleftover res'
close pushI closeI conduit = do
sinput <- conduitClose conduit
let loop _ c [] = c
loop p _ (i:is) = do
mres <- p i
case mres of
Processing p' c' -> loop p' c' is
Done _sleftover res' -> return res'
loop pushI closeI sinput
infixr 0 =$=
(=$=) :: Resource m => Conduit a m b -> Conduit b m c -> Conduit a m c
outerOrig =$= innerOrig = Conduit
(pushF outerOrig innerOrig)
(closeF outerOrig innerOrig)
where
pushF outer0 inner0 inputO = do
res <- conduitPush outer0 inputO
case res of
Producing outer inputI -> do
let loop inner [] front = return $ Producing
(Conduit (pushF outer inner) (closeF outer inner))
(front [])
loop inner (i:is) front = do
resI <- conduitPush inner i
case resI of
Producing conduit c -> loop
conduit
is
(front . (c ++))
Finished _leftover c -> do
_ <- conduitClose outer
return $ Finished Nothing $ front c
loop inner0 inputI id
Finished leftoverO inputI -> do
c <- conduitPushClose inner0 inputI
return $ Finished leftoverO c
closeF outer inner = do
b <- conduitClose outer
c <- conduitPushClose inner b
return c
conduitPushClose :: Monad m => Conduit a m b -> [a] -> ResourceT m [b]
conduitPushClose c [] = conduitClose c
conduitPushClose c (input:rest) = do
res <- conduitPush c input
case res of
Finished _ b -> return b
Producing conduit b -> do
b' <- conduitPushClose conduit rest
return $ b ++ b'
data BufferedSource m a = BufferedSource (Ref (Base m) (BSState m a))
data BSState m a =
ClosedEmpty
| OpenEmpty (Source m a)
| ClosedFull a
| OpenFull (Source m a) a
bufferSource :: Resource m => Source m a -> ResourceT m (BufferedSource m a)
bufferSource src = BufferedSource <$> newRef (OpenEmpty src)
unbufferSource :: Resource m
=> BufferedSource m a
-> Source m a
unbufferSource (BufferedSource bs) = Source
{ sourcePull = msrc >>= sourcePull
, sourceClose = msrc >>= sourceClose
}
where
msrc = do
buf <- readRef bs
case buf of
OpenEmpty src -> return src
OpenFull src a -> return Source
{ sourcePull = return $ Open src a
, sourceClose = sourceClose src
}
ClosedEmpty -> return Source
{ sourcePull = return Closed
, sourceClose = return ()
}
ClosedFull a -> return Source
{ sourcePull = return $ Open
(Source (return Closed) (return ()))
a
, sourceClose = return ()
}
bufferedConnect :: Resource m => BufferedSource m a -> Sink a m b -> ResourceT m b
bufferedConnect _ (SinkNoData output) = return output
bufferedConnect bsrc (SinkLift msink) = msink >>= bufferedConnect bsrc
bufferedConnect (BufferedSource bs) (SinkData push0 close0) = do
bsState <- readRef bs
case bsState of
ClosedEmpty -> close0
OpenEmpty src -> connect' src push0 close0
ClosedFull a -> do
res <- push0 a
case res of
Done mleftover res' -> do
writeRef bs $ maybe ClosedEmpty ClosedFull mleftover
return res'
Processing _ close' -> do
writeRef bs ClosedEmpty
close'
OpenFull src a -> push0 a >>= onRes src
where
connect' src push close = do
res <- sourcePull src
case res of
Closed -> do
writeRef bs ClosedEmpty
res' <- close
return res'
Open src' a -> push a >>= onRes src'
onRes src (Done mleftover res) = do
writeRef bs $ maybe (OpenEmpty src) (OpenFull src) mleftover
return res
onRes src (Processing push close) = connect' src push close
bufferedFuseLeft
:: Resource m
=> BufferedSource m a
-> Conduit a m b
-> Source m b
bufferedFuseLeft bsrc conduit0 = Source
{ sourcePull = pullF $ FLOpen () conduit0 []
, sourceClose = return ()
}
where
mkSrc state = Source
(pullF state)
(closeF state)
pullF state' =
case state' of
FLClosed [] -> return Closed
FLClosed (x:xs) -> return $ Open
(mkSrc (FLClosed xs))
x
FLOpen () conduit (x:xs) -> return $ Open
(mkSrc (FLOpen () conduit xs))
x
FLOpen () conduit [] -> do
mres <- bsourcePull bsrc
case mres of
Nothing -> do
res <- conduitClose conduit
case res of
[] -> return Closed
x:xs -> return $ Open
(mkSrc (FLClosed xs))
x
Just input -> do
res' <- conduitPush conduit input
case res' of
Producing conduit' [] ->
pullF (FLOpen () conduit' [])
Producing conduit' (x:xs) -> return $ Open
(mkSrc (FLOpen () conduit' xs))
x
Finished leftover output -> do
bsourceUnpull bsrc leftover
case output of
[] -> return Closed
x:xs -> return $ Open
(mkSrc (FLClosed xs))
x
closeF state = do
case state of
FLClosed _ -> return ()
FLOpen () (Conduit _ close) _ -> do
_ignored <- close
return ()
bsourcePull :: Resource m => BufferedSource m a -> ResourceT m (Maybe a)
bsourcePull (BufferedSource bs) = do
buf <- readRef bs
case buf of
OpenEmpty src -> do
res <- sourcePull src
case res of
Open src' a -> do
writeRef bs $ OpenEmpty src'
return $ Just a
Closed -> writeRef bs ClosedEmpty >> return Nothing
ClosedEmpty -> return Nothing
OpenFull src a -> do
writeRef bs (OpenEmpty src)
return $ Just a
ClosedFull a -> do
writeRef bs ClosedEmpty
return $ Just a
bsourceUnpull :: Resource m => BufferedSource m a -> Maybe a -> ResourceT m ()
bsourceUnpull _ Nothing = return ()
bsourceUnpull (BufferedSource ref) (Just a) = do
buf <- readRef ref
case buf of
OpenEmpty src -> writeRef ref (OpenFull src a)
ClosedEmpty -> writeRef ref (ClosedFull a)
_ -> error $ "Invariant violated: bsourceUnpull called on full data"
bsourceClose :: Resource m => BufferedSource m a -> ResourceT m ()
bsourceClose (BufferedSource ref) = do
buf <- readRef ref
case buf of
OpenEmpty src -> sourceClose src
OpenFull src _ -> sourceClose src
ClosedEmpty -> return ()
ClosedFull _ -> return ()
data Flush a = Chunk a | Flush
deriving (Show, Eq, Ord)
instance Functor Flush where
fmap _ Flush = Flush
fmap f (Chunk a) = Chunk (f a)