{-# LANGUAGE KindSignatures
            ,RankNTypes
            ,FlexibleContexts
            ,ScopedTypeVariables
            ,BangPatterns
            ,DeriveDataTypeable #-}

-- |Monadic and General Iteratees:
-- incremental input parsers, processors and transformers

module Data.Iteratee.Iteratee (
  -- * Types
  -- ** Error handling
  throwErr
  ,throwRecoverableErr
  ,checkErr
  -- ** Basic Iteratees
  ,identity
  ,skipToEof
  ,isStreamFinished
  -- ** Chunkwise Iteratees
  ,mapChunksM_
  ,mapReduce
  ,getChunk
  ,getChunks
  -- ** Nested iteratee combinators
  ,mapChunks
  ,convStream
  ,unfoldConvStream
  ,joinI
  ,joinIM
  -- * Enumerators
  ,Enumerator
  ,Enumeratee
  -- ** Basic enumerators
  ,enumChunk
  ,enumEof
  ,enumErr
  ,enumPure1Chunk
  ,enumList
  ,enumCheckIfDone
  ,enumFromCallback
  ,enumFromCallbackCatch
  -- ** Enumerator Combinators
  ,(>>>)
  ,eneeCheckIfDone
  ,mergeEnums
  -- ** Enumeratee Combinators
  ,(><>)
  ,(<><)
  -- * Misc.
  ,seek
  ,FileOffset
  -- * Classes
  ,module Data.Iteratee.Base
)
where

import Prelude hiding (head, drop, dropWhile, take, break, foldl, foldl1, length, filter, sum, product)

import Data.Iteratee.IO.Base
import Data.Iteratee.Base

import Control.Exception
import Control.Monad.Trans.Class
import Control.Parallel
import Data.Maybe
import Data.Monoid
import Data.Typeable

-- exception helpers
excDivergent :: SomeException
excDivergent = toException DivergentException

-- ------------------------------------------------------------------------
-- Primitive iteratees

-- |Report and propagate an unrecoverable error.
--  Disregard the input first and then propagate the error.  This error
-- cannot be handled by 'enumFromCallbackCatch', although it can be cleared
-- by 'checkErr'.
throwErr :: (Monad m) => SomeException -> Iteratee s m a
throwErr e = icont (const (throwErr e)) (Just e)

-- |Report and propagate a recoverable error.  This error can be handled by
-- both 'enumFromCallbackCatch' and 'checkErr'.
throwRecoverableErr ::
 (Monad m) =>
  SomeException
  -> (Stream s -> Iteratee s m a)
  -> Iteratee s m a
throwRecoverableErr e i = icont i (Just e)


-- |Check if an iteratee produces an error.
-- Returns @Right a@ if it completes without errors, otherwise
-- @Left SomeException@. 'checkErr' is useful for iteratees that may not
-- terminate, such as @Data.Iteratee.head@ with an empty stream.
checkErr ::
 (Monad m, NullPoint s) =>
  Iteratee s m a
  -> Iteratee s m (Either SomeException a)
checkErr iter = Iteratee $ \onDone onCont ->
  let od            = onDone . Right
      oc k Nothing  = onCont (checkErr . k) Nothing
      oc _ (Just e) = onDone (Left e) (Chunk empty)
  in runIter iter od oc

-- ------------------------------------------------------------------------
-- Parser combinators

-- |The identity iteratee.  Doesn't do any processing of input.
identity :: (Monad m, NullPoint s) => Iteratee s m ()
identity = idone () (Chunk empty)

-- |Get the stream status of an iteratee.
isStreamFinished :: (Monad m, Nullable s) => Iteratee s m (Maybe SomeException)
isStreamFinished = liftI check
  where
    check s@(Chunk xs)
      | nullC xs  = isStreamFinished
      | otherwise = idone Nothing s
    check s@(EOF e) = idone (Just $ fromMaybe (toException EofException) e) s
{-# INLINE isStreamFinished #-}


-- |Skip the rest of the stream
skipToEof :: (Monad m) => Iteratee s m ()
skipToEof = icont check Nothing
  where
    check (Chunk _) = skipToEof
    check s         = idone () s


-- |Seek to a position in the stream
seek :: (Monad m, NullPoint s) => FileOffset -> Iteratee s m ()
seek o = throwRecoverableErr (toException $ SeekException o) (const identity)

-- | Map a monadic function over the chunks of the stream and ignore the
-- result.  Useful for creating efficient monadic iteratee consumers, e.g.
-- 
-- >  logger = mapChunksM_ (liftIO . putStrLn)
-- 
-- these can be efficiently run in parallel with other iteratees via
-- @Data.Iteratee.ListLike.zip@.
mapChunksM_ :: (Monad m, Nullable s) => (s -> m b) -> Iteratee s m ()
mapChunksM_ f = liftI step
  where
    step (Chunk xs)
      | nullC xs   = liftI step
      | otherwise  = lift (f xs) >> liftI step
    step s@(EOF _) = idone () s
{-# INLINE mapChunksM_ #-}

-- | Perform a parallel map/reduce.  The `bufsize` parameter controls
-- the maximum number of chunks to read at one time.  A larger bufsize
-- allows for greater parallelism, but will require more memory.
-- 
-- Implementation of `sum`
-- 
-- > sum :: (Monad m, LL.ListLike s, Nullable s) => Iteratee s m Int64
-- > sum = getSum <$> mapReduce 4 (Sum . LL.sum)
mapReduce ::
  (Monad m, Nullable s, Monoid b)
  => Int               -- ^ maximum number of chunks to read
  -> (s -> b)          -- ^ map function
  -> Iteratee s m b
mapReduce bufsize f = liftI (step (0, []))
 where
  step a@(!buf,acc) (Chunk xs)
    | nullC xs = liftI (step a)
    | buf >= bufsize =
        let acc'  = mconcat acc
            b'    = f xs
        in b' `par` acc' `pseq` liftI (step (0,[b' `mappend` acc']))
    | otherwise     =
        let b' = f xs
        in b' `par` liftI (step (succ buf,b':acc))
  step (_,acc) s@(EOF Nothing) =
    idone (mconcat acc) s
  step acc       (EOF (Just err))  =
    throwRecoverableErr err (step acc)

-- | Get the current chunk from the stream.
getChunk :: (Monad m, Nullable s, NullPoint s) => Iteratee s m s
getChunk = liftI step
 where
  step (Chunk xs)
    | nullC xs  = liftI step
    | otherwise = idone xs $ Chunk empty
  step (EOF Nothing)  = throwErr $ toException EofException
  step (EOF (Just e)) = throwErr e
{-# INLINE getChunk #-}

-- | Get a list of all chunks from the stream.
getChunks :: (Monad m, Nullable s) => Iteratee s m [s]
getChunks = liftI (step [])
 where
  step acc (Chunk xs)
    | nullC xs    = liftI (step acc)
    | otherwise   = liftI (step (xs:acc))
  step acc stream = idone (reverse acc) stream
{-# INLINE getChunks #-}

-- ---------------------------------------------------
-- The converters show a different way of composing two iteratees:
-- `vertical' rather than `horizontal'

type Enumeratee sFrom sTo (m :: * -> *) a =
  Iteratee sTo m a
  -> Iteratee sFrom m (Iteratee sTo m a)

-- The following pattern appears often in Enumeratee code
{-# INLINE eneeCheckIfDone #-}

-- | Utility function for creating enumeratees.  Typical usage is demonstrated
-- by the @breakE@ definition.
-- 
-- > breakE
-- >   :: (Monad m, LL.ListLike s el, NullPoint s)
-- >   => (el -> Bool)
-- >   -> Enumeratee s s m a
-- > breakE cpred = eneeCheckIfDone (liftI . step)
-- >  where
-- >   step k (Chunk s)
-- >       | LL.null s  = liftI (step k)
-- >       | otherwise  = case LL.break cpred s of
-- >         (str', tail')
-- >           | LL.null tail' -> eneeCheckIfDone (liftI . step) . k $ Chunk str'
-- >           | otherwise     -> idone (k $ Chunk str') (Chunk tail')
-- >   step k stream           =  idone (k stream) stream
-- 
eneeCheckIfDone ::
 (Monad m, NullPoint elo) =>
  ((Stream eli -> Iteratee eli m a) -> Iteratee elo m (Iteratee eli m a))
  -> Enumeratee elo eli m a
eneeCheckIfDone f inner = Iteratee $ \od oc -> 
  let onDone x s = od (idone x s) (Chunk empty)
      onCont k Nothing  = runIter (f k) od oc
      onCont _ (Just e) = runIter (throwErr e) od oc
  in runIter inner onDone onCont

-- | Convert one stream into another with the supplied mapping function.
-- This function operates on whole chunks at a time, contrasting to
-- @mapStream@ which operates on single elements.
-- 
-- > unpacker :: Enumeratee B.ByteString [Word8] m a
-- > unpacker = mapChunks B.unpack
-- 
mapChunks :: (Monad m, NullPoint s) => (s -> s') -> Enumeratee s s' m a
mapChunks f = eneeCheckIfDone (liftI . step)
 where
  step k (Chunk xs)     = eneeCheckIfDone (liftI . step) . k . Chunk $ f xs
  step k str@(EOF mErr) = idone (k $ EOF mErr) str
{-# INLINE mapChunks #-}


-- |Convert one stream into another, not necessarily in lockstep.
-- The transformer mapStream maps one element of the outer stream
-- to one element of the nested stream.  The transformer below is more
-- general: it may take several elements of the outer stream to produce
-- one element of the inner stream, or the other way around.
-- The transformation from one stream to the other is specified as
-- Iteratee s m s'.
convStream ::
 (Monad m, Nullable s) =>
  Iteratee s m s'
  -> Enumeratee s s' m a
convStream fi = eneeCheckIfDone check
  where
    check k = isStreamFinished >>= maybe (step k) (idone (liftI k) . EOF . Just)
    step k = fi >>= eneeCheckIfDone check . k . Chunk

-- |The most general stream converter.  Given a function to produce iteratee
-- transformers and an initial state, convert the stream using iteratees
-- generated by the function while continually updating the internal state.
unfoldConvStream ::
 (Monad m, Nullable s) =>
  (acc -> Iteratee s m (acc, s'))
  -> acc
  -> Enumeratee s s' m a
unfoldConvStream f acc0 = eneeCheckIfDone (check acc0)
  where
    check acc k = isStreamFinished >>=
                    maybe (step acc k) (idone (liftI k) . EOF . Just)
    step acc k = f acc >>= \(acc', s') ->
                    eneeCheckIfDone (check acc') . k . Chunk $ s'


-- | Collapse a nested iteratee.  The inner iteratee is terminated by @EOF@.
--   Errors are propagated through the result.
-- 
--  The stream resumes from the point of the outer iteratee; any remaining
--  input in the inner iteratee will be lost.
--  Differs from 'Control.Monad.join' in that the inner iteratee is terminated,
--  and may have a different stream type than the result.
joinI ::
 (Monad m, Nullable s) =>
  Iteratee s m (Iteratee s' m a)
  -> Iteratee s m a
joinI = (>>=
  \inner -> Iteratee $ \od oc ->
  let onDone  x _        = od x (Chunk empty)
      onCont  k Nothing  = runIter (k (EOF Nothing)) onDone onCont'
      onCont  _ (Just e) = runIter (throwErr e) od oc
      onCont' _ e        = runIter (throwErr (fromMaybe excDivergent e)) od oc
  in runIter inner onDone onCont)
{-# INLINE joinI #-}

-- | Lift an iteratee inside a monad to an iteratee.
joinIM :: (Monad m) => m (Iteratee s m a) -> Iteratee s m a
joinIM mIter = Iteratee $ \od oc -> mIter >>= \iter -> runIter iter od oc


-- ------------------------------------------------------------------------
-- Enumerators
-- |Each enumerator takes an iteratee and returns an iteratee
-- an Enumerator is an iteratee transformer.
-- The enumerator normally stops when the stream is terminated
-- or when the iteratee moves to the done state, whichever comes first.
-- When to stop is of course up to the enumerator...

type Enumerator s m a = Iteratee s m a -> m (Iteratee s m a)

-- |Applies the iteratee to the given stream.  This wraps 'enumEof',
-- 'enumErr', and 'enumPure1Chunk', calling the appropriate enumerator
-- based upon 'Stream'.
enumChunk :: (Monad m) => Stream s -> Enumerator s m a
enumChunk (Chunk xs)     = enumPure1Chunk xs
enumChunk (EOF Nothing)  = enumEof
enumChunk (EOF (Just e)) = enumErr e

-- |The most primitive enumerator: applies the iteratee to the terminated
-- stream. The result is the iteratee in the Done state.  It is an error
-- if the iteratee does not terminate on EOF.
enumEof :: (Monad m) => Enumerator s m a
enumEof iter = runIter iter onDone onCont
  where
    onDone  x _str    = return $ idone x (EOF Nothing)
    onCont  k Nothing = runIter (k (EOF Nothing)) onDone onCont'
    onCont  k e       = return $ icont k e
    onCont' _ Nothing = return $ throwErr excDivergent
    onCont' k e       = return $ icont k e

-- |Another primitive enumerator: tell the Iteratee the stream terminated
-- with an error.
enumErr :: (Exception e, Monad m) => e -> Enumerator s m a
enumErr e iter = runIter iter onDone onCont
  where
    onDone  x _       = return $ idone x (EOF . Just $ toException e)
    onCont  k Nothing = runIter (k (EOF (Just (toException e)))) onDone onCont'
    onCont  k e'      = return $ icont k e'
    onCont' _ Nothing = return $ throwErr excDivergent
    onCont' k e'      = return $ icont k e'


-- |The composition of two enumerators: essentially the functional composition
-- It is convenient to flip the order of the arguments of the composition
-- though: in e1 >>> e2, e1 is executed first

(>>>) :: (Monad m) => Enumerator s m a -> Enumerator s m a -> Enumerator s m a
(e1 >>> e2) i =  e1 i >>= e2
  -- I think (>>>) is identical to (>=>)...


-- | Enumeratee composition
-- Run the second enumeratee within the first.  In this example, stream2list
-- is run within the 'take 10', which is itself run within 'take 15', resulting
-- in 15 elements being consumed
-- 
-- >>> run =<< enumPure1Chunk [1..1000 :: Int] (joinI $ (I.take 15 ><> I.take 10) I.stream2list)
-- [1,2,3,4,5,6,7,8,9,10]
-- 
(><>) ::
 (Nullable s1, Monad m)
  => (forall x . Enumeratee s1 s2 m x)
  -> Enumeratee s2 s3 m a
  -> Enumeratee s1 s3 m a
f ><> g = joinI . f . g

-- | enumeratee composition with the arguments flipped, see '><>'
(<><) ::
 (Nullable s1, Monad m)
  => Enumeratee s2 s3 m a
  -> (forall x. Enumeratee s1 s2 m x)
  -> Enumeratee s1 s3 m a
f <>< g = joinI . g . f

-- | Combine enumeration over two streams.  The merging enumeratee would
-- typically be the result of 'Data.Iteratee.ListLike.merge' or
-- 'Data.Iteratee.ListLike.mergeByChunks' (see @merge@ for example).
mergeEnums :: 
  (Nullable s2, Nullable s1, Monad m)
  => Enumerator s1 m a                   -- ^ inner enumerator
  -> Enumerator s2 (Iteratee s1 m) a     -- ^ outer enumerator
  -> Enumeratee s2 s1 (Iteratee s1 m) a  -- ^ merging enumeratee
  -> Enumerator s1 m a
mergeEnums e1 e2 etee i = e1 $ e2 (joinI . etee $ ilift lift i) >>= run
{-# INLINE mergeEnums #-}

-- | The pure 1-chunk enumerator
-- 
-- It passes a given list of elements to the iteratee in one chunk
-- This enumerator does no IO and is useful for testing of base parsing
enumPure1Chunk :: (Monad m) => s -> Enumerator s m a
enumPure1Chunk str iter = runIter iter idoneM onCont
  where
    onCont k Nothing = return $ k $ Chunk str
    onCont k e       = return $ icont k e

-- | Enumerate chunks from a list
-- 
enumList :: (Monad m) => [s] -> Enumerator s m a
enumList chunks = go chunks
 where
  go [] i = return i
  go xs' i = runIter i idoneM (onCont xs')
   where
    onCont (x:xs) k Nothing = go xs . k $ Chunk x
    onCont _ _ (Just e) = return $ throwErr e
    onCont _ k Nothing  = return $ icont k Nothing
{-# INLINABLE enumList #-}

-- | Checks if an iteratee has finished.
-- 
-- This enumerator runs the iteratee, performing any monadic actions.
-- If the result is True, the returned iteratee is done.
enumCheckIfDone :: (Monad m) => Iteratee s m a -> m (Bool, Iteratee s m a)
enumCheckIfDone iter = runIter iter onDone onCont
  where
    onDone x str = return (True, idone x str)
    onCont k e   = return (False, icont k e)
{-# INLINE enumCheckIfDone #-}


-- |Create an enumerator from a callback function
enumFromCallback ::
 (Monad m, NullPoint s) =>
  (st -> m (Either SomeException ((Bool, st), s)))
  -> st
  -> Enumerator s m a
enumFromCallback c st =
  enumFromCallbackCatch c (\NotAnException -> return Nothing) st

-- Dummy exception to catch in enumFromCallback
-- This never gets thrown, but it lets us
-- share plumbing
data NotAnException = NotAnException
 deriving (Show, Typeable)

instance Exception NotAnException where
instance IException NotAnException where

-- |Create an enumerator from a callback function with an exception handler.
-- The exception handler is called if an iteratee reports an exception.
enumFromCallbackCatch ::
 (IException e, Monad m, NullPoint s) =>
  (st -> m (Either SomeException ((Bool, st), s)))
  -> (e -> m (Maybe EnumException))
  -> st
  -> Enumerator s m a
enumFromCallbackCatch c handler = loop
  where
    loop st iter = runIter iter idoneM (onCont st)
    onCont st k Nothing = c st >>=
        either (return . k . EOF . Just) (uncurry check)
      where
        check (b,st') = if b then loop st' . k . Chunk else return . k . Chunk
    onCont st k j@(Just e) = case fromException e of
      Just e' -> handler e' >>= maybe (loop st . k $ Chunk empty)
                                 (return . icont k . Just) . fmap toException
      Nothing -> return (icont k j)
{-# INLINE enumFromCallbackCatch #-}