process-streaming-0.0.1: Streaming interface to system processes.

Safe HaskellNone

System.Process.Streaming

Contents

Description

This module contains helper functions and types built on top of System.Process and Pipes.

They provide concurrent, buffered (to avoid deadlocks) streaming access to the inputs and outputs of system processes.

There's also an emphasis in having error conditions explicit in the types, instead of throwing exceptions.

See the functions execute and execute3 for an entry point. Then the functions separate and combineLines that handle the consumption of stdout and stderr.

Regular Consumers, Parsers from pipes-parse and folds from Pipes.Prelude can be used to consume the output streams of the external processes.

Synopsis

Execution helpers

execute :: (Show e, Typeable e) => CreateProcess -> (IOException -> e) -> (Producer ByteString IO () -> Producer ByteString IO () -> IO (Either e a)) -> IO (Either e (ExitCode, a))Source

This function takes as arguments a CreateProcess record, an exception handler, and a consuming function for two Producers associated to stdout and stderr, respectively.

execute tries to avoid launching exceptions, and represents all errors as e values.

If the consuming function fails with e, the whole computation is immediately aborted and e is returned.

If an error or asynchronous exception happens, the external process is terminated.

This function sets the std_out and std_err fields in the CreateProcess record to CreatePipe.

execute3 :: (Show e, Typeable e) => CreateProcess -> (IOException -> e) -> (Consumer ByteString IO () -> IO (Either e a)) -> (Producer ByteString IO () -> Producer ByteString IO () -> IO (Either e b)) -> IO (Either e (ExitCode, (a, b)))Source

Like execute3 but with an additional argument consisting in a feeding function that takes the stdin Consumer and writes to it.

Like the consuming function, the feeding function can return a value and can also fail, terminating the process.

The feeding function is executed concurrently with the consuming functions, not before them.

execute3 sets the std_in, std_out and std_err fields in the CreateProcess record to CreatePipe.

exitCode :: Functor c => (Int -> e) -> c (Either e (ExitCode, a)) -> c (Either e a)Source

Convenience function that merges ExitFailure values into the e value.

The e value is created from the exit code.

Usually composed with the execute functions.

separate :: (Show e, Typeable e) => (Producer ByteString IO () -> IO (Either e a)) -> (Producer ByteString IO () -> IO (Either e b)) -> Producer ByteString IO () -> Producer ByteString IO () -> IO (Either e (a, b))Source

separate should be used when we want to consume stdout and stderr concurrently and independently. It constructs a function that can be plugged into execute or execute3.

If the consuming functions return with a and b, the corresponding streams keep being drained until the end. The combined value is not returned until both stdout and stderr are closed by the external process.

However, if any of the consuming functions fails with e, the whole computation fails immediately with e.

Execution with combined stdout/stderr

type LinePolicy e = (FreeT (Producer Text IO) IO (Producer ByteString IO ()) -> IO (Producer ByteString IO ())) -> Producer ByteString IO () -> IO (Either e ())Source

Type synonym for a function that takes a method to tear down a FreeT-based list of lines as first parameter, a ByteString source as second parameter, and returns a (possibly failing) computation. Presumably, the bytes are decoded into text, the text split into lines, and the tear down function applied.

See the pipes-group package for utilities on how to manipulate these FreeT-based lists. They allow you to handle individual lines without forcing you to have a whole line in memory at any given time.

See also linePolicy and combineLines.

linePolicy :: (forall r. Producer ByteString IO r -> Producer Text IO (Producer ByteString IO r)) -> (forall r. Producer Text IO r -> Producer Text IO r) -> LeftoverPolicy (Producer ByteString IO ()) e () -> LinePolicy eSource

Constructs a LinePolicy.

The first argument is a function function that decodes ByteString into Text. See the section Decoding Functions in the documentation for the Pipes.Text module.

The second argument is a function that modifies each individual line. The line is represented as a Producer to avoid having to keep it wholly in memory. If you want the lines unmodified, just pass id. Line prefixes are easy to add using applicative notation:

 (\x -> yield "prefix: " *> x)

The third argument is a LeftoverPolicy value that specifies how to handle decoding failures.

type LeftoverPolicy l e a = a -> l -> IO (Either e a)Source

In the Pipes ecosystem, leftovers from decoding operations are often stored in the result value of Producers (often as Producers themselves). This is a type synonym for a function that receives a value a and some leftovers l, and may modify the value or fail outright, depending of what the leftovers are.

ignoreLeftovers :: LeftoverPolicy l e aSource

Never fails for any leftover.

failOnLeftovers :: (a -> b -> e) -> LeftoverPolicy (Producer b IO ()) e aSource

Fails if it encounters any leftover, and constructs the error out of the first undedcoded data.

For simple error handling, just ignore the a and the undecoded data:

 (failOnLeftvoers (\_ _->"badbytes")) :: LeftoverPolicy (Producer b IO ()) String a

For more detailed error handling, you may want to include the result until the error a and/or the first undecoded values b in your custom error datatype.

