streamly-process-0.3.1: Use OS processes as stream transformation functions
Copyright(c) 2020 Composewell Technologies
LicenseApache-2.0
Maintainerstreamly@composewell.com
Stabilityexperimental
PortabilityGHC
Safe HaskellSafe-Inferred
LanguageHaskell2010

Streamly.System.Process

Description

This module provides functions to run operating system processes as stream producers, consumers or stream transformation functions. Thus OS processes can be used in the same way as Haskell functions and all the streaming combinators in streamly can be used to combine them. This allows you to seamlessly integrate external binary executables into your Haskell program.

However, we recommend native Haskell functions with Streamly threads over using system processes whenever possible. This approach offers a simpler programming model compared to system processes, which also have a larger performance overhead.

Prefer Streamly.System.Command module as a higher level wrapper over this module.

Executables as functions

Processes can be composed in a streaming pipeline just like a Posix shell command pipeline. Moreover, we can mix processes and Haskell functions seamlessly in a processing pipeline. For example:

>>> :{
   Process.toBytes "echo" ["hello world"]
 & Process.pipeBytes "tr" ["[a-z]", "[A-Z]"]
 & Stream.fold Stdio.write
 :}
 HELLO WORLD

Of course, you can use a Haskell function instead of "tr":

>>> :{
   Process.toBytes "echo" ["hello world"]
 & Unicode.decodeLatin1 & fmap toUpper & Unicode.encodeLatin1
 & Stream.fold Stdio.write
 :}
 HELLO WORLD

Shell commands as functions

Using a shell as the command interpreter we can use shell commands in a data processing pipeline:

>>> :{
   Process.toBytes "sh" ["-c", "echo hello | tr [a-z] [A-Z]"]
 & Stream.fold Stdio.write
 :}
 HELLO

Running Commands Concurrently

We can run executables or commands concurrently as we would run any other functions in Streamly. For example, the following program greps the word "to" in all the files in the current directory concurrently:

>>> :{
grep file =
   Process.toBytes "grep" ["-H", "pattern", file]
 & Stream.handle (\(_ :: Process.ProcessFailure) -> Stream.nil)
 & Stream.foldMany (Fold.takeEndBy (== 10) Array.write)
 :}
>>> :{
pgrep =
   Dir.readFiles "."
 & Stream.parConcatMap id grep
 & Stream.fold Stdio.writeChunks
:}

Experimental APIs

See Streamly.Internal.System.Process for unreleased functions.

Synopsis

Setup

To execute the code examples provided in this module in ghci, please run the following commands first.

>>> :set -XFlexibleContexts
>>> :set -XScopedTypeVariables
>>> import Data.Char (toUpper)
>>> import Data.Function ((&))
>>> import qualified Streamly.Console.Stdio as Stdio
>>> import qualified Streamly.Data.Array as Array
>>> import qualified Streamly.Data.Fold as Fold
>>> import qualified Streamly.Data.Stream.Prelude as Stream
>>> import qualified Streamly.System.Process as Process
>>> import qualified Streamly.Unicode.Stream as Unicode

For APIs that have not been released yet.

>>> import qualified Streamly.Internal.Console.Stdio as Stdio (putChars, putChunks)
>>> import qualified Streamly.Internal.FileSystem.Dir as Dir (readFiles)
>>> import qualified Streamly.Internal.System.Process as Process
>>> import qualified Streamly.Internal.Unicode.Stream as Unicode (lines)

Exceptions

Since we are composing using Streamly's streaming pipeline there is nothing special about exception handling, it works the same as in Streamly. Like the pipefail option in shells, exceptions are propagated if any of the stages fail.

newtype ProcessFailure Source #

An exception that is raised when a process fails.

Since: 0.1.0

Constructors

ProcessFailure Int

The exit code of the process.

Process Configuration

Use the config modifiers to modify the default config.

data Config Source #

Process configuration used for creating a new process.

By default the process config is setup to inherit the following attributes from the parent process:

  • Current working directory
  • Environment variables
  • Open file descriptors
  • Process group
  • Terminal session

On POSIX:

  • Process uid and gid
  • Signal handlers

On Windows by default the parent process waits for the entire child process tree to finish.

Common Modifiers

These options apply to both POSIX and Windows.

setCwd :: Maybe FilePath -> Config -> Config Source #

Set the current working directory of the new process. When Nothing, the working directory is inherited from the parent process.

Default is Nothing - inherited from the parent process.

