module Data.STM.SBChan (
SBChan,
newSBChan,
newSBChanIO,
ItemSize(..),
SBCItem(..),
readSBChan,
writeSBChan,
peekSBChan,
unGetSBChan,
isEmptySBChan,
tryReadSBChan,
tryWriteSBChan,
tryPeekSBChan,
cramSBChan,
rollSBChan,
getLimitSBChan,
setLimitSBChan,
satisfyLimitSBChan,
clearSBChan,
) where
import Control.Concurrent.STM.TVar
import Control.Monad.STM
import Data.Typeable (Typeable)
import Data.STM.TList (TList)
import qualified Data.STM.TList as TList
data SBChan a = SBC
{ readEnd :: !(TVar (ReadEnd a))
, writeEnd :: !(TVar (WriteEnd a))
}
deriving Typeable
instance Eq (SBChan a) where
a == b = readEnd a == readEnd b
data ReadEnd a = ReadEnd
{ readPtr :: !(TList a)
, readSize :: !Int
}
data WriteEnd a = WriteEnd
{ writePtr :: !(TList a)
, writeSize :: !Int
, chanLimit :: !Int
}
class ItemSize a where
itemSize :: a -> Int
newtype SBCItem a = SBCItem { unSBCItem :: a }
deriving Typeable
instance ItemSize (SBCItem a) where
itemSize _ = 1
newSBChan :: Int -> STM (SBChan a)
newSBChan limit = do
hole <- TList.empty
rv <- newTVar $ ReadEnd hole 0
wv <- newTVar $ WriteEnd hole 0 limit
return (SBC rv wv)
newSBChanIO :: Int -> IO (SBChan a)
newSBChanIO limit = do
hole <- TList.emptyIO
rv <- newTVarIO $ ReadEnd hole 0
wv <- newTVarIO $ WriteEnd hole 0 limit
return (SBC rv wv)
clearSBChan :: SBChan a -> STM ()
clearSBChan SBC{..} = do
hole <- TList.empty
oldWriteEnd <- readTVar writeEnd
writeTVar readEnd $ ReadEnd hole 0
writeTVar writeEnd $ WriteEnd hole 0 (chanLimit oldWriteEnd)
readSBChan :: ItemSize a => SBChan a -> STM a
readSBChan sbc = do
m <- tryReadSBChan sbc
case m of
Nothing -> retry
Just x -> return x
writeSBChan :: ItemSize a => SBChan a -> a -> STM ()
writeSBChan sbc x = do
ok <- tryWriteSBChan sbc x
if ok then return ()
else retry
peekSBChan :: SBChan a -> STM a
peekSBChan sbc = do
m <- tryPeekSBChan sbc
case m of
Nothing -> retry
Just x -> return x
unGetSBChan :: ItemSize a => SBChan a -> a -> STM ()
unGetSBChan SBC{..} a = do
ReadEnd{..} <- readTVar readEnd
readPtr' <- TList.cons a readPtr
writeTVar readEnd $! ReadEnd
{ readPtr = readPtr'
, readSize = readSize itemSize a
}
isEmptySBChan :: SBChan a -> STM Bool
isEmptySBChan SBC{..} = do
ReadEnd{..} <- readTVar readEnd
TList.null readPtr
tryReadSBChan :: ItemSize a => SBChan a -> STM (Maybe a)
tryReadSBChan SBC{..} = do
ReadEnd{..} <- readTVar readEnd
let pop x xs = do
writeTVar readEnd $! ReadEnd
{ readPtr = xs
, readSize = readSize + itemSize x
}
return (Just x)
TList.uncons (return Nothing) pop readPtr
tryWriteSBChan :: ItemSize a => SBChan a -> a -> STM Bool
tryWriteSBChan SBC{..} x = do
we@WriteEnd{..} <- readTVar writeEnd
let writeSize' = writeSize + itemSize x
if writeSize' <= chanLimit
then do
appendWriteEnd writeEnd we x writeSize'
return True
else do
ReadEnd{..} <- readTVar readEnd
let chanSize' = writeSize' readSize
if chanSize' <= chanLimit || readPtr == writePtr
then do
writeTVar readEnd $! ReadEnd
{ readPtr = readPtr
, readSize = 0
}
appendWriteEnd writeEnd we x chanSize'
return True
else return False
appendWriteEnd :: TVar (WriteEnd a) -> WriteEnd a -> a -> Int -> STM ()
appendWriteEnd var WriteEnd{..} x writeSize' = do
writePtr' <- TList.append writePtr x
writeTVar var $! WriteEnd
{ writePtr = writePtr'
, writeSize = writeSize'
, chanLimit = chanLimit
}
tryPeekSBChan :: SBChan a -> STM (Maybe a)
tryPeekSBChan SBC{..} = do
ReadEnd{..} <- readTVar readEnd
TList.uncons (return Nothing) (\x _xs -> return (Just x)) readPtr
cramSBChan :: ItemSize a => SBChan a -> a -> STM ()
cramSBChan SBC{..} x = do
we@WriteEnd{..} <- readTVar writeEnd
let writeSize' = writeSize + itemSize x
if writeSize' <= chanLimit
then appendWriteEnd writeEnd we x writeSize'
else do
ReadEnd{..} <- readTVar readEnd
if readSize == 0
then appendWriteEnd writeEnd we x writeSize'
else do
writeTVar readEnd $! ReadEnd
{ readPtr = readPtr
, readSize = 0
}
appendWriteEnd writeEnd we x (writeSize' readSize)
rollSBChan :: ItemSize a => SBChan a -> a -> STM Int
rollSBChan SBC{..} x = do
we@WriteEnd{..} <- readTVar writeEnd
let writeSize' = writeSize + itemSize x
if writeSize' <= chanLimit
then do
appendWriteEnd writeEnd we x writeSize'
return 0
else do
ReadEnd{..} <- readTVar readEnd
let chanSize' = writeSize' readSize
quota = chanSize' chanLimit
(readPtr', droppedSize, droppedCount)
<- dropItems readPtr quota
writeTVar readEnd $! ReadEnd
{ readPtr = readPtr'
, readSize = 0
}
appendWriteEnd writeEnd we x (chanSize' droppedSize)
return droppedCount
dropItems :: ItemSize a => TList a -> Int -> STM (TList a, Int, Int)
dropItems start quota =
loop 0 0 start
where
loop !total !count ptr
| total >= quota = done
| otherwise = TList.uncons done next ptr
where
next x xs = loop (total + itemSize x) (count + 1) xs
done = return (ptr, total, count)
dropItemsExceptLast :: ItemSize a
=> TList a
-> TList a
-> Int
-> STM (TList a, Int, Int)
dropItemsExceptLast start end quota =
loop 0 0 start
where
loop !total !count ptr
| total >= quota = done
| otherwise = TList.uncons done next ptr
where
next x xs | xs == end = done
| otherwise = loop (total + itemSize x) (count + 1) xs
done = return (ptr, total, count)
getLimitSBChan :: SBChan a -> STM Int
getLimitSBChan SBC{..} = do
WriteEnd{..} <- readTVar writeEnd
return chanLimit
setLimitSBChan :: SBChan a -> Int -> STM ()
setLimitSBChan SBC{..} newLimit = do
WriteEnd{..} <- readTVar writeEnd
writeTVar writeEnd $! WriteEnd
{ writePtr = writePtr
, writeSize = writeSize
, chanLimit = newLimit
}
satisfyLimitSBChan :: ItemSize a => SBChan a -> STM Int
satisfyLimitSBChan SBC{..} = do
WriteEnd{..} <- readTVar writeEnd
if writeSize <= chanLimit
then return 0
else do
ReadEnd{..} <- readTVar readEnd
let chanSize = writeSize readSize
quota = chanSize chanLimit
(readPtr', droppedSize, droppedCount)
<- dropItemsExceptLast readPtr writePtr quota
writeTVar readEnd $! ReadEnd
{ readPtr = readPtr'
, readSize = 0
}
writeTVar writeEnd $! WriteEnd
{ writePtr = writePtr
, writeSize = chanSize droppedSize
, chanLimit = chanLimit
}
return droppedCount