streamly-process-0.1.0: Use OS processes as stream transformation functions
Copyright(c) 2020 Composewell Technologies
LicenseApache-2.0
Maintainerstreamly@composewell.com
Stabilityexperimental
PortabilityGHC
Safe HaskellNone
LanguageHaskell2010

Streamly.System.Process

Description

Use OS processes as stream transformation functions.

This module provides functions to run operating system processes as stream source, sink or transformation functions. Thus OS processes can be used in the same way as Haskell functions and all the streaming combinators in streamly can be used to combine them. This allows you to seamlessly integrate external programs into your Haskell program.

We recommend that you use Streamly threads instead of system processes where possible as they have a simpler programming model and processes have a larger performance overhead.

Imports for examples

Use the following imports for examples below.

>>> :set -XFlexibleContexts
>>> :set -XScopedTypeVariables
>>> import Data.Char (toUpper)
>>> import Data.Function ((&))
>>> import qualified Streamly.Console.Stdio as Stdio
>>> import qualified Streamly.Data.Array.Foreign as Array
>>> import qualified Streamly.Data.Fold as Fold
>>> import qualified Streamly.Prelude as Stream
>>> import qualified Streamly.System.Process as Process
>>> import qualified Streamly.Unicode.Stream as Unicode
>>> import qualified Streamly.Internal.FileSystem.Dir as Dir
>>> import qualified Streamly.Internal.Data.Stream.IsStream as Stream

Executables as functions

Streamly provides powerful ways to combine streams. Processes can be composed in a streaming pipeline just like a Posix shell command pipeline except that we use & instead of |. Moreover, we can mix processes and Haskell functions seamlessly in a processing pipeline. For example:

>>> :{
   Process.toBytes "echo" ["hello world"]
 & Process.processBytes "tr" ["[a-z]", "[A-Z]"]
 & Stream.fold Stdio.write
 :}
 HELLO WORLD

Of course, you can use a Haskell function instead of "tr":

>>> :{
   Process.toBytes "echo" ["hello world"]
 & Unicode.decodeLatin1 & Stream.map toUpper & Unicode.encodeLatin1
 & Stream.fold Stdio.write
 :}
 HELLO WORLD

Shell commands as functions

Using a shell as the command interpreter we can use shell commands in a data processing pipeline:

>>> :{
   Process.toBytes "sh" ["-c", "echo hello | tr [a-z] [A-Z]"]
 & Stream.fold Stdio.write
 :}
 HELLO

Concurrent Processing

We can run executables or commands concurrently as we would run any other functions in Streamly. For example, the following program greps the word "to" in all the files in the current directory concurrently:

>>> :{
grep file =
   Process.toBytes "grep" ["-H", "to", file]
 & Stream.handle (\(_ :: Process.ProcessFailure) -> Stream.nil)
 & Stream.splitWithSuffix (== 10) Array.write
 :}
>>> :{
pgrep =
   Dir.toFiles "."
 & Stream.concatMapWith Stream.async grep
 & Stream.fold Stdio.writeChunks
:}

Exception handling

Since we are composing using Streamly streaming pipeline there is nothing special about exception handling, it works the same as in Streamly. Like the pipefail option in shells, exceptions are propagated if any of the stages fail.

Process Attributes

When a new process is spawned, the following attributes are inherited from the parent process:

  • Current working directory
  • Environment
  • Open file descriptors
  • Process group
  • Process uid and gid
  • Signal handlers
  • Terminal (Session)
Synopsis

Documentation

newtype ProcessFailure Source #

An exception that is raised when a process fails.

Since: 0.1.0

Constructors

ProcessFailure Int

The exit code of the process.

Generation

toChunks Source #

Arguments

:: (IsStream t, MonadAsync m, MonadCatch m) 
=> FilePath

Executable name or path

-> [String]

Arguments

-> t m (Array Word8)

Output Stream

See processChunks. toChunks is defined as:

>>> toChunks path args = processChunks path args Stream.nil

The following code is equivalent to the shell command echo "hello world":

>>> :{
   Process.toChunks "echo" ["hello world"]
 & Stream.fold Stdio.writeChunks
 :}
hello world

Since: 0.1.0

toBytes Source #

Arguments

:: (IsStream t, MonadAsync m, MonadCatch m) 
=> FilePath

Executable name or path

-> [String]

Arguments

-> t m Word8

Output Stream

See processBytes. toBytes is defined as:

>>> toBytes path args = processBytes path args Stream.nil

The following code is equivalent to the shell command echo "hello world":

>>> :{
   Process.toBytes "echo" ["hello world"]
 & Stream.fold Stdio.write
 :}
hello world

Since: 0.1.0

Transformation

processChunks Source #

Arguments

:: (IsStream t, MonadCatch m, MonadAsync m) 
=> FilePath

Executable name or path

-> [String]

Arguments

-> t m (Array Word8)

Input stream

-> t m (Array Word8)

Output stream

processChunks file args input runs the executable file specified by its name or path using args as arguments and input stream as its standard input. Returns the standard output of the executable as a stream.

If only the name of an executable file is specified instead of its path then the file name is searched in the directories specified by the PATH environment variable.

If the input stream throws an exception or if the output stream is garbage collected before it could finish then the process is sent a SIGTERM and we wait for it to terminate gracefully.

If the process terminates with a non-zero exit code then a ProcessFailure exception is raised.

The following code is equivalent to the shell command echo "hello world" | tr [a-z] [A-Z]:

>>> :{
   Process.toChunks "echo" ["hello world"]
 & Process.processChunks "tr" ["[a-z]", "[A-Z]"]
 & Stream.fold Stdio.writeChunks
 :}
HELLO WORLD

Since: 0.1.0

processBytes Source #

Arguments

:: (IsStream t, MonadCatch m, MonadAsync m) 
=> FilePath

Executable name or path

-> [String]

Arguments

-> t m Word8

Input Stream

-> t m Word8

Output Stream

Like processChunks except that it works on a stream of bytes instead of a stream of chunks.

We can write the example in processChunks as follows. Notice how seamlessly we can replace the tr process with the Haskell toUpper function:

>>> :{
   Process.toBytes "echo" ["hello world"]
 & Unicode.decodeLatin1 & Stream.map toUpper & Unicode.encodeLatin1
 & Stream.fold Stdio.write
 :}
HELLO WORLD

Since: 0.1.0