setEnv :: Maybe [(String, String)] -> Config -> Config Source #

Set the environment variables for the new process. When Nothing, the environment is inherited from the parent process.

Default is Nothing - inherited from the parent process.

closeFiles :: Bool -> Config -> Config Source #

Close all open file descriptors inherited from the parent process. Note, this does not apply to stdio descriptors - the behavior of those is determined by other configuration settings.

Default is False.

Note: if the number of open descriptors is large, it may take a while closing them.

newProcessGroup :: Bool -> Config -> Config Source #

If True the new process starts a new process group, becomes a process group leader, its pid becoming the process group id.

See the POSIX setpgid man page.

Default is False, the new process belongs to the parent's process group.

data Session Source #

InheritSession makes the new process inherit the terminal session from the parent process. This is the default.

NewSession makes the new process start with a new session without a controlling terminal. On POSIX, setsid is used to create a new process group and session, the pid of the new process is the session id and process group id as well. On Windows DETACHED_PROCESS flag is used to detach the process from inherited console session.

NewConsole creates a new terminal and attaches the process to the new terminal on Windows, using the CREATE_NEW_CONSOLE flag. On POSIX this does nothing.

For Windows see

For POSIX see, setsid man page.

Constructors

InheritSession

Inherit the parent session

NewSession

Detach process from the current session

NewConsole

Windows only, CREATE_NEW_CONSOLE flag

setSession :: Session -> Config -> Config Source #

Define the terminal session behavior for the new process.

Default is InheritSession.

Posix Only Modifiers

These options have no effect on Windows.

interruptChildOnly :: Bool -> Config -> Config Source #

When this is True, the parent process ignores user interrupt signals SIGINT and SIGQUIT delivered to it until the child process exits. If multiple child processes are started then the default handling in the parent is restored only after the last one exits.

When a user presses CTRL-C or CTRL- on the terminal, a SIGINT or SIGQUIT is sent to all the foreground processes in the terminal session, this includes both the child and the parent. By default, on receiving these signals, the parent process would cleanup and exit, to avoid that and let the child handle these signals we can choose to ignore these signals in the parent until the child exits.

POSIX only. Default is False.

setUserId :: Maybe Word32 -> Config -> Config Source #

Use the POSIX setuid call to set the user id of the new process before executing the command. The parent process must have sufficient privileges to set the user id.

POSIX only. See the POSIX setuid man page.

Default is Nothing - inherit from the parent.

setGroupId :: Maybe Word32 -> Config -> Config Source #

Use the POSIX setgid call to set the group id of the new process before executing the command. The parent process must have sufficient privileges to set the group id.

POSIX only. See the POSIX setgid man page.

Default is Nothing - inherit from the parent.

Windows Only Modifiers

These options have no effect on Posix.

waitForDescendants :: Bool -> Config -> Config Source #

On Windows, the parent waits for the entire descendant tree of process i.e. including processes that are spawned by the child process.

Default is True.

Generation

toChunks Source #

Arguments

:: (MonadAsync m, MonadCatch m) 
=> FilePath

Executable name or path

-> [String]

Arguments

-> Stream m (Array Word8)

Output Stream

The following code is equivalent to the shell command echo "hello world":

>>> :{
   Process.toChunks "echo" ["hello world"]
 & Stream.fold Stdio.writeChunks
 :}
hello world
>>> toChunks = toChunksWith id

Since: 0.1.0

toChunksWith Source #

Arguments

:: (MonadCatch m, MonadAsync m) 
=> (Config -> Config)

Config modifier

-> FilePath

Executable name or path

-> [String]

Arguments

-> Stream m (Array Word8)

Output stream

Like toChunks but use the specified configuration to run the process.

toBytes Source #

Arguments

:: (MonadAsync m, MonadCatch m) 
=> FilePath

Executable name or path

-> [String]

Arguments

-> Stream m Word8

Output Stream

The following code is equivalent to the shell command echo "hello world":

>>> :{
   Process.toBytes "echo" ["hello world"]
 & Stream.fold Stdio.write
 :}
hello world

Since: 0.1.0

toChars Source #

Arguments

:: (MonadAsync m, MonadCatch m) 
=> FilePath

Executable name or path

-> [String]

Arguments

-> Stream m Char

Output Stream

>>> toChars path args = toBytes path args & Unicode.decodeUtf8

toLines Source #

Arguments

:: (MonadAsync m, MonadCatch m) 
=> Fold m Char a 
-> FilePath

