pipes-attoparsec-streaming- Streaming parsing in the pipes-core framework with Attoparsec.

PortabilityGHC and Linux only
Safe HaskellSafe-Infered




This module exports the single function parse, which can be used to run an Attoparsec parser in a streaming fashion, which means that the parser is not only run incrementally across the input (which can be done with plain Attoparsec or packages like pipes-attoparsec), but that the parse results are delivered incrementally. This package can be seen as a kind of dual to pipes-attoparsec: the latter runs parser incrementally over their input, whereas the former incrementally delivers output.

The following example (to be found with comments in examples/MimeParser.hs in the source tarball) shows an example for parsing e-mail messages incrementally.

 import Control.Pipe.Attoparsec.Stream
 import Data.Word
 import Control.Pipe
 import Control.Pipe.Combinators as Combinators
 import Data.Attoparsec(Parser)
 import qualified Data.Attoparsec as A
 import qualified Data.Attoparsec.Char8 as A8
 import Data.ByteString(ByteString)
 import qualified Data.ByteString as B
 import Control.Applicative
 import Control.Monad.Trans.Class(lift)

 isToken :: Word8 -> Bool
 isToken w = w <= 127 && A.notInClass "\0-\31()<>@,;:\\\"/[]?={} \t" w
 takeSpaces :: Parser ByteString
 takeSpaces = B.cons <$> A.satisfy A8.isHorizontalSpace <*> A.takeWhile A8.isHorizontalSpace
 endOfLine :: Parser ByteString
 endOfLine = (A.word8 10 >> return "\n") <|> A.string "\r\n"
 takeAtMost :: Int -> Parser ByteString
 takeAtMost n = A.scan n (\ n' c -> if n' == 0 then Nothing else (Just (n' - 1)))
 data Event
   = Header ByteString ByteString [(ByteString, ByteString, ByteString)]
   | EndOfHeader ByteString
   | BodyChunk ByteString
   | EndOfInput
   deriving (Show)
 messageHeader :: Parser (PartialResult Event)
 messageHeader = do
   header <- B.cons <$> A.satisfy isToken <*> A.takeWhile isToken
   delim <- B.snoc <$> A.takeWhile A8.isHorizontalSpace <*> A8.char8 ':'
   body <-  (,,) <$> A.takeWhile A8.isHorizontalSpace <*>
                     A.takeTill A8.isEndOfLine <*> endOfLine
   bodies <- many $ (,,) <$> takeSpaces <*> A.takeTill A8.isEndOfLine <*> endOfLine
   return (PartialResult (Just (Header header delim (body:bodies))) (Just messageHeader))
   return (PartialResult Nothing (Just headerEnd))
 headerEnd :: Parser (PartialResult Event)
 headerEnd = do
   s <- endOfLine
   return $! PartialResult (Just (EndOfHeader s)) (Just bodyChunk)
 bodyChunk :: Parser (PartialResult Event)
 bodyChunk = do
   s <- takeAtMost 10
   if B.null s
     then return $ PartialResult (Just EndOfInput) Nothing
     else return $ PartialResult (Just (BodyChunk s)) (Just bodyChunk)
 msg :: ByteString
 msg =
   "Received  : foo\r\n  cont'd\r\nDate: now\r\n\r\nblabla\r\nsecond line - longer"
 dump :: Pipe Event Void IO ()
 dump = go
    go = do
      e <- await
      lift $ print e
 main :: IO ()
 main = do
   runPipe $ fromList [msg] >+> parse messageHeader >+> dump



data StreamException Source

This is the type of exceptions that may be thrown by the streaming parser.

Currently, only one exception i defined: it reports Attoparsec parse errors. The values carried by the exceptions are the parse context and the error message, just as in the Fail case of Attoparsec's Result type.


ParseException [String] String

Parsing failed.

data PartialResult a Source

Parsers which are to be used with the streaming parser Pipe must return values of the following type:

  • The first parameter of type Maybe a is the parser result. When it is a Just value, the carried value is delivered to the next Pipe in the Pipeline. When it is Nothing, no value is yield.
  • When the second parameter is a Just p, the parser p is used to continue the parsing process. When it is Nothing, the parsing process ends and the Pipeline terminates.


PartialResult (Maybe a) (Maybe (Parser (PartialResult a))) 

Parse function

parse :: Monad m => Parser (PartialResult a) -> Pipe ByteString a m ()Source

This function converts an Attoparsec parser over bytestrings which returns a PartialResult a into a Pipe that consumes a stream of bytestrings and delivers a stream of a values.