unagi-chan-0.4.1.4: Fast concurrent queues with a Chan-like API, and more
Safe HaskellNone
LanguageHaskell2010

Control.Concurrent.Chan.Unagi.NoBlocking

Synopsis

Documentation

General-purpose concurrent FIFO queue without blocking reads, and with optimized variants for single-threaded producers and/or consumers. This variant, and even more so the SP/SC variants, offer the lowest latency of all of the implementations in this library.

Creating channels

newChan :: IO (InChan a, OutChan a) Source #

Create a new channel, returning its write and read ends.

data InChan a Source #

The write end of a channel created with newChan.

Instances

Instances details
Eq (InChan a) Source # 
Instance details

Defined in Control.Concurrent.Chan.Unagi.NoBlocking.Internal

Methods

(==) :: InChan a -> InChan a -> Bool #

(/=) :: InChan a -> InChan a -> Bool #

data OutChan a Source #

The read end of a channel created with newChan.

Instances

Instances details
Eq (OutChan a) Source # 
Instance details

Defined in Control.Concurrent.Chan.Unagi.NoBlocking.Internal

Methods

(==) :: OutChan a -> OutChan a -> Bool #

(/=) :: OutChan a -> OutChan a -> Bool #

Channel operations

Reading

tryReadChan :: OutChan a -> IO (Element a) Source #

Returns immediately with an Element a future, which returns one unique element when it becomes available via tryRead.

Note: This is a destructive operation. See Element for more details.

Note re. exceptions: When an async exception is raised during a tryReadChan the message that the read would have returned is likely to be lost, just as it would be when raised directly after this function returns.

readChan :: IO () -> OutChan a -> IO a Source #

readChan io c returns the next element from c, calling tryReadChan and looping on the Element returned, and calling io at each iteration when the element is not yet available. It throws BlockedIndefinitelyOnMVar when isActive determines that a value will never be returned.

When used like readChan yield or readChan (threadDelay 10) this is the semantic equivalent to the blocking readChan in the other implementations.

newtype Element a Source #

An IO action that returns a particular enqueued element when and if it becomes available.

Each Element corresponds to a particular enqueued element, i.e. a returned Element always offers the only means to access one particular enqueued item. The value returned by tryRead moves monotonically from Nothing to Just a when and if an element becomes available, and is idempotent at that point.

So for instance:

   (in, out) <- newChan
   (el, _) <- tryReadChan out  -- READ FROM EMPTY CHAN
   writeChan in "msg1"
   writeChan in "msg2"
   readChan out        -- RETURNS "msg2"
   tryRead el          -- RETURNS "msg1" (which would otherwise be lost)

Constructors

Element 

Fields

Instances

Instances details
Monad Element Source # 
Instance details

Defined in Control.Concurrent.Chan.Unagi.NoBlocking.Types

Methods

(>>=) :: Element a -> (a -> Element b) -> Element b #

(>>) :: Element a -> Element b -> Element b #

return :: a -> Element a #

Functor Element Source # 
Instance details

Defined in Control.Concurrent.Chan.Unagi.NoBlocking.Types

Methods

fmap :: (a -> b) -> Element a -> Element b #

(<$) :: a -> Element b -> Element a #

MonadFix Element Source # 
Instance details

Defined in Control.Concurrent.Chan.Unagi.NoBlocking.Types

Methods

mfix :: (a -> Element a) -> Element a #

MonadFail Element Source # 
Instance details

Defined in Control.Concurrent.Chan.Unagi.NoBlocking.Types

Methods

fail :: String -> Element a #

Applicative Element Source # 
Instance details

Defined in Control.Concurrent.Chan.Unagi.NoBlocking.Types

Methods

pure :: a -> Element a #

(<*>) :: Element (a -> b) -> Element a -> Element b #

liftA2 :: (a -> b -> c) -> Element a -> Element b -> Element c #

(*>) :: Element a -> Element b -> Element b #

(<*) :: Element a -> Element b -> Element a #

Alternative Element Source # 
Instance details

Defined in Control.Concurrent.Chan.Unagi.NoBlocking.Types

Methods

empty :: Element a #

(<|>) :: Element a -> Element a -> Element a #

some :: Element a -> Element [a] #

many :: Element a -> Element [a] #

MonadPlus Element Source # 
Instance details

Defined in Control.Concurrent.Chan.Unagi.NoBlocking.Types

Methods

mzero :: Element a #

mplus :: Element a -> Element a -> Element a #

Utilities

isActive :: OutChan a -> IO Bool Source #

An action that returns False sometime after the chan no longer has any writers.

After False is returned, any tryRead which returns Nothing can be considered to be dead. Likewise for tryReadNext. Note that in the blocking implementations a BlockedIndefinitelyOnMVar exception is raised, so this function is unnecessary.

Writing

writeChan :: InChan a -> a -> IO () Source #

Write a value to the channel.

writeList2Chan :: InChan a -> [a] -> IO () Source #

Write an entire list of items to a chan type. Writes here from multiple threads may be interleaved, and infinite lists are supported.

Broadcasting

dupChan :: InChan a -> IO (OutChan a) Source #

Duplicate a chan: the returned OutChan begins empty, but data written to the argument InChan from then on will be available from both the original OutChan and the one returned here, creating a kind of broadcast channel.

See also streamChan for a faster alternative that might be appropriate.

Streaming

newtype Stream a Source #

An infinite stream of elements. tryReadNext can be called any number of times from multiple threads, and returns a value which moves monotonically from Pending to Next if and when a head element becomes available. isActive can be used to determine if the stream has expired.

Constructors

Stream 

Fields

data Next a Source #

Constructors

Next a (Stream a)

The next head element along with the tail Stream.

Pending

The next element is not yet in the queue; you can retry tryReadNext until a Next is returned.

streamChan :: Int -> OutChan a -> IO [Stream a] Source #

Produce the specified number of interleaved "streams" from a chan. Nextuming a Stream is much faster than calling tryReadChan, and might be useful when an MPSC queue is needed, or when multiple consumers should be load-balanced in a round-robin fashion.

Usage example:

  do mapM_ (writeChan i) [1..9]
     [str1, str2, str2] <- streamChan 3 o
     forkIO $ printStream str1   -- prints: 1,4,7
     forkIO $ printStream str2   -- prints: 2,5,8
     forkIO $ printStream str3   -- prints: 3,6,9
   where 
     printStream str = do
       h <- tryReadNext str
       case h of
         Next a str' -> print a >> printStream str'
         -- We know that all values were already written, so a Pending tells 
         -- us we can exit; in other cases we might call yield and then 
         -- retry that same tryReadNext str:
         Pending -> return ()

Be aware: if one stream consumer falls behind another (e.g. because it is slower) the number of elements in the queue which can't be GC'd will grow. You may want to do some coordination of Stream consumers to prevent this.