module Network.ByteString.Lazy.Stream
(
Result(..),
Stream,
invalidate,
withStream,
write,
send,
receive,
receiveE
)
where
import Control.Applicative
import Control.Concurrent
import Control.Concurrent.STM
import Control.Concurrent.STM.TChan
import Control.Exception
import Control.Monad
import Control.Monad.IO.Class
import Data.Binary
import Data.Binary.Put
import Data.Binary.Get
import qualified Data.ByteString.Lazy as B
import qualified Data.Enumerator as E
import Data.Typeable
import Data.Word
import Network
import System.IO
data StreamItem = SBytes B.ByteString
| SInvalidate
| SSuccess
| SFailure
deriving (Read, Show)
instance Binary StreamItem where
put (SBytes bytes) = do put (0 :: Word8)
put $ B.length bytes
putLazyByteString bytes
put SInvalidate = put (1 :: Word8)
put SSuccess = put (2 :: Word8)
put SFailure = put (3 :: Word8)
get = do cons <- get :: Get Word8
case cons of
0 -> do l <- get :: Get Int
bytes <- getLazyByteString (fromIntegral l)
return $ SBytes bytes
1 -> return SInvalidate
2 -> return SSuccess
3 -> return SFailure
data Stream = Stream Handle (TChan StreamItem)
data Result a = Success a
| Failure a
deriving (Read, Show)
intSize :: Int
intSize = fromIntegral $ B.length (encode (1 :: Int))
openStream :: Handle -> IO Stream
openStream h = do chan <- atomically newTChan
return $ Stream h chan
closeStream :: Stream -> IO ()
closeStream (Stream _ chan) = atomically $ writeTChan chan SSuccess
failStream :: Stream -> IO ()
failStream (Stream _ chan) = atomically $ unGetTChan chan SFailure
clearChan :: TChan a -> STM ()
clearChan chan = do empty <- isEmptyTChan chan
unless empty $ readTChan chan >> clearChan chan
streamProcess :: Stream -> IO ()
streamProcess s@(Stream h chan) = do item <- atomically $ readTChan chan
B.hPut h $ encode item
case item of
SBytes bytes -> streamProcess s
SInvalidate -> streamProcess s
SSuccess -> return ()
SFailure -> return ()
withStream :: Handle -> (Stream -> IO (Result a)) -> IO a
withStream h f = do s <- openStream h
forkIO $ streamProcess s
res <- f s
case res of
Success r -> closeStream s >> return r
Failure r -> failStream s >> return r
invalidate :: Stream -> IO ()
invalidate (Stream _ chan) = atomically $ do clearChan chan
writeTChan chan SInvalidate
write :: Stream -> B.ByteString -> IO ()
write (Stream _ chan) bytes = atomically $ writeTChan chan (SBytes bytes)
send :: Binary a => Handle -> a -> IO ()
send h x = withStream h $ \s -> do write s $ encode x
return $ Success ()
readBytes :: Handle -> IO (Maybe B.ByteString)
readBytes h = do sizeB <- B.hGet h intSize
let size = decode sizeB :: Int
bytes <- B.hGet h size
return $ Just bytes
readStreamItem :: Handle -> IO (Maybe StreamItem)
readStreamItem h = do
codeB <- B.hGet h 1
let code = decode codeB :: Word8
case code of
0 -> do bytesM <- readBytes h
case bytesM of
Nothing -> return Nothing
Just bytes -> return . Just $ SBytes bytes
1 -> return . Just $ SInvalidate
2 -> return . Just $ SSuccess
3 -> return . Just $ SFailure
receive :: Handle -> IO (Maybe B.ByteString)
receive = flip receiveLoop B.empty
receiveLoop :: Handle -> B.ByteString -> IO (Maybe B.ByteString)
receiveLoop h bytes = do
itemM <- readStreamItem h
case itemM of
Nothing -> return Nothing
Just item ->
case item of
SBytes bytes' -> receiveLoop h $ bytes `B.append` bytes'
SInvalidate -> receiveLoop h B.empty
SSuccess -> return $ Just bytes
SFailure -> return Nothing
data StreamEnumException = InvalidateException | FailureException | DecodeException
deriving (Read, Show, Typeable)
instance Exception StreamEnumException
receiveE :: MonadIO m =>
Handle
-> E.Iteratee B.ByteString m b
-> m (Maybe b)
receiveE handle iter = do res <- E.run $ streamEnum handle E.$$ iter
case res of
Left exception -> case fromException exception of
Nothing -> return Nothing
Just InvalidateException -> receiveE handle iter
Just FailureException -> return Nothing
Just DecodeException -> error "DecodeException"
Right val -> return $ Just val
returnError :: (Exception e, Monad m) => e -> E.Iteratee B.ByteString m b
returnError e = E.returnI $ E.Error (toException e)
streamEnum :: MonadIO m => Handle -> E.Enumerator B.ByteString m b
streamEnum h (E.Continue k) = do
itemM <- liftIO $ readStreamItem h
case itemM of
Nothing -> returnError DecodeException
Just item ->
case item of
SBytes bytes -> k (E.Chunks [bytes]) E.>>== streamEnum h
SInvalidate -> returnError InvalidateException
SSuccess -> E.continue k
SFailure -> returnError FailureException
streamEnum h step = E.returnI step