{-# LANGUAGE FlexibleContexts, TypeFamilies, DeriveDataTypeable, Trustworthy #-} module Control.CUtils.BoundedQueue (BoundedQueue, newRB, writeRB, readRB, lengthRB, getSizeRB) where import qualified Control.Concurrent.QSem as S import Control.Concurrent.MVar import Control.Monad.ST import Control.Exception.Assert import Control.Monad import Control.Exception import Data.IORef import Data.Vector.Generic.Mutable hiding (MVector) import qualified Data.Vector.Generic.Mutable as MV import Data.Vector.Mutable (MVector) import Data.Data import Prelude hiding (length, read) data BoundedQueue t = BoundedQueue { vector_ :: !(MVector RealWorld t), range_mvar_ :: !(MVar(Int,Int)), empty_ssem_ :: !S.QSem, full_ssem_ :: !S.QSem } deriving (Typeable) -- The strictest possible constraint on these semaphores would be the equation -- empty+full = n. This equation is relaxed to the inequality empty+full <= n, -- so that producers/consumers may pass through intermediate states where the -- total is less than n. instance (Typeable t) => Data(BoundedQueue t) newRB :: (MV.MVector MVector t) => Int -> IO(BoundedQueue t) newRB n | n<=0 = throwIO$ErrorCall"newRB: size must be greater than zero" newRB n = liftM4 BoundedQueue (new n) (newMVar$!(0,0)) -- (lowest populated index, number of elements) (S.newQSem 0) (S.newQSem n)-- start out empty writeRB :: BoundedQueue t -> t -> IO() {-# INLINABLE writeRB #-} writeRB rb x = do S.waitQSem(full_ssem_ rb) -- Wait until space becomes free in the buffer. -- putStrLn"f" b <- modifyMVar(range_mvar_ rb) f S.signalQSem(empty_ssem_ rb) where f (low,size) = do let nLen= length(vector_ rb) write(vector_ rb) ((low+size)`mod`nLen) x let size' = succ size assert(size'>=0 && size'<=nLen) (return()) return$!((low,size'), size/=nLen) readRB :: BoundedQueue t -> IO t {-# INLINABLE readRB #-} readRB rb = do S.waitQSem(empty_ssem_ rb) -- putStrLn"e" (b,x) <- modifyMVar(range_mvar_ rb) f S.signalQSem(full_ssem_ rb) return x where f (low,size) = do let nLen = length(vector_ rb) x <- read(vector_ rb) low let low' = succ low `mod` nLen let size' = pred size assert(size'>=0 && size'<=nLen) (return()) return$!((low',size'), (size'/=0,x)) lengthRB :: BoundedQueue t -> Int {-# INLINE lengthRB #-} -- | Read the maximum size of the bounded queue. lengthRB = length. vector_ getSizeRB :: BoundedQueue t -> IO Int {-# INLINE getSizeRB #-} -- | Do not rely on this information being a timely reflection of what happens to -- the buffer. getSizeRB = liftM snd. readMVar. range_mvar_