-- |Implements bounded channels. These channels differ from normal 'Chan's in -- that they are guaranteed to contain no more than a certain number of -- elements. This is ideal when you may be writing to a channel faster than you -- are able to read from it. -- -- This module supports all the functions of "Control.Concurrent.Chan" except -- 'unGetChan' and 'dupChan', which are not supported for bounded channels. -- -- Extra consitency: This version enforces that if thread Alice writes -- e1 followed by e2 then e1 will be returned by readChan before e2. -- Conversely, if thead Bob reads e1 followed by e2 then it was true that -- writeChan e1 preceded writeChan e2. -- -- Previous versions did not enforce this consistency: if writeChan were -- preempted between putMVars or killThread arrived between putMVars then it -- can fail. Similarly it might fail if readChan were stopped after putMVar -- and before the second takeMVar. An unlucky pattern of several such deaths -- might actually break the invariants of the array in an unrecoverable way -- causing all future reads and writes to block. module Control.Concurrent.BoundedChan( BoundedChan , newBoundedChan , writeChan , readChan , isEmptyChan , getChanContents , writeList2Chan ) where import Control.Concurrent.MVar (MVar, isEmptyMVar, newEmptyMVar, newMVar, putMVar, takeMVar) import Control.Exception (mask_, onException) import Control.Monad (replicateM) import Data.Array (Array, (!), listArray) import System.IO.Unsafe (unsafeInterleaveIO) -- |'BoundedChan' is an abstract data type representing a bounded channel. data BoundedChan a = BC { _size :: Int , _contents :: Array Int (MVar a) , _writePos :: MVar Int , _readPos :: MVar Int } -- Versions of modifyMVar and withMVar that do not 'restore' the previous mask state when running -- 'io', with added modification strictness. The lack of 'restore' may make these perform better -- than the normal version. Moving strictness here makes using them more pleasant. {-# INLINE modifyMVar_mask #-} modifyMVar_mask :: MVar a -> (a -> IO (a,b)) -> IO b modifyMVar_mask m io = mask_ $ do a <- takeMVar m (a',b) <- io a `onException` putMVar m a putMVar m $! a' return b {-# INLINE modifyMVar_mask_ #-} modifyMVar_mask_ :: MVar a -> (a -> IO a) -> IO () modifyMVar_mask_ m io = mask_ $ do a <- takeMVar m a' <- io a `onException` putMVar m a putMVar m $! a' {-# INLINE withMVar_mask #-} withMVar_mask :: MVar a -> (a -> IO b) -> IO b withMVar_mask m io = mask_ $ do a <- takeMVar m b <- io a `onException` putMVar m a putMVar m a return b -- |@newBoundedChan n@ returns a channel than can contain no more than @n@ -- elements. newBoundedChan :: Int -> IO (BoundedChan a) newBoundedChan x = do entls <- replicateM x newEmptyMVar wpos <- newMVar 0 rpos <- newMVar 0 let entries = listArray (0, x - 1) entls return (BC x entries wpos rpos) -- |Write an element to the channel. If the channel is full, this routine will -- block until it is able to write. Blockers wait in a fair FIFO queue. writeChan :: BoundedChan a -> a -> IO () writeChan (BC size contents wposMV _) x = modifyMVar_mask_ wposMV $ \wpos -> do putMVar (contents ! wpos) x return ((succ wpos) `mod` size) -- only advance when putMVar succeeds -- |Read an element from the channel. If the channel is empty, this routine -- will block until it is able to read. Blockers wait in a fair FIFO queue. readChan :: BoundedChan a -> IO a readChan (BC size contents _ rposMV) = modifyMVar_mask rposMV $ \rpos -> do a <- takeMVar (contents ! rpos) return ((succ rpos) `mod` size, a) -- only advance when takeMVar succeeds -- |DANGER: This may block on an empty channel if there is already a blocked reader. -- Returns 'True' if the supplied channel is empty. -- -- DEPRECATED {-# DEPRECATED isEmptyChan "This isEmptyChan can block, no non-blocking substitute yet" #-} isEmptyChan :: BoundedChan a -> IO Bool isEmptyChan (BC _ contents _ rposMV) = withMVar_mask rposMV $ \rpos -> isEmptyMVar (contents ! rpos) -- |Return a lazy list representing the contents of the supplied channel. Competing -- readers might steal from this list. getChanContents :: BoundedChan a -> IO [a] getChanContents ch = unsafeInterleaveIO $ do x <- readChan ch xs <- getChanContents ch return (x:xs) -- |Write a list of elements to the channel. If the channel becomes full, this -- routine will block until it is able to write. Competing writers may interleave with -- this one. writeList2Chan :: BoundedChan a -> [a] -> IO () writeList2Chan = mapM_ . writeChan