module Data.Enumerator.NetLines
(
netLine,
netLineEmpty,
netWord,
netWordEmpty,
netLines,
netLinesEmpty,
netWords,
netWordsEmpty,
netSplitBy,
netSplitsBy,
TimeoutError(..),
enumHandleSession,
enumHandleTimeout,
iterHandleTimeout
)
where
import qualified Data.ByteString as B
import Control.ContStuff as Monad
import Control.Exception as Ex
import Data.ByteString (ByteString)
import Data.Enumerator as E
import Data.Time.Clock
import Data.Typeable
import Data.Word
import System.IO
import System.IO.Error as IOErr
import System.Timeout
newtype TimeoutError = TimeoutError { timeoutErrorMessage :: String }
deriving (Typeable)
instance Ex.Exception TimeoutError
instance Show TimeoutError where
show (TimeoutError msg) = "Operation timed out: " ++ msg
enumHandleSession ::
forall b m. MonadIO m =>
Int -> Int -> Int -> Handle -> Enumerator ByteString m b
enumHandleSession bufSize readTime sessionTime h step = do
startTime <- liftIO getCurrentTime
loop startTime step
where
loop :: UTCTime -> Enumerator ByteString m b
loop startTime (Continue k) = do
now <- liftIO getCurrentTime
let timeoutErr = TimeoutError "Read timeout"
diff = sessionTime round (1000 * diffUTCTime now startTime)
timeout = min diff readTime
when (timeout <= 0) $ throwError timeoutErr
mHaveInput <- liftIO $ IOErr.try (hWaitForInput h timeout)
case mHaveInput of
Left err
| isEOFError err -> continue k
| otherwise -> throwError err
Right False -> throwError timeoutErr
Right True -> do
str <- tryIO $ B.hGetNonBlocking h bufSize
if B.null str
then continue k
else k (Chunks [str]) >>== loop startTime
loop _ step = returnI step
enumHandleTimeout ::
forall b m. MonadIO m =>
Int -> Int -> Handle -> Enumerator ByteString m b
enumHandleTimeout bufSize timeout h = loop
where
loop :: Enumerator ByteString m b
loop (Continue k) = do
mHaveInput <- liftIO $ IOErr.try (hWaitForInput h timeout)
case mHaveInput of
Left err
| isEOFError err -> continue k
| otherwise -> throwError err
Right False -> throwError $ TimeoutError "Read timeout"
Right True -> do
str <- tryIO $ B.hGetNonBlocking h bufSize
if B.null str
then continue k
else k (Chunks [str]) >>== loop
loop step = returnI step
isSpace :: Word8 -> Bool
isSpace n = n == 32 || (n >= 9 && n <= 13)
iterHandleTimeout ::
forall m. MonadIO m => Int -> Handle -> Iteratee ByteString m ()
iterHandleTimeout maxTime h = do
startTime <- tryIO getCurrentTime
continue (loop startTime)
where
loop :: UTCTime -> Stream ByteString -> Iteratee ByteString m ()
loop _ EOF = yield () EOF
loop startTime (Chunks []) = continue (loop startTime)
loop startTime (Chunks strs) = do
let timeoutErr = TimeoutError "Write timeout"
now <- tryIO getCurrentTime
let restMs = 1000*maxTime round (1000000 * diffUTCTime now startTime)
when (restMs <= 0) (throwError timeoutErr)
tryIO (timeout restMs (mapM_ (B.hPutStr h) strs)) >>=
maybe (throwError timeoutErr) return
continue (loop startTime)
netLine :: forall m. Monad m => Int -> Iteratee ByteString m (Maybe ByteString)
netLine = nonEmpty . netLineEmpty
netLineEmpty :: Monad m => Int -> Iteratee ByteString m (Maybe ByteString)
netLineEmpty = netSplitBy (== 10) (/= 13)
netLines :: Monad m => Int -> Enumeratee ByteString ByteString m b
netLines = netSplitsBy . netLine
netLinesEmpty :: Monad m => Int -> Enumeratee ByteString ByteString m b
netLinesEmpty = netSplitsBy . netLineEmpty
netSplitBy ::
forall m. Monad m =>
(Word8 -> Bool) -> (Word8 -> Bool) -> Int ->
Iteratee ByteString m (Maybe ByteString)
netSplitBy breakP filterP n =
continue (loop B.empty)
where
loop :: ByteString -> Stream ByteString ->
Iteratee ByteString m (Maybe ByteString)
loop line' EOF = yield (if B.null line' then Nothing else Just line') EOF
loop line' (Chunks []) = continue (loop line')
loop line' (Chunks (str:strs)) =
if B.null line2'
then line `seq` loop line (Chunks strs)
else yield (Just line) (Chunks (line2:strs))
where
(line1', line2') = B.break breakP str
line1 = B.filter filterP line1'
line2 = B.tail line2'
line = B.take n $ B.append line' line1
netSplitsBy ::
forall b m. Monad m =>
Iteratee ByteString m (Maybe ByteString) -> Enumeratee ByteString ByteString m b
netSplitsBy getLine = loop
where
loop :: Enumeratee ByteString ByteString m b
loop (Continue k) = do
mLine <- getLine
case mLine of
Just line -> k (Chunks [line]) >>== loop
Nothing -> k EOF >>== loop
loop step = return step
netWord :: Monad m => Int -> Iteratee ByteString m (Maybe ByteString)
netWord = nonEmpty . netWordEmpty
netWordEmpty :: Monad m => Int -> Iteratee ByteString m (Maybe ByteString)
netWordEmpty = netSplitBy isSpace (const True)
netWords :: Monad m => Int -> Enumeratee ByteString ByteString m b
netWords = netSplitsBy . netWord
netWordsEmpty :: Monad m => Int -> Enumeratee ByteString ByteString m b
netWordsEmpty = netSplitsBy . netWordEmpty
nonEmpty ::
forall a m. Monad m =>
Iteratee a m (Maybe ByteString) -> Iteratee a m (Maybe ByteString)
nonEmpty getStr = evalMaybeT loop
where
loop :: MaybeT r (Iteratee a m) ByteString
loop = do
line <- liftF getStr
if B.null line then loop else return line