module Data.Conduit
(
module Data.Conduit.Types.Source
, BufferedSource
, bufferSource
, unbufferSource
, bsourceClose
, IsSource
, module Data.Conduit.Types.Sink
, module Data.Conduit.Types.Conduit
,
($$)
, ($=)
, (=$)
, (=$=)
, module Data.Conduit.Util.Source
, module Data.Conduit.Util.Sink
, module Data.Conduit.Util.Conduit
, ResourceT
, Resource (..)
, ResourceIO
, ResourceUnsafeIO
, runResourceT
, ResourceThrow (..)
) where
import Control.Monad.Trans.Resource
import Data.Conduit.Types.Source
import Data.Conduit.Util.Source
import Data.Conduit.Types.Sink
import Data.Conduit.Util.Sink
import Data.Conduit.Types.Conduit
import Data.Conduit.Util.Conduit
infixr 0 $$
($$) :: (IsSource src, Resource m) => src m a -> Sink a m b -> ResourceT m b
($$) = connect
class IsSource src where
connect :: Resource m => src m a -> Sink a m b -> ResourceT m b
fuseLeft :: Resource m => src m a -> Conduit a m b -> Source m b
instance IsSource Source where
connect = normalConnect
fuseLeft = normalFuseLeft
instance IsSource BufferedSource where
connect = bufferedConnect
fuseLeft = bufferedFuseLeft
normalConnect :: Resource m => Source m a -> Sink a m b -> ResourceT m b
normalConnect (Source msrc) (Sink msink) = do
sinkI <- msink
case sinkI of
SinkNoData output -> return output
SinkData push close -> do
src <- msrc
connect' src push close
where
connect' src push close =
loop
where
loop = do
res <- sourcePull src
case res of
Closed -> do
res' <- close
return res'
Open a -> do
mres <- push a
case mres of
Done _leftover res' -> do
sourceClose src
return res'
Processing -> loop
data FuseLeftState a = FLClosed [a] | FLOpen [a]
infixl 1 $=
($=) :: (IsSource src, Resource m)
=> src m a
-> Conduit a m b
-> Source m b
($=) = fuseLeft
normalFuseLeft :: Resource m => Source m a -> Conduit a m b -> Source m b
normalFuseLeft (Source msrc) (Conduit mc) = Source $ do
istate <- newRef $ FLOpen []
src <- msrc
c <- mc
return $ PreparedSource
(pull istate src c)
(close istate src c)
where
pull istate src c = do
state' <- readRef istate
case state' of
FLClosed [] -> return Closed
FLClosed (x:xs) -> do
writeRef istate $ FLClosed xs
return $ Open x
FLOpen (x:xs) -> do
writeRef istate $ FLOpen xs
return $ Open x
FLOpen [] -> do
mres <- sourcePull src
case mres of
Closed -> do
res <- conduitClose c
case res of
[] -> do
writeRef istate $ FLClosed []
return Closed
x:xs -> do
writeRef istate $ FLClosed xs
return $ Open x
Open input -> do
res' <- conduitPush c input
case res' of
Producing [] -> pull istate src c
Producing (x:xs) -> do
writeRef istate $ FLOpen xs
return $ Open x
Finished _leftover output -> do
sourceClose src
case output of
[] -> do
writeRef istate $ FLClosed []
return Closed
x:xs -> do
writeRef istate $ FLClosed xs
return $ Open x
close istate src c = do
state <- readRef istate
case state of
FLClosed _ -> return ()
FLOpen _ -> do
_ignored <- conduitClose c
sourceClose src
infixr 0 =$
(=$) :: Resource m => Conduit a m b -> Sink b m c -> Sink a m c
Conduit mc =$ Sink ms = Sink $ do
s <- ms
case s of
SinkData pushI closeI -> mc >>= go pushI closeI
SinkNoData mres -> return $ SinkNoData mres
where
go pushI closeI c = do
return SinkData
{ sinkPush = \cinput -> do
res <- conduitPush c cinput
case res of
Producing sinput -> do
let push [] = return Processing
push (i:is) = do
mres <- pushI i
case mres of
Processing -> push is
Done _sleftover res' -> do
_ <- conduitClose c
return $ Done Nothing res'
push sinput
Finished cleftover sinput -> do
let push [] = closeI
push (i:is) = do
mres <- pushI i
case mres of
Processing -> push is
Done _sleftover res' -> return res'
res' <- push sinput
return $ Done cleftover res'
, sinkClose = do
sinput <- conduitClose c
let push [] = closeI
push (i:is) = do
mres <- pushI i
case mres of
Processing -> push is
Done _sleftover res' -> return res'
push sinput
}
infixr 0 =$=
(=$=) :: Resource m => Conduit a m b -> Conduit b m c -> Conduit a m c
Conduit outerM =$= Conduit innerM = Conduit $ do
outer <- outerM
inner <- innerM
return PreparedConduit
{ conduitPush = \inputO -> do
res <- conduitPush outer inputO
case res of
Producing inputI -> do
let push [] front = return $ Producing $ front []
push (i:is) front = do
resI <- conduitPush inner i
case resI of
Producing c -> push is (front . (c ++))
Finished _leftover c -> do
_ <- conduitClose outer
return $ Finished Nothing $ front c
push inputI id
Finished leftoverO inputI -> do
c <- conduitPushClose inner inputI
return $ Finished leftoverO c
, conduitClose = do
b <- conduitClose outer
c <- conduitPushClose inner b
return c
}
conduitPushClose :: Monad m => PreparedConduit a m b -> [a] -> ResourceT m [b]
conduitPushClose c [] = conduitClose c
conduitPushClose c (input:rest) = do
res <- conduitPush c input
case res of
Finished _ b -> return b
Producing b -> do
b' <- conduitPushClose c rest
return $ b ++ b'
data BufferedSource m a = BufferedSource
{ bsSource :: PreparedSource m a
, bsBuffer :: Ref (Base m) (BSState a)
}
data BSState a = ClosedEmpty | OpenEmpty | ClosedFull a | OpenFull a
bufferSource :: Resource m => Source m a -> ResourceT m (BufferedSource m a)
bufferSource (Source msrc) = do
src <- msrc
buf <- newRef OpenEmpty
return $ BufferedSource src buf
unbufferSource :: Resource m
=> BufferedSource m a
-> Source m a
unbufferSource (BufferedSource src bufRef) = Source $ do
buf <- readRef bufRef
case buf of
OpenEmpty -> return src
OpenFull a -> do
isUsedRef <- newRef False
return PreparedSource
{ sourcePull = do
isUsed <- readRef isUsedRef
if isUsed
then sourcePull src
else do
writeRef isUsedRef True
return $ Open a
, sourceClose = sourceClose src
}
ClosedEmpty -> return PreparedSource
{ sourcePull = return Closed
, sourceClose = return ()
}
ClosedFull a -> do
isUsedRef <- newRef False
return PreparedSource
{ sourcePull = do
isUsed <- readRef isUsedRef
if isUsed
then return Closed
else do
writeRef isUsedRef True
return $ Open a
, sourceClose = sourceClose src
}
bufferedConnect :: Resource m => BufferedSource m a -> Sink a m b -> ResourceT m b
bufferedConnect bs (Sink msink) = do
sinkI <- msink
case sinkI of
SinkNoData output -> return output
SinkData push close -> do
bsState <- readRef $ bsBuffer bs
case bsState of
ClosedEmpty -> close
OpenEmpty -> connect' push close
ClosedFull a -> do
res <- push a
case res of
Done mleftover res' -> do
writeRef (bsBuffer bs) $ maybe ClosedEmpty ClosedFull mleftover
return res'
Processing -> do
writeRef (bsBuffer bs) ClosedEmpty
close
OpenFull a -> push a >>= onRes (connect' push close)
where
connect' push close =
loop
where
loop = do
res <- sourcePull $ bsSource bs
case res of
Closed -> do
writeRef (bsBuffer bs) ClosedEmpty
res' <- close
return res'
Open a -> push a >>= onRes loop
onRes _ (Done mleftover res) = do
writeRef (bsBuffer bs) (maybe OpenEmpty OpenFull mleftover)
return res
onRes loop Processing = loop
bufferedFuseLeft
:: Resource m
=> BufferedSource m a
-> Conduit a m b
-> Source m b
bufferedFuseLeft bsrc (Conduit mc) = Source $ do
istate <- newRef $ FLOpen []
c <- mc
return $ PreparedSource
(pull istate c)
(close istate c)
where
pull istate c = do
state' <- readRef istate
case state' of
FLClosed [] -> return Closed
FLClosed (x:xs) -> do
writeRef istate $ FLClosed xs
return $ Open x
FLOpen (x:xs) -> do
writeRef istate $ FLOpen xs
return $ Open x
FLOpen [] -> do
mres <- bsourcePull bsrc
case mres of
Closed -> do
res <- conduitClose c
case res of
[] -> do
writeRef istate $ FLClosed []
return Closed
x:xs -> do
writeRef istate $ FLClosed xs
return $ Open x
Open input -> do
res' <- conduitPush c input
case res' of
Producing [] -> pull istate c
Producing (x:xs) -> do
writeRef istate $ FLOpen xs
return $ Open x
Finished leftover output -> do
bsourceUnpull bsrc leftover
case output of
[] -> do
writeRef istate $ FLClosed []
return Closed
x:xs -> do
writeRef istate $ FLClosed xs
return $ Open x
close istate c = do
state <- readRef istate
case state of
FLClosed _ -> return ()
FLOpen _ -> do
_ignored <- conduitClose c
return ()
bsourcePull :: Resource m => BufferedSource m a -> ResourceT m (SourceResult a)
bsourcePull (BufferedSource src bufRef) = do
buf <- readRef bufRef
case buf of
OpenEmpty -> do
res <- sourcePull src
case res of
Open _ -> return res
Closed -> writeRef bufRef ClosedEmpty >> return Closed
ClosedEmpty -> return Closed
OpenFull a -> do
writeRef bufRef OpenEmpty
return $ Open a
ClosedFull a -> do
writeRef bufRef ClosedEmpty
return $ Open a
bsourceUnpull :: Resource m => BufferedSource m a -> Maybe a -> ResourceT m ()
bsourceUnpull _ Nothing = return ()
bsourceUnpull (BufferedSource _ bufRef) (Just a) = do
buf <- readRef bufRef
case buf of
OpenEmpty -> writeRef bufRef $ OpenFull a
ClosedEmpty -> writeRef bufRef $ ClosedFull a
_ -> error $ "Invariant violated: bsourceUnpull called on full data"
bsourceClose :: Resource m => BufferedSource m a -> ResourceT m ()
bsourceClose (BufferedSource src bufRef) = do
buf <- readRef bufRef
case buf of
OpenEmpty -> sourceClose src
OpenFull _ -> sourceClose src
ClosedEmpty -> return ()
ClosedFull _ -> return ()