Z-IO-1.0.0.0: Simple and high performance IO toolkit for Haskell
Copyright(c) Dong Han 2017-2020
LicenseBSD
Maintainerwinterland1989@gmail.com
Stabilityexperimental
Portabilitynon-portable
Safe HaskellNone
LanguageHaskell2010

Z.IO.BIO.Concurrent

Description

This module provides some concurrent BIO node, to ease the implementation of producer-consumer model. All sources and sinks return by this module are safe to be used in multiple threads.

  • Use newTQueuePair for common cases.
  • Use newTBQueuePair if you have a fast producer and you don't want input get piled up in memory.
  • Use newBroadcastTChanPair if you want messages get broadcasted, i.e. every message written by producers will be received by every consumers.

It's important to correctly set the numebr of producers, internally it keeps a counter on how many producers reached their ends, and send EOF to all consumers when last producer ends. So it's a good idea to catch exceptions and pull the sink(which indicate EOF) on producer side.

(sink, src) <- newTQueuePair 2  -- it's important to correctly set the numebr of producers

--------------------------------------------------------------------------------
-- producers

forkIO $ do
    ...
    push x sink             -- producer using push
    ...
    pull sink               -- when EOF is reached, manually pull, you may consider put it in a bracket.

forkIO $ do
    ...
    (runBIO $ ... . sink) -- producer using BIO
        onException (pull sink)

--------------------------------------------------------------------------------
-- consumers

forkIO $ do
    ...
    r <- pull src           -- consumer using pull
    case r of Just r' -> ...
              _ -> ...      -- EOF indicate all producers reached EOF

forkIO $ do
    ...
    runBIO $ src . ...    -- consumer using BIO
Synopsis

Documentation

zip :: BIO a b -> BIO a c -> BIO a (b, c) Source #

Zip two BIO node by running them concurrently.

This implementation use MVar to synchronize two BIO's output, which has some implications:

  • Two node should output same numebr of results.
  • If the number differs, one node maybe

newTQueuePair Source #

Arguments

:: Int

number of producers

-> IO (Sink a, Source a) 

Make an unbounded queue and a pair of sink and souce connected to it.

newTBQueuePair Source #

Arguments

:: Int

number of producers

-> Natural

queue buffer bound

-> IO (Sink a, Source a) 

Make an bounded queue and a pair of sink and souce connected to it.

newBroadcastTChanPair Source #

Arguments

:: Int

number of producers

-> IO (Sink a, IO (Source a))

(Sink, IO Source)

Make a broadcast chan and a sink connected to it, and a function return sources to receive broadcast message.