module Control.Concurrent.RCU.QSBR.Internal
( SRef(..)
, RCUThread(..)
, RCU(..)
, runRCU
, ReadingRCU(..)
, WritingRCU(..)
, RCUState(..)
) where
import Control.Applicative
import Control.Concurrent
import Control.Concurrent.RCU.Class
import Control.Monad
import Control.Monad.IO.Class
import Control.Parallel
import Data.Atomics
import Data.List
import Data.IORef
import Data.Word
import Prelude hiding (read, Read)
newtype SRef s a = SRef { unSRef :: IORef a }
deriving Eq
newSRefIO :: a -> IO (IORef a)
newSRefIO = newIORef
readSRefIO :: IORef a -> IO a
readSRefIO = readIORef
writeSRefIO :: IORef a -> a -> IO ()
writeSRefIO r a = do a `pseq` writeBarrier
writeIORef r a
type Counter = IORef Word64
offline :: Word64
offline = 0
online :: Word64
online = 1
newCounter :: IO Counter
newCounter = newIORef online
readCounter :: Counter -> IO Word64
readCounter = readIORef
writeCounter :: Counter -> Word64 -> IO ()
writeCounter c !i = writeIORef c i
incCounter :: Counter -> IO Word64
incCounter c = do !x <- succ <$> readIORef c
writeCounter c x
return x
data RCUState = RCUState
{ rcuStateGlobalCounter :: !Counter
, rcuStateMyCounter :: !Counter
, rcuStateThreadCountersV :: !(MVar [Counter])
, rcuStateWriterLockV :: !(MVar ())
}
newtype ReadingRCU s a = ReadingRCU { runReadingRCU :: RCUState -> IO a }
deriving Functor
instance Applicative (ReadingRCU s) where
pure a = ReadingRCU $ \ _ -> pure a
ReadingRCU mf <*> ReadingRCU ma = ReadingRCU $ \ s -> mf s <*> ma s
instance Monad (ReadingRCU s) where
return a = ReadingRCU $ \ _ -> pure a
ReadingRCU m >>= f = ReadingRCU $ \ s -> do
a <- m s
runReadingRCU (f a) s
fail s = ReadingRCU $ \ _ -> fail s
instance Alternative (ReadingRCU s) where
empty = ReadingRCU $ \ _ -> empty
ReadingRCU ma <|> ReadingRCU mb = ReadingRCU $ \s -> ma s <|> mb s
instance MonadPlus (ReadingRCU s) where
mzero = ReadingRCU $ \ _ -> mzero
ReadingRCU ma `mplus` ReadingRCU mb = ReadingRCU $ \s -> ma s `mplus` mb s
instance MonadNew (SRef s) (ReadingRCU s) where
newSRef a = ReadingRCU $ \_ -> SRef <$> newSRefIO a
instance MonadReading (SRef s) (ReadingRCU s) where
readSRef (SRef r) = ReadingRCU $ \ _ -> readSRefIO r
newtype WritingRCU s a = WritingRCU { runWritingRCU :: RCUState -> IO a }
deriving Functor
instance Applicative (WritingRCU s) where
pure a = WritingRCU $ \ _ -> pure a
WritingRCU mf <*> WritingRCU ma = WritingRCU $ \ s -> mf s <*> ma s
instance Monad (WritingRCU s) where
return a = WritingRCU $ \ _ -> pure a
WritingRCU m >>= f = WritingRCU $ \ s -> do
a <- m s
runWritingRCU (f a) s
fail s = WritingRCU $ \ _ -> fail s
instance Alternative (WritingRCU s) where
empty = WritingRCU $ \ _ -> empty
WritingRCU ma <|> WritingRCU mb = WritingRCU $ \s -> ma s <|> mb s
instance MonadPlus (WritingRCU s) where
mzero = WritingRCU $ \ _ -> mzero
WritingRCU ma `mplus` WritingRCU mb = WritingRCU $ \s -> ma s `mplus` mb s
instance MonadNew (SRef s) (WritingRCU s) where
newSRef a = WritingRCU $ \_ -> SRef <$> newSRefIO a
instance MonadReading (SRef s) (WritingRCU s) where
readSRef (SRef r) = WritingRCU $ \ _ -> readSRefIO r
instance MonadWriting (SRef s) (WritingRCU s) where
writeSRef (SRef r) a = WritingRCU $ \ _ -> writeSRefIO r a
synchronize = WritingRCU synchronizeIO
synchronizeIO :: RCUState -> IO ()
synchronizeIO RCUState { rcuStateGlobalCounter
, rcuStateMyCounter
, rcuStateThreadCountersV } = do
mc <- readCounter rcuStateMyCounter
storeLoadBarrier
when (mc /= offline) $ writeCounter rcuStateMyCounter offline
gc' <- withMVar rcuStateThreadCountersV $ \ threadCounters -> do
gc' <- incCounter rcuStateGlobalCounter
writeBarrier
let waitForThread i threadCounter = do
tc <- readCounter threadCounter
when (tc /= offline && tc < gc') $ do
threadDelay 1
storeLoadBarrier
waitForThread (i + 1) threadCounter
forM_ threadCounters (waitForThread (0 :: Int))
return gc'
when (mc /= offline) $ writeCounter rcuStateMyCounter gc'
storeLoadBarrier
newtype RCU s a = RCU { unRCU :: RCUState -> IO a }
deriving Functor
instance Applicative (RCU s) where
pure = return
(<*>) = ap
instance Monad (RCU s) where
return a = RCU $ \ _ -> return a
RCU m >>= f = RCU $ \s -> do
a <- m s
unRCU (f a) s
instance MonadNew (SRef s) (RCU s) where
newSRef a = RCU $ \_ -> SRef <$> newSRefIO a
data RCUThread s a = RCUThread
{ rcuThreadId :: !ThreadId
, rcuThreadVar :: !(MVar a)
}
instance MonadRCU (SRef s) (RCU s) where
type Reading (RCU s) = ReadingRCU s
type Writing (RCU s) = WritingRCU s
type Thread (RCU s) = RCUThread s
forking (RCU m) = RCU $ \ s@RCUState { rcuStateThreadCountersV } -> do
result <- newEmptyMVar
threadCounter <- newCounter
modifyMVar_ rcuStateThreadCountersV $ return . (threadCounter :)
tid <- forkIO $ do
x <- m $ s { rcuStateMyCounter = threadCounter }
putMVar result x
writeBarrier
writeCounter threadCounter offline
modifyMVar_ rcuStateThreadCountersV $ return . delete threadCounter
return (RCUThread tid result)
joining (RCUThread _ m) = RCU $ \ _ -> readMVar m
reading (ReadingRCU m) = RCU $ \ s@RCUState { rcuStateMyCounter
, rcuStateGlobalCounter } -> do
mc <- readCounter rcuStateMyCounter
when (mc == offline) $ do
writeCounter rcuStateMyCounter =<< readCounter rcuStateGlobalCounter
storeLoadBarrier
x <- m s
storeLoadBarrier
writeCounter rcuStateMyCounter =<< readCounter rcuStateGlobalCounter
storeLoadBarrier
return x
writing (WritingRCU m) = RCU $ \ s@RCUState { rcuStateWriterLockV } -> do
takeMVar rcuStateWriterLockV
x <- m s
synchronizeIO s
putMVar rcuStateWriterLockV ()
return x
instance MonadIO (RCU s) where
liftIO m = RCU $ \ _ -> m
runRCU :: (forall s. RCU s a) -> IO a
runRCU m = do
unRCU m =<< RCUState <$> newCounter <*> newCounter <*> newMVar [] <*> newMVar ()