{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE DeriveDataTypeable #-}
-- | Defines the types for a source, which is a producer of data.
module Data.Conduit.Types.Source
    ( SourceResult (..)
    , PreparedSource (..)
    , Source (..)
    , BufferedSource (..)
    , SourceInvariantException (..)
    , BufferSource (..)
    ) where

import Control.Monad.Trans.Resource
import Data.Monoid (Monoid (..))
import Control.Monad (liftM)
import Data.Typeable (Typeable)
import Control.Exception (Exception, throw)

-- | Result of pulling from a source. Either a new piece of data (@Open@), or
-- indicates that the source is now @Closed@.
data SourceResult a = Open a | Closed
    deriving (Show, Eq, Ord)

instance Functor SourceResult where
    fmap f (Open a) = Open (f a)
    fmap _ Closed = Closed

-- | A 'PreparedSource' has two operations on it: pull some data, and close the
-- 'PreparedSource'. Since 'PreparedSource' is built on top of 'ResourceT', all
-- acquired resources should be automatically released anyway. Closing a
-- 'PreparedSource' early
-- is merely an optimization to free scarce resources as soon as possible.
--
-- A 'PreparedSource' has three invariants:
--
-- * It is illegal to call 'sourcePull' after a previous call returns 'Closed', or after a call to 'sourceClose'.
--
-- * It is illegal to call 'sourceClose' multiple times, or after a previous
-- 'sourcePull' returns a 'Closed'.
--
-- * A 'PreparedSource' is responsible to free any resources when either 'sourceClose'
-- is called or a 'Closed' is returned. However, based on the usage of
-- 'ResourceT', this is simply an optimization.
data PreparedSource m a = PreparedSource
    { sourcePull :: ResourceT m (SourceResult a)
    , sourceClose :: ResourceT m ()
    }

instance Monad m => Functor (PreparedSource m) where
    fmap f src = src
        { sourcePull = liftM (fmap f) (sourcePull src)
        }

-- | All but the simplest of 'PreparedSource's (e.g., @repeat@) require some
-- type of state to track their current status. This may be in the form of a
-- mutable variable (e.g., @IORef@), or via opening a resource like a @Handle@.
-- While a 'PreparedSource' is given no opportunity to acquire such resources,
-- this type is.
--
-- A 'Source' is simply a monadic action that returns a 'PreparedSource'. One
-- nice consequence of this is the possibility of creating an efficient
-- 'Monoid' instance, which will only acquire one resource at a time, instead
-- of bulk acquiring all resources at the beginning of running the 'Source'.
--
-- Note that each time you \"call\" a @Source@, it is started from scratch. If
-- you want a resumable source (e.g., one which can be passed to multiple
-- @Sink@s), you likely want to use a 'BufferedSource'.
newtype Source m a = Source { prepareSource :: ResourceT m (PreparedSource m a) }

instance Monad m => Functor (Source m) where
    fmap f (Source msrc) = Source (liftM (fmap f) msrc)

instance Resource m => Monoid (Source m a) where
    mempty = Source (return PreparedSource
        { sourcePull = return Closed
        , sourceClose = return ()
        })
    mappend a b = mconcat [a, b]
    mconcat [] = mempty
    mconcat (Source mnext:rest0) = Source $ do
        -- open up the first Source...
        next0 <- mnext
        -- and place it in a mutable reference along with all of the upcoming
        -- Sources
        istate <- newRef (next0, rest0)
        return PreparedSource
            { sourcePull = pull istate
            , sourceClose = close istate
            }
      where
        pull istate =
            readRef istate >>= pull'
          where
            pull' (current, rest) = do
                res <- sourcePull current
                case res of
                    -- end of the current Source
                    Closed -> do
                        case rest of
                            -- ... and open the next one
                            Source ma:as -> do
                                a <- ma
                                writeRef istate (a, as)
                                -- continue pulling base on this new state
                                pull istate
                            -- no more source, return an EOF
                            [] -> do
                                -- give an error message if the first Source
                                -- invariant is violated (read data after EOF)
                                writeRef istate $
                                    throw $ PullAfterEOF "Source:mconcat"
                                return Closed
                    Open _ -> return res
        close istate = do
            -- we only need to close the current Source, since they are opened
            -- one at a time
            (current, _) <- readRef istate
            sourceClose current

-- | When actually interacting with 'Source's, we usually want to be able to
-- buffer the output, in case any intermediate steps return leftover data. A
-- 'BufferedSource' allows for such buffering, via the 'bsourceUnpull' function.
--
-- A 'BufferedSource', unlike a 'Source', is resumable, meaning it can be passed to
-- multiple 'Sink's without restarting.
--
-- Finally, a 'BufferedSource' relaxes one of the invariants of a 'Source': calling
-- 'bsourcePull' after an 'EOF' will simply return another 'EOF'.
--
-- A @BufferedSource@ is also known as a /resumable source/, in that it can be
-- called multiple times, and each time will provide new data. One caveat:
-- while the types will allow you to use the buffered source in multiple
-- threads, there is no guarantee that all @BufferedSource@s will handle this
-- correctly.
data BufferedSource m a = BufferedSource
    { bsourcePull :: ResourceT m (SourceResult a)
    , bsourceUnpull :: a -> ResourceT m ()
    , bsourceClose :: ResourceT m ()
    }

data SourceInvariantException = PullAfterEOF String
    deriving (Show, Typeable)
instance Exception SourceInvariantException

-- | This typeclass allows us to unify operators on 'Source' and 'BufferedSource'.
class BufferSource s where
    bufferSource :: Resource m => s m a -> ResourceT m (BufferedSource m a)

-- | Note that this instance hides the 'bsourceClose' record, so that a
-- @BufferedSource@ remains resumable. The correct way to handle closing of a
-- resumable source would be to call @bsourceClose@ on the originally
-- @BufferedSource@, e.g.:
--
-- > bsrc <- bufferSource $ sourceFile "myfile.txt"
-- > bsrc $$ drop 5
-- > rest <- bsrc $$ consume
-- > bsourceClose bsrc
--
-- Note that the call to the @$$@ operator allocates a /new/ 'BufferedSource'
-- internally, so that when @$$@ calls @bsourceClose@ the first time, it does
-- not close the actual file, thereby allowing us to pass the same @bsrc@ to
-- the @consume@ function. Afterwards, we should call @bsourceClose@ manually
-- (though @runResourceT@ will handle it for us eventually).
instance BufferSource BufferedSource where
    bufferSource bsrc = return bsrc
        { bsourceClose = return ()
        }

-- | State of a 'BufferedSource'
data BState a = BOpen [a]
              | BClosed [a]
    deriving Show

instance BufferSource PreparedSource where
    bufferSource src = do
        istate <- newRef $ BOpen []
        return BufferedSource
            { bsourcePull = do
                mresult <- modifyRef istate $ \state ->
                    case state of
                        BOpen [] -> (state, Nothing)
                        BClosed [] -> (state, Just Closed)
                        BOpen (x:xs) -> (BOpen xs, Just $ Open x)
                        BClosed (x:xs) -> (BClosed xs, Just $ Open x)
                case mresult of
                    Nothing -> do
                        result <- sourcePull src
                        case result of
                            Closed -> writeRef istate $ BClosed []
                            Open _ -> return ()
                        return result
                    Just result -> return result
            , bsourceUnpull = \x ->
                modifyRef istate $ \state ->
                    case state of
                        BOpen buffer -> (BOpen (x : buffer), ())
                        BClosed buffer -> (BClosed (x : buffer), ())
            , bsourceClose = do
                action <- modifyRef istate $ \state ->
                    case state of
                        BOpen x -> (BClosed x, sourceClose src)
                        BClosed _ -> (state, return ())
                action
            }

instance BufferSource Source where
    bufferSource (Source msrc) = msrc >>= bufferSource