Executable name or path

-> [String]

Arguments

-> Stream m a

Output Stream

>>> toLines path args f = toChars path args & Unicode.lines f

Effects

toString Source #

Arguments

:: (MonadAsync m, MonadCatch m) 
=> FilePath

Executable name or path

-> [String]

Arguments

-> m String 
>>> toString path args = toChars path args & Stream.fold Fold.toList

toStdout Source #

Arguments

:: (MonadAsync m, MonadCatch m) 
=> FilePath

Executable name or path

-> [String]

Arguments

-> m () 
>>> toStdout path args = toChunks path args & Stdio.putChunks

toNull Source #

Arguments

:: (MonadAsync m, MonadCatch m) 
=> FilePath

Executable name or path

-> [String]

Arguments

-> m () 
>>> toNull path args = toChunks path args & Stream.fold Fold.drain

Transformation

pipeChunks Source #

Arguments

:: (MonadCatch m, MonadAsync m) 
=> FilePath

Executable name or path

-> [String]

Arguments

-> Stream m (Array Word8)

Input stream

-> Stream m (Array Word8)

Output stream

pipeChunks file args input runs the executable file specified by its name or path using args as arguments and input stream as its standard input. Returns the standard output of the executable as a stream.

If only the name of an executable file is specified instead of its path then the file name is searched in the directories specified by the PATH environment variable.

If the input stream throws an exception or if the output stream is garbage collected before it could finish then the process is terminated with SIGTERM.

If the process terminates with a non-zero exit code then a ProcessFailure exception is raised.

The following code is equivalent to the shell command echo "hello world" | tr [a-z] [A-Z]:

>>> :{
   Process.toChunks "echo" ["hello world"]
 & Process.pipeChunks "tr" ["[a-z]", "[A-Z]"]
 & Stream.fold Stdio.writeChunks
 :}
HELLO WORLD

pre-release

pipeChunksWith Source #

Arguments

:: (MonadCatch m, MonadAsync m) 
=> (Config -> Config)

Config modifier

-> FilePath

Executable name or path

-> [String]

Arguments

-> Stream m (Array Word8)

Input stream

-> Stream m (Array Word8)

Output stream

Like pipeChunks but use the specified configuration to run the process.

pipeBytes Source #

Arguments

:: (MonadCatch m, MonadAsync m) 
=> FilePath

Executable name or path

-> [String]

Arguments

-> Stream m Word8

Input Stream

-> Stream m Word8

Output Stream

Like pipeChunks except that it works on a stream of bytes instead of a stream of chunks.

We can write the example in pipeChunks as follows.

>>> :{
   Process.toBytes "echo" ["hello world"]
 & Process.pipeBytes "tr" ["[a-z]", "[A-Z]"]
 & Stream.fold Stdio.write
 :}
HELLO WORLD

pre-release

Including Stderr Stream

Like other Generation routines but along with stdout, stderr is also included in the output stream. stdout is converted to Right values in the output stream and stderr is converted to Left values.

toBytesEither Source #

Arguments

:: (MonadAsync m, MonadCatch m) 
=> FilePath

Executable name or path

-> [String]

Arguments

-> Stream m (Either Word8 Word8)

Output Stream

toBytesEither path args runs the executable at path using args as arguments and returns a stream of Either bytes. The Left values are from stderr and the Right values are from stdout of the executable.

Raises ProcessFailure exception in case of failure.

The following example uses echo to write hello to stdout and world to stderr, then uses folds from Streamly.Console.Stdio to write them back to stdout and stderr respectively:

>>> :{
  Process.toBytesEither "/bin/bash" ["-c", "echo 'hello'; echo 'world' 1>&2"]
& Stream.fold (Fold.partition Stdio.writeErr Stdio.write)
:}
world
hello
((),())

Since: 0.1.0

toChunksEither Source #

Arguments

:: (MonadAsync m, MonadCatch m) 
=> FilePath

Executable name or path

-> [String]

Arguments

-> Stream m (Either (Array Word8) (Array Word8))

Output Stream

Like toBytesEither but generates a stream of Array Word8 instead of a stream of Word8.

>>> :{
  toChunksEither "bash" ["-c", "echo 'hello'; echo 'world' 1>&2"]
& Stream.fold (Fold.partition Stdio.writeErrChunks Stdio.writeChunks)
:}
world
hello
((),())
>>> toChunksEither = toChunksEitherWith id

