module Chan.KickChan (
KickChan
, newKickChan
, kcSize
, putKickChan
, invalidateKickChan
, KCReader
, newReader
, readNext
, currentLag
, KickChanS
, KickChanV
, KickChanU
, kcUnboxed
, kcStorable
, kcDefault
, KCReaderS
, KCReaderV
, KCReaderU
) where
import Control.Concurrent.MVar
import Control.Concurrent (yield)
import Control.Exception
import Data.Bits
import Data.IORef
import Data.IntMap (IntMap)
import qualified Data.IntMap as IM
import Data.Foldable as Fold
import Data.Maybe (maybeToList)
import Data.Vector.Generic.Mutable (MVector)
import qualified Data.Vector.Generic.Mutable as M
import qualified Data.Vector.Mutable as V
import qualified Data.Vector.Storable.Mutable as S
import qualified Data.Vector.Unboxed.Mutable as U
import Control.Monad.Primitive
#if MIN_VERSION_base(4,6,0)
#else
atomicModifyIORef' :: IORef a -> (a -> (a,b)) -> IO b
atomicModifyIORef' ref f = do
b <- atomicModifyIORef ref
(\x -> let (a, b) = f x
in (a, a `seq` b))
b `seq` return b
#endif
data Position a = Position
{ nSeq :: !Int
, waiting :: IntMap [MVar (Maybe a)]
}
emptyPosition :: Position a
emptyPosition = Position 0 IM.empty
incrPosition :: Int -> Position a -> (Position a,Maybe Int)
incrPosition sz oldP@(Position curSeq pMap) = case IM.minViewWithKey pMap of
Just ((lowKey,_),_)
| lowKey+sz <= curSeq -> (oldP,Nothing)
_ -> (newP,Just curSeq)
where
!newSeq = curSeq+1
newP = Position
{ nSeq = newSeq
, waiting=IM.insertWith (++) curSeq [] pMap
}
invalidatePosition :: Int -> Position a -> (Position a,[MVar (Maybe a)])
invalidatePosition wrapAmount (Position oldP waiting) = (newP,pList)
where
newP = Position (oldP+wrapAmount) IM.empty
pList = Prelude.concat $ IM.elems waiting
commit :: Int -> Position a -> (Position a,[MVar (Maybe a)])
commit seqNum (Position nSeq pMap) = (newP,pList)
where
pList = Prelude.concat $ maybeToList pending
(pending,pMap') = IM.updateLookupWithKey (\_ _ -> Nothing) seqNum pMap
newP = Position nSeq pMap'
data CheckResult = Ok | Invalid
readyOrWait :: MVar (Maybe a) -> Int -> Position a -> (Position a, Bool)
readyOrWait await readP p@(Position nextP pMap) =
case IM.updateLookupWithKey (\_ xs -> Just $ await:xs) readP pMap of
(Just _waitList,pMap') -> (p { waiting = pMap' }, False)
(Nothing,_)
| readP >= nextP ->
(p{waiting=IM.insert readP [await] pMap} ,False)
| otherwise -> (p,True)
checkWithPosition :: Int -> Int -> Position a -> (Position a, CheckResult)
checkWithPosition sz readP pos@(Position nextP _pMap) =
case nextPreadP of
dif
| dif > 0 && dif <= sz -> (pos,Ok)
| otherwise -> (pos,Invalid)
data KickChan v a = KickChan
{ kcSz :: !Int
, kcPos :: (IORef (Position a))
, kcV :: (v a)
}
newKickChan :: (MVector v' a, v ~ v' RealWorld) => Int -> IO (KickChan v a)
newKickChan sz = do
kcPos <- newIORef emptyPosition
kcV <- M.new (kcSz+1)
return KickChan {..}
where
kcSz = 2^(ceiling (logBase 2 (fromIntegral $ sz) :: Double) :: Int) 1
kcSize :: KickChan v a -> Int
kcSize KickChan {kcSz} = kcSz+1
putKickChan :: (MVector v' a, v ~ v' RealWorld) => KickChan v a -> a -> IO ()
putKickChan KickChan {..} x = mask_ $ do
curSeq <- claim
M.unsafeWrite kcV (curSeq .&. kcSz) x
waiting <- atomicModifyIORef' kcPos $ commit curSeq
Fold.mapM_ (\v -> putMVar v (Just x)) waiting
where
claim = do
curSeq'm <- atomicModifyIORef' kcPos (incrPosition (kcSz+1))
maybe (yield >> claim) return curSeq'm
invalidateKickChan :: KickChan v a -> IO ()
invalidateKickChan KickChan {..} = mask_ $ do
waiting <- atomicModifyIORef' kcPos (invalidatePosition $ 2+kcSz)
Fold.mapM_ (flip putMVar Nothing) waiting
getKickChan :: (MVector v' a, v ~ v' RealWorld) => KickChan v a -> Int -> IO (Maybe a)
getKickChan KickChan {..} readP = do
await <- newEmptyMVar
proceed <- atomicModifyIORef' kcPos $ readyOrWait await readP
if proceed
then do
x <- M.unsafeRead kcV (readP .&. kcSz)
result <- atomicModifyIORef' kcPos (checkWithPosition (kcSz+1) readP)
case result of
Ok -> return $ Just x
Invalid -> return Nothing
else takeMVar await
data KCReader v a = KCReader
{ kcrChan :: !(KickChan v a)
, kcrPos :: IORef Int
}
newReader :: KickChan v a -> IO (KCReader v a)
newReader kcrChan@KickChan{..} = do
(Position writeP _pMap) <- readIORef kcPos
kcrPos <- newIORef (writeP1)
return KCReader {..}
readNext :: (MVector v' a, v ~ v' RealWorld) => KCReader v a -> IO (Maybe a)
readNext (KCReader {..}) = do
readP <- atomicModifyIORef' kcrPos (\lastP -> let p = lastP+1 in (p,p))
getKickChan kcrChan readP
currentLag :: KCReader v a -> IO Int
currentLag KCReader {..} = do
lastRead <- readIORef kcrPos
Position nextWrite pMap <- readIORef $ kcPos kcrChan
return $! nextWrite lastRead IM.size pMap 1
type KickChanU a = KickChan (U.MVector RealWorld) a
type KickChanS a = KickChan (S.MVector RealWorld) a
type KickChanV a = KickChan (V.MVector RealWorld) a
type KCReaderU a = KCReader (U.MVector RealWorld) a
type KCReaderS a = KCReader (S.MVector RealWorld) a
type KCReaderV a = KCReader (V.MVector RealWorld) a
kcUnboxed :: KickChan (U.MVector RealWorld) a -> KickChan (U.MVector RealWorld) a
kcUnboxed = id
kcDefault :: KickChan (V.MVector RealWorld) a -> KickChan (V.MVector RealWorld) a
kcDefault = id
kcStorable :: KickChan (S.MVector RealWorld) a -> KickChan (S.MVector RealWorld) a
kcStorable = id