pipes-async-0.1.0: A higher-level interface to using concurrency with pipes

Safe HaskellNone
LanguageHaskell2010

Pipes.Async

Synopsis

Documentation

buffer Source

Arguments

:: (MonadBaseControl IO m, MonadBaseControl IO (Base m), MonadSafe m, MonadIO m, MonadMask m) 
=> Int

Number of slots in the bounded queue

-> Proxy a' a () b m r

Upstream producer

-> Proxy () b c' c m r

Downstream consumer

-> Proxy a' a c' c m r 

A substitute for >-> that executes both the upstream producer and downstream consumer in separate threads (see >&> for an operator version, with a default queue size of 16 slots). The reason separate threads are used for both sides is so that the current thread (running runEffect or toListM, for example) can manage the bidirectional semantics for the resulting Proxy. That is:

Upstream is executed in task A, downstream in task B, and runEffect in the parent thread. Tasks A and B are connected so that b values produced in A are immediately enqueued and available to B. runEffect does not manage passing b values from A to B, as it normally would; rather they flow directly through a TBQueue side-channel.

If upstream should attempt to send an a' value further upstream, expecting an a in return, this will block task A as runEffect sends the request further up the chain. Or, should downstream send a c value downstream and expect a c', it will block task B as runEffect sends the response further down the chain.

If upstream exits, its result value is enqueued until downstream sees it, at which point runEffect terminates with this value. However, if downstream should exit first, this result is communicated directly to runEffect, which returns it immediately, canceling both threads. Thus, execution lifetime is biased toward the downstream consumer, since it is more likely that downstream will consume elements until there are none left, than that upstream would produce elements while waiting for downstream to terminate.

If an exception occurs in either upstream or downstream, it is re-thrown in the runEffect thread. Also, no matter what happens, both the upstream and downstream threads are canceled at the conclusion of the enclosing MonadSafe block.

Note: Using >&> should be a drop-in replacement for >-> anywhere it is used, without changing the meaning of the pipeline; however, how the composition is associated has an effect on the concurrency. For example, a >-> (b >&> c) causes b and c to be executed concurrently, with effects from a occuring in the parent thread (while b blocks waiting on the response). By contrast, (a >-> b) >&> c executes a >-> b and c concurrently, with nothing happening in the parent thread except to wait on the final result. This will generally be faster since value passing through MVar will not be necessary. This is also the default interpretation of a >-> b >&> c, since both operators left-associate at the same level.

(>&>) :: (MonadBaseControl IO m, MonadBaseControl IO (Base m), MonadSafe m, MonadIO m, MonadMask m) => Proxy a' a () b m r -> Proxy () b c' c m r -> Proxy a' a c' c m r infixl 7 Source