combineLines :: (Show e, Typeable e) => LinePolicy e -> LinePolicy e -> (Producer Text IO () -> IO (Either e a)) -> Producer ByteString IO () -> Producer ByteString IO () -> IO (Either e a)Source

The bytes from stdout and stderr are decoded into Text, splitted into lines (maybe applying some transformation to each line) and then combined and consumed by the function passed as argument.

For both stdout and stderr, a LinePolicy must be supplied.

Like with separate, the streams are drained to completion if no errors happen, but the computation is aborted immediately if any error e is returned.

combineLines returns a function that can be plugged into execute or execute3.

Beware! combineLines avoids situations in which a line emitted in stderr cuts a long line emitted in stdout, see here for a description of the problem. To avoid this, the combined text stream is locked while writing each individual line. But this means that if the external program stops writing to a handle while in the middle of a line, lines coming from the other handles won't get printed, either!

Constructing feeding/consuming functions

useConsumer :: Monad m => Consumer b m () -> Producer b m () -> m ()Source

Useful for constructing stdout or stderr consuming functions from a Consumer, to be plugged into separated or combineLines.

You may need to use surely for the types to fit.

useProducer :: Monad m => Producer b m () -> Consumer b m () -> m ()Source

Useful for constructing stdin feeding functions from a Producer, to be plugged into execute3.

You may need to use surely for the types to fit.

surely :: (Functor f, Functor f') => f (f' a) -> f (f' (Either e a))Source

Useful when we want to plug in a handler that doesn't return an Either. For example folds from Pipes.Prelude, or functions created from simple Consumers with useConsumer.

 surely = fmap (fmap Right)

safely :: (MFunctor t, MonadCatch m, MonadIO m) => (t (SafeT m) l -> SafeT m x) -> t m l -> m xSource

Useful when we want to plug in a handler that does its work in the SafeT transformer.

fallibly :: (MFunctor t, Monad m, Error e) => (t (ErrorT e m) l -> ErrorT e m x) -> t m l -> m (Either e x)Source

monoidally :: (MFunctor t, Monad m, Monoid w, Error e') => (e' -> w -> e) -> (t (ErrorT e' (WriterT w m)) l -> ErrorT e' (WriterT w m) ()) -> t m l -> m (Either e w)Source

Usually, it is better to use a fold form Pipes.Prelude instead of this function. But this function has the ability to return the monoidal result accumulated up until the error happened.

The first argument is a function that combines the initial error with the monoidal result to build the definitive error value. If you want to discard the results, use const as the first argument.

exceptionally :: (IOException -> e) -> (x -> IO (Either e a)) -> x -> IO (Either e a)Source

Useful when we want to construct different error values e depending on what feeding/consuming function throws an exeption, instead of relying in the catch-all error callback supplied in execute or execute3.

nop :: (MFunctor t, Monad m) => t m l -> m (Either e ())Source

Value to plug into a separate or combineLines function when we are not interested in doing anything with the handle. It returns immediately with ().

Notice that even if nop returns immediately, separate and combineLines drain the streams to completion before returning.

encoding :: (Show e, Typeable e) => (Producer b IO r -> Producer t IO (Producer b IO r)) -> LeftoverPolicy (Producer b IO r) e x -> (Producer t IO () -> IO (Either e x)) -> Producer b IO r -> IO (Either e x)Source

Adapts a function that works with Producers of decoded values so that it works with Producers of still undecoded values, by supplying a decoding function and a LeftoverPolicy.

Concurrency helpers

newtype Conc e a Source

Conc is very similar to Concurrently from the async package, but it has an explicit error type e.

The Applicative instance is used to run actions concurrently, wait until they finish, and combine their results.

However, if any of the actions fails with e the other actions are immediately cancelled and the whole computation fails with e.

To put it another way: Conc behaves like Concurrently for successes and like race for errors.

Constructors

Conc 

Fields

runConc :: IO (Either e a)
 

Instances

Functor (Conc e) 
(Show e, Typeable e) => Applicative (Conc e) 
(Show e, Typeable e) => Alternative (Conc e) 

conc :: (Show e, Typeable e) => IO (Either e a) -> IO (Either e b) -> IO (Either e (a, b))Source

mapConc :: (Show e, Typeable e, Traversable t) => (a -> IO (Either e b)) -> t a -> IO (Either e (t b))Source

Works similarly to mapConcurrently from the async package, but if any of the computations fails with e, the others are immediately cancelled and the whole computation fails with e.

newtype ForkProd b e a Source

ForkProd is a newtype around a function that does something with a Producer. The applicative instance fuses the functions, so that each one receives its own copy of the Producer and runs concurrently with the others. Like with Conc, if any of the functions fails with e the others are immediately cancelled and the whole computation fails with e.

ForkProd and its accompanying functions are useful to run multiple parsers from Pipes.Parse in parallel over the same Producer.

Constructors

ForkProd 

Fields

runForkProd :: Producer b IO () -> IO (Either e a)
 

Instances

forkProd :: (Show e, Typeable e) => (Producer b IO () -> IO (Either e x)) -> (Producer b IO () -> IO (Either e y)) -> Producer b IO () -> IO (Either e (x, y))Source