Copyright | (c) 2020 Composewell Technologies |
---|---|
License | Apache-2.0 |
Maintainer | streamly@composewell.com |
Stability | experimental |
Portability | GHC |
Safe Haskell | None |
Language | Haskell2010 |
Use OS processes as stream transformation functions.
This module provides functions to run operating system processes as stream source, sink or transformation functions. Thus OS processes can be used in the same way as Haskell functions and all the streaming combinators in streamly can be used to combine them. This allows you to seamlessly integrate external programs into your Haskell program.
We recommend that you use Streamly threads instead of system processes where possible as they have a simpler programming model and processes have a larger performance overhead.
Imports for examples
Use the following imports for examples below.
>>>
:set -XFlexibleContexts
>>>
:set -XScopedTypeVariables
>>>
import Data.Char (toUpper)
>>>
import Data.Function ((&))
>>>
import qualified Streamly.Console.Stdio as Stdio
>>>
import qualified Streamly.Data.Array.Foreign as Array
>>>
import qualified Streamly.Data.Fold as Fold
>>>
import qualified Streamly.Prelude as Stream
>>>
import qualified Streamly.System.Process as Process
>>>
import qualified Streamly.Unicode.Stream as Unicode
>>>
import qualified Streamly.Internal.FileSystem.Dir as Dir
>>>
import qualified Streamly.Internal.Data.Stream.IsStream as Stream
Executables as functions
Streamly provides powerful ways to combine streams. Processes can be
composed in a streaming pipeline just like a Posix shell command pipeline
except that we use &
instead of |
. Moreover, we can mix processes and
Haskell functions seamlessly in a processing pipeline. For example:
>>>
:{
Process.toBytes "echo" ["hello world"] & Process.processBytes "tr" ["[a-z]", "[A-Z]"] & Stream.fold Stdio.write :} HELLO WORLD
Of course, you can use a Haskell function instead of "tr":
>>>
:{
Process.toBytes "echo" ["hello world"] & Unicode.decodeLatin1 & Stream.map toUpper & Unicode.encodeLatin1 & Stream.fold Stdio.write :} HELLO WORLD
Shell commands as functions
Using a shell as the command interpreter we can use shell commands in a data processing pipeline:
>>>
:{
Process.toBytes "sh" ["-c", "echo hello | tr [a-z] [A-Z]"] & Stream.fold Stdio.write :} HELLO
Concurrent Processing
We can run executables or commands concurrently as we would run any other functions in Streamly. For example, the following program greps the word "to" in all the files in the current directory concurrently:
>>>
:{
grep file = Process.toBytes "grep" ["-H", "to", file] & Stream.handle (\(_ :: Process.ProcessFailure) -> Stream.nil) & Stream.splitWithSuffix (== 10) Array.write :}
>>>
:{
pgrep = Dir.toFiles "." & Stream.concatMapWith Stream.async grep & Stream.fold Stdio.writeChunks :}
Exception handling
Since we are composing using Streamly streaming pipeline there is nothing
special about exception handling, it works the same as in Streamly. Like
the pipefail
option in shells, exceptions are propagated if any of the
stages fail.
Process Attributes
When a new process is spawned, the following attributes are inherited from the parent process:
- Current working directory
- Environment
- Open file descriptors
- Process group
- Process uid and gid
- Signal handlers
- Terminal (Session)
Synopsis
- newtype ProcessFailure = ProcessFailure Int
- toChunks :: (IsStream t, MonadAsync m, MonadCatch m) => FilePath -> [String] -> t m (Array Word8)
- toBytes :: (IsStream t, MonadAsync m, MonadCatch m) => FilePath -> [String] -> t m Word8
- processChunks :: (IsStream t, MonadCatch m, MonadAsync m) => FilePath -> [String] -> t m (Array Word8) -> t m (Array Word8)
- processBytes :: (IsStream t, MonadCatch m, MonadAsync m) => FilePath -> [String] -> t m Word8 -> t m Word8
Documentation
newtype ProcessFailure Source #
An exception that is raised when a process fails.
Since: 0.1.0
ProcessFailure Int | The exit code of the process. |
Instances
Show ProcessFailure Source # | |
Defined in Streamly.Internal.System.Process showsPrec :: Int -> ProcessFailure -> ShowS # show :: ProcessFailure -> String # showList :: [ProcessFailure] -> ShowS # | |
Exception ProcessFailure Source # | |
Defined in Streamly.Internal.System.Process |
Generation
:: (IsStream t, MonadAsync m, MonadCatch m) | |
=> FilePath | Executable name or path |
-> [String] | Arguments |
-> t m (Array Word8) | Output Stream |
See processChunks
. toChunks
is defined as:
>>>
toChunks path args = processChunks path args Stream.nil
The following code is equivalent to the shell command echo "hello world"
:
>>>
:{
Process.toChunks "echo" ["hello world"] & Stream.fold Stdio.writeChunks :} hello world
Since: 0.1.0
:: (IsStream t, MonadAsync m, MonadCatch m) | |
=> FilePath | Executable name or path |
-> [String] | Arguments |
-> t m Word8 | Output Stream |
See processBytes
. toBytes
is defined as:
>>>
toBytes path args = processBytes path args Stream.nil
The following code is equivalent to the shell command echo "hello world"
:
>>>
:{
Process.toBytes "echo" ["hello world"] & Stream.fold Stdio.write :} hello world
Since: 0.1.0
Transformation
:: (IsStream t, MonadCatch m, MonadAsync m) | |
=> FilePath | Executable name or path |
-> [String] | Arguments |
-> t m (Array Word8) | Input stream |
-> t m (Array Word8) | Output stream |
processChunks file args input
runs the executable file
specified by
its name or path using args
as arguments and input
stream as its
standard input. Returns the standard output of the executable as a stream.
If only the name of an executable file is specified instead of its path then the file name is searched in the directories specified by the PATH environment variable.
If the input stream throws an exception or if the output stream is garbage collected before it could finish then the process is sent a SIGTERM and we wait for it to terminate gracefully.
If the process terminates with a non-zero exit code then a ProcessFailure
exception is raised.
The following code is equivalent to the shell command echo "hello world" |
tr [a-z] [A-Z]
:
>>>
:{
Process.toChunks "echo" ["hello world"] & Process.processChunks "tr" ["[a-z]", "[A-Z]"] & Stream.fold Stdio.writeChunks :} HELLO WORLD
Since: 0.1.0
:: (IsStream t, MonadCatch m, MonadAsync m) | |
=> FilePath | Executable name or path |
-> [String] | Arguments |
-> t m Word8 | Input Stream |
-> t m Word8 | Output Stream |
Like processChunks
except that it works on a stream of bytes instead of
a stream of chunks.
We can write the example in processChunks
as follows. Notice how
seamlessly we can replace the tr
process with the Haskell toUpper
function:
>>>
:{
Process.toBytes "echo" ["hello world"] & Unicode.decodeLatin1 & Stream.map toUpper & Unicode.encodeLatin1 & Stream.fold Stdio.write :} HELLO WORLD
Since: 0.1.0