streamly-process-0.3.0: Use OS processes as stream transformation functions
Copyright(c) 2020 Composewell Technologies
LicenseApache-2.0
Maintainerstreamly@composewell.com
Stabilityexperimental
PortabilityGHC
Safe HaskellSafe-Inferred
LanguageHaskell2010

Streamly.System.Process

Description

Use OS processes just like native Haskell functions - to generate, transform or consume streams.

See Streamly.System.Command module for a higher level wrapper over this module.

See also: Streamly.Internal.System.Process for unreleased functions.

Synopsis

Setup

To execute the code examples provided in this module in ghci, please run the following commands first.

>>> :set -XFlexibleContexts
>>> :set -XScopedTypeVariables
>>> import Data.Char (toUpper)
>>> import Data.Function ((&))
>>> import qualified Streamly.Console.Stdio as Stdio
>>> import qualified Streamly.Data.Array as Array
>>> import qualified Streamly.Data.Fold as Fold
>>> import qualified Streamly.Data.Stream.Prelude as Stream
>>> import qualified Streamly.System.Process as Process
>>> import qualified Streamly.Unicode.Stream as Unicode
>>> import qualified Streamly.Internal.FileSystem.Dir as Dir
>>> import qualified Streamly.Internal.Data.Stream as Stream

Overview

This module provides functions to run operating system processes as stream source, sink or transformation functions. Thus OS processes can be used in the same way as Haskell functions and all the streaming combinators in streamly can be used to combine them. This allows you to seamlessly integrate external programs into your Haskell program.

We recommend using Haskell functions with Streamly threads for performing tasks whenever possible. This approach offers a simpler programming model compared to system processes, which also have a larger performance overhead.

Executables as functions

Processes can be composed in a streaming pipeline just like a Posix shell command pipeline. Moreover, we can mix processes and Haskell functions seamlessly in a processing pipeline. For example:

>>> :{
   Process.toBytes "echo" ["hello world"]
 & Process.pipeBytes "tr" ["[a-z]", "[A-Z]"]
 & Stream.fold Stdio.write
 :}
 HELLO WORLD

Of course, you can use a Haskell function instead of "tr":

>>> :{
   Process.toBytes "echo" ["hello world"]
 & Unicode.decodeLatin1 & fmap toUpper & Unicode.encodeLatin1
 & Stream.fold Stdio.write
 :}
 HELLO WORLD

Shell commands as functions

Using a shell as the command interpreter we can use shell commands in a data processing pipeline:

>>> :{
   Process.toBytes "sh" ["-c", "echo hello | tr [a-z] [A-Z]"]
 & Stream.fold Stdio.write
 :}
 HELLO

Running Commands Concurrently

We can run executables or commands concurrently as we would run any other functions in Streamly. For example, the following program greps the word "to" in all the files in the current directory concurrently:

>>> :{
grep file =
   Process.toBytes "grep" ["-H", "pattern", file]
 & Stream.handle (\(_ :: Process.ProcessFailure) -> Stream.nil)
 & Stream.foldMany (Fold.takeEndBy (== 10) Array.write)
 :}
>>> :{
pgrep =
   Dir.readFiles "."
 & Stream.parConcatMap id grep
 & Stream.fold Stdio.writeChunks
:}

Exceptions

Since we are composing using Streamly's streaming pipeline there is nothing special about exception handling, it works the same as in Streamly. Like the pipefail option in shells, exceptions are propagated if any of the stages fail.

newtype ProcessFailure Source #

An exception that is raised when a process fails.

Since: 0.1.0

Constructors

ProcessFailure Int

The exit code of the process.

Process Configuration

data Config Source #

Process configuration used for creating a new process.

By default the process config is setup to inherit the following attributes from the parent process:

  • Current working directory
  • Environment variables
  • Open file descriptors
  • Process group
  • Terminal session

On POSIX:

  • Process uid and gid
  • Signal handlers

On Windows by default the parent process waits for the entire child process tree to finish.

Generation

toChunks Source #

Arguments

:: (MonadAsync m, MonadCatch m) 
=> FilePath

Executable name or path

-> [String]

Arguments

-> Stream m (Array Word8)

Output Stream

The following code is equivalent to the shell command echo "hello world":

>>> :{
   Process.toChunks "echo" ["hello world"]
 & Stream.fold Stdio.writeChunks
 :}
hello world
>>> toChunks = toChunksWith id

Since: 0.1.0

toBytes Source #

Arguments

:: (MonadAsync m, MonadCatch m) 
=> FilePath

Executable name or path

-> [String]

Arguments

-> Stream m Word8

Output Stream

The following code is equivalent to the shell command echo "hello world":

>>> :{
   Process.toBytes "echo" ["hello world"]
 & Stream.fold Stdio.write
 :}
hello world

Since: 0.1.0

Transformation

pipeChunks Source #

Arguments

:: (MonadCatch m, MonadAsync m) 
=> FilePath

Executable name or path

-> [String]

Arguments

-> Stream m (Array Word8)

Input stream

-> Stream m (Array Word8)

Output stream

pipeChunks file args input runs the executable file specified by its name or path using args as arguments and input stream as its standard input. Returns the standard output of the executable as a stream.

If only the name of an executable file is specified instead of its path then the file name is searched in the directories specified by the PATH environment variable.

If the input stream throws an exception or if the output stream is garbage collected before it could finish then the process is terminated with SIGTERM.

If the process terminates with a non-zero exit code then a ProcessFailure exception is raised.

The following code is equivalent to the shell command echo "hello world" | tr [a-z] [A-Z]:

>>> :{
   Process.toChunks "echo" ["hello world"]
 & Process.pipeChunks "tr" ["[a-z]", "[A-Z]"]
 & Stream.fold Stdio.writeChunks
 :}
HELLO WORLD

pre-release

pipeBytes Source #

Arguments

:: (MonadCatch m, MonadAsync m) 
=> FilePath

Executable name or path

-> [String]

Arguments

-> Stream m Word8

Input Stream

-> Stream m Word8

Output Stream

Like pipeChunks except that it works on a stream of bytes instead of a stream of chunks.

We can write the example in pipeChunks as follows.

>>> :{
   Process.toBytes "echo" ["hello world"]
 & Process.pipeBytes "tr" ["[a-z]", "[A-Z]"]
 & Stream.fold Stdio.write
 :}
HELLO WORLD

pre-release

Deprecated

processChunks Source #

Arguments

:: (MonadCatch m, MonadAsync m) 
=> FilePath

Executable name or path

-> [String]

Arguments

-> Stream m (Array Word8)

Input stream

-> Stream m (Array Word8)

Output stream

Deprecated: Please use pipeChunks instead.

processBytes Source #

Arguments

:: (MonadCatch m, MonadAsync m) 
=> FilePath

Executable name or path

-> [String]

Arguments

-> Stream m Word8

Input Stream

-> Stream m Word8

Output Stream

Deprecated: Please use pipeBytes instead.