{-# LANGUAGE CPP #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE FunctionalDependencies #-}
{-# LANGUAGE UndecidableInstances #-}
-- | Note: This module is experimental, and might be modified at any time.
-- Caveat emptor!
module Data.Conduit.Classy
    ( module Data.Conduit.Classy
    , C.ResumableSource
    , C.runResourceT
    , C.Flush (..)
    , C.ResourceT
    , C.unwrapResumable
    ) where

import Prelude (Monad (..), Functor (..), ($), const, IO, Maybe, Either, Bool, (.), either)
import Data.Void (Void)
import Control.Applicative (Applicative (..))
import qualified Data.Conduit as C
import Data.Conduit.Internal (Pipe (PipeM))
import Control.Monad.Trans.Class (MonadTrans (..))
import Control.Monad.Trans.Resource (allocate, release, MonadThrow, MonadResource, ResourceT)
import Control.Monad.Trans.Control (liftWith, restoreT, MonadTransControl)
import Control.Monad.IO.Class (MonadIO)
import Data.Monoid (Monoid (..))

import Control.Monad.Trans.Identity ( IdentityT)
import Control.Monad.Trans.List     ( ListT    )
import Control.Monad.Trans.Maybe    ( MaybeT   )
import Control.Monad.Trans.Error    ( ErrorT, Error)
import Control.Monad.Trans.Reader   ( ReaderT  )
import Control.Monad.Trans.State    ( StateT   )
import Control.Monad.Trans.Writer   ( WriterT  )
import Control.Monad.Trans.RWS      ( RWST     )

import qualified Control.Monad.Trans.RWS.Strict    as Strict ( RWST   )
import qualified Control.Monad.Trans.State.Strict  as Strict ( StateT )
import qualified Control.Monad.Trans.Writer.Strict as Strict ( WriterT )

-- | Provides a stream of output values, without consuming any input or
-- producing a final result.
--
-- Since 0.6.0
type Source m o = SourceM o m ()

newtype SourceM o m r = SourceM { unSourceM :: Pipe () () o () m r }
    deriving (Functor, Applicative, Monad, MonadTrans, MonadIO, ResourcePipe, MonadThrow)

instance Monad m => Monoid (SourceM o m ()) where
    mempty = return ()
    mappend = (>>)

-- | Consumes a stream of input values and produces a stream of output values,
-- without producing a final result.
--
-- Since 0.6.0
type Conduit i m o = ConduitM i o m ()

newtype ConduitM i o m r = ConduitM { unConduitM :: Pipe i i o () m r }
    deriving (Functor, Applicative, Monad, MonadTrans, MonadIO, ResourcePipe, MonadThrow)

instance Monad m => Monoid (ConduitM i o m ()) where
    mempty = return ()
    mappend = (>>)

-- | Consumes a stream of input values and produces a final result, without
-- producing any output.
--
-- Since 0.6.0
newtype Sink i m r = Sink { unSink :: Pipe i i Void () m r }
    deriving (Functor, Applicative, Monad, MonadTrans, MonadIO, ResourcePipe, MonadThrow)

instance Monad m => Monoid (Sink i m ()) where
    mempty = return ()
    mappend = (>>)

class (Monad m, Monad (PipeMonad m)) => IsPipe m where
    type PipeInput m
    type PipeTerm m
    type PipeOutput m
    type PipeMonad m :: * -> *

    -- | Wait for a single input value from upstream, terminating immediately if no
    -- data is available.
    --
    -- Since 0.5.0
    await :: m (Maybe (PipeInput m))

    -- | This is similar to @await@, but will return the upstream result value as
    -- @Left@ if available.
    --
    -- Since 0.5.0
    awaitE :: m (Either (PipeTerm m) (PipeInput m))

    -- | Provide a single piece of leftover input to be consumed by the next pipe
    -- in the current monadic binding.
    --
    -- /Note/: it is highly encouraged to only return leftover values from input
    -- already consumed from upstream.
    --
    -- Since 0.5.0
    leftover :: PipeInput m -> m ()

    -- | Send a single output value downstream. If the downstream @Pipe@
    -- terminates, this @Pipe@ will terminate as well.
    --
    -- Since 0.5.0
    yield :: PipeOutput m -> m ()

    -- | Similar to @yield@, but additionally takes a finalizer to be run if the
    -- downstream @Pipe@ terminates.
    --
    -- Since 0.5.0
    yieldOr :: PipeOutput m -> PipeMonad m () -> m ()

    liftPipeMonad :: PipeMonad m a -> m a

    -- | Add some code to be run when the given @Pipe@ cleans up.
    --
    -- Since 0.4.1
    addCleanup :: (Bool -> PipeMonad m ()) -- ^ @True@ if @Pipe@ ran to completion, @False@ for early termination.
               -> m r
               -> m r

instance (Monad m, l ~ i) => IsPipe (Pipe l i o u m) where
    type PipeInput (Pipe l i o u m) = i
    type PipeTerm (Pipe l i o u m) = u
    type PipeOutput (Pipe l i o u m) = o
    type PipeMonad (Pipe l i o u m) = m

    await = C.await
    {-# INLINE [1] await #-}

    awaitE = C.awaitE
    {-# INLINE [1] awaitE #-}

    leftover = C.leftover
    {-# INLINE [1] leftover #-}

    yield = C.yield
    {-# INLINE yield #-}

    yieldOr = C.yieldOr
    {-# INLINE yieldOr #-}

    liftPipeMonad = lift

    addCleanup = C.addCleanup

instance Monad m => IsPipe (SourceM o m) where
    type PipeInput (SourceM o m) = ()
    type PipeTerm (SourceM o m) = ()
    type PipeOutput (SourceM o m) = o
    type PipeMonad (SourceM o m) = m

    await = SourceM await
    {-# INLINE await #-}

    awaitE = SourceM awaitE
    {-# INLINE awaitE #-}

    leftover = SourceM . leftover
    {-# INLINE leftover #-}

    yield = SourceM . yield
    {-# INLINE yield #-}

    yieldOr a = SourceM . yieldOr a
    {-# INLINE yieldOr #-}

    liftPipeMonad = lift
    {-# INLINE liftPipeMonad #-}

    addCleanup c (SourceM p) = SourceM (addCleanup c p)
    {-# INLINE addCleanup #-}

instance Monad m => IsPipe (ConduitM i o m) where
    type PipeInput (ConduitM i o m) = i
    type PipeTerm (ConduitM i o m) = ()
    type PipeOutput (ConduitM i o m) = o
    type PipeMonad (ConduitM i o m) = m

    await = ConduitM await
    {-# INLINE await #-}

    awaitE = ConduitM awaitE
    {-# INLINE awaitE #-}

    leftover = ConduitM . leftover
    {-# INLINE leftover #-}

    yield = ConduitM . yield
    {-# INLINE yield #-}

    yieldOr a = ConduitM . yieldOr a
    {-# INLINE yieldOr #-}

    liftPipeMonad = lift
    {-# INLINE liftPipeMonad #-}

    addCleanup c (ConduitM p) = ConduitM (addCleanup c p)
    {-# INLINE addCleanup #-}

instance Monad m => IsPipe (Sink i m) where
    type PipeInput (Sink i m) = i
    type PipeTerm (Sink i m) = ()
    type PipeOutput (Sink i m) = Void
    type PipeMonad (Sink i m) = m

    await = Sink await
    {-# INLINE await #-}

    awaitE = Sink awaitE
    {-# INLINE awaitE #-}

    leftover = Sink . leftover
    {-# INLINE leftover #-}

    yield = Sink . yield
    {-# INLINE yield #-}

    yieldOr a = Sink . yieldOr a
    {-# INLINE yieldOr #-}

    liftPipeMonad = lift
    {-# INLINE liftPipeMonad #-}

    addCleanup c (Sink p) = Sink (addCleanup c p)
    {-# INLINE addCleanup #-}

class (IsPipe m, MonadResource (PipeMonad m), MonadIO m) => ResourcePipe m where
    -- | Perform some allocation and run an inner @Pipe@. Two guarantees are given
    -- about resource finalization:
    --
    -- 1. It will be /prompt/. The finalization will be run as early as possible.
    --
    -- 2. It is exception safe. Due to usage of @resourcet@, the finalization will
    --    be run in the event of any exceptions.
    --
    -- Since 0.5.0
    bracketP :: IO a -> (a -> IO ()) -> (a -> m r) -> m r

instance (l ~ i, MonadResource m) => ResourcePipe (Pipe l i o u m) where
    bracketP alloc free inside = PipeM $ do
        (key, seed) <- allocate alloc free
        return $ addCleanup (const $ release key) (inside seed)

#define GOALL(C, C2, T) instance C => IsPipe (T) where { type PipeInput (T) = PipeInput m; type PipeMonad (T) = PipeMonad m; type PipeTerm (T) = PipeTerm m; type PipeOutput (T) = PipeOutput m; await = lift await; awaitE = lift awaitE; leftover = lift . leftover; yield = lift . yield; yieldOr a = lift . yieldOr a; liftPipeMonad = lift . liftPipeMonad; addCleanup c r = liftWith (\run -> run $ addCleanup c r) >>= restoreT . return}; instance C2 => ResourcePipe (T) where { bracketP = controlBracketP }
#define GO(T) GOALL(IsPipe m, ResourcePipe m, T m)
#define GOX(X, T) GOALL((IsPipe m, X), (ResourcePipe m, X), T m)
GO(IdentityT)
GO(ListT)
GO(MaybeT)
GOX(Error e, ErrorT e)
GO(ReaderT r)
GO(StateT s)
GOX(Monoid w, WriterT w)
GOX(Monoid w, RWST r w s)
GOX(Monoid w, Strict.RWST r w s)
GO(Strict.StateT s)
GOX(Monoid w, Strict.WriterT w)
GO(ResourceT)
#undef GO
#undef GOX
#undef GOALL

controlBracketP :: (ResourcePipe m, Monad (t m), MonadTransControl t)
                => IO a -> (a -> IO ()) -> (a -> t m r) -> t m r
controlBracketP alloc free inside = liftWith (\run -> bracketP alloc free (run . inside)) >>= restoreT . return

-- | Wait for input forever, calling the given inner @Pipe@ for each piece of
-- new input. Returns the upstream result type.
--
-- Since 0.5.0
awaitForever :: IsPipe m
             => (PipeInput m -> m r')
             -> m (PipeTerm m)
awaitForever inner =
    self
  where
    self = awaitE >>= either return (\i -> inner i >> self)
{-# INLINE [1] awaitForever #-}

infixr 0 $$
infixl 1 $=
infixr 2 =$
infixr 2 =$=
infixr 0 $$+
infixr 0 $$++
infixr 0 $$+-

($$) :: Monad m => Source m a -> Sink a m b -> m b
SourceM src $$ Sink sink = src C.$$ sink
{-# INLINE ($$) #-}

($=) :: Monad m => Source m a -> Conduit a m b -> Source m b
SourceM src $= ConduitM con = SourceM $ src C.$= con
{-# INLINE ($=) #-}

(=$=) :: Monad m => Conduit a m b -> Conduit b m c -> Conduit a m c
ConduitM l =$= ConduitM r = ConduitM $ l C.=$= r
{-# INLINE (=$=) #-}

(=$) :: Monad m => Conduit a m b -> Sink b m c -> Sink a m c
ConduitM l =$ Sink r = Sink $ l C.=$ r
{-# INLINE (=$) #-}

($$+) :: Monad m => Source m a -> Sink a m b -> m (C.ResumableSource m a, b)
SourceM src $$+ Sink sink = src C.$$+ sink
{-# INLINE ($$+) #-}

($$++) :: Monad m => C.ResumableSource m a -> Sink a m b -> m (C.ResumableSource m a, b)
rsrc $$++ Sink sink = rsrc C.$$++ sink
{-# INLINE ($$++) #-}

($$+-) :: Monad m => C.ResumableSource m a -> Sink a m b -> m b
rsrc $$+- Sink sink = rsrc C.$$+- sink
{-# INLINE ($$+-) #-}