module Data.Conduit.Network.Stream
(
Stream
, Sendable, send1, sendList
, Streamable (receive), receiveLast, close
, next
, (~~)
, sink1, sinkList, sinkList'
) where
import Control.Monad.Trans
import Control.Monad.Trans.Resource
import Data.ByteString (ByteString)
import Data.Conduit hiding (($$))
import Data.Conduit.Network
import qualified Data.Conduit as C
import qualified Data.Conduit.List as CL
import qualified Data.Conduit.Binary as CB
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as BL
import Data.Conduit.Network.Stream.Exceptions
import Data.Conduit.Network.Stream.Header
import Data.Conduit.Network.Stream.Internal
infixr 0 ~~
(~~) :: Monad m => Source (Stream m) a -> Sink a (Stream m) b -> m b
src ~~ sink = stream_base $ src C.$$ sink
class Sendable a m where
encode :: Conduit a (Stream m) ByteString
instance Monad m => Sendable ByteString m where
encode = encodeBS
instance Monad m => Sendable (Int, BL.ByteString) m where
encode = encodeLazyBS
send1 :: (Monad m, Sendable a m) => AppData m -> Source (Stream m) a -> m ()
send1 ad src = src ~~ sink1 ad
sendList :: (Monad m, Sendable a m) => AppData m -> Source (Stream m) a -> m ()
sendList ad src = src ~~ sinkList ad
sink1 :: (Monad m, Sendable a m) => AppData m -> Sink a (Stream m) ()
sink1 ad = do
CL.isolate 1 =$ encode =$ sink
CL.sinkNull
where
sink = transPipe lift (appSink ad)
sinkList :: (Monad m, Sendable a m) => AppData m -> Sink a (Stream m) ()
sinkList ad = do
sinkListStart ad
sinkListElems ad
sinkListEnd ad
class Streamable source m where
receive :: source -> Sink BL.ByteString (Stream m) b -> m (ResumableSource (Stream m) ByteString, b)
instance MonadResource m => Streamable (AppData m) m where
receive ad sink = stream_base $
transPipe lift (appSource ad) $$+ next =$ sink
instance MonadResource m => Streamable (ResumableSource (Stream m) ByteString) m where
receive src sink = stream_base $
src $$++ next =$ sink
receiveLast
:: MonadResource m
=> ResumableSource (Stream m) ByteString
-> Sink BL.ByteString (Stream m) a
-> m a
receiveLast src sink = stream_base $
src $$+- next =$ sink
close
:: MonadResource m
=> ResumableSource (Stream m) ByteString
-> m ()
close src = stream_base $
src $$+- return ()
next :: MonadResource m => Conduit ByteString (Stream m) BL.ByteString
next = do
h <- decodeHeader
case h of
VarInt l -> single l
ListSTART -> list
EndOfInput -> return ()
_ -> monadThrow $ UnexpectedHeader h
where
single l = CB.take l >>= yield
list = do
h <- decodeHeader
case h of
VarInt l -> single l >> list
ListEND -> return ()
_ -> monadThrow $ UnexpectedHeader h
sinkList'
:: (Monad m, Sendable a m)
=> AppData m
-> (Sink a (Stream m) () -> Sink b (Stream m) c)
-> Sink b (Stream m) c
sinkList' ad f = do
sinkListStart ad
b <- f (sinkListElems ad)
sinkListEnd ad
return b
sinkListStart, sinkListEnd
:: Monad m => AppData m -> Sink a (Stream m) ()
sinkListStart ad = yield (BS.pack listStart) =$ transPipe lift (appSink ad)
sinkListEnd ad = yield (BS.pack listEnd) =$ transPipe lift (appSink ad)
sinkListElems
:: (Monad m, Sendable a m) => AppData m -> Sink a (Stream m) ()
sinkListElems ad = encode =$ transPipe lift (appSink ad)