Description

Synopsis

# Documentation

data Mapper i o e m Source #

A Mapper consists of a decoder, an encoder, and a stream transforming input into (key, value) pairs.

Constructors

 Mapper Fields(i -> Either e j)Decoder for mapper input(k -> v -> o)Encoder for mapper output(ConduitT j (k, v) m ())A stream transforming input into (k, v) pairs.

data Reducer i o e m Source #

A Reducer consists of a decoder, an encoder, and a stream transforming each key and all values associated with the key into some result values.

Constructors

 Eq k => Reducer Fields(i -> Either e (k, v))Decoder for reducer input(r -> o)Encoder for reducer output(k -> v -> ConduitT v r m ())A stream processing a key and all values associated with the key. The parameter v is the first value associated with the key (since a key always has one or more values), and the remaining values are processed by the conduit.Examples:import qualified Data.Conduit as C import qualified Data.Conduit.Combinators as C -- Sum up all values associated with the key and emit a (key, sum) pair. sumValues :: (Monad m, Num v) => k -> v -> ConduitT v (k, v) m () sumValues k v0 = C.foldl (+) v0 >>= C.yield . (k,) -- Increment a counter for each (key, value) pair, and emit the (key, value) pair. incCounterAndEmit :: MonadIO m => k -> v -> ConduitT v (k, v) m () incCounterAndEmit k v0 = C.leftover v0 <> C.mapM \v -> incCounter "reducer" "key-value pairs" >> pure (k, v) 

Arguments

 :: MonadIO m => ConduitT () i m () Mapper source. The source should generally stream from stdin, and produce a value for each line of the input, as that is how Hadoop streaming is supposed to work, but this is not enforced. For example, if you really want to, you can produce a single value for the entire input split (which can be done using Data.Conduit.Combinators.stdin as the source). Note that regardless of what the source does, the values of Hadoop counters "Map input records" and "Reduce input records" are always the number of lines of the mapper and reducer input, because these counters are managed by the Hadoop framework.An example is stdinLn, which streams from stdin as Text, one line at a time. -> ConduitT o Void m () Mapper sink. It should generally stream to stdout. An example is stdoutLn. -> (i -> e -> m ()) An action to be executed for each input that cannot be decoded. The first parameter is the input and the second parameter is the decoding error. One may choose to, for instance, increment a counter and println an error message. -> Mapper i o e m -> m ()

Run a Mapper.

Arguments

 :: MonadIO m => ConduitT () i m () Reducer source -> ConduitT o Void m () Reducer sink -> (i -> e -> m ()) An action to be executed for each input that cannot be decoded. -> Reducer i o e m -> m ()

Run a Reducer.

println :: MonadIO m => Text -> m () Source #

Like putStrLn, but writes to stderr.

Arguments

 :: MonadIO m => Text Group name. Must not contain comma. -> Text Counter name. Must not contain comma. -> m ()

Increment a counter by 1.

Arguments

 :: MonadIO m => Int -> Text Group name. Must not contain comma. -> Text Counter name. Must not contain comma. -> m ()

Increment a counter by n.