Copyright | (c) 2020 Composewell Technologies |
---|---|
License | Apache-2.0 |
Maintainer | streamly@composewell.com |
Stability | experimental |
Portability | GHC |
Safe Haskell | Safe-Inferred |
Language | Haskell2010 |
This module provides functions to run operating system processes as stream producers, consumers or stream transformation functions. Thus OS processes can be used in the same way as Haskell functions and all the streaming combinators in streamly can be used to combine them. This allows you to seamlessly integrate external binary executables into your Haskell program.
However, we recommend native Haskell functions with Streamly threads over using system processes whenever possible. This approach offers a simpler programming model compared to system processes, which also have a larger performance overhead.
Prefer Streamly.System.Command module as a higher level wrapper over this module.
Executables as functions
Processes can be composed in a streaming pipeline just like a Posix shell command pipeline. Moreover, we can mix processes and Haskell functions seamlessly in a processing pipeline. For example:
>>>
:{
Process.toBytes "echo" ["hello world"] & Process.pipeBytes "tr" ["[a-z]", "[A-Z]"] & Stream.fold Stdio.write :} HELLO WORLD
Of course, you can use a Haskell function instead of "tr":
>>>
:{
Process.toBytes "echo" ["hello world"] & Unicode.decodeLatin1 & fmap toUpper & Unicode.encodeLatin1 & Stream.fold Stdio.write :} HELLO WORLD
Shell commands as functions
Using a shell as the command interpreter we can use shell commands in a data processing pipeline:
>>>
:{
Process.toBytes "sh" ["-c", "echo hello | tr [a-z] [A-Z]"] & Stream.fold Stdio.write :} HELLO
Running Commands Concurrently
We can run executables or commands concurrently as we would run any other functions in Streamly. For example, the following program greps the word "to" in all the files in the current directory concurrently:
>>>
:{
grep file = Process.toBytes "grep" ["-H", "pattern", file] & Stream.handle (\(_ :: Process.ProcessFailure) -> Stream.nil) & Stream.foldMany (Fold.takeEndBy (== 10) Array.write) :}
>>>
:{
pgrep = Dir.readFiles "." & Stream.parConcatMap id grep & Stream.fold Stdio.writeChunks :}
Experimental APIs
See Streamly.Internal.System.Process for unreleased functions.
Synopsis
- newtype ProcessFailure = ProcessFailure Int
- data Config
- setCwd :: Maybe FilePath -> Config -> Config
- setEnv :: Maybe [(String, String)] -> Config -> Config
- closeFiles :: Bool -> Config -> Config
- newProcessGroup :: Bool -> Config -> Config
- data Session
- setSession :: Session -> Config -> Config
- interruptChildOnly :: Bool -> Config -> Config
- setUserId :: Maybe Word32 -> Config -> Config
- setGroupId :: Maybe Word32 -> Config -> Config
- waitForDescendants :: Bool -> Config -> Config
- toChunks :: (MonadAsync m, MonadCatch m) => FilePath -> [String] -> Stream m (Array Word8)
- toChunksWith :: (MonadCatch m, MonadAsync m) => (Config -> Config) -> FilePath -> [String] -> Stream m (Array Word8)
- toBytes :: (MonadAsync m, MonadCatch m) => FilePath -> [String] -> Stream m Word8
- toChars :: (MonadAsync m, MonadCatch m) => FilePath -> [String] -> Stream m Char
- toLines :: (MonadAsync m, MonadCatch m) => Fold m Char a -> FilePath -> [String] -> Stream m a
- toString :: (MonadAsync m, MonadCatch m) => FilePath -> [String] -> m String
- toStdout :: (MonadAsync m, MonadCatch m) => FilePath -> [String] -> m ()
- toNull :: (MonadAsync m, MonadCatch m) => FilePath -> [String] -> m ()
- pipeChunks :: (MonadCatch m, MonadAsync m) => FilePath -> [String] -> Stream m (Array Word8) -> Stream m (Array Word8)
- pipeChunksWith :: (MonadCatch m, MonadAsync m) => (Config -> Config) -> FilePath -> [String] -> Stream m (Array Word8) -> Stream m (Array Word8)
- pipeBytes :: (MonadCatch m, MonadAsync m) => FilePath -> [String] -> Stream m Word8 -> Stream m Word8
- toBytesEither :: (MonadAsync m, MonadCatch m) => FilePath -> [String] -> Stream m (Either Word8 Word8)
- toChunksEither :: (MonadAsync m, MonadCatch m) => FilePath -> [String] -> Stream m (Either (Array Word8) (Array Word8))
- toChunksEitherWith :: (MonadCatch m, MonadAsync m) => (Config -> Config) -> FilePath -> [String] -> Stream m (Either (Array Word8) (Array Word8))
- pipeBytesEither :: (MonadCatch m, MonadAsync m) => FilePath -> [String] -> Stream m Word8 -> Stream m (Either Word8 Word8)
- pipeChunksEither :: (MonadCatch m, MonadAsync m) => FilePath -> [String] -> Stream m (Array Word8) -> Stream m (Either (Array Word8) (Array Word8))
- pipeChunksEitherWith :: (MonadCatch m, MonadAsync m) => (Config -> Config) -> FilePath -> [String] -> Stream m (Array Word8) -> Stream m (Either (Array Word8) (Array Word8))
- foreground :: (Config -> Config) -> FilePath -> [String] -> IO ExitCode
- daemon :: (Config -> Config) -> FilePath -> [String] -> IO ProcessHandle
- standalone :: Bool -> (Bool, Bool, Bool) -> (Config -> Config) -> FilePath -> [String] -> IO (Either ExitCode ProcessHandle)
- processChunks :: (MonadCatch m, MonadAsync m) => FilePath -> [String] -> Stream m (Array Word8) -> Stream m (Array Word8)
- processBytes :: (MonadCatch m, MonadAsync m) => FilePath -> [String] -> Stream m Word8 -> Stream m Word8
Setup
To execute the code examples provided in this module in ghci, please run the following commands first.
>>>
:set -XFlexibleContexts
>>>
:set -XScopedTypeVariables
>>>
import Data.Char (toUpper)
>>>
import Data.Function ((&))
>>>
import qualified Streamly.Console.Stdio as Stdio
>>>
import qualified Streamly.Data.Array as Array
>>>
import qualified Streamly.Data.Fold as Fold
>>>
import qualified Streamly.Data.Stream.Prelude as Stream
>>>
import qualified Streamly.System.Process as Process
>>>
import qualified Streamly.Unicode.Stream as Unicode
For APIs that have not been released yet.
>>>
import qualified Streamly.Internal.Console.Stdio as Stdio (putChars, putChunks)
>>>
import qualified Streamly.Internal.FileSystem.Dir as Dir (readFiles)
>>>
import qualified Streamly.Internal.System.Process as Process
>>>
import qualified Streamly.Internal.Unicode.Stream as Unicode (lines)
Exceptions
Since we are composing using Streamly's streaming pipeline there is
nothing special about exception handling, it works the same as in
Streamly. Like the pipefail
option in shells, exceptions are
propagated if any of the stages fail.
newtype ProcessFailure Source #
An exception that is raised when a process fails.
Since: 0.1.0
ProcessFailure Int | The exit code of the process. |
Instances
Exception ProcessFailure Source # | |
Defined in Streamly.Internal.System.Process | |
Show ProcessFailure Source # | |
Defined in Streamly.Internal.System.Process showsPrec :: Int -> ProcessFailure -> ShowS # show :: ProcessFailure -> String # showList :: [ProcessFailure] -> ShowS # |
Process Configuration
Use the config modifiers to modify the default config.
Process configuration used for creating a new process.
By default the process config is setup to inherit the following attributes from the parent process:
- Current working directory
- Environment variables
- Open file descriptors
- Process group
- Terminal session
On POSIX:
- Process uid and gid
- Signal handlers
On Windows by default the parent process waits for the entire child process tree to finish.
Common Modifiers
These options apply to both POSIX and Windows.
closeFiles :: Bool -> Config -> Config Source #
Close all open file descriptors inherited from the parent process. Note, this does not apply to stdio descriptors - the behavior of those is determined by other configuration settings.
Default is False
.
Note: if the number of open descriptors is large, it may take a while closing them.
InheritSession
makes the new process inherit the terminal session from the
parent process. This is the default.
NewSession
makes the new process start with a new session without a
controlling terminal. On POSIX,
setsid
is used to create a new process
group and session, the pid of the new process is the session id and process
group id as well. On Windows DETACHED_PROCESS
flag is used to detach the
process from inherited console session.
NewConsole
creates a new terminal and attaches the process to the new
terminal on Windows, using the CREATE_NEW_CONSOLE flag. On POSIX this does
nothing.
For Windows see
- https://learn.microsoft.com/en-us/windows/win32/procthread/process-creation-flags
- https://learn.microsoft.com/en-us/windows/console/creation-of-a-console .
For POSIX see, setsid man page.
InheritSession | Inherit the parent session |
NewSession | Detach process from the current session |
NewConsole | Windows only, CREATE_NEW_CONSOLE flag |
setSession :: Session -> Config -> Config Source #
Define the terminal session behavior for the new process.
Default is InheritSession
.
Posix Only Modifiers
These options have no effect on Windows.
interruptChildOnly :: Bool -> Config -> Config Source #
When this is True
, the parent process ignores user interrupt signals
SIGINT
and SIGQUIT
delivered to it until the child process exits. If
multiple child processes are started then the default handling in the parent
is restored only after the last one exits.
When a user presses CTRL-C or CTRL- on the terminal, a SIGINT or SIGQUIT is sent to all the foreground processes in the terminal session, this includes both the child and the parent. By default, on receiving these signals, the parent process would cleanup and exit, to avoid that and let the child handle these signals we can choose to ignore these signals in the parent until the child exits.
POSIX only. Default is False
.
Windows Only Modifiers
These options have no effect on Posix.
waitForDescendants :: Bool -> Config -> Config Source #
On Windows, the parent waits for the entire descendant tree of process i.e. including processes that are spawned by the child process.
Default is True
.
Generation
:: (MonadAsync m, MonadCatch m) | |
=> FilePath | Executable name or path |
-> [String] | Arguments |
-> Stream m (Array Word8) | Output Stream |
The following code is equivalent to the shell command echo "hello
world"
:
>>>
:{
Process.toChunks "echo" ["hello world"] & Stream.fold Stdio.writeChunks :} hello world
>>>
toChunks = toChunksWith id
Since: 0.1.0
:: (MonadCatch m, MonadAsync m) | |
=> (Config -> Config) | Config modifier |
-> FilePath | Executable name or path |
-> [String] | Arguments |
-> Stream m (Array Word8) | Output stream |
Like toChunks
but use the specified configuration to run the process.
:: (MonadAsync m, MonadCatch m) | |
=> FilePath | Executable name or path |
-> [String] | Arguments |
-> Stream m Word8 | Output Stream |
The following code is equivalent to the shell command echo "hello
world"
:
>>>
:{
Process.toBytes "echo" ["hello world"] & Stream.fold Stdio.write :} hello world
Since: 0.1.0
:: (MonadAsync m, MonadCatch m) | |
=> FilePath | Executable name or path |
-> [String] | Arguments |
-> Stream m Char | Output Stream |
>>>
toChars path args = toBytes path args & Unicode.decodeUtf8
:: (MonadAsync m, MonadCatch m) | |
=> Fold m Char a | |
-> FilePath | Executable name or path |
-> [String] | Arguments |
-> Stream m a | Output Stream |
>>>
toLines path args f = toChars path args & Unicode.lines f
Effects
:: (MonadAsync m, MonadCatch m) | |
=> FilePath | Executable name or path |
-> [String] | Arguments |
-> m String |
>>>
toString path args = toChars path args & Stream.fold Fold.toList
:: (MonadAsync m, MonadCatch m) | |
=> FilePath | Executable name or path |
-> [String] | Arguments |
-> m () |
>>>
toStdout path args = toChunks path args & Stdio.putChunks
:: (MonadAsync m, MonadCatch m) | |
=> FilePath | Executable name or path |
-> [String] | Arguments |
-> m () |
>>>
toNull path args = toChunks path args & Stream.fold Fold.drain
Transformation
:: (MonadCatch m, MonadAsync m) | |
=> FilePath | Executable name or path |
-> [String] | Arguments |
-> Stream m (Array Word8) | Input stream |
-> Stream m (Array Word8) | Output stream |
pipeChunks file args input
runs the executable file
specified by
its name or path using args
as arguments and input
stream as its
standard input. Returns the standard output of the executable as a stream.
If only the name of an executable file is specified instead of its path then the file name is searched in the directories specified by the PATH environment variable.
If the input stream throws an exception or if the output stream is garbage collected before it could finish then the process is terminated with SIGTERM.
If the process terminates with a non-zero exit code then a ProcessFailure
exception is raised.
The following code is equivalent to the shell command echo "hello world" |
tr [a-z] [A-Z]
:
>>>
:{
Process.toChunks "echo" ["hello world"] & Process.pipeChunks "tr" ["[a-z]", "[A-Z]"] & Stream.fold Stdio.writeChunks :} HELLO WORLD
pre-release
:: (MonadCatch m, MonadAsync m) | |
=> (Config -> Config) | Config modifier |
-> FilePath | Executable name or path |
-> [String] | Arguments |
-> Stream m (Array Word8) | Input stream |
-> Stream m (Array Word8) | Output stream |
Like pipeChunks
but use the specified configuration to run the process.
:: (MonadCatch m, MonadAsync m) | |
=> FilePath | Executable name or path |
-> [String] | Arguments |
-> Stream m Word8 | Input Stream |
-> Stream m Word8 | Output Stream |
Like pipeChunks
except that it works on a stream of bytes instead of
a stream of chunks.
We can write the example in pipeChunks
as follows.
>>>
:{
Process.toBytes "echo" ["hello world"] & Process.pipeBytes "tr" ["[a-z]", "[A-Z]"] & Stream.fold Stdio.write :} HELLO WORLD
pre-release
Including Stderr Stream
Like other Generation routines but along with stdout, stderr is also
included in the output stream. stdout is converted to Right
values in
the output stream and stderr is converted to Left
values.
:: (MonadAsync m, MonadCatch m) | |
=> FilePath | Executable name or path |
-> [String] | Arguments |
-> Stream m (Either Word8 Word8) | Output Stream |
toBytesEither path args
runs the executable at path
using args
as
arguments and returns a stream of Either
bytes. The Left
values are from
stderr
and the Right
values are from stdout
of the executable.
Raises ProcessFailure
exception in case of failure.
The following example uses echo
to write hello
to stdout
and world
to stderr
, then uses folds from Streamly.Console.Stdio to write them
back to stdout
and stderr
respectively:
>>>
:{
Process.toBytesEither "/bin/bash" ["-c", "echo 'hello'; echo 'world' 1>&2"] & Stream.fold (Fold.partition Stdio.writeErr Stdio.write) :} world hello ((),())
Since: 0.1.0
:: (MonadAsync m, MonadCatch m) | |
=> FilePath | Executable name or path |
-> [String] | Arguments |
-> Stream m (Either (Array Word8) (Array Word8)) | Output Stream |
Like toBytesEither
but generates a stream of Array Word8
instead of a stream
of Word8
.
>>>
:{
toChunksEither "bash" ["-c", "echo 'hello'; echo 'world' 1>&2"] & Stream.fold (Fold.partition Stdio.writeErrChunks Stdio.writeChunks) :} world hello ((),())
>>>
toChunksEither = toChunksEitherWith id
Prefer toChunksEither
over toBytesEither
when performance matters.
Pre-release
:: (MonadCatch m, MonadAsync m) | |
=> (Config -> Config) | Config modifier |
-> FilePath | Executable name or path |
-> [String] | Arguments |
-> Stream m (Either (Array Word8) (Array Word8)) | Output stream |
Like toChunksEither
but use the specified configuration to run the
process.
:: (MonadCatch m, MonadAsync m) | |
=> FilePath | Executable name or path |
-> [String] | Arguments |
-> Stream m Word8 | Input Stream |
-> Stream m (Either Word8 Word8) | Output Stream |
pipeBytesEither path args input
runs the executable at path
using args
as arguments and input
stream as its standard input. The error stream of
the executable is presented as Left
values in the resulting stream and
output stream as Right
values.
Raises ProcessFailure
exception in case of failure.
For example, the following is equivalent to the shell command echo "hello
world" | tr [:lower:] [:upper:]
:
>>>
:{
pipeBytesEither "echo" ["hello world"] Stream.nil & Stream.catRights & pipeBytesEither "tr" ["[:lower:]", "[:upper:]"] & Stream.catRights & Stream.fold Stdio.write :} HELLO WORLD
Since: 0.1.0
:: (MonadCatch m, MonadAsync m) | |
=> FilePath | Executable name or path |
-> [String] | Arguments |
-> Stream m (Array Word8) | Input stream |
-> Stream m (Either (Array Word8) (Array Word8)) | Output stream |
Like pipeChunks
but also includes stderr as Left
stream in the
Either
output.
:: (MonadCatch m, MonadAsync m) | |
=> (Config -> Config) | Config modifier |
-> FilePath | Executable name or path |
-> [String] | Arguments |
-> Stream m (Array Word8) | Input stream |
-> Stream m (Either (Array Word8) (Array Word8)) | Output stream |
Like pipeChunksEither
but use the specified configuration to run the
process.
Non-streaming Processes
These processes do not attach the IO streams with other processes.
Launch a process interfacing with the user. User interrupts are sent to
the launched process and ignored by the parent process. The launched process
inherits stdin, stdout, and stderr from the parent, so that the user can
interact with the process. The parent waits for the child process to exit,
an ExitCode
is returned when the process finishes.
This is the same as the common system
function found in other libraries
used to execute commands.
On Windows you can pass setSession NewConsole
to create a new console.
Launch a daemon process. Closes stdin, stdout and stderr, creates a new session, detached from the terminal, the parent does not wait for the process to finish.
The ProcessHandle
returned can be used to terminate the daemon or send
signals to it.
:: Bool | Wait for process to finish? |
-> (Bool, Bool, Bool) | close (stdin, stdout, stderr) |
-> (Config -> Config) | |
-> FilePath | Executable name or path |
-> [String] | Arguments |
-> IO (Either ExitCode ProcessHandle) |
Launch a standalone process i.e. the process does not have a way to attach the IO streams with other processes. The IO streams stdin, stdout, stderr can either be inherited from the parent or closed.
This API is more powerful than interactive
and daemon
and can be used to
implement both of these. However, it should be used carefully e.g. if you
inherit the IO streams and parent is not waiting for the child process to
finish then both parent and child may use the IO streams resulting in
garbled IO if both are reading/writing simultaneously.
If the parent chooses to wait for the process an ExitCode
is returned
otherwise a ProcessHandle
is returned which can be used to terminate the
process, send signals to it or wait for it to finish.
Deprecated
:: (MonadCatch m, MonadAsync m) | |
=> FilePath | Executable name or path |
-> [String] | Arguments |
-> Stream m (Array Word8) | Input stream |
-> Stream m (Array Word8) | Output stream |
Deprecated: Please use pipeChunks instead.
:: (MonadCatch m, MonadAsync m) | |
=> FilePath | Executable name or path |
-> [String] | Arguments |
-> Stream m Word8 | Input Stream |
-> Stream m Word8 | Output Stream |
Deprecated: Please use pipeBytes instead.