Copyright | (c) 2020 Composewell Technologies |
---|---|
License | Apache-2.0 |
Maintainer | streamly@composewell.com |
Stability | experimental |
Portability | GHC |
Safe Haskell | Safe-Inferred |
Language | Haskell2010 |
Streamly.System.Process
Description
Use OS processes just like native Haskell functions - to generate, transform or consume streams.
See Streamly.System.Command module for a higher level wrapper over this module.
See also: Streamly.Internal.System.Process for unreleased functions.
Synopsis
- newtype ProcessFailure = ProcessFailure Int
- data Config
- toChunks :: (MonadAsync m, MonadCatch m) => FilePath -> [String] -> Stream m (Array Word8)
- toBytes :: (MonadAsync m, MonadCatch m) => FilePath -> [String] -> Stream m Word8
- pipeChunks :: (MonadCatch m, MonadAsync m) => FilePath -> [String] -> Stream m (Array Word8) -> Stream m (Array Word8)
- pipeBytes :: (MonadCatch m, MonadAsync m) => FilePath -> [String] -> Stream m Word8 -> Stream m Word8
- processChunks :: (MonadCatch m, MonadAsync m) => FilePath -> [String] -> Stream m (Array Word8) -> Stream m (Array Word8)
- processBytes :: (MonadCatch m, MonadAsync m) => FilePath -> [String] -> Stream m Word8 -> Stream m Word8
Setup
To execute the code examples provided in this module in ghci, please run the following commands first.
>>>
:set -XFlexibleContexts
>>>
:set -XScopedTypeVariables
>>>
import Data.Char (toUpper)
>>>
import Data.Function ((&))
>>>
import qualified Streamly.Console.Stdio as Stdio
>>>
import qualified Streamly.Data.Array as Array
>>>
import qualified Streamly.Data.Fold as Fold
>>>
import qualified Streamly.Data.Stream.Prelude as Stream
>>>
import qualified Streamly.System.Process as Process
>>>
import qualified Streamly.Unicode.Stream as Unicode
>>>
import qualified Streamly.Internal.FileSystem.Dir as Dir
>>>
import qualified Streamly.Internal.Data.Stream as Stream
Overview
This module provides functions to run operating system processes as stream source, sink or transformation functions. Thus OS processes can be used in the same way as Haskell functions and all the streaming combinators in streamly can be used to combine them. This allows you to seamlessly integrate external programs into your Haskell program.
We recommend using Haskell functions with Streamly threads for performing tasks whenever possible. This approach offers a simpler programming model compared to system processes, which also have a larger performance overhead.
Executables as functions
Processes can be composed in a streaming pipeline just like a Posix shell command pipeline. Moreover, we can mix processes and Haskell functions seamlessly in a processing pipeline. For example:
>>>
:{
Process.toBytes "echo" ["hello world"] & Process.pipeBytes "tr" ["[a-z]", "[A-Z]"] & Stream.fold Stdio.write :} HELLO WORLD
Of course, you can use a Haskell function instead of "tr":
>>>
:{
Process.toBytes "echo" ["hello world"] & Unicode.decodeLatin1 & fmap toUpper & Unicode.encodeLatin1 & Stream.fold Stdio.write :} HELLO WORLD
Shell commands as functions
Using a shell as the command interpreter we can use shell commands in a data processing pipeline:
>>>
:{
Process.toBytes "sh" ["-c", "echo hello | tr [a-z] [A-Z]"] & Stream.fold Stdio.write :} HELLO
Running Commands Concurrently
We can run executables or commands concurrently as we would run any other functions in Streamly. For example, the following program greps the word "to" in all the files in the current directory concurrently:
>>>
:{
grep file = Process.toBytes "grep" ["-H", "pattern", file] & Stream.handle (\(_ :: Process.ProcessFailure) -> Stream.nil) & Stream.foldMany (Fold.takeEndBy (== 10) Array.write) :}
>>>
:{
pgrep = Dir.readFiles "." & Stream.parConcatMap id grep & Stream.fold Stdio.writeChunks :}
Exceptions
Since we are composing using Streamly's streaming pipeline there is
nothing special about exception handling, it works the same as in
Streamly. Like the pipefail
option in shells, exceptions are
propagated if any of the stages fail.
newtype ProcessFailure Source #
An exception that is raised when a process fails.
Since: 0.1.0
Constructors
ProcessFailure Int | The exit code of the process. |
Instances
Exception ProcessFailure Source # | |
Defined in Streamly.Internal.System.Process Methods toException :: ProcessFailure -> SomeException # | |
Show ProcessFailure Source # | |
Defined in Streamly.Internal.System.Process Methods showsPrec :: Int -> ProcessFailure -> ShowS # show :: ProcessFailure -> String # showList :: [ProcessFailure] -> ShowS # |
Process Configuration
Process configuration used for creating a new process.
By default the process config is setup to inherit the following attributes from the parent process:
- Current working directory
- Environment variables
- Open file descriptors
- Process group
- Terminal session
On POSIX:
- Process uid and gid
- Signal handlers
On Windows by default the parent process waits for the entire child process tree to finish.
Generation
Arguments
:: (MonadAsync m, MonadCatch m) | |
=> FilePath | Executable name or path |
-> [String] | Arguments |
-> Stream m (Array Word8) | Output Stream |
The following code is equivalent to the shell command echo "hello
world"
:
>>>
:{
Process.toChunks "echo" ["hello world"] & Stream.fold Stdio.writeChunks :} hello world
>>>
toChunks = toChunksWith id
Since: 0.1.0
Arguments
:: (MonadAsync m, MonadCatch m) | |
=> FilePath | Executable name or path |
-> [String] | Arguments |
-> Stream m Word8 | Output Stream |
The following code is equivalent to the shell command echo "hello
world"
:
>>>
:{
Process.toBytes "echo" ["hello world"] & Stream.fold Stdio.write :} hello world
Since: 0.1.0
Transformation
Arguments
:: (MonadCatch m, MonadAsync m) | |
=> FilePath | Executable name or path |
-> [String] | Arguments |
-> Stream m (Array Word8) | Input stream |
-> Stream m (Array Word8) | Output stream |
pipeChunks file args input
runs the executable file
specified by
its name or path using args
as arguments and input
stream as its
standard input. Returns the standard output of the executable as a stream.
If only the name of an executable file is specified instead of its path then the file name is searched in the directories specified by the PATH environment variable.
If the input stream throws an exception or if the output stream is garbage collected before it could finish then the process is terminated with SIGTERM.
If the process terminates with a non-zero exit code then a ProcessFailure
exception is raised.
The following code is equivalent to the shell command echo "hello world" |
tr [a-z] [A-Z]
:
>>>
:{
Process.toChunks "echo" ["hello world"] & Process.pipeChunks "tr" ["[a-z]", "[A-Z]"] & Stream.fold Stdio.writeChunks :} HELLO WORLD
pre-release
Arguments
:: (MonadCatch m, MonadAsync m) | |
=> FilePath | Executable name or path |
-> [String] | Arguments |
-> Stream m Word8 | Input Stream |
-> Stream m Word8 | Output Stream |
Like pipeChunks
except that it works on a stream of bytes instead of
a stream of chunks.
We can write the example in pipeChunks
as follows.
>>>
:{
Process.toBytes "echo" ["hello world"] & Process.pipeBytes "tr" ["[a-z]", "[A-Z]"] & Stream.fold Stdio.write :} HELLO WORLD
pre-release
Deprecated
Arguments
:: (MonadCatch m, MonadAsync m) | |
=> FilePath | Executable name or path |
-> [String] | Arguments |
-> Stream m (Array Word8) | Input stream |
-> Stream m (Array Word8) | Output stream |
Deprecated: Please use pipeChunks instead.
Arguments
:: (MonadCatch m, MonadAsync m) | |
=> FilePath | Executable name or path |
-> [String] | Arguments |
-> Stream m Word8 | Input Stream |
-> Stream m Word8 | Output Stream |
Deprecated: Please use pipeBytes instead.