{-# LANGUAGE FlexibleContexts, TypeFamilies, DeriveDataTypeable, Trustworthy #-} module Control.CUtils.BoundedQueue (BoundedQueue, newRB, writeRB, readRB, lengthRB, getSizeRB) where --import qualified Control.Concurrent.SSem as S import Control.Concurrent.MVar import Control.Monad.ST import Control.Exception.Assert import Control.Monad import Control.Exception import Data.IORef import Data.Vector.Generic.Mutable hiding (MVector) import qualified Data.Vector.Generic.Mutable as MV import Data.Vector.Mutable (MVector) import Data.Data import Control.Parallel.Strategies import Control.CUtils.Semaphore import Prelude hiding (length, read) data BoundedQueue t = BoundedQueue { vector_ :: !(MVector RealWorld t), range_mvar_ :: !(MVar(Int,Int)), empty_ssem_ :: !Sem, full_ssem_ :: !Sem } deriving (Typeable) -- The strictest possible constraint on these semaphores would be the equation -- empty+full = n. This equation is relaxed to the inequality empty+full <= n, -- so that producers/consumers may pass through intermediate states where the -- total is less than n. instance (Typeable t) => Data(BoundedQueue t) newRB :: (MV.MVector MVector t) => Int -> IO(BoundedQueue t) newRB n | n<=0 = throwIO$ErrorCall"newRB: require positive size" newRB n = liftM4 BoundedQueue (new n) (newMVar$!(0,0)) -- (lowest populated index, number of elements) newSem (newSem>>= \s->putSem s n>>return s)-- start out empty writeRB :: BoundedQueue t -> t -> IO() {-# INLINABLE writeRB #-} writeRB rb x = mask_$do takeSem(full_ssem_ rb) 1 -- Wait until space becomes free in the buffer. -- putStrLn"f" b <- modifyMVar(range_mvar_ rb) f putSem(empty_ssem_ rb) 1 where f (low,size) = do let nLen= length(vector_ rb) write(vector_ rb) ((low+size)`mod`nLen) x let size' = succ size assert(size'>=0 && size'<=nLen) (return()) return$!(((low,size'), size/=nLen)`using`evalTuple2(evalTuple2 r0 rseq) r0) readRB :: BoundedQueue t -> IO t {-# INLINABLE readRB #-} readRB rb = mask_$do takeSem(empty_ssem_ rb) 1 -- putStrLn"e" (b,x) <- modifyMVar(range_mvar_ rb) f putSem(full_ssem_ rb) 1 return x where f (low,size) = do let nLen = length(vector_ rb) x <- read(vector_ rb) low let low' = succ low `mod` nLen let size' = pred size assert(size'>=0 && size'<=nLen) (return()) return$!(((low',size'), (size'/=0,x))`using`evalTuple2(evalTuple2 r0 rseq) r0) lengthRB :: BoundedQueue t -> Int {-# INLINE lengthRB #-} -- | Read the maximum size of the bounded queue. lengthRB = length. vector_ getSizeRB :: BoundedQueue t -> IO Int {-# INLINE getSizeRB #-} -- | Snapshot buffer size. getSizeRB = liftM snd. readMVar. range_mvar_