{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE CPP #-}
{-# OPTIONS_GHC -fno-warn-orphans #-}
-- | A full tutorial for this module is available at:
-- <https://github.com/snoyberg/conduit/blob/master/PROCESS.md>.
--
-- Note that this is a very thin layer around the @Data.Streaming.Process@ module. In particular, it:
--
-- * Provides orphan instances for conduit
--
-- * Provides some useful helper functions
module Data.Conduit.Process
    ( -- * Functions
      sourceCmdWithConsumer
    , sourceProcessWithConsumer
    , sourceCmdWithStreams
    , sourceProcessWithStreams
    , withCheckedProcessCleanup
      -- * InputSource types
    , FlushInput(..)
    , BuilderInput(..)
      -- * Reexport
    , module Data.Streaming.Process
    ) where

import Data.Streaming.Process
import Data.Streaming.Process.Internal
import System.Exit (ExitCode (..))
import Control.Monad.IO.Unlift (MonadIO, liftIO, MonadUnliftIO, withRunInIO, withUnliftIO, unliftIO)
import System.IO (hClose, BufferMode (NoBuffering), hSetBuffering)
import Data.Conduit
import Data.Functor (($>))
import Data.Conduit.Binary (sourceHandle, sinkHandle, sinkHandleBuilder, sinkHandleFlush)
import Data.ByteString (ByteString)
import Data.ByteString.Builder (Builder)
import Control.Concurrent.Async (runConcurrently, Concurrently(..))
import Control.Exception (onException, throwIO, finally, bracket)
#if (__GLASGOW_HASKELL__ < 710)
import Control.Applicative ((<$>), (<*>))
#endif

instance (r ~ (), MonadIO m, i ~ ByteString) => InputSource (ConduitM i o m r) where
    isStdStream = (\(Just h) -> hSetBuffering h NoBuffering $> sinkHandle h, Just CreatePipe)
instance (r ~ (), r' ~ (), MonadIO m, MonadIO n, i ~ ByteString) => InputSource (ConduitM i o m r, n r') where
    isStdStream = (\(Just h) -> hSetBuffering h NoBuffering $> (sinkHandle h, liftIO $ hClose h), Just CreatePipe)

-- | Wrapper for input source which accepts 'Data.ByteString.Builder.Builder's.
-- You can pass 'Data.ByteString.Builder.Extra.flush' to flush the input. Note
-- that the pipe will /not/ automatically close when the processing completes.
--
-- @since 1.3.2
newtype BuilderInput o m r = BuilderInput (ConduitM Builder o m r)

-- | Wrapper for input source  which accepts @Flush@es. Note that the pipe
-- will /not/ automatically close then processing completes.
--
-- @since 1.3.2
newtype FlushInput o m r = FlushInput (ConduitM (Flush ByteString) o m r)

instance (MonadIO m, r ~ ()) => InputSource (BuilderInput o m r) where
  isStdStream = (\(Just h) -> return $ BuilderInput $ sinkHandleBuilder h, Just CreatePipe)
instance (MonadIO m, MonadIO n, r ~ (), r' ~ ()) => InputSource (BuilderInput o m r, n r') where
  isStdStream = (\(Just h) -> return (BuilderInput $ sinkHandleBuilder h, liftIO $ hClose h), Just CreatePipe)
instance (MonadIO m, r ~ ()) => InputSource (FlushInput o m r) where
  isStdStream = (\(Just h) -> return $ FlushInput $ sinkHandleFlush h, Just CreatePipe)
instance (MonadIO m, MonadIO n, r ~ (), r' ~ ()) => InputSource (FlushInput o m r, n r') where
  isStdStream = (\(Just h) -> return (FlushInput $ sinkHandleFlush h, liftIO $ hClose h), Just CreatePipe)

instance (r ~ (), MonadIO m, o ~ ByteString) => OutputSink (ConduitM i o m r) where
    osStdStream = (\(Just h) -> hSetBuffering h NoBuffering $> sourceHandle h, Just CreatePipe)
instance (r ~ (), r' ~ (), MonadIO m, MonadIO n, o ~ ByteString) => OutputSink (ConduitM i o m r, n r') where
    osStdStream = (\(Just h) -> hSetBuffering h NoBuffering $> (sourceHandle h, liftIO $ hClose h), Just CreatePipe)

-- | Given a @CreateProcess@, run the process, with its output being used as a
-- @Source@ to feed the provided @Consumer@. Once the process has completed,
-- return a tuple of the @ExitCode@ from the process and the output collected
-- from the @Consumer@.
--
-- Note that, if an exception is raised by the consumer, the process is /not/
-- terminated. This behavior is different from 'sourceProcessWithStreams' due
-- to historical reasons.
--
-- Since 1.1.2
sourceProcessWithConsumer :: MonadIO m
                          => CreateProcess
                          -> ConduitT ByteString Void m a -- ^ stdout
                          -> m (ExitCode, a)
sourceProcessWithConsumer cp consumer = do
    (ClosedStream, (source, close), ClosedStream, cph) <- streamingProcess cp
    res <- runConduit $ source .| consumer
    close
    ec <- waitForStreamingProcess cph
    return (ec, res)

-- | Like @sourceProcessWithConsumer@ but providing the command to be run as
-- a @String@.
--
-- Since 1.1.2
sourceCmdWithConsumer :: MonadIO m
                      => String                  -- ^command
                      -> ConduitT ByteString Void m a -- ^stdout
                      -> m (ExitCode, a)
sourceCmdWithConsumer cmd = sourceProcessWithConsumer (shell cmd)


-- | Given a @CreateProcess@, run the process
-- and feed the provided @Producer@
-- to the stdin @Sink@ of the process.
-- Use the process outputs (stdout, stderr) as @Source@s
-- and feed it to the provided @Consumer@s.
-- Once the process has completed,
-- return a tuple of the @ExitCode@ from the process
-- and the results collected from the @Consumer@s.
--
-- If an exception is raised by any of the streams,
-- the process is terminated.
--
-- IO is required because the streams are run concurrently
-- using the <https://hackage.haskell.org/package/async async> package
--
-- @since 1.1.12
sourceProcessWithStreams
  :: MonadUnliftIO m
  => CreateProcess
  -> ConduitT () ByteString m () -- ^stdin
  -> ConduitT ByteString Void m a -- ^stdout
  -> ConduitT ByteString Void m b -- ^stderr
  -> m (ExitCode, a, b)
sourceProcessWithStreams cp producerStdin consumerStdout consumerStderr =
  withUnliftIO $ \u -> do
    (  (sinkStdin, closeStdin)
     , (sourceStdout, closeStdout)
     , (sourceStderr, closeStderr)
     , sph) <- streamingProcess cp
    (_, resStdout, resStderr) <-
      runConcurrently (
        (,,)
        <$> Concurrently ((unliftIO u $ runConduit $ producerStdin .| sinkStdin) `finally` closeStdin)
        <*> Concurrently (unliftIO u $ runConduit $ sourceStdout .| consumerStdout)
        <*> Concurrently (unliftIO u $ runConduit $ sourceStderr .| consumerStderr))
      `finally` (closeStdout >> closeStderr)
      `onException` terminateStreamingProcess sph
    ec <- waitForStreamingProcess sph
    return (ec, resStdout, resStderr)

-- | Like @sourceProcessWithStreams@ but providing the command to be run as
-- a @String@.
--
-- @since 1.1.12
sourceCmdWithStreams
  :: MonadUnliftIO m
  => String                   -- ^command
  -> ConduitT () ByteString m () -- ^stdin
  -> ConduitT ByteString Void m a -- ^stdout
  -> ConduitT ByteString Void m b -- ^stderr
  -> m (ExitCode, a, b)
sourceCmdWithStreams cmd = sourceProcessWithStreams (shell cmd)

-- | Same as 'withCheckedProcess', but kills the child process in the case of
-- an exception being thrown by the provided callback function.
--
-- @since 1.1.11
withCheckedProcessCleanup
    :: ( InputSource stdin
       , OutputSink stderr
       , OutputSink stdout
       , MonadUnliftIO m
       )
    => CreateProcess
    -> (stdin -> stdout -> stderr -> m b)
    -> m b
withCheckedProcessCleanup cp f = withRunInIO $ \run -> bracket
    (streamingProcess cp)
    (\(_, _, _, sph) -> closeStreamingProcessHandle sph)
    $ \(x, y, z, sph) -> do
        res <- run (f x y z) `onException` terminateStreamingProcess sph
        ec <- waitForStreamingProcess sph
        if ec == ExitSuccess
            then return res
            else throwIO $ ProcessExitedUnsuccessfully cp ec


terminateStreamingProcess :: MonadIO m => StreamingProcessHandle -> m ()
terminateStreamingProcess = liftIO . terminateProcess . streamingProcessHandleRaw