-- |
-- Module      : Streamly.Internal.System.Process
-- Copyright   : (c) 2020 Composewell Technologies
-- License     : Apache-2.0
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--

{-# LANGUAGE CPP #-}
{-# LANGUAGE ScopedTypeVariables #-}

-- TODO:
--
-- Remove dependency on the "process" package. We do not need a lot of it or
-- need to reimplement significantly.
--
-- Interactive processes:
--
-- To understand process groups and sessions, see the "General Terminal
-- Interface" section in the [POSIX
-- standard](https://pubs.opengroup.org/onlinepubs/9699919799/).
--
-- For running processes interactively we need to make a new process group for
-- the new process and make it the foreground process group. When the process
-- is done we can make the parent process group as the foreground process group
-- again. We need to ensure that this works properly under exceptions. We can
-- provide an "interact" function to do so.
--
-- - Need a way to specify additional parameters for process creation.
-- Possibly use something like @processBytesWith spec@ etc.
--
-- - Need a way to access the pid and manage the processes and process groups.
-- We can treat the processes in the same way as we treat threads. We can
-- compose processes in parallel, and cleanup can happen in the same way as
-- tids are cleaned up. But do we need this when we have threads anyway?
--
-- - Use unfolds for generation?
--
-- - Folds for composing process sinks? Input may be taken as input of the
-- fold and the output of the process can be consumed by another fold.
--
-- - Replace FilePath with a typed path.
--
{-# LANGUAGE FlexibleContexts #-}

module Streamly.Internal.System.Process
    (
    -- * Process Configuration
      Config

    -- * Exceptions
    , ProcessFailure (..)

    -- * Generation
    , toBytes
    , toBytes'
    , toChunks
    , toChunks'

    -- * Transformation
    , processBytes
    , processBytes'
    , processChunksWith
    , processChunks
    , processChunks'With
    , processChunks'
    )
where

-- #define USE_NATIVE

import Control.Monad.Catch (MonadCatch, throwM)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Data.Word (Word8)
import Streamly.Data.Array.Foreign (Array)
import Streamly.Prelude (MonadAsync, parallel, IsStream, adapt)
import System.Exit (ExitCode(..))
import System.IO (hClose, Handle)

#ifdef USE_NATIVE
import Control.Exception (Exception(..), catch, throwIO, SomeException)
import System.Posix.Process (ProcessStatus(..))
import Streamly.Internal.System.Process.Posix
#else
import Control.Concurrent (forkIO)
import Control.Exception (Exception(..), catch, throwIO)
import Control.Monad (void, unless)
import Foreign.C.Error (Errno(..), ePIPE)
import GHC.IO.Exception (IOException(..), IOErrorType(..))
import System.Process
    ( ProcessHandle
    , CreateProcess(..)
    , StdStream (..)
    , createProcess
    , waitForProcess
    , CmdSpec(..)
    , terminateProcess
    )
#endif

import qualified Streamly.Data.Array.Foreign as Array
import qualified Streamly.Prelude as Stream

-- Internal imports
import Streamly.Internal.System.IO (defaultChunkSize)

import qualified Streamly.Internal.Data.Array.Stream.Foreign
    as ArrayStream (arraysOf)
import qualified Streamly.Internal.Data.Stream.IsStream as Stream (bracket')
import qualified Streamly.Internal.Data.Unfold as Unfold (either)
import qualified Streamly.Internal.FileSystem.Handle
    as Handle (toChunks, putChunks)

-- $setup
-- >>> :set -XFlexibleContexts
-- >>> import Data.Char (toUpper)
-- >>> import Data.Function ((&))
-- >>> import qualified Streamly.Console.Stdio as Stdio
-- >>> import qualified Streamly.Data.Fold as Fold
-- >>> import qualified Streamly.Prelude as Stream
-- >>> import qualified Streamly.System.Process as Process
-- >>> import qualified Streamly.Unicode.Stream as Unicode
-- >>> import qualified Streamly.Internal.Data.Stream.IsStream as Stream

-------------------------------------------------------------------------------
-- Config
-------------------------------------------------------------------------------

-- | Process configuration used for creating a new process.
--
-- By default the process config is setup to inherit the following attributes
-- from the parent process:
--
-- * Current working directory
-- * Environment
-- * Open file descriptors
-- * Process group
-- * Process uid and gid
-- * Signal handlers
-- * Terminal (Session)
--
#ifdef USE_NATIVE

type ProcessHandle = Process

-- XXX After the fork specify what code to run in parent and in child before
-- exec. Also, use config to control whether to search the binary in the PATH
-- or not.
newtype Config = Config Bool

mkConfig :: FilePath -> [String] -> Config
mkConfig _ _ = Config False

pipeStdErr :: Config -> Config
pipeStdErr (Config _) = Config True
#else
newtype Config = Config CreateProcess

-- | Create a default process configuration from an executable file path and
-- an argument list.
--
mkConfig :: FilePath -> [String] -> Config
mkConfig :: FilePath -> [FilePath] -> Config
mkConfig FilePath
path [FilePath]
args = CreateProcess -> Config
Config (CreateProcess -> Config) -> CreateProcess -> Config
forall a b. (a -> b) -> a -> b
$ CreateProcess :: CmdSpec
-> Maybe FilePath
-> Maybe [(FilePath, FilePath)]
-> StdStream
-> StdStream
-> StdStream
-> Bool
-> Bool
-> Bool
-> Bool
-> Bool
-> Bool
-> Maybe GroupID
-> Maybe UserID
-> Bool
-> CreateProcess
CreateProcess
    { cmdspec :: CmdSpec
cmdspec = FilePath -> [FilePath] -> CmdSpec
RawCommand FilePath
path [FilePath]
args
    , cwd :: Maybe FilePath
cwd = Maybe FilePath
forall a. Maybe a
Nothing -- inherit
    , env :: Maybe [(FilePath, FilePath)]
env = Maybe [(FilePath, FilePath)]
forall a. Maybe a
Nothing -- inherit

    -- File descriptors
    , std_in :: StdStream
std_in = StdStream
CreatePipe
    , std_out :: StdStream
std_out = StdStream
CreatePipe
    , std_err :: StdStream
std_err = StdStream
Inherit
    , close_fds :: Bool
close_fds = Bool
False

    -- Session/group/setuid/setgid
    , create_group :: Bool
create_group = Bool
False
    , child_user :: Maybe UserID
child_user = Maybe UserID
forall a. Maybe a
Nothing  -- Posix only
    , child_group :: Maybe GroupID
child_group = Maybe GroupID
forall a. Maybe a
Nothing  -- Posix only

    -- Signals (Posix only) behavior
    -- Reset SIGINT (Ctrl-C) and SIGQUIT (Ctrl-\) to default handlers.
    , delegate_ctlc :: Bool
delegate_ctlc = Bool
False

    -- Terminal behavior
    , new_session :: Bool
new_session = Bool
False  -- Posix only
    , detach_console :: Bool
detach_console = Bool
False -- Windows only
    , create_new_console :: Bool
create_new_console = Bool
False -- Windows Only

    -- Added by commit 6b8ffe2ec3d115df9ccc047599545ca55c393005
    , use_process_jobs :: Bool
use_process_jobs = Bool
True -- Windows only
    }

pipeStdErr :: Config -> Config
pipeStdErr :: Config -> Config
pipeStdErr (Config CreateProcess
cfg) = CreateProcess -> Config
Config (CreateProcess -> Config) -> CreateProcess -> Config
forall a b. (a -> b) -> a -> b
$ CreateProcess
cfg { std_err :: StdStream
std_err = StdStream
CreatePipe }
#endif

-------------------------------------------------------------------------------
-- Exceptions
-------------------------------------------------------------------------------
--
-- TODO Add the path of the executable and the PID of the process to the
-- exception info to aid debugging.

-- | An exception that is raised when a process fails.
--
-- @since 0.1.0
newtype ProcessFailure = ProcessFailure Int -- ^ The exit code of the process.
    deriving Int -> ProcessFailure -> ShowS
[ProcessFailure] -> ShowS
ProcessFailure -> FilePath
(Int -> ProcessFailure -> ShowS)
-> (ProcessFailure -> FilePath)
-> ([ProcessFailure] -> ShowS)
-> Show ProcessFailure
forall a.
(Int -> a -> ShowS) -> (a -> FilePath) -> ([a] -> ShowS) -> Show a
showList :: [ProcessFailure] -> ShowS
$cshowList :: [ProcessFailure] -> ShowS
show :: ProcessFailure -> FilePath
$cshow :: ProcessFailure -> FilePath
showsPrec :: Int -> ProcessFailure -> ShowS
$cshowsPrec :: Int -> ProcessFailure -> ShowS
Show

-- Exception instance of ProcessFailure
instance Exception ProcessFailure where

    displayException :: ProcessFailure -> FilePath
displayException (ProcessFailure Int
exitCode) =
        FilePath
"Process failed with exit code: " FilePath -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> FilePath
forall a. Show a => a -> FilePath
show Int
exitCode

-------------------------------------------------------------------------------
-- Transformation
-------------------------------------------------------------------------------
--
-- | On normal cleanup we do not need to close the pipe handles as they are
-- already guaranteed to be closed (we can assert that) by the time we reach
-- here. We should not kill the process, rather wait for it to terminate
-- normally.
cleanupNormal :: MonadIO m =>
    (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) -> m ()
cleanupNormal :: (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) -> m ()
cleanupNormal (Maybe Handle
_, Maybe Handle
_, Maybe Handle
_, ProcessHandle
procHandle) = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
#ifdef USE_NATIVE
    -- liftIO $ putStrLn "cleanupNormal waiting"
    status <- wait procHandle
    -- liftIO $ putStrLn "cleanupNormal done"
    case status of
        Exited ExitSuccess -> return ()
        Exited (ExitFailure code) -> throwM $ ProcessFailure code
        Terminated signal _ ->
            throwM $ ProcessFailure (negate $ fromIntegral signal)
        Stopped signal ->
            throwM $ ProcessFailure (negate $ fromIntegral signal)
#else
    ExitCode
exitCode <- ProcessHandle -> IO ExitCode
waitForProcess ProcessHandle
procHandle
    case ExitCode
exitCode of
        ExitCode
ExitSuccess -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        ExitFailure Int
code -> ProcessFailure -> IO ()
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM (ProcessFailure -> IO ()) -> ProcessFailure -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> ProcessFailure
ProcessFailure Int
code
#endif

-- | On an exception or if the process is getting garbage collected we need to
-- close the pipe handles, and send a SIGTERM to the process to clean it up.
-- Since we are using SIGTERM to kill the process, it may block forever. We can
-- possibly use a timer and send a SIGKILL after the timeout if the process is
-- still hanging around.
cleanupException :: MonadIO m =>
    (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) -> m ()
cleanupException :: (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) -> m ()
cleanupException (Just Handle
stdinH, Just Handle
stdoutH, Maybe Handle
stderrMaybe, ProcessHandle
ph) = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
    -- Send a SIGTERM to the process
    ProcessHandle -> IO ()
terminateProcess ProcessHandle
ph

    -- Ideally we should be closing the handle without flushing the buffers so
    -- that we cannot get a SIGPIPE. But there seems to be no way to do that as
    -- of now so we just ignore the SIGPIPE.
    Handle -> IO ()
hClose Handle
stdinH IO () -> (IOException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` IOException -> IO ()
eatSIGPIPE
    Handle -> IO ()
hClose Handle
stdoutH
    (Handle -> IO ()) -> Maybe Handle -> IO ()
forall (f :: * -> *) a.
Applicative f =>
(a -> f ()) -> Maybe a -> f ()
whenJust Handle -> IO ()
hClose Maybe Handle
stderrMaybe

    -- Non-blocking wait for the process to go away
    IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> IO ThreadId -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO ThreadId
forkIO (IO ExitCode -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ExitCode -> IO ()) -> IO ExitCode -> IO ()
forall a b. (a -> b) -> a -> b
$ ProcessHandle -> IO ExitCode
waitForProcess ProcessHandle
ph)

    where

    whenJust :: (a -> f ()) -> Maybe a -> f ()
whenJust a -> f ()
action Maybe a
mb = f () -> (a -> f ()) -> Maybe a -> f ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> f ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) a -> f ()
action Maybe a
mb

    isSIGPIPE :: IOException -> Bool
isSIGPIPE IOException
e =
        case IOException
e of
            IOError
                { ioe_type :: IOException -> IOErrorType
ioe_type = IOErrorType
ResourceVanished
                , ioe_errno :: IOException -> Maybe CInt
ioe_errno = Just CInt
ioe
                } -> CInt -> Errno
Errno CInt
ioe Errno -> Errno -> Bool
forall a. Eq a => a -> a -> Bool
== Errno
ePIPE
            IOException
_ -> Bool
False

    eatSIGPIPE :: IOException -> IO ()
eatSIGPIPE IOException
e = Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (IOException -> Bool
isSIGPIPE IOException
e) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IOException -> IO ()
forall e a. Exception e => e -> IO a
throwIO IOException
e
cleanupException (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
_ = FilePath -> m ()
forall a. HasCallStack => FilePath -> a
error FilePath
"cleanupProcess: Not reachable"

-- | Creates a system process from an executable path and arguments. For the
-- default attributes used to create the process see 'mkConfig'.
--
createProc' ::
       (Config -> Config) -- ^ Process attribute modifier
    -> FilePath                         -- ^ Executable path
    -> [String]                         -- ^ Arguments
    -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
    -- ^ (Input Handle, Output Handle, Error Handle, Process Handle)
createProc' :: (Config -> Config)
-> FilePath
-> [FilePath]
-> IO (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
createProc' Config -> Config
modCfg FilePath
path [FilePath]
args = do
#ifdef USE_NATIVE
    ((inp, out, err, _excParent, _excChild), parent, child, failure) <-
        mkStdioPipes cfg
    -- XXX Pass the exChild handle to the child process action
    proc <- newProcess child path args Nothing
              `catch` (\(e :: SomeException) -> failure >> throwIO e)
    -- XXX Read the exception channel and reap the process if it failed before
    -- exec.
    parent
    return (Just inp, Just out, err, proc)
#else
    CreateProcess
-> IO (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
createProcess CreateProcess
cfg
#endif

    where

    Config CreateProcess
cfg = Config -> Config
modCfg (Config -> Config) -> Config -> Config
forall a b. (a -> b) -> a -> b
$ FilePath -> [FilePath] -> Config
mkConfig FilePath
path [FilePath]
args

{-# INLINE putChunksClose #-}
putChunksClose :: (MonadIO m, IsStream t) =>
    Handle -> t m (Array Word8) -> t m a
putChunksClose :: Handle -> t m (Array Word8) -> t m a
putChunksClose Handle
h t m (Array Word8)
input =
    m () -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
m b -> t m a -> t m a
Stream.before
        (Handle -> SerialT m (Array Word8) -> m ()
forall (m :: * -> *) a.
(MonadIO m, Storable a) =>
Handle -> SerialT m (Array a) -> m ()
Handle.putChunks Handle
h (t m (Array Word8) -> SerialT m (Array Word8)
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
       (m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
adapt t m (Array Word8)
input) m () -> m () -> m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Handle -> IO ()
hClose Handle
h))
        t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
Stream.nil

{-# INLINE toChunksClose #-}
toChunksClose :: (IsStream t, MonadAsync m) => Handle -> t m (Array Word8)
toChunksClose :: Handle -> t m (Array Word8)
toChunksClose Handle
h = m () -> t m (Array Word8) -> t m (Array Word8)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, MonadRunInIO m) =>
m b -> t m a -> t m a
Stream.after (IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Handle -> IO ()
hClose Handle
h) (Handle -> t m (Array Word8)
forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadIO m) =>
Handle -> t m (Array Word8)
Handle.toChunks Handle
h)

{-# INLINE processChunksWithAction #-}
processChunksWithAction ::
    (IsStream t, MonadCatch m, MonadAsync m)
    => ((Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) -> t m a)
    -> (Config -> Config)
    -> FilePath             -- ^ Path to Executable
    -> [String]             -- ^ Arguments
    -> t m a     -- ^ Output stream
processChunksWithAction :: ((Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
 -> t m a)
-> (Config -> Config) -> FilePath -> [FilePath] -> t m a
processChunksWithAction (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) -> t m a
run Config -> Config
modCfg FilePath
path [FilePath]
args =
    m (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
-> ((Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
    -> m ())
-> ((Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
    -> m ())
-> ((Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
    -> m ())
-> ((Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
    -> t m a)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b c d e a.
(IsStream t, MonadAsync m, MonadCatch m) =>
m b
-> (b -> m c) -> (b -> m d) -> (b -> m e) -> (b -> t m a) -> t m a
Stream.bracket'
          m (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
alloc (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) -> m ()
forall (m :: * -> *).
MonadIO m =>
(Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) -> m ()
cleanupNormal (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) -> m ()
forall (m :: * -> *).
MonadIO m =>
(Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) -> m ()
cleanupException (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) -> m ()
forall (m :: * -> *).
MonadIO m =>
(Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) -> m ()
cleanupException (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) -> t m a
run

    where

    alloc :: m (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
alloc = IO (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
-> m (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
 -> m (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle))
-> IO (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
-> m (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
forall a b. (a -> b) -> a -> b
$ (Config -> Config)
-> FilePath
-> [FilePath]
-> IO (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
createProc' Config -> Config
modCfg FilePath
path [FilePath]
args

{-# INLINE processChunks'With #-}
processChunks'With ::
    (IsStream t, MonadCatch m, MonadAsync m)
    => (Config -> Config)   -- ^ Config modifier
    -> FilePath             -- ^ Executable name or path
    -> [String]             -- ^ Arguments
    -> t m (Array Word8)    -- ^ Input stream
    -> t m (Either (Array Word8) (Array Word8))     -- ^ Output stream
processChunks'With :: (Config -> Config)
-> FilePath
-> [FilePath]
-> t m (Array Word8)
-> t m (Either (Array Word8) (Array Word8))
processChunks'With Config -> Config
modifier FilePath
path [FilePath]
args t m (Array Word8)
input =
    ((Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
 -> t m (Either (Array Word8) (Array Word8)))
-> (Config -> Config)
-> FilePath
-> [FilePath]
-> t m (Either (Array Word8) (Array Word8))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadCatch m, MonadAsync m) =>
((Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
 -> t m a)
-> (Config -> Config) -> FilePath -> [FilePath] -> t m a
processChunksWithAction (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
-> t m (Either (Array Word8) (Array Word8))
forall d.
(Maybe Handle, Maybe Handle, Maybe Handle, d)
-> t m (Either (Array Word8) (Array Word8))
run (Config -> Config
modifier (Config -> Config) -> (Config -> Config) -> Config -> Config
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Config -> Config
pipeStdErr) FilePath
path [FilePath]
args

    where

    run :: (Maybe Handle, Maybe Handle, Maybe Handle, d)
-> t m (Either (Array Word8) (Array Word8))
run (Just Handle
stdinH, Just Handle
stdoutH, Just Handle
stderrH, d
_) =
        Handle
-> t m (Array Word8) -> t m (Either (Array Word8) (Array Word8))
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(MonadIO m, IsStream t) =>
Handle -> t m (Array Word8) -> t m a
putChunksClose Handle
stdinH t m (Array Word8)
input
            t m (Either (Array Word8) (Array Word8))
-> t m (Either (Array Word8) (Array Word8))
-> t m (Either (Array Word8) (Array Word8))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a -> t m a
`parallel` (Array Word8 -> Either (Array Word8) (Array Word8))
-> t m (Array Word8) -> t m (Either (Array Word8) (Array Word8))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
Stream.map Array Word8 -> Either (Array Word8) (Array Word8)
forall a b. a -> Either a b
Left (Handle -> t m (Array Word8)
forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadAsync m) =>
Handle -> t m (Array Word8)
toChunksClose Handle
stderrH)
            t m (Either (Array Word8) (Array Word8))
-> t m (Either (Array Word8) (Array Word8))
-> t m (Either (Array Word8) (Array Word8))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a -> t m a
`parallel` (Array Word8 -> Either (Array Word8) (Array Word8))
-> t m (Array Word8) -> t m (Either (Array Word8) (Array Word8))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
Stream.map Array Word8 -> Either (Array Word8) (Array Word8)
forall a b. b -> Either a b
Right (Handle -> t m (Array Word8)
forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadAsync m) =>
Handle -> t m (Array Word8)
toChunksClose Handle
stdoutH)
    run (Maybe Handle, Maybe Handle, Maybe Handle, d)
_ = FilePath -> t m (Either (Array Word8) (Array Word8))
forall a. HasCallStack => FilePath -> a
error FilePath
"processChunks': Not reachable"

{-# INLINE processChunks' #-}
processChunks' ::
    (IsStream t, MonadCatch m, MonadAsync m)
    => FilePath             -- ^ Executable name or path
    -> [String]             -- ^ Arguments
    -> t m (Array Word8)    -- ^ Input stream
    -> t m (Either (Array Word8) (Array Word8))     -- ^ Output stream
processChunks' :: FilePath
-> [FilePath]
-> t m (Array Word8)
-> t m (Either (Array Word8) (Array Word8))
processChunks' = (Config -> Config)
-> FilePath
-> [FilePath]
-> t m (Array Word8)
-> t m (Either (Array Word8) (Array Word8))
forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadCatch m, MonadAsync m) =>
(Config -> Config)
-> FilePath
-> [FilePath]
-> t m (Array Word8)
-> t m (Either (Array Word8) (Array Word8))
processChunks'With Config -> Config
forall a. a -> a
id

-- | @processBytes' path args input@ runs the executable at @path@ using @args@
-- as arguments and @input@ stream as its standard input.  The error stream of
-- the executable is presented as 'Left' values in the resulting stream and
-- output stream as 'Right' values.
--
-- Raises 'ProcessFailure' exception in case of failure.
--
-- For example, the following is equivalent to the shell command @echo "hello
-- world" | tr [:lower:] [:upper:]@:
--
-- >>> :{
--    processBytes' "echo" ["hello world"] Stream.nil
--  & Stream.rights
--  & processBytes' "tr" ["[:lower:]", "[:upper:]"]
--  & Stream.rights
--  & Stream.fold Stdio.write
--  :}
--HELLO WORLD
--
-- @since 0.1.0
{-# INLINE processBytes' #-}
processBytes' ::
    (IsStream t, MonadCatch m, MonadAsync m)
    => FilePath         -- ^ Executable name or path
    -> [String]         -- ^ Arguments
    -> t m Word8        -- ^ Input Stream
    -> t m (Either Word8 Word8) -- ^ Output Stream
processBytes' :: FilePath -> [FilePath] -> t m Word8 -> t m (Either Word8 Word8)
processBytes' FilePath
path [FilePath]
args t m Word8
input =
    let input1 :: t m (Array Word8)
input1 = Int -> t m Word8 -> t m (Array Word8)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadIO m, Storable a) =>
Int -> t m a -> t m (Array a)
ArrayStream.arraysOf Int
defaultChunkSize t m Word8
input
        output :: t m (Either (Array Word8) (Array Word8))
output = FilePath
-> [FilePath]
-> t m (Array Word8)
-> t m (Either (Array Word8) (Array Word8))
forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadCatch m, MonadAsync m) =>
FilePath
-> [FilePath]
-> t m (Array Word8)
-> t m (Either (Array Word8) (Array Word8))
processChunks' FilePath
path [FilePath]
args t m (Array Word8)
input1
     in Unfold m (Either (Array Word8) (Array Word8)) (Either Word8 Word8)
-> t m (Either (Array Word8) (Array Word8))
-> t m (Either Word8 Word8)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Unfold m a b -> t m a -> t m b
Stream.unfoldMany (Unfold m (Array Word8) Word8
-> Unfold
     m (Either (Array Word8) (Array Word8)) (Either Word8 Word8)
forall (m :: * -> *) a b.
Applicative m =>
Unfold m a b -> Unfold m (Either a a) (Either b b)
Unfold.either Unfold m (Array Word8) Word8
forall (m :: * -> *) a.
(Monad m, Storable a) =>
Unfold m (Array a) a
Array.read) t m (Either (Array Word8) (Array Word8))
output

{-# INLINE processChunksWith #-}
processChunksWith ::
    (IsStream t, MonadCatch m, MonadAsync m)
    => (Config -> Config)   -- ^ Config modifier
    -> FilePath             -- ^ Executable name or path
    -> [String]             -- ^ Arguments
    -> t m (Array Word8)    -- ^ Input stream
    -> t m (Array Word8)    -- ^ Output stream
processChunksWith :: (Config -> Config)
-> FilePath -> [FilePath] -> t m (Array Word8) -> t m (Array Word8)
processChunksWith Config -> Config
modifier FilePath
path [FilePath]
args t m (Array Word8)
input =
    ((Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
 -> t m (Array Word8))
-> (Config -> Config)
-> FilePath
-> [FilePath]
-> t m (Array Word8)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadCatch m, MonadAsync m) =>
((Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
 -> t m a)
-> (Config -> Config) -> FilePath -> [FilePath] -> t m a
processChunksWithAction (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)
-> t m (Array Word8)
forall c d. (Maybe Handle, Maybe Handle, c, d) -> t m (Array Word8)
run Config -> Config
modifier FilePath
path [FilePath]
args

    where

    run :: (Maybe Handle, Maybe Handle, c, d) -> t m (Array Word8)
run (Just Handle
stdinH, Just Handle
stdoutH, c
_, d
_) =
        Handle -> t m (Array Word8) -> t m (Array Word8)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(MonadIO m, IsStream t) =>
Handle -> t m (Array Word8) -> t m a
putChunksClose Handle
stdinH t m (Array Word8)
input t m (Array Word8) -> t m (Array Word8) -> t m (Array Word8)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a -> t m a
`parallel` Handle -> t m (Array Word8)
forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadAsync m) =>
Handle -> t m (Array Word8)
toChunksClose Handle
stdoutH
    run (Maybe Handle, Maybe Handle, c, d)
_ = FilePath -> t m (Array Word8)
forall a. HasCallStack => FilePath -> a
error FilePath
"processChunks: Not reachable"

-- | @processChunks file args input@ runs the executable @file@ specified by
-- its name or path using @args@ as arguments and @input@ stream as its
-- standard input.  Returns the standard output of the executable as a stream.
--
-- If only the name of an executable file is specified instead of its path then
-- the file name is searched in the directories specified by the PATH
-- environment variable.
--
-- If the input stream throws an exception or if the output stream is garbage
-- collected before it could finish then the process is terminated with SIGTERM.
--
-- If the process terminates with a non-zero exit code then a 'ProcessFailure'
-- exception is raised.
--
-- The following code is equivalent to the shell command @echo "hello world" |
-- tr [a-z] [A-Z]@:
--
-- >>> :{
--    Process.toChunks "echo" ["hello world"]
--  & Process.processChunks "tr" ["[a-z]", "[A-Z]"]
--  & Stream.fold Stdio.writeChunks
--  :}
--HELLO WORLD
--
-- @since 0.1.0
{-# INLINE processChunks #-}
processChunks ::
    (IsStream t, MonadCatch m, MonadAsync m)
    => FilePath             -- ^ Executable name or path
    -> [String]             -- ^ Arguments
    -> t m (Array Word8)    -- ^ Input stream
    -> t m (Array Word8)    -- ^ Output stream
processChunks :: FilePath -> [FilePath] -> t m (Array Word8) -> t m (Array Word8)
processChunks = (Config -> Config)
-> FilePath -> [FilePath] -> t m (Array Word8) -> t m (Array Word8)
forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadCatch m, MonadAsync m) =>
(Config -> Config)
-> FilePath -> [FilePath] -> t m (Array Word8) -> t m (Array Word8)
processChunksWith Config -> Config
forall a. a -> a
id

-- | Like 'processChunks' except that it works on a stream of bytes instead of
-- a stream of chunks.
--
-- We can write the example in 'processChunks' as follows. Notice how
-- seamlessly we can replace the @tr@ process with the Haskell @toUpper@
-- function:
--
-- >>> :{
--    Process.toBytes "echo" ["hello world"]
--  & Unicode.decodeLatin1 & Stream.map toUpper & Unicode.encodeLatin1
--  & Stream.fold Stdio.write
--  :}
--HELLO WORLD
--
-- @since 0.1.0
{-# INLINE processBytes #-}
processBytes ::
    (IsStream t, MonadCatch m, MonadAsync m)
    => FilePath     -- ^ Executable name or path
    -> [String]     -- ^ Arguments
    -> t m Word8    -- ^ Input Stream
    -> t m Word8    -- ^ Output Stream
processBytes :: FilePath -> [FilePath] -> t m Word8 -> t m Word8
processBytes FilePath
path [FilePath]
args t m Word8
input = -- rights . processBytes' path args
    let input1 :: t m (Array Word8)
input1 = Int -> t m Word8 -> t m (Array Word8)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadIO m, Storable a) =>
Int -> t m a -> t m (Array a)
ArrayStream.arraysOf Int
defaultChunkSize t m Word8
input
        output :: t m (Array Word8)
output = FilePath -> [FilePath] -> t m (Array Word8) -> t m (Array Word8)
forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadCatch m, MonadAsync m) =>
FilePath -> [FilePath] -> t m (Array Word8) -> t m (Array Word8)
processChunks FilePath
path [FilePath]
args t m (Array Word8)
input1
     in Unfold m (Array Word8) Word8 -> t m (Array Word8) -> t m Word8
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Unfold m a b -> t m a -> t m b
Stream.unfoldMany Unfold m (Array Word8) Word8
forall (m :: * -> *) a.
(Monad m, Storable a) =>
Unfold m (Array a) a
Array.read t m (Array Word8)
output

-------------------------------------------------------------------------------
-- Generation
-------------------------------------------------------------------------------
--
-- | @toBytes' path args@ runs the executable at @path@ using @args@ as
-- arguments and returns a stream of 'Either' bytes. The 'Left' values are from
-- @stderr@ and the 'Right' values are from @stdout@ of the executable.
--
-- Raises 'ProcessFailure' exception in case of failure.
--
-- The following example uses @echo@ to write @hello@ to @stdout@ and @world@
-- to @stderr@, then uses folds from "Streamly.Console.Stdio" to write them
-- back to @stdout@ and @stderr@ respectively:
--
--
-- >>> :{
--   Process.toBytes' "/bin/bash" ["-c", "echo 'hello'; echo 'world' 1>&2"]
-- & Stream.fold (Fold.partition Stdio.writeErr Stdio.write)
-- :}
-- world
-- hello
-- ((),())
--
-- >>> toBytes' path args = Process.processBytes' path args Stream.nil
--
-- @since 0.1.0
{-# INLINE toBytes' #-}
toBytes' ::
    (IsStream t, MonadAsync m, MonadCatch m)
    => FilePath     -- ^ Executable name or path
    -> [String]     -- ^ Arguments
    -> t m (Either Word8 Word8)    -- ^ Output Stream
toBytes' :: FilePath -> [FilePath] -> t m (Either Word8 Word8)
toBytes' FilePath
path [FilePath]
args = FilePath -> [FilePath] -> t m Word8 -> t m (Either Word8 Word8)
forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadCatch m, MonadAsync m) =>
FilePath -> [FilePath] -> t m Word8 -> t m (Either Word8 Word8)
processBytes' FilePath
path [FilePath]
args t m Word8
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
Stream.nil

-- | See 'processBytes'. 'toBytes' is defined as:
--
-- >>> toBytes path args = processBytes path args Stream.nil
--
-- The following code is equivalent to the shell command @echo "hello world"@:
--
-- >>> :{
--    Process.toBytes "echo" ["hello world"]
--  & Stream.fold Stdio.write
--  :}
--hello world
--
-- @since 0.1.0
{-# INLINE toBytes #-}
toBytes ::
    (IsStream t, MonadAsync m, MonadCatch m)
    => FilePath     -- ^ Executable name or path
    -> [String]     -- ^ Arguments
    -> t m Word8    -- ^ Output Stream
toBytes :: FilePath -> [FilePath] -> t m Word8
toBytes FilePath
path [FilePath]
args = FilePath -> [FilePath] -> t m Word8 -> t m Word8
forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadCatch m, MonadAsync m) =>
FilePath -> [FilePath] -> t m Word8 -> t m Word8
processBytes FilePath
path [FilePath]
args t m Word8
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
Stream.nil

-- | Like 'toBytes' but generates a stream of @Array Word8@ instead of a stream
-- of @Word8@.
--
-- >>> :{
--   toChunks' "bash" ["-c", "echo 'hello'; echo 'world' 1>&2"]
-- & Stream.fold (Fold.partition Stdio.writeErrChunks Stdio.writeChunks)
-- :}
-- world
-- hello
-- ((),())
--
-- >>> toChunks' path args = processChunks' path args Stream.nil
--
-- Prefer 'toChunks' over 'toBytes' when performance matters.
--
-- @since 0.1.0
{-# INLINE toChunks' #-}
toChunks' ::
    (IsStream t, MonadAsync m, MonadCatch m)
    => FilePath             -- ^ Executable name or path
    -> [String]             -- ^ Arguments
    -> t m (Either (Array Word8) (Array Word8))    -- ^ Output Stream
toChunks' :: FilePath -> [FilePath] -> t m (Either (Array Word8) (Array Word8))
toChunks' FilePath
path [FilePath]
args = FilePath
-> [FilePath]
-> t m (Array Word8)
-> t m (Either (Array Word8) (Array Word8))
forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadCatch m, MonadAsync m) =>
FilePath
-> [FilePath]
-> t m (Array Word8)
-> t m (Either (Array Word8) (Array Word8))
processChunks' FilePath
path [FilePath]
args t m (Array Word8)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
Stream.nil

-- | See 'processChunks'. 'toChunks' is defined as:
--
-- >>> toChunks path args = processChunks path args Stream.nil
--
-- The following code is equivalent to the shell command @echo "hello world"@:
--
-- >>> :{
--    Process.toChunks "echo" ["hello world"]
--  & Stream.fold Stdio.writeChunks
--  :}
--hello world
--
-- @since 0.1.0
{-# INLINE toChunks #-}
toChunks ::
    (IsStream t, MonadAsync m, MonadCatch m)
    => FilePath             -- ^ Executable name or path
    -> [String]             -- ^ Arguments
    -> t m (Array Word8)    -- ^ Output Stream
toChunks :: FilePath -> [FilePath] -> t m (Array Word8)
toChunks FilePath
path [FilePath]
args = FilePath -> [FilePath] -> t m (Array Word8) -> t m (Array Word8)
forall (t :: (* -> *) -> * -> *) (m :: * -> *).
(IsStream t, MonadCatch m, MonadAsync m) =>
FilePath -> [FilePath] -> t m (Array Word8) -> t m (Array Word8)
processChunks FilePath
path [FilePath]
args t m (Array Word8)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
Stream.nil