Safe Haskell | None |
---|---|
Language | Haskell2010 |
Synopsis
- buffer :: (CCatable c1 c2 c3, CRunnable c3, RunConstraints c3 m) => Natural -> c1 () x m () -> c2 x Void m r -> m r
- ($$&) :: (CCatable c1 c2 c3, CRunnable c3, RunConstraints c3 m) => c1 () x m () -> c2 x Void m r -> m r
- (=$=&) :: CCatable c1 c2 c3 => c1 i x m () -> c2 x o m r -> c3 i o m r
- ($=&) :: CCatable c1 c2 c3 => c1 i x m () -> c2 x o m r -> c3 i o m r
- (=$&) :: CCatable c1 c2 c3 => c1 i x m () -> c2 x o m r -> c3 i o m r
- class CCatable c1 c2 (c3 :: * -> * -> (* -> *) -> * -> *) | c1 c2 -> c3 where
- bufferToFile :: (CFConduitLike c1, CFConduitLike c2, Serialize x, MonadBaseControl IO m, MonadResource m, MonadThrow m) => Natural -> Maybe Int -> FilePath -> c1 () x m () -> c2 x Void m r -> m r
- bufferToFile' :: (CFConduitLike c1, CFConduitLike c2, Serialize x) => Natural -> Maybe Int -> FilePath -> c1 i x m () -> c2 x o m r -> CFConduit i o m r
- class CRunnable c where
- type RunConstraints c (m :: * -> *) :: Constraint
- runCConduit :: RunConstraints c m => c () Void m r -> m r
- data CConduit i o m r where
- link2 :: MonadBase IO m => Async a -> Async b -> m ()
- sender :: MonadIO m => TBQueue (Maybe o) -> ConduitT () o m () -> m ()
- stage :: (MonadBaseControl IO m, MonadIO m) => TBQueue (Maybe i) -> Async x -> CConduit i Void m r -> m r
- receiver :: MonadIO m => TBQueue (Maybe o) -> ConduitT () o m ()
- data CFConduit i o m r where
- class CFConduitLike a where
- asCFConduit :: a i o m r -> CFConduit i o m r
- data BufferContext m a = BufferContext {}
- fsender :: (MonadIO m, MonadResource m, Serialize x, MonadThrow m) => BufferContext m x -> ConduitT () x m () -> m ()
- fstage :: (MonadBaseControl IO m, MonadResource m, MonadThrow m) => ConduitT () i m () -> Async x -> CFConduit i Void m r -> m r
- freceiver :: MonadIO m => BufferContext m o -> ConduitT () o m ()
- persistChan :: (MonadIO m, MonadResource m, Serialize o, MonadThrow m) => BufferContext m o -> STM (m ())
- exhaust :: TBQueue a -> STM [a]
- recv :: MonadIO m => TBQueue a -> m a
- send :: MonadIO m => TBQueue a -> a -> m ()
- ccMap :: (forall i1. ConduitT i1 o1 m a -> ConduitT i1 o2 m a) -> CConduit i o1 m a -> CConduit i o2 m a
Documentation
:: (CCatable c1 c2 c3, CRunnable c3, RunConstraints c3 m) | |
=> Natural | Size of the bounded queue in memory. |
-> c1 () x m () | |
-> c2 x Void m r | |
-> m r |
Concurrently join the producer and consumer, using a bounded queue of the given size. The producer will block when the queue is full, if it is producing faster than the consumers is taking from it. Likewise, if the consumer races ahead, it will block until more input is available.
Exceptions are properly managed and propagated between the two sides, so the net effect should be equivalent to not using buffer at all, save for the concurrent interleaving of effects.
The underlying monad must always be an instance of
'MonadBaseControl IO'. If at least one of the two conduits is a
CFConduit
, it must additionally be a in instance of
MonadResource
.
This function is similar to $$
; for one more like =$=
, see
buffer'
.
>>>
buffer 1 (CL.sourceList [1,2,3]) CL.consume
[1,2,3]
($$&) :: (CCatable c1 c2 c3, CRunnable c3, RunConstraints c3 m) => c1 () x m () -> c2 x Void m r -> m r infixr 0 Source #
An operator form of buffer
. In general you should be able to replace
any use of $$
with $$&
and suddenly reap the benefit of
concurrency, if your conduits were spending time waiting on each other.
The underlying monad must always be an instance of
'MonadBaseControl IO'. If at least one of the two conduits is a
CFConduit
, it must additionally be a in instance of
MonadResource
.
>>>
CL.sourceList [1,2,3] $$& CL.consume
[1,2,3]
It can be combined with $=&
and $=
. This creates two threads;
the first thread produces the list and the second thread does the
map and the consume:
>>>
CL.sourceList [1,2,3] $$& mapC (*2) $= CL.consume
[2,4,6]
This creates three threads. The three conduits all run in their own threads:
>>>
CL.sourceList [1,2,3] $$& mapC (*2) $=& CL.consume
[2,4,6]
>>>
CL.sourceList [1,2,3] $$& (mapC (*2) $= mapC (+1)) $=& CL.consume
[3,5,7]
(=$=&) :: CCatable c1 c2 c3 => c1 i x m () -> c2 x o m r -> c3 i o m r infixr 2 Source #
An operator form of buffer'
. In general you should be able to replace
any use of =$=
with =$=&
and $$
either with $$&
or =$=
and runCConduit
and suddenly reap the benefit of concurrency, if
your conduits were spending time waiting on each other.
>>>
runCConduit $ CL.sourceList [1,2,3] =$=& CL.consume
[1,2,3]
class CCatable c1 c2 (c3 :: * -> * -> (* -> *) -> * -> *) | c1 c2 -> c3 where Source #
Conduits are concatenable; this class describes how. class CCatable (c1 :: * -> * -> (* -> *) -> * -> *) (c2 :: * -> * -> (* -> *) -> * -> *) (c3 :: * -> * -> (* -> *) -> * -> *) | c1 c2 -> c3 where
:: Natural | Size of the bounded queue in memory |
-> c1 i x m () | |
-> c2 x o m r | |
-> c3 i o m r |
Concurrently join the producer and consumer, using a bounded queue of the given size. The producer will block when the queue is full, if it is producing faster than the consumers is taking from it. Likewise, if the consumer races ahead, it will block until more input is available.
Exceptions are properly managed and propagated between the two sides, so the net effect should be equivalent to not using buffer at all, save for the concurrent interleaving of effects.
This function is similar to =$=
; for one more like $$
, see
buffer
.
>>>
runCConduit $ buffer' 1 (CL.sourceList [1,2,3]) CL.consume
[1,2,3]
Instances
:: (CFConduitLike c1, CFConduitLike c2, Serialize x, MonadBaseControl IO m, MonadResource m, MonadThrow m) | |
=> Natural | Size of the bounded queue in memory |
-> Maybe Int | Max elements to keep on disk at one time |
-> FilePath | Directory to write temp files to |
-> c1 () x m () | |
-> c2 x Void m r | |
-> m r |
Like buffer
, except that when the bounded queue is overflowed, the
excess is cached in a local file so that consumption from upstream may
continue. When the queue becomes exhausted by yielding, it is filled
from the cache until all elements have been yielded.
Note that the maximum amount of memory consumed is equal to (2 * memorySize + 1), so take this into account when picking a chunking size.
This function is similar to $$
; for one more like =$=
, see
bufferToFile'
.
>>>
runResourceT $ bufferToFile 1 Nothing "/tmp" (CL.sourceList [1,2,3]) CL.consume
[1,2,3]
:: (CFConduitLike c1, CFConduitLike c2, Serialize x) | |
=> Natural | Size of the bounded queue in memory |
-> Maybe Int | Max elements to keep on disk at one time |
-> FilePath | Directory to write temp files to |
-> c1 i x m () | |
-> c2 x o m r | |
-> CFConduit i o m r |
Like buffer'
, except that when the bounded queue is overflowed, the
excess is cached in a local file so that consumption from upstream may
continue. When the queue becomes exhausted by yielding, it is filled
from the cache until all elements have been yielded.
Note that the maximum amount of memory consumed is equal to (2 * memorySize + 1), so take this into account when picking a chunking size.
This function is similar to =$=
; for one more like $$
, see
bufferToFile
.
>>>
runResourceT $ runCConduit $ bufferToFile' 1 Nothing "/tmp" (CL.sourceList [1,2,3]) CL.consume
[1,2,3]
It is frequently convenient to define local function to use this in operator form:
>>>
:{
runResourceT $ do let buf c = bufferToFile' 10 Nothing "/tmp" c -- eta-conversion to avoid monomorphism restriction runCConduit $ CL.sourceList [0x30, 0x31, 0x32] `buf` mapC (toEnum :: Int -> Char) `buf` CL.consume :} "012"
class CRunnable c where Source #
Conduits are, once there's a producer on one end and a consumer on the other, runnable.
type RunConstraints c (m :: * -> *) :: Constraint Source #
runCConduit :: RunConstraints c m => c () Void m r -> m r Source #
Execute a conduit concurrently. This is the concurrent
equivalent of runConduit
.
The underlying monad must always be an instance of
'MonadBaseControl IO'. If the conduits is a CFConduit
, it must
additionally be a in instance of MonadResource
.
Instances
CRunnable ConduitT Source # | |
Defined in Ribosome.Data.Conduit.Composition type RunConstraints ConduitT m Source # runCConduit :: RunConstraints ConduitT m => ConduitT () Void m r -> m r Source # | |
CRunnable CFConduit Source # | |
Defined in Ribosome.Data.Conduit.Composition type RunConstraints CFConduit m Source # runCConduit :: RunConstraints CFConduit m => CFConduit () Void m r -> m r Source # | |
CRunnable CConduit Source # | |
Defined in Ribosome.Data.Conduit.Composition type RunConstraints CConduit m Source # runCConduit :: RunConstraints CConduit m => CConduit () Void m r -> m r Source # |
data CConduit i o m r where Source #
A "concurrent conduit", in which the stages run in parallel with a buffering queue between them.
Single :: ConduitT i o m r -> CConduit i o m r | |
Multiple :: Natural -> ConduitT i x m () -> CConduit x o m r -> CConduit i o m r |
Instances
CFConduitLike CConduit Source # | |
Defined in Ribosome.Data.Conduit.Composition | |
CRunnable CConduit Source # | |
Defined in Ribosome.Data.Conduit.Composition type RunConstraints CConduit m Source # runCConduit :: RunConstraints CConduit m => CConduit () Void m r -> m r Source # | |
CCatable ConduitT ConduitT CConduit Source # | |
CCatable ConduitT CConduit CConduit Source # | |
CCatable CFConduit CConduit CFConduit Source # | |
CCatable CConduit ConduitT CConduit Source # | |
CCatable CConduit CFConduit CFConduit Source # | |
CCatable CConduit CConduit CConduit Source # | |
type RunConstraints CConduit m Source # | |
Defined in Ribosome.Data.Conduit.Composition |
stage :: (MonadBaseControl IO m, MonadIO m) => TBQueue (Maybe i) -> Async x -> CConduit i Void m r -> m r Source #
data CFConduit i o m r where Source #
A "concurrent conduit", in which the stages run in parallel with a buffering queue and possibly a disk file between them.
FSingle :: ConduitT i o m r -> CFConduit i o m r | |
FMultiple :: Natural -> ConduitT i x m () -> CFConduit x o m r -> CFConduit i o m r | |
FMultipleF :: Serialize x => Natural -> Maybe Int -> FilePath -> ConduitT i x m () -> CFConduit x o m r -> CFConduit i o m r |
Instances
CFConduitLike CFConduit Source # | |
Defined in Ribosome.Data.Conduit.Composition | |
CRunnable CFConduit Source # | |
Defined in Ribosome.Data.Conduit.Composition type RunConstraints CFConduit m Source # runCConduit :: RunConstraints CFConduit m => CFConduit () Void m r -> m r Source # | |
CCatable ConduitT CFConduit CFConduit Source # | |
CCatable CFConduit ConduitT CFConduit Source # | |
CCatable CFConduit CFConduit CFConduit Source # | |
CCatable CFConduit CConduit CFConduit Source # | |
CCatable CConduit CFConduit CFConduit Source # | |
type RunConstraints CFConduit m Source # | |
Defined in Ribosome.Data.Conduit.Composition |
class CFConduitLike a where Source #
asCFConduit :: a i o m r -> CFConduit i o m r Source #
Instances
CFConduitLike ConduitT Source # | |
Defined in Ribosome.Data.Conduit.Composition | |
CFConduitLike CFConduit Source # | |
Defined in Ribosome.Data.Conduit.Composition | |
CFConduitLike CConduit Source # | |
Defined in Ribosome.Data.Conduit.Composition |
data BufferContext m a Source #
fsender :: (MonadIO m, MonadResource m, Serialize x, MonadThrow m) => BufferContext m x -> ConduitT () x m () -> m () Source #
fstage :: (MonadBaseControl IO m, MonadResource m, MonadThrow m) => ConduitT () i m () -> Async x -> CFConduit i Void m r -> m r Source #
persistChan :: (MonadIO m, MonadResource m, Serialize o, MonadThrow m) => BufferContext m o -> STM (m ()) Source #