{-# LANGUAGE Trustworthy #-} {-# LANGUAGE CPP #-} ----------------------------------------------------------------------------- -- | -- Module : Control.Concurrent.Chan -- Copyright : (c) The University of Glasgow 2001 -- License : BSD-style (see the file libraries/base/LICENSE) -- -- Maintainer : libraries@haskell.org -- Stability : experimental -- Portability : non-portable (concurrency) -- -- Unbounded channels. -- -- The channels are implemented with @MVar@s and therefore inherit all the -- caveats that apply to @MVar@s (possibility of races, deadlocks etc). The -- stm (software transactional memory) library has a more robust implementation -- of channels called @TChan@s. -- ----------------------------------------------------------------------------- module Control.Concurrent.Chan ( -- * The 'Chan' type Chan, -- abstract -- * Operations newChan, writeChan, readChan, dupChan, -- * Stream interface getChanContents, writeList2Chan, ) where import System.IO.Unsafe ( unsafeInterleaveIO ) import Control.Concurrent.MVar import Control.Exception (mask_) #define _UPK_(x) {-# UNPACK #-} !(x) -- A channel is represented by two @MVar@s keeping track of the two ends -- of the channel contents,i.e., the read- and write ends. Empty @MVar@s -- are used to handle consumers trying to read from an empty channel. -- |'Chan' is an abstract type representing an unbounded FIFO channel. data Chan a = Chan _UPK_(MVar (Stream a)) _UPK_(MVar (Stream a)) -- Invariant: the Stream a is always an empty MVar deriving Eq -- ^ @since 4.4.0.0 type Stream a = MVar (ChItem a) data ChItem a = ChItem a _UPK_(Stream a) -- benchmarks show that unboxing the MVar here is worthwhile, because -- although it leads to higher allocation, the channel data takes up -- less space and is therefore quicker to GC. -- See the Concurrent Haskell paper for a diagram explaining the -- how the different channel operations proceed. -- @newChan@ sets up the read and write end of a channel by initialising -- these two @MVar@s with an empty @MVar@. -- |Build and returns a new instance of 'Chan'. newChan :: IO (Chan a) newChan :: IO (Chan a) newChan = do MVar (ChItem a) hole <- IO (MVar (ChItem a)) forall a. IO (MVar a) newEmptyMVar MVar (MVar (ChItem a)) readVar <- MVar (ChItem a) -> IO (MVar (MVar (ChItem a))) forall a. a -> IO (MVar a) newMVar MVar (ChItem a) hole MVar (MVar (ChItem a)) writeVar <- MVar (ChItem a) -> IO (MVar (MVar (ChItem a))) forall a. a -> IO (MVar a) newMVar MVar (ChItem a) hole Chan a -> IO (Chan a) forall (m :: * -> *) a. Monad m => a -> m a return (MVar (MVar (ChItem a)) -> MVar (MVar (ChItem a)) -> Chan a forall a. MVar (Stream a) -> MVar (Stream a) -> Chan a Chan MVar (MVar (ChItem a)) readVar MVar (MVar (ChItem a)) writeVar) -- To put an element on a channel, a new hole at the write end is created. -- What was previously the empty @MVar@ at the back of the channel is then -- filled in with a new stream element holding the entered value and the -- new hole. -- |Write a value to a 'Chan'. writeChan :: Chan a -> a -> IO () writeChan :: Chan a -> a -> IO () writeChan (Chan MVar (Stream a) _ MVar (Stream a) writeVar) a val = do Stream a new_hole <- IO (Stream a) forall a. IO (MVar a) newEmptyMVar IO () -> IO () forall a. IO a -> IO a mask_ (IO () -> IO ()) -> IO () -> IO () forall a b. (a -> b) -> a -> b $ do Stream a old_hole <- MVar (Stream a) -> IO (Stream a) forall a. MVar a -> IO a takeMVar MVar (Stream a) writeVar Stream a -> ChItem a -> IO () forall a. MVar a -> a -> IO () putMVar Stream a old_hole (a -> Stream a -> ChItem a forall a. a -> Stream a -> ChItem a ChItem a val Stream a new_hole) MVar (Stream a) -> Stream a -> IO () forall a. MVar a -> a -> IO () putMVar MVar (Stream a) writeVar Stream a new_hole -- The reason we don't simply do this: -- -- modifyMVar_ writeVar $ \old_hole -> do -- putMVar old_hole (ChItem val new_hole) -- return new_hole -- -- is because if an asynchronous exception is received after the 'putMVar' -- completes and before modifyMVar_ installs the new value, it will set the -- Chan's write end to a filled hole. -- |Read the next value from the 'Chan'. Blocks when the channel is empty. Since -- the read end of a channel is an 'MVar', this operation inherits fairness -- guarantees of 'MVar's (e.g. threads blocked in this operation are woken up in -- FIFO order). -- -- Throws 'Control.Exception.BlockedIndefinitelyOnMVar' when the channel is -- empty and no other thread holds a reference to the channel. readChan :: Chan a -> IO a readChan :: Chan a -> IO a readChan (Chan MVar (Stream a) readVar MVar (Stream a) _) = do MVar (Stream a) -> (Stream a -> IO (Stream a, a)) -> IO a forall a b. MVar a -> (a -> IO (a, b)) -> IO b modifyMVar MVar (Stream a) readVar ((Stream a -> IO (Stream a, a)) -> IO a) -> (Stream a -> IO (Stream a, a)) -> IO a forall a b. (a -> b) -> a -> b $ \Stream a read_end -> do (ChItem a val Stream a new_read_end) <- Stream a -> IO (ChItem a) forall a. MVar a -> IO a readMVar Stream a read_end -- Use readMVar here, not takeMVar, -- else dupChan doesn't work (Stream a, a) -> IO (Stream a, a) forall (m :: * -> *) a. Monad m => a -> m a return (Stream a new_read_end, a val) -- |Duplicate a 'Chan': the duplicate channel begins empty, but data written to -- either channel from then on will be available from both. Hence this creates -- a kind of broadcast channel, where data written by anyone is seen by -- everyone else. -- -- (Note that a duplicated channel is not equal to its original. -- So: @fmap (c /=) $ dupChan c@ returns @True@ for all @c@.) dupChan :: Chan a -> IO (Chan a) dupChan :: Chan a -> IO (Chan a) dupChan (Chan MVar (Stream a) _ MVar (Stream a) writeVar) = do Stream a hole <- MVar (Stream a) -> IO (Stream a) forall a. MVar a -> IO a readMVar MVar (Stream a) writeVar MVar (Stream a) newReadVar <- Stream a -> IO (MVar (Stream a)) forall a. a -> IO (MVar a) newMVar Stream a hole Chan a -> IO (Chan a) forall (m :: * -> *) a. Monad m => a -> m a return (MVar (Stream a) -> MVar (Stream a) -> Chan a forall a. MVar (Stream a) -> MVar (Stream a) -> Chan a Chan MVar (Stream a) newReadVar MVar (Stream a) writeVar) -- Operators for interfacing with functional streams. -- |Return a lazy list representing the contents of the supplied -- 'Chan', much like 'System.IO.hGetContents'. getChanContents :: Chan a -> IO [a] getChanContents :: Chan a -> IO [a] getChanContents Chan a ch = IO [a] -> IO [a] forall a. IO a -> IO a unsafeInterleaveIO (do a x <- Chan a -> IO a forall a. Chan a -> IO a readChan Chan a ch [a] xs <- Chan a -> IO [a] forall a. Chan a -> IO [a] getChanContents Chan a ch [a] -> IO [a] forall (m :: * -> *) a. Monad m => a -> m a return (a xa -> [a] -> [a] forall a. a -> [a] -> [a] :[a] xs) ) -- |Write an entire list of items to a 'Chan'. writeList2Chan :: Chan a -> [a] -> IO () writeList2Chan :: Chan a -> [a] -> IO () writeList2Chan Chan a ch [a] ls = [IO ()] -> IO () forall (t :: * -> *) (m :: * -> *) a. (Foldable t, Monad m) => t (m a) -> m () sequence_ ((a -> IO ()) -> [a] -> [IO ()] forall a b. (a -> b) -> [a] -> [b] map (Chan a -> a -> IO () forall a. Chan a -> a -> IO () writeChan Chan a ch) [a] ls)