hadoop-streaming-0.1.0.0: A simple Hadoop streaming library

Safe HaskellNone
LanguageHaskell2010

HadoopStreaming

Synopsis

Documentation

data Mapper e m Source #

A Mapper consists of a decoder, an encoder, and a stream that transforms each input into a (key, value) pair.

Constructors

Mapper 

Fields

  • (Text -> Either e input)

    Decoder for mapper input.

  • (k -> v -> Text)

    Encoder for mapper output.

  • (ConduitT input (k, v) m ())

    A stream transforming input into (k, v) pairs.

data Reducer e m Source #

A Reducer consists of a decoder, an encoder, and a stream that transforms each key and all values associated with the key into zero or more res.

Constructors

Eq k => Reducer 

Fields

  • (Text -> Either e (k, v))

    Decoder for reducer input

  • (res -> Text)

    Encoder for reducer output

  • (k -> v -> ConduitT v res m ())

    A stream processing a key and all values associated with the key. The parameter v is the first value associated with the key (since a key always has one or more values), and the remaining values are processed by the conduit.

    Examples:

    import qualified Data.Conduit as C
    import qualified Data.Conduit.Combinators as C
    
    -- Sum up all values associated with the key and emit a (key, sum) pair.
    sumValues :: (Monad m, Num v) => k -> v -> ConduitT v (k, v) m ()
    sumValues k v0 = C.foldl (+) v0 >>= C.yield . (k,)
    
    -- Increment a counter for each (key, value) pair, and emit the (key, value) pair.
    incCounterAndEmit :: MonadIO m => k -> v -> ConduitT v (k, v) m ()
    incCounterAndEmit k v0 = C.leftover v0 <> C.mapM \v ->
      incCounter "reducer" "key-value pairs" >> pure (k, v)
    

runMapper Source #

Arguments

:: MonadIO m 
=> (Text -> e -> m ())

A action to be executed for each input that cannot be decoded. The first parameter is the input and the second parameter is the decoding error. One may choose to, for instance, increment a counter and println an error message.

-> Mapper e m 
-> m () 

Run a Mapper. Takes input from stdin and emits the result to stdout.

runMapper = runMapperWith (sourceHandle stdin) (sinkHandle stdout)

runMapperWith :: MonadIO m => ConduitT () Text m () -> ConduitT Text Void m () -> (Text -> e -> m ()) -> Mapper e m -> m () Source #

Like runMapper, but allows specifying a source and a sink.

runMapper = runMapperWith (sourceHandle stdin) (sinkHandle stdout)

runReducer Source #

Arguments

:: MonadIO m 
=> (Text -> e -> m ())

A action to be executed for each input that cannot be decoded. The first parameter is the input and the second parameter is the decoding error. One may choose to, for instance, increment a counter and println an error message.

-> Reducer e m 
-> m () 

Run a Reducer. Takes input from stdin and emits the result to stdout.

runReducer = runReducerWith (sourceHandle stdin) (sinkHandle stdout)

runReducerWith :: MonadIO m => ConduitT () Text m () -> ConduitT Text Void m () -> (Text -> e -> m ()) -> Reducer e m -> m () Source #

Like runReducer, but allows specifying a source and a sink.

runReducer = runReducerWith (sourceHandle stdin) (sinkHandle stdout)

println :: MonadIO m => Text -> m () Source #

Like putStrLn, but writes to stderr.

incCounter Source #

Arguments

:: MonadIO m 
=> Text

Group name. Must not contain comma.

-> Text

Counter name. Must not contain comma.

-> m () 

Increment a counter by 1.

incCounterBy Source #

Arguments

:: MonadIO m 
=> Int 
-> Text

Group name. Must not contain comma.

-> Text

Counter name. Must not contain comma.

-> m () 

Increment a counter by n.

sourceHandle :: MonadIO m => Handle -> ConduitT i Text m () Source #

Stream the contents of a Handle one line at a time as Text.

sinkHandle :: MonadIO m => Handle -> ConduitT Text o m () Source #

Stream data to a Handle, separated by \n.