Prefer toChunksEither over toBytesEither when performance matters.

Pre-release

toChunksEitherWith Source #

Arguments

:: (MonadCatch m, MonadAsync m) 
=> (Config -> Config)

Config modifier

-> FilePath

Executable name or path

-> [String]

Arguments

-> Stream m (Either (Array Word8) (Array Word8))

Output stream

Like toChunksEither but use the specified configuration to run the process.

pipeBytesEither Source #

Arguments

:: (MonadCatch m, MonadAsync m) 
=> FilePath

Executable name or path

-> [String]

Arguments

-> Stream m Word8

Input Stream

-> Stream m (Either Word8 Word8)

Output Stream

pipeBytesEither path args input runs the executable at path using args as arguments and input stream as its standard input. The error stream of the executable is presented as Left values in the resulting stream and output stream as Right values.

Raises ProcessFailure exception in case of failure.

For example, the following is equivalent to the shell command echo "hello world" | tr [:lower:] [:upper:]:

>>> :{
   pipeBytesEither "echo" ["hello world"] Stream.nil
 & Stream.catRights
 & pipeBytesEither "tr" ["[:lower:]", "[:upper:]"]
 & Stream.catRights
 & Stream.fold Stdio.write
 :}
HELLO WORLD

Since: 0.1.0

pipeChunksEither Source #

Arguments

:: (MonadCatch m, MonadAsync m) 
=> FilePath

Executable name or path

-> [String]

Arguments

-> Stream m (Array Word8)

Input stream

-> Stream m (Either (Array Word8) (Array Word8))

Output stream

Like pipeChunks but also includes stderr as Left stream in the Either output.

pipeChunksEitherWith Source #

Arguments

:: (MonadCatch m, MonadAsync m) 
=> (Config -> Config)

Config modifier

-> FilePath

Executable name or path

-> [String]

Arguments

-> Stream m (Array Word8)

Input stream

-> Stream m (Either (Array Word8) (Array Word8))

Output stream

Like pipeChunksEither but use the specified configuration to run the process.

Non-streaming Processes

These processes do not attach the IO streams with other processes.

foreground Source #

Arguments

:: (Config -> Config) 
-> FilePath

Executable name or path

-> [String]

Arguments

-> IO ExitCode 

Launch a process interfacing with the user. User interrupts are sent to the launched process and ignored by the parent process. The launched process inherits stdin, stdout, and stderr from the parent, so that the user can interact with the process. The parent waits for the child process to exit, an ExitCode is returned when the process finishes.

This is the same as the common system function found in other libraries used to execute commands.

On Windows you can pass setSession NewConsole to create a new console.

daemon Source #

Arguments

:: (Config -> Config) 
-> FilePath

Executable name or path

-> [String]

Arguments

-> IO ProcessHandle 

Launch a daemon process. Closes stdin, stdout and stderr, creates a new session, detached from the terminal, the parent does not wait for the process to finish.

The ProcessHandle returned can be used to terminate the daemon or send signals to it.

standalone Source #

Arguments

:: Bool

Wait for process to finish?

-> (Bool, Bool, Bool)

close (stdin, stdout, stderr)

-> (Config -> Config) 
-> FilePath

Executable name or path

-> [String]

Arguments

-> IO (Either ExitCode ProcessHandle) 

Launch a standalone process i.e. the process does not have a way to attach the IO streams with other processes. The IO streams stdin, stdout, stderr can either be inherited from the parent or closed.

This API is more powerful than interactive and daemon and can be used to implement both of these. However, it should be used carefully e.g. if you inherit the IO streams and parent is not waiting for the child process to finish then both parent and child may use the IO streams resulting in garbled IO if both are reading/writing simultaneously.

If the parent chooses to wait for the process an ExitCode is returned otherwise a ProcessHandle is returned which can be used to terminate the process, send signals to it or wait for it to finish.

Deprecated

processChunks Source #

Arguments

:: (MonadCatch m, MonadAsync m) 
=> FilePath

Executable name or path

-> [String]

Arguments

-> Stream m (Array Word8)

Input stream

-> Stream m (Array Word8)

Output stream

Deprecated: Please use pipeChunks instead.

processBytes Source #

Arguments

:: (MonadCatch m, MonadAsync m) 
=> FilePath

Executable name or path

-> [String]

Arguments

-> Stream m Word8

Input Stream

-> Stream m Word8

Output Stream

Deprecated: Please use pipeBytes instead.