{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DeriveDataTypeable #-}
------------------------------------------------------------------------------
-- | Module: Control.Pipe.Attoparsec.Stream
-- Copyright: Martin Grabmueller
-- License: BSD3
--
-- Maintainer: martin@grabmueller.de
-- Stability: provisional
-- Portability: GHC and Linux only
--
-- This module exports the single function parse, which can be used to
-- run an Attoparsec parser in a streaming fashion, which means that
-- the parser is not only run incrementally across the input (which
-- can be done with plain Attoparsec or packages like
-- pipes-attoparsec), but that the parse results are delivered
-- incrementally.  This package can be seen as a kind of dual to
-- pipes-attoparsec: the latter runs parser incrementally over their
-- input, whereas the former incrementally delivers output.
--
-- The following example (to be found with comments in
-- examples/MimeParser.hs in the source tarball) shows an example for
-- parsing e-mail messages incrementally.
--
-- > import Control.Pipe.Attoparsec.Stream
-- > 
-- > import Data.Word
-- > import Control.Pipe
-- > import Control.Pipe.Combinators as Combinators
-- > import Data.Attoparsec(Parser)
-- > import qualified Data.Attoparsec as A
-- > import qualified Data.Attoparsec.Char8 as A8
-- > import Data.ByteString(ByteString)
-- > import qualified Data.ByteString as B
-- > import Control.Applicative
-- > import Control.Monad.Trans.Class(lift)
-- >
-- > isToken :: Word8 -> Bool
-- > isToken w = w <= 127 && A.notInClass "\0-\31()<>@,;:\\\"/[]?={} \t" w
-- > 
-- > takeSpaces :: Parser ByteString
-- > takeSpaces = B.cons <$> A.satisfy A8.isHorizontalSpace <*> A.takeWhile A8.isHorizontalSpace
-- > 
-- > endOfLine :: Parser ByteString
-- > endOfLine = (A.word8 10 >> return "\n") <|> A.string "\r\n"
-- > 
-- > takeAtMost :: Int -> Parser ByteString
-- > takeAtMost n = A.scan n (\ n' c -> if n' == 0 then Nothing else (Just (n' - 1)))
-- > 
-- > data Event
-- >   = Header ByteString ByteString [(ByteString, ByteString, ByteString)]
-- >   | EndOfHeader ByteString
-- >   | BodyChunk ByteString
-- >   | EndOfInput
-- >   deriving (Show)
-- > 
-- > messageHeader :: Parser (PartialResult Event)
-- > messageHeader = do
-- >   header <- B.cons <$> A.satisfy isToken <*> A.takeWhile isToken
-- >   delim <- B.snoc <$> A.takeWhile A8.isHorizontalSpace <*> A8.char8 ':'
-- >   body <-  (,,) <$> A.takeWhile A8.isHorizontalSpace <*>
-- >                     A.takeTill A8.isEndOfLine <*> endOfLine
-- >   bodies <- many $ (,,) <$> takeSpaces <*> A.takeTill A8.isEndOfLine <*> endOfLine
-- >   return (PartialResult (Just (Header header delim (body:bodies))) (Just messageHeader))
-- >  <|>
-- >   return (PartialResult Nothing (Just headerEnd))
-- > 
-- > headerEnd :: Parser (PartialResult Event)
-- > headerEnd = do
-- >   s <- endOfLine
-- >   return $! PartialResult (Just (EndOfHeader s)) (Just bodyChunk)
-- > 
-- > bodyChunk :: Parser (PartialResult Event)
-- > bodyChunk = do
-- >   s <- takeAtMost 10
-- >   if B.null s
-- >     then return $ PartialResult (Just EndOfInput) Nothing
-- >     else return $ PartialResult (Just (BodyChunk s)) (Just bodyChunk)
-- > 
-- > msg :: ByteString
-- > msg =
-- >   "Received  : foo\r\n  cont'd\r\nDate: now\r\n\r\nblabla\r\nsecond line - longer"
-- > 
-- > dump :: Pipe Event Void IO ()
-- > dump = go
-- >  where
-- >    go = do
-- >      e <- await
-- >      lift $ print e
-- >      go
-- > 
-- > main :: IO ()
-- > main = do
-- >   runPipe $ fromList [msg] >+> parse messageHeader >+> dump
--
------------------------------------------------------------------------------
module Control.Pipe.Attoparsec.Stream
  (
    -- * Types
    StreamException(..),
    PartialResult(..),
    -- * Parse function
    parse
  ) where

import Control.Exception
import Control.Pipe
import Control.Pipe.Combinators as Combinators
import Control.Pipe.Exception as E
import Data.Attoparsec(Parser, Result)
import qualified Data.Attoparsec as A
import Data.ByteString(ByteString)
import qualified Data.ByteString as B
import Data.Typeable

-- | This is the type of exceptions that may be thrown by the
-- streaming parser.
--
-- Currently, only one exception i defined: it reports Attoparsec
-- parse errors.  The values carried by the exceptions are the parse
-- context and the error message, just as in the @Fail@ case of
-- Attoparsec's @Result@ type.
--
data StreamException
  = ParseException [String] String -- ^ Parsing failed.
  deriving (Show, Typeable)

instance Exception StreamException

-- | Parsers which are to be used with the streaming parser Pipe must
-- return values of the following type:
--
-- * The first parameter of type @Maybe a@ is the parser result. When
-- it is a @Just@ value, the carried value is delivered to the next
-- Pipe in the Pipeline.  When it is @Nothing@, no value is yield.
--
-- * When the second parameter is a @Just p@, the parser @p@ is used
-- to continue the parsing process. When it is @Nothing@, the parsing
-- process ends and the Pipeline terminates.
--
data PartialResult a = PartialResult (Maybe a) (Maybe (Parser (PartialResult a)))


-- | This function converts an Attoparsec parser over bytestrings
-- which returns a @PartialResult a@ into a Pipe that consumes a
-- stream of bytestrings and delivers a stream of @a@ values.
--
parse :: Monad m => Parser (PartialResult a) -> Pipe ByteString a m ()
parse parser = do
  chunk <- await
  go (A.parse parser chunk)
 where
   go :: Monad m => (Result (PartialResult a)) -> Pipe ByteString a m ()
   go cont =
     case cont of
       -- Convert a parse error into an exception.
       A.Fail _ ctxt err -> 
         E.throw (ParseException ctxt err)

       -- Partial parse: check whether more input is available, and
       -- feed it to the continuation.  Note that the following is not
       -- sufficient:
       --
       -- > do chunk <- await
       -- >    go (c chunk)
       --
       -- The reason is that the await will terminate the pipeline
       -- when no more input is available, so the parse will not see
       -- the end-of-input.  Instead, we have to feed an empty
       -- bytestring to the continuation, so that Attoparsec can
       -- finish the parsing.
       --
       A.Partial c -> do
         chunk <- tryAwait
         case chunk of
           Nothing -> go (c B.empty)
           Just chunk' -> go (c chunk')
       
       -- We are finished with the current parser. Yield the parsed
       -- value (if there is any) and run the continuation parser, if
       -- there is one.
       --
       A.Done rest (PartialResult mbResult mbNext) -> do
         case mbResult of
           Nothing -> return ()
           Just a -> yield a
         case mbNext of
           Nothing -> return ()
           Just p -> go (A.parse p rest)