{-# LANGUAGE FlexibleContexts, MultiParamTypeClasses, NamedFieldPuns,
             RankNTypes, RecordWildCards #-}

{- |
   Module      : Streaming.Process
   Description : Run system process with support for Streams
   Copyright   : (c) Ivan Lazar Miljenovic
   License     : MIT
   Maintainer  : Ivan.Miljenovic@gmail.com

   Run system commands in a streaming fashion.

   __WARNING:__ If using this module, you will need to have
   @ghc-options -threaded@ in your @.cabal@ file otherwise it will
   likely hang!

   These functions are typically written to be used in a
   continuation-passing style to allow for proper finalisation.  If
   you have many of these nested, it may be easier to use the
   "Streaming.Process.Lifted" module.

   These functions will all throw 'ProcessExitedUnsuccessfully' if the
   process\/command itself fails.

 -}
module Streaming.Process
  ( -- * High level functions
    withStreamingProcess
  , withStreamingCommand
  , streamInput
  , streamInputCommand
  , withStreamingOutput
  , withStreamingOutputCommand
    -- * Lower level
  , StreamProcess(..)
  , switchOutputs
  , WithStream(..)
  , WithStream'
  , SupplyStream(..)
  , withStreamProcess
  , withStreamCommand
  , withProcessHandles
  , processInput
  , withProcessOutput
    -- * Interleaved stdout and stderr
  , StdOutErr
  , withStreamOutputs
    -- * Re-exports
    -- $reexports
  , module Data.Streaming.Process
  , concurrently
  ) where

import           Data.ByteString.Streaming (ByteString)
import qualified Data.ByteString.Streaming as SB
import           Streaming                 (hoist)
import           Streaming.Concurrent      (unbounded, withMergedStreams)
import qualified Streaming.Prelude         as S

import Control.Concurrent.Async.Lifted (concurrently)
import Control.Monad.Base              (MonadBase)
import Control.Monad.Catch             (MonadMask, finally, onException, throwM)
import Control.Monad.IO.Class          (MonadIO, liftIO)
import Control.Monad.Trans.Control     (MonadBaseControl)
import Data.Streaming.Process
import Data.Streaming.Process.Internal (InputSource(..), OutputSink(..))
import System.Exit                     (ExitCode(..))
import System.IO                       (hClose)
import System.Process                  (CreateProcess(..),
                                        StdStream(CreatePipe), shell)

--------------------------------------------------------------------------------

-- | Feeds the provided data into the specified process, then
--   concurrently streams stdout and stderr into the provided
--   continuation.
--
--   Note that the monad used in the 'StdOutErr' argument to the
--   continuation can be different from the final result, as it's up
--   to the caller to make sure the result is reached.
withStreamingProcess :: (MonadBaseControl IO m, MonadIO m, MonadMask m
                        , MonadBase IO n)
                        => CreateProcess -> ByteString m v
                        -> (StdOutErr n () -> m r) -> m r
withStreamingProcess cp inp = withStreamProcess cp
                              . flip (withProcessHandles inp)

-- | As with 'withStreamingProcess', but run the specified command in
--   a shell.
withStreamingCommand :: (MonadBaseControl IO m, MonadIO m, MonadMask m
                        , MonadBase IO n)
                        => String -> ByteString m v
                        -> (StdOutErr n () -> m r) -> m r
withStreamingCommand = withStreamingProcess . shell

-- | Feed input into a process with no expected output.
streamInput :: (MonadIO m, MonadMask m) => CreateProcess
               -> ByteString m r -> m r
streamInput cp = withStreamProcess cp . flip processInput

-- | As with 'streamInput' but run the specified command in a shell.
streamInputCommand :: (MonadIO m, MonadMask m) => String
                      -> ByteString m r -> m r
streamInputCommand = streamInput . shell

-- | Obtain the output of a process with no input (ignoring error
--   output).
withStreamingOutput :: (MonadIO n, MonadIO m, MonadMask m)
                       => CreateProcess
                       -> (ByteString n () -> m r) -> m r
withStreamingOutput cp = withStreamProcess cp . flip withProcessOutput

-- | As with 'withStreamingOutput' but run the specified command in a
--  shell.
withStreamingOutputCommand :: (MonadIO n, MonadIO m, MonadMask m)
                              => String
                              -> (ByteString n () -> m r) -> m r
withStreamingOutputCommand = withStreamingOutput . shell

--------------------------------------------------------------------------------

-- | Feeds the provided data into the input handle, then concurrently
--   streams stdout and stderr into the provided continuation.
--
--   Note that the monad used in the 'StdOutErr' argument to the
--   continuation can be different from the final result, as it's up
--   to the caller to make sure the result is reached.
withProcessHandles :: (MonadBaseControl IO m, MonadIO m, MonadMask m, MonadBase IO n)
                      => ByteString m v
                      -> StreamProcess (SupplyStream m)
                                       (WithStream' m)
                                       (WithStream' m)
                      -> (StdOutErr n () -> m r) -> m r
withProcessHandles inp sp@StreamProcess{..} f =
  snd <$> concurrently withIn withOutErr
  where
    withIn = supplyStream toStdin inp

    withOutErr = withStreamOutputs sp f

-- | Stream input into a process, ignoring any output.
processInput :: (MonadIO m, MonadMask m)
                => StreamProcess (SupplyStream m) ClosedStream ClosedStream
                -> ByteString m r -> m r
processInput StreamProcess{toStdin} = supplyStream toStdin

-- | Read the output from a process, ignoring stdin and stderr.
withProcessOutput :: (MonadIO n, MonadIO m, MonadMask m)
                     => StreamProcess ClosedStream (WithStream n m) ClosedStream
                     -> (ByteString n () -> m r) -> m r
withProcessOutput StreamProcess{fromStdout} = withStream fromStdout

--------------------------------------------------------------------------------

-- | Represents the input and outputs for a streaming process.
data StreamProcess stdin stdout stderr = StreamProcess
  { toStdin    :: !stdin
  , fromStdout :: !stdout
  , fromStderr :: !stderr
  } deriving (Eq, Show)

-- | Switch the two outputs.  Useful for example if using
--   'withStreamProcess' and 'withProcessHandles' but wanting to deal
--   with any potential output from stderr before stdout.
switchOutputs :: StreamProcess stdin stdout stderr
                 -> StreamProcess stdin stderr stdout
switchOutputs sp@StreamProcess{fromStdout, fromStderr}
  = sp { fromStdout = fromStderr
       , fromStderr = fromStdout
       }

-- | A variant of 'withCheckedProcess' that will on an exception kill
--   the child process and attempt to perform cleanup (though you
--   should also attempt to do so in your own code).
--
--   Will throw 'ProcessExitedUnsuccessfully' on a non-successful exit code.
--
--   Compared to @withCheckedProcessCleanup@ from @conduit-extra@,
--   this has the three parameters grouped into 'StreamProcess' to
--   make it more of a continuation.
withStreamProcess :: (InputSource stdin, OutputSink stdout, OutputSink stderr
                     , MonadIO m, MonadMask m)
                     => CreateProcess
                     -> (StreamProcess stdin stdout stderr -> m r) -> m r
withStreamProcess cp f = do
  (stdin, stdout, stderr, sph) <- streamingProcess cp
  r <- f (StreamProcess stdin stdout stderr)
         `onException` terminateStreamingProcess sph
  ec <- waitForStreamingProcess sph `finally` closeStreamingProcessHandle sph
  case ec of
    ExitSuccess   -> return r
    ExitFailure _ -> throwM (ProcessExitedUnsuccessfully cp ec)

-- | A variant of 'withStreamProcess' that runs the provided
--   command in a shell.
withStreamCommand :: (InputSource stdin, OutputSink stdout, OutputSink stderr
                     , MonadIO m, MonadMask m)
                     => String
                     -> (StreamProcess stdin stdout stderr -> m r) -> m r
withStreamCommand = withStreamProcess . shell

terminateStreamingProcess :: (MonadIO m) => StreamingProcessHandle -> m ()
terminateStreamingProcess = liftIO . terminateProcess . streamingProcessHandleRaw

--------------------------------------------------------------------------------

-- | A representation of the concurrent streaming of both @stdout@ and
--   @stderr@ (contrast to 'SB.hGet').
--
--   Note that if for example you wish to completely discard stderr,
--   you can do so with @'hoist' 'SB.effects'@ (or just process the
--   stdout, then run 'SB.effects' at the end to discard the stderr).
type StdOutErr m r = ByteString (ByteString m) r

-- | Get both stdout and stderr concurrently.
withStreamOutputs :: ( MonadMask m, MonadIO m, MonadBaseControl IO m
                     , MonadBase IO n)
                     => StreamProcess stdin (WithStream' m) (WithStream' m)
                     -> (StdOutErr n () -> m r) -> m r
withStreamOutputs StreamProcess{fromStdout, fromStderr} f =
  withStream fromStdout $ \stdout ->
    withStream fromStderr $ \stderr ->
      let getOut = S.map Left  . SB.toChunks $ stdout
          getErr = S.map Right . SB.toChunks $ stderr
      in withMergedStreams unbounded [getOut, getErr] (f . mrg)
  where
    mrg = SB.fromChunks . hoist SB.fromChunks . S.partitionEithers

--------------------------------------------------------------------------------

-- | A wrapper for being able to provide a stream of bytes.
newtype SupplyStream m = SupplyStream { supplyStream :: forall r. ByteString m r -> m r }

instance (MonadMask m, MonadIO m) => InputSource (SupplyStream m) where
  isStdStream = (\(Just h) -> return (SupplyStream $ \inp ->
                                       SB.hPut h inp `finally` liftIO (hClose h))
                , Just CreatePipe
                )

-- | A wrapper for something taking a continuation with a stream of
--   bytes as input.
newtype WithStream n m = WithStream { withStream :: forall r. (ByteString n () -> m r) -> m r }

-- | An alias for the common case of @n ~ m@.
type WithStream' m = WithStream m m

instance (MonadIO m, MonadMask m, MonadIO n) => OutputSink (WithStream n m) where
  osStdStream = (\(Just h) -> return (WithStream $ \f ->
                                       f (SB.hGetContents h) `finally` liftIO (hClose h))
                , Just CreatePipe
                )

--------------------------------------------------------------------------------

{- $reexports

All of "Data.Streaming.Process" is available for you to use.

The 'concurrently' function will probably be useful if manually
handling process inputs and outputs.

-}