module Control.Concurrent.RCU.QSBR.Internal
( SRef(..)
, RCUThread(..)
, RCU(..)
, runRCU
, runOnRCU
, ReadingRCU(..)
, WritingRCU(..)
, RCUState(..)
#if BENCHMARKS
, unRCU
, runWritingRCU
, runReadingRCU
, writeSRefIO
, RCUState(..)
#endif
) where
import Control.Applicative
import Control.Concurrent
import Control.Concurrent.RCU.Class
import Control.Monad
import Control.Monad.IO.Class
import Control.Monad.Primitive
import Control.Parallel
import Data.Atomics
import Data.IORef
import Data.List
import Data.Primitive
import Foreign
import Prelude hiding (read, Read)
foreign import ccall unsafe "pause.h" pause :: IO ()
newtype SRef s a = SRef { unSRef :: IORef a }
deriving Eq
newSRefIO :: a -> IO (IORef a)
newSRefIO = newIORef
readSRefIO :: IORef a -> IO a
readSRefIO = readIORef
writeSRefIO :: IORef a -> a -> IO ()
writeSRefIO r a = do a `pseq` writeBarrier
writeIORef r a
newtype Counter = Counter (MutableByteArray RealWorld)
instance Eq Counter where
Counter m == Counter n = sameMutableByteArray m n
offline :: Word64
offline = 0
online :: Word64
online = 1
newCounter :: IO Counter
newCounter = do
b <- newByteArray 8
writeByteArray b 0 online
return (Counter b)
readCounter :: Counter -> IO Word64
readCounter (Counter c) = readByteArray c 0
writeCounter :: Counter -> Word64 -> IO ()
writeCounter (Counter c) w = writeByteArray c 0 w
incCounter :: Counter -> IO Word64
incCounter c = do
x <- (+ 2) <$> readCounter c
writeCounter c x
return x
data RCUState = RCUState
{
rcuStateGlobalCounter :: !Counter
, rcuStateThreadCountersR :: !(IORef [Counter])
, rcuStateThreadCountersLockV :: !(MVar ())
, rcuStateWriterLockV :: !(MVar ())
, rcuStateMyCounter :: !Counter
, rcuStatePinned :: !(Maybe Int)
}
newtype ReadingRCU s a = ReadingRCU { runReadingRCU :: RCUState -> IO a }
deriving Functor
instance Applicative (ReadingRCU s) where
pure a = ReadingRCU $ \ _ -> pure a
ReadingRCU mf <*> ReadingRCU ma = ReadingRCU $ \ s -> mf s <*> ma s
instance Monad (ReadingRCU s) where
return a = ReadingRCU $ \ _ -> pure a
ReadingRCU m >>= f = ReadingRCU $ \ s -> do
a <- m s
runReadingRCU (f a) s
fail s = ReadingRCU $ \ _ -> fail s
instance Alternative (ReadingRCU s) where
empty = ReadingRCU $ \ _ -> empty
ReadingRCU ma <|> ReadingRCU mb = ReadingRCU $ \s -> ma s <|> mb s
instance MonadPlus (ReadingRCU s) where
mzero = ReadingRCU $ \ _ -> mzero
ReadingRCU ma `mplus` ReadingRCU mb = ReadingRCU $ \s -> ma s `mplus` mb s
instance MonadNew (SRef s) (ReadingRCU s) where
newSRef a = ReadingRCU $ \_ -> SRef <$> newSRefIO a
instance MonadReading (SRef s) (ReadingRCU s) where
readSRef (SRef r) = ReadingRCU $ \ _ -> readSRefIO r
newtype WritingRCU s a = WritingRCU { runWritingRCU :: RCUState -> IO a }
deriving Functor
instance Applicative (WritingRCU s) where
pure a = WritingRCU $ \ _ -> pure a
WritingRCU mf <*> WritingRCU ma = WritingRCU $ \ s -> mf s <*> ma s
instance Monad (WritingRCU s) where
return a = WritingRCU $ \ _ -> pure a
WritingRCU m >>= f = WritingRCU $ \ s -> do
a <- m s
runWritingRCU (f a) s
fail s = WritingRCU $ \ _ -> fail s
instance Alternative (WritingRCU s) where
empty = WritingRCU $ \ _ -> empty
WritingRCU ma <|> WritingRCU mb = WritingRCU $ \s -> ma s <|> mb s
instance MonadPlus (WritingRCU s) where
mzero = WritingRCU $ \ _ -> mzero
WritingRCU ma `mplus` WritingRCU mb = WritingRCU $ \s -> ma s `mplus` mb s
instance MonadNew (SRef s) (WritingRCU s) where
newSRef a = WritingRCU $ \_ -> SRef <$> newSRefIO a
instance MonadReading (SRef s) (WritingRCU s) where
readSRef (SRef r) = WritingRCU $ \ _ -> readSRefIO r
instance MonadWriting (SRef s) (WritingRCU s) where
writeSRef (SRef r) a = WritingRCU $ \ _ -> writeSRefIO r a
synchronize = WritingRCU synchronizeIO
synchronizeIO :: RCUState -> IO ()
synchronizeIO RCUState { rcuStateGlobalCounter
, rcuStateMyCounter
, rcuStateThreadCountersR
, rcuStatePinned } = do
mc <- readCounter rcuStateMyCounter
when (mc /= offline) $ writeCounter rcuStateMyCounter offline
threadCounters <- readSRefIO rcuStateThreadCountersR
gc' <- incCounter rcuStateGlobalCounter
let busyWaitPeriod = case rcuStatePinned of Just _ -> 1000
Nothing -> 2
let waitForThread !(n :: Word64) threadCounter = do
tc <- readCounter threadCounter
when (tc /= offline && tc /= gc') $ do
if n `mod` busyWaitPeriod == 0
then yield
else pause
waitForThread (succ n) threadCounter
forM_ threadCounters (waitForThread 1)
when (mc /= offline) $ writeCounter rcuStateMyCounter gc'
storeLoadBarrier
newtype RCU s a = RCU { unRCU :: RCUState -> IO a }
deriving Functor
instance Applicative (RCU s) where
pure = return
(<*>) = ap
instance Monad (RCU s) where
return a = RCU $ \ _ -> return a
RCU m >>= f = RCU $ \s -> do
a <- m s
unRCU (f a) s
instance MonadNew (SRef s) (RCU s) where
newSRef a = RCU $ \_ -> SRef <$> newSRefIO a
data RCUThread s a = RCUThread
{ rcuThreadId :: !ThreadId
, rcuThreadVar :: !(MVar a)
}
instance MonadRCU (SRef s) (RCU s) where
type Reading (RCU s) = ReadingRCU s
type Writing (RCU s) = WritingRCU s
type Thread (RCU s) = RCUThread s
forking (RCU m) = RCU $ \ s@RCUState { rcuStateThreadCountersLockV
, rcuStateThreadCountersR
, rcuStatePinned } -> do
result <- newEmptyMVar
threadCounter <- newCounter
withMVar rcuStateThreadCountersLockV $ \ _ -> writeSRefIO rcuStateThreadCountersR . (threadCounter :) =<< readSRefIO rcuStateThreadCountersR
storeLoadBarrier
let frk = case rcuStatePinned of
Just i -> forkOn i
Nothing -> forkIO
tid <- frk $ do
x <- m $ s { rcuStateMyCounter = threadCounter }
putMVar result x
writeBarrier
writeCounter threadCounter offline
withMVar rcuStateThreadCountersLockV $ \ _ -> writeSRefIO rcuStateThreadCountersR . delete threadCounter =<< readSRefIO rcuStateThreadCountersR
return (RCUThread tid result)
joining (RCUThread _ m) = RCU $ \ _ -> readMVar m
reading (ReadingRCU m) = RCU $ \ s@RCUState { rcuStateMyCounter
, rcuStateGlobalCounter } -> do
writeCounter rcuStateMyCounter =<< readCounter rcuStateGlobalCounter
storeLoadBarrier
x <- m s
storeLoadBarrier
writeCounter rcuStateMyCounter offline
storeLoadBarrier
return x
writing (WritingRCU m) = RCU $ \ s@RCUState { rcuStateWriterLockV } -> do
takeMVar rcuStateWriterLockV
x <- m s
synchronizeIO s
putMVar rcuStateWriterLockV ()
return x
instance MonadIO (RCU s) where
liftIO m = RCU $ \ _ -> m
runRCU :: (forall s. RCU s a) -> IO a
runRCU m =
unRCU m =<< RCUState <$> newCounter
<*> newIORef []
<*> newMVar ()
<*> newMVar ()
<*> newCounter
<*> pure Nothing
runOnRCU :: Int -> (forall s. RCU s a) -> IO a
runOnRCU i m =
unRCU m =<< RCUState <$> newCounter
<*> newIORef []
<*> newMVar ()
<*> newMVar ()
<*> newCounter
<*> pure (Just i)