{- | Module : Description : This module implements a channel concurrency primitive using Buffers. Maintainer : willig@ki.informatik.uni-frankfurt.de Stability : experimental Portability : non-portable (requires Futures) This module implements a channel synchronisation primitive using buffers that block on futures. A channel is a linked list of buffers. It has a read-end at one side and a write-end at the other. Elements put into the channel can be read out in a first in, first out order. A read and a write operation can be executed in parallel by several threads. A channel has no capacity bounding. The module contains similar functions as 'Control.Concurrent.Futures.Chan'. Warning: All operations on channels should only be used within the global wrapper function 'Futures.withFuturesDo'! -} module Control.Concurrent.Futures.Chan ( Chan, ChanType, newChan, writeChan, readChan, writeChanContents, getChanContents, mergeChan ) where -- qualified, because channel functions are also defined in -- Control.Concurrent import qualified Control.Concurrent import System.IO.Unsafe ( unsafeInterleaveIO ) import Control.Concurrent.Futures.Buffer import Control.Concurrent.Futures.Futures -- | A channel consists of a read-end buffer and a write-end buffer. -- The Itemtype is required to link the buffers in the channel. data Chan a = Chan (Buffer (ItemType a)) (Buffer (ItemType a)) type ChanType a = ((Buffer (ItemType a)), (Buffer (ItemType a))) type ItemType a = (Buffer(Item a)) data Item a = Item a (ItemType a) -- | Creates a new empty channel. newChan :: IO (Chan a) newChan = do hole <- newBuf read_end <- newBuf write_end <- newBuf putBuf read_end hole putBuf write_end hole return (Chan read_end write_end) -- | Writes one value to a channel. A 'writeChan' never blocks, since channels have -- no bounding limiters. writeChan :: Chan a -> a -> IO () writeChan (Chan read_end write_end) val = do new_hole <- newBuf old_hole <- getBuf write_end putBuf write_end new_hole putBuf old_hole (Item val new_hole) -- | Reads out an item from the read-head of the channel. -- It blocks on a empty channel. readChan :: Chan a -> IO a readChan (Chan read_end write_end) = do chan_head <- getBuf read_end (Item val content) <- getBuf chan_head putBuf read_end content return val -- | Implements the same behaviour as writeChanContents from the module Control.Concurrent.Chan. writeChanContents :: Chan a -> [a] -> IO () writeChanContents chan (x:xs) = do Control.Concurrent.forkIO (writeChan chan x) writeChanContents chan xs >>= return writeChanContents chan [] = return () -- | Implements the same behaviour as getChanContents from the module Control.Concurrent.Chan. -- It reads permanently from the channel. getChanContents :: Chan a -> IO [a] getChanContents ch = unsafeInterleaveIO ( do x <- readChan ch xs <- getChanContents ch return (x:xs) ) -- | Writes two equally typed lists to a given channel in parallel. mergeChan :: [a] -> [a] -> Chan a -> IO (Chan a) mergeChan l1 l2 cm = do Control.Concurrent.forkIO (merge l1 cm) Control.Concurrent.forkIO (merge l2 cm) return cm -- internal function merge (x:xs) c = do Control.Concurrent.forkIO (writeChan c x) (merge xs c) merge [] c = return ()