{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE BangPatterns #-} {-# LANGUAGE DeriveDataTypeable #-} ------------------------------------------------------------------------------ -- | Module: Control.Pipe.Attoparsec.Stream -- Copyright: Martin Grabmueller -- License: BSD3 -- -- Maintainer: martin@grabmueller.de -- Stability: provisional -- Portability: GHC and Linux only -- -- This module exports the single function parse, which can be used to -- run an Attoparsec parser in a streaming fashion, which means that -- the parser is not only run incrementally across the input (which -- can be done with plain Attoparsec or packages like -- pipes-attoparsec), but that the parse results are delivered -- incrementally. This package can be seen as a kind of dual to -- pipes-attoparsec: the latter runs parser incrementally over their -- input, whereas the former incrementally delivers output. -- -- The following example (to be found with comments in -- examples/MimeParser.hs in the source tarball) shows an example for -- parsing e-mail messages incrementally. -- -- > import Control.Pipe.Attoparsec.Stream -- > -- > import Data.Word -- > import Control.Pipe -- > import Control.Pipe.Combinators as Combinators -- > import Data.Attoparsec(Parser) -- > import qualified Data.Attoparsec as A -- > import qualified Data.Attoparsec.Char8 as A8 -- > import Data.ByteString(ByteString) -- > import qualified Data.ByteString as B -- > import Control.Applicative -- > import Control.Monad.Trans.Class(lift) -- > -- > isToken :: Word8 -> Bool -- > isToken w = w <= 127 && A.notInClass "\0-\31()<>@,;:\\\"/[]?={} \t" w -- > -- > takeSpaces :: Parser ByteString -- > takeSpaces = B.cons <$> A.satisfy A8.isHorizontalSpace <*> A.takeWhile A8.isHorizontalSpace -- > -- > endOfLine :: Parser ByteString -- > endOfLine = (A.word8 10 >> return "\n") <|> A.string "\r\n" -- > -- > takeAtMost :: Int -> Parser ByteString -- > takeAtMost n = A.scan n (\ n' c -> if n' == 0 then Nothing else (Just (n' - 1))) -- > -- > data Event -- > = Header ByteString ByteString [(ByteString, ByteString, ByteString)] -- > | EndOfHeader ByteString -- > | BodyChunk ByteString -- > | EndOfInput -- > deriving (Show) -- > -- > messageHeader :: Parser (PartialResult Event) -- > messageHeader = do -- > header <- B.cons <$> A.satisfy isToken <*> A.takeWhile isToken -- > delim <- B.snoc <$> A.takeWhile A8.isHorizontalSpace <*> A8.char8 ':' -- > body <- (,,) <$> A.takeWhile A8.isHorizontalSpace <*> -- > A.takeTill A8.isEndOfLine <*> endOfLine -- > bodies <- many $ (,,) <$> takeSpaces <*> A.takeTill A8.isEndOfLine <*> endOfLine -- > return (PartialResult (Just (Header header delim (body:bodies))) (Just messageHeader)) -- > <|> -- > return (PartialResult Nothing (Just headerEnd)) -- > -- > headerEnd :: Parser (PartialResult Event) -- > headerEnd = do -- > s <- endOfLine -- > return $! PartialResult (Just (EndOfHeader s)) (Just bodyChunk) -- > -- > bodyChunk :: Parser (PartialResult Event) -- > bodyChunk = do -- > s <- takeAtMost 10 -- > if B.null s -- > then return $ PartialResult (Just EndOfInput) Nothing -- > else return $ PartialResult (Just (BodyChunk s)) (Just bodyChunk) -- > -- > msg :: ByteString -- > msg = -- > "Received : foo\r\n cont'd\r\nDate: now\r\n\r\nblabla\r\nsecond line - longer" -- > -- > dump :: Pipe Event Void IO () -- > dump = go -- > where -- > go = do -- > e <- await -- > lift $ print e -- > go -- > -- > main :: IO () -- > main = do -- > runPipe $ fromList [msg] >+> parse messageHeader >+> dump -- ------------------------------------------------------------------------------ module Control.Pipe.Attoparsec.Stream ( -- * Types StreamException(..), PartialResult(..), -- * Parse function parse ) where import Control.Exception import Control.Pipe import Control.Pipe.Combinators as Combinators import Control.Pipe.Exception as E import Data.Attoparsec(Parser, Result) import qualified Data.Attoparsec as A import Data.ByteString(ByteString) import qualified Data.ByteString as B import Data.Typeable -- | This is the type of exceptions that may be thrown by the -- streaming parser. -- -- Currently, only one exception i defined: it reports Attoparsec -- parse errors. The values carried by the exceptions are the parse -- context and the error message, just as in the @Fail@ case of -- Attoparsec's @Result@ type. -- data StreamException = ParseException [String] String -- ^ Parsing failed. deriving (Show, Typeable) instance Exception StreamException -- | Parsers which are to be used with the streaming parser Pipe must -- return values of the following type: -- -- * The first parameter of type @Maybe a@ is the parser result. When -- it is a @Just@ value, the carried value is delivered to the next -- Pipe in the Pipeline. When it is @Nothing@, no value is yield. -- -- * When the second parameter is a @Just p@, the parser @p@ is used -- to continue the parsing process. When it is @Nothing@, the parsing -- process ends and the Pipeline terminates. -- data PartialResult a = PartialResult (Maybe a) (Maybe (Parser (PartialResult a))) -- | This function converts an Attoparsec parser over bytestrings -- which returns a @PartialResult a@ into a Pipe that consumes a -- stream of bytestrings and delivers a stream of @a@ values. -- parse :: Monad m => Parser (PartialResult a) -> Pipe ByteString a m () parse parser = do chunk <- await go (A.parse parser chunk) where go :: Monad m => (Result (PartialResult a)) -> Pipe ByteString a m () go cont = case cont of -- Convert a parse error into an exception. A.Fail _ ctxt err -> E.throw (ParseException ctxt err) -- Partial parse: check whether more input is available, and -- feed it to the continuation. Note that the following is not -- sufficient: -- -- > do chunk <- await -- > go (c chunk) -- -- The reason is that the await will terminate the pipeline -- when no more input is available, so the parse will not see -- the end-of-input. Instead, we have to feed an empty -- bytestring to the continuation, so that Attoparsec can -- finish the parsing. -- A.Partial c -> do chunk <- tryAwait case chunk of Nothing -> go (c B.empty) Just chunk' -> go (c chunk') -- We are finished with the current parser. Yield the parsed -- value (if there is any) and run the continuation parser, if -- there is one. -- A.Done rest (PartialResult mbResult mbNext) -> do case mbResult of Nothing -> return () Just a -> yield a case mbNext of Nothing -> return () Just p -> go (A.parse p rest)