| Safe Haskell | None |
|---|
System.Process.Streaming
Contents
Description
This module contains helper functions and types built on top of System.Process and Pipes.
They provide concurrent, buffered (to avoid deadlocks) streaming access to the inputs and outputs of system processes.
There's also an emphasis in having error conditions explicit in the types, instead of throwing exceptions.
See the functions execute and execute3 for an entry point. Then the
functions separate and combineLines that handle the consumption of
stdout and stderr.
Regular Consumers, Parsers from pipes-parse and folds from
Pipes.Prelude can be used to consume the output streams of the external
processes.
- execute :: (Show e, Typeable e) => CreateProcess -> (IOException -> e) -> (Producer ByteString IO () -> Producer ByteString IO () -> IO (Either e a)) -> IO (Either e (ExitCode, a))
- execute3 :: (Show e, Typeable e) => CreateProcess -> (IOException -> e) -> (Consumer ByteString IO () -> IO (Either e a)) -> (Producer ByteString IO () -> Producer ByteString IO () -> IO (Either e b)) -> IO (Either e (ExitCode, (a, b)))
- exitCode :: Functor c => (Int -> e) -> c (Either e (ExitCode, a)) -> c (Either e a)
- separate :: (Show e, Typeable e) => (Producer ByteString IO () -> IO (Either e a)) -> (Producer ByteString IO () -> IO (Either e b)) -> Producer ByteString IO () -> Producer ByteString IO () -> IO (Either e (a, b))
- type LinePolicy e = (FreeT (Producer Text IO) IO (Producer ByteString IO ()) -> IO (Producer ByteString IO ())) -> Producer ByteString IO () -> IO (Either e ())
- linePolicy :: (forall r. Producer ByteString IO r -> Producer Text IO (Producer ByteString IO r)) -> (forall r. Producer Text IO r -> Producer Text IO r) -> LeftoverPolicy (Producer ByteString IO ()) e () -> LinePolicy e
- type LeftoverPolicy l e a = a -> l -> IO (Either e a)
- ignoreLeftovers :: LeftoverPolicy l e a
- failOnLeftovers :: (a -> b -> e) -> LeftoverPolicy (Producer b IO ()) e a
- combineLines :: (Show e, Typeable e) => LinePolicy e -> LinePolicy e -> (Producer Text IO () -> IO (Either e a)) -> Producer ByteString IO () -> Producer ByteString IO () -> IO (Either e a)
- useConsumer :: Monad m => Consumer b m () -> Producer b m () -> m ()
- useProducer :: Monad m => Producer b m () -> Consumer b m () -> m ()
- surely :: (Functor f, Functor f') => f (f' a) -> f (f' (Either e a))
- safely :: (MFunctor t, MonadCatch m, MonadIO m) => (t (SafeT m) l -> SafeT m x) -> t m l -> m x
- fallibly :: (MFunctor t, Monad m, Error e) => (t (ErrorT e m) l -> ErrorT e m x) -> t m l -> m (Either e x)
- monoidally :: (MFunctor t, Monad m, Monoid w, Error e') => (e' -> w -> e) -> (t (ErrorT e' (WriterT w m)) l -> ErrorT e' (WriterT w m) ()) -> t m l -> m (Either e w)
- exceptionally :: (IOException -> e) -> (x -> IO (Either e a)) -> x -> IO (Either e a)
- nop :: (MFunctor t, Monad m) => t m l -> m (Either e ())
- encoding :: (Show e, Typeable e) => (Producer b IO r -> Producer t IO (Producer b IO r)) -> LeftoverPolicy (Producer b IO r) e x -> (Producer t IO () -> IO (Either e x)) -> Producer b IO r -> IO (Either e x)
- newtype Conc e a = Conc {}
- conc :: (Show e, Typeable e) => IO (Either e a) -> IO (Either e b) -> IO (Either e (a, b))
- mapConc :: (Show e, Typeable e, Traversable t) => (a -> IO (Either e b)) -> t a -> IO (Either e (t b))
- newtype ForkProd b e a = ForkProd {
- runForkProd :: Producer b IO () -> IO (Either e a)
- forkProd :: (Show e, Typeable e) => (Producer b IO () -> IO (Either e x)) -> (Producer b IO () -> IO (Either e y)) -> Producer b IO () -> IO (Either e (x, y))
Execution helpers
execute :: (Show e, Typeable e) => CreateProcess -> (IOException -> e) -> (Producer ByteString IO () -> Producer ByteString IO () -> IO (Either e a)) -> IO (Either e (ExitCode, a))Source
This function takes as arguments a CreateProcess record, an exception
handler, and a consuming function for two Producers associated to stdout
and stderr, respectively.
execute tries to avoid launching exceptions, and represents all errors as
e values.
If the consuming function fails with e, the whole computation is
immediately aborted and e is returned.
If an error or asynchronous exception happens, the external process is terminated.
This function sets the std_out and std_err fields in the CreateProcess
record to CreatePipe.
execute3 :: (Show e, Typeable e) => CreateProcess -> (IOException -> e) -> (Consumer ByteString IO () -> IO (Either e a)) -> (Producer ByteString IO () -> Producer ByteString IO () -> IO (Either e b)) -> IO (Either e (ExitCode, (a, b)))Source
Like execute3 but with an additional argument consisting in a feeding
function that takes the stdin Consumer and writes to it.
Like the consuming function, the feeding function can return a value and can also fail, terminating the process.
The feeding function is executed concurrently with the consuming functions, not before them.
execute3 sets the std_in, std_out and std_err fields in the
CreateProcess record to CreatePipe.
exitCode :: Functor c => (Int -> e) -> c (Either e (ExitCode, a)) -> c (Either e a)Source
Convenience function that merges ExitFailure values into the e value.
The e value is created from the exit code.
Usually composed with the execute functions.
separate :: (Show e, Typeable e) => (Producer ByteString IO () -> IO (Either e a)) -> (Producer ByteString IO () -> IO (Either e b)) -> Producer ByteString IO () -> Producer ByteString IO () -> IO (Either e (a, b))Source
separate should be used when we want to consume stdout and stderr
concurrently and independently. It constructs a function that can be plugged
into execute or execute3.
If the consuming functions return with a and b, the corresponding
streams keep being drained until the end. The combined value is not returned
until both stdout and stderr are closed by the external process.
However, if any of the consuming functions fails with e, the whole
computation fails immediately with e.
Execution with combined stdout/stderr
type LinePolicy e = (FreeT (Producer Text IO) IO (Producer ByteString IO ()) -> IO (Producer ByteString IO ())) -> Producer ByteString IO () -> IO (Either e ())Source
Type synonym for a function that takes a method to tear down a FreeT-based
list of lines as first parameter, a ByteString source as second parameter,
and returns a (possibly failing) computation. Presumably, the bytes are decoded
into text, the text split into lines, and the tear down function applied.
See the pipes-group package for utilities on how to manipulate these
FreeT-based lists. They allow you to handle individual lines without forcing
you to have a whole line in memory at any given time.
See also linePolicy and combineLines.
linePolicy :: (forall r. Producer ByteString IO r -> Producer Text IO (Producer ByteString IO r)) -> (forall r. Producer Text IO r -> Producer Text IO r) -> LeftoverPolicy (Producer ByteString IO ()) e () -> LinePolicy eSource
Constructs a LinePolicy.
The first argument is a function function that decodes ByteString into
Text. See the section Decoding Functions in the documentation for the
Pipes.Text module.
The second argument is a function that modifies each individual line. The
line is represented as a Producer to avoid having to keep it wholly in
memory. If you want the lines unmodified, just pass id. Line prefixes are
easy to add using applicative notation:
(\x -> yield "prefix: " *> x)
The third argument is a LeftoverPolicy value that specifies how to handle
decoding failures.
type LeftoverPolicy l e a = a -> l -> IO (Either e a)Source
In the Pipes ecosystem, leftovers from decoding operations are often stored
in the result value of Producers (often as Producers themselves). This is a
type synonym for a function that receives a value a and some leftovers l,
and may modify the value or fail outright, depending of what the leftovers are.
ignoreLeftovers :: LeftoverPolicy l e aSource
Never fails for any leftover.
failOnLeftovers :: (a -> b -> e) -> LeftoverPolicy (Producer b IO ()) e aSource
Fails if it encounters any leftover, and constructs the error out of the first undedcoded data.
For simple error handling, just ignore the a and the undecoded data:
(failOnLeftvoers (\_ _->"badbytes")) :: LeftoverPolicy (Producer b IO ()) String a
For more detailed error handling, you may want to include the result until
the error a and/or the first undecoded values b in your custom error
datatype.
combineLines :: (Show e, Typeable e) => LinePolicy e -> LinePolicy e -> (Producer Text IO () -> IO (Either e a)) -> Producer ByteString IO () -> Producer ByteString IO () -> IO (Either e a)Source
The bytes from stdout and stderr are decoded into Text, splitted into
lines (maybe applying some transformation to each line) and then combined and
consumed by the function passed as argument.
For both stdout and stderr, a LinePolicy must be supplied.
Like with separate, the streams are drained to completion if no errors
happen, but the computation is aborted immediately if any error e is
returned.
combineLines returns a function that can be plugged into execute or
execute3.
Beware! combineLines avoids situations in which a line emitted
in stderr cuts a long line emitted in stdout, see
here for a description of the problem. To avoid this, the combined text
stream is locked while writing each individual line. But this means that if the
external program stops writing to a handle while in the middle of a line,
lines coming from the other handles won't get printed, either!
Constructing feeding/consuming functions
useConsumer :: Monad m => Consumer b m () -> Producer b m () -> m ()Source
Useful for constructing stdout or stderr consuming functions from a
Consumer, to be plugged into separated or combineLines.
You may need to use surely for the types to fit.
useProducer :: Monad m => Producer b m () -> Consumer b m () -> m ()Source
surely :: (Functor f, Functor f') => f (f' a) -> f (f' (Either e a))Source
Useful when we want to plug in a handler that doesn't return an Either. For
example folds from Pipes.Prelude, or functions created from simple
Consumers with useConsumer.
surely = fmap (fmap Right)
safely :: (MFunctor t, MonadCatch m, MonadIO m) => (t (SafeT m) l -> SafeT m x) -> t m l -> m xSource
Useful when we want to plug in a handler that does its work in the SafeT
transformer.
fallibly :: (MFunctor t, Monad m, Error e) => (t (ErrorT e m) l -> ErrorT e m x) -> t m l -> m (Either e x)Source
monoidally :: (MFunctor t, Monad m, Monoid w, Error e') => (e' -> w -> e) -> (t (ErrorT e' (WriterT w m)) l -> ErrorT e' (WriterT w m) ()) -> t m l -> m (Either e w)Source
Usually, it is better to use a fold form Pipes.Prelude instead of this function. But this function has the ability to return the monoidal result accumulated up until the error happened.
The first argument is a function that combines the initial error with the
monoidal result to build the definitive error value. If you want to discard the
results, use const as the first argument.
exceptionally :: (IOException -> e) -> (x -> IO (Either e a)) -> x -> IO (Either e a)Source
nop :: (MFunctor t, Monad m) => t m l -> m (Either e ())Source
Value to plug into a separate or combineLines function when we are not
interested in doing anything with the handle. It returns immediately with ().
Notice that even if nop returns immediately, separate and
combineLines drain the streams to completion before returning.
encoding :: (Show e, Typeable e) => (Producer b IO r -> Producer t IO (Producer b IO r)) -> LeftoverPolicy (Producer b IO r) e x -> (Producer t IO () -> IO (Either e x)) -> Producer b IO r -> IO (Either e x)Source
Adapts a function that works with Producers of decoded values so that it works with Producers of still undecoded values, by supplying a decoding function and a LeftoverPolicy.
Concurrency helpers
Conc is very similar to Concurrently from the
async package, but it has an explicit error type e.
The Applicative instance is used to run actions concurrently, wait until
they finish, and combine their results.
However, if any of the actions fails with e the other actions are
immediately cancelled and the whole computation fails with e.
To put it another way: Conc behaves like Concurrently for successes and
like race for errors.
Instances
| Functor (Conc e) | |
| (Show e, Typeable e) => Applicative (Conc e) | |
| (Show e, Typeable e) => Alternative (Conc e) |
mapConc :: (Show e, Typeable e, Traversable t) => (a -> IO (Either e b)) -> t a -> IO (Either e (t b))Source
Works similarly to mapConcurrently from the
async package, but if any of the computations fails with e, the others are
immediately cancelled and the whole computation fails with e.
ForkProd is a newtype around a function that does something with a
Producer. The applicative instance fuses the functions, so that each one
receives its own copy of the Producer and runs concurrently with the others.
Like with Conc, if any of the functions fails with e the others are
immediately cancelled and the whole computation fails with e.
ForkProd and its accompanying functions are useful to run multiple
parsers from Pipes.Parse in parallel over the same Producer.