broadcast-chan-0.2: Closable, fair, single-wakeup channel type that avoids 0 reader space leaks.

Copyright(C) 2014-2018 Merijn Verstraaten
LicenseBSD-style (see the file LICENSE)
MaintainerMerijn Verstraaten <merijn@inconsistent.nl>
Stabilityexperimental
Portabilityhaha
Safe HaskellSafe
LanguageHaskell2010

BroadcastChan.Extra

Description

Functions in this module are *NOT* intended to be used by regular users of the library. Rather, they are intended for implementing parallel processing libraries on top of broadcast-chan, such as broadcast-chan-conduit.

This module, while not for end users, is considered part of the public API, so users can rely on PVP bounds to avoid breakage due to changes to this module.

Synopsis

Documentation

data Action Source #

Action to take when an exception occurs while processing an element.

Constructors

Drop

Drop the current element and continue processing.

Retry

Retry by appending the current element to the queue of remaining elements.

Terminate

Stop all processing and reraise the exception.

Instances
Eq Action Source # 
Instance details

Methods

(==) :: Action -> Action -> Bool #

(/=) :: Action -> Action -> Bool #

Show Action Source # 
Instance details

data BracketOnError m r Source #

Allocation, cleanup, and work actions for parallel processing. These should be passed to an appropriate bracketOnError function.

Constructors

Bracket 

Fields

  • allocate :: IO [Weak ThreadId]

    Allocation action that spawn threads and sets up handlers.

  • cleanup :: [Weak ThreadId] -> IO ()

    Cleanup action that handles exceptional termination

  • action :: m r

    Action that performs actual processing and waits for processing to finish and threads to terminate.

data Handler m a Source #

Exception handler for parallel processing.

Constructors

Simple Action

Always take the specified Action.

Handle (a -> SomeException -> m Action)

Allow inspection of the element, exception, and execution of monadic actions before deciding the Action to take.

mapHandler :: (m Action -> n Action) -> Handler m a -> Handler n a Source #

Convenience function for changing the monad the exception handler runs in.

runParallel Source #

Arguments

:: (MonadIO m, MonadIO n) 
=> Either (b -> n r) (r -> b -> n r)

Output yielder

-> Handler IO a

Parallel processing exception handler

-> Int

Number of threads to use

-> (a -> IO b)

Function to run in parallel

-> ((a -> m ()) -> (a -> m b) -> n r)

"Stream" processing function

-> n (BracketOnError n r) 

Sets up parallel processing.

The workhorses of this function are the output yielder and "stream" processing functions.

The output yielder is responsible for handling the produced b values, which if can either yield downstream (Left) when used with something like conduit or pipes, or fold into a single results (Right) when used to run IO in parallel.

The stream processing function gets two arguments:

a -> m ()
Should be used to buffer a number of elements equal to the number of threads.
a -> m b
Which should be used to process the remainder of the element stream via, for example, mapM.

See BroadcastChan or broadcast-chan-conduit for examples.

The returned BracketOnError has a allocate action that takes care of setting up forkIO threads and exception handlers. The cleanup action ensures all threads are terminate in case of an exception. Finally, action performs the actual parallel processing of elements.

runParallel_ Source #

Arguments

:: (MonadIO m, MonadIO n) 
=> Handler IO a

Parallel processing exception handler

-> Int

Number of threads to use

-> (a -> IO ())

Function to run in parallel

-> ((a -> m ()) -> n r)

"Stream" processing function

-> n (BracketOnError n r) 

Sets up parallel processing for functions where we ignore the result.

The stream processing argument is the workhorse of this function. It gets a (rate-limited) function a -> m () that queues a values for processing. This function should be applied to all a elements that should be processed. This would be either a partially applied forM_ for parallel processing, or something like conduit's mapM_ to construct a "sink" for a values. See BroadcastChan or broadcast-chan-conduit for examples.

The returned BracketOnError has a allocate action that takes care of setting up forkIO threads and exception handlers. The cleanup action ensures all threads are terminate in case of an exception. Finally, action performs the actual parallel processing of elements.