{-# LANGUAGE CPP #-}
{-# LANGUAGE Rank2Types #-}
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE DeriveFunctor #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE UndecidableInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE ForeignFunctionInterface #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# OPTIONS_HADDOCK not-home #-}
module Control.Concurrent.RCU.QSBR.Internal
( SRef(..)
, RCUThread(..)
, RCU(..)
, runRCU
, runOnRCU
, ReadingRCU(..)
, WritingRCU(..)
, RCUState(..)
#if BENCHMARKS
, unRCU
, runWritingRCU
, runReadingRCU
, writeSRefIO
, RCUState(..)
#endif
) where
import Control.Applicative
import Control.Concurrent
import Control.Concurrent.RCU.Class
import Control.Monad
import Control.Monad.IO.Class
import Control.Monad.Primitive
import Control.Parallel
import Data.Atomics
import Data.IORef
import Data.List
import Data.Primitive
import Foreign
import qualified Control.Monad.Fail as Fail
import Prelude hiding (Read(..))
foreign import ccall unsafe "pause.h" pause :: IO ()
newtype SRef s a = SRef { unSRef :: IORef a }
deriving Eq
newSRefIO :: a -> IO (IORef a)
newSRefIO = newIORef
{-# INLINE newSRefIO #-}
readSRefIO :: IORef a -> IO a
readSRefIO = readIORef
{-# INLINE readSRefIO #-}
writeSRefIO :: IORef a -> a -> IO ()
writeSRefIO r a = do a `pseq` writeBarrier
writeIORef r a
{-# INLINE writeSRefIO #-}
newtype Counter = Counter (MutableByteArray RealWorld)
instance Eq Counter where
Counter m == Counter n = sameMutableByteArray m n
offline :: Word64
offline = 0
online :: Word64
online = 1
newCounter :: IO Counter
newCounter = do
b <- newByteArray 8
writeByteArray b 0 online
return (Counter b)
{-# INLINE newCounter #-}
readCounter :: Counter -> IO Word64
readCounter (Counter c) = readByteArray c 0
{-# INLINE readCounter #-}
writeCounter :: Counter -> Word64 -> IO ()
writeCounter (Counter c) w = writeByteArray c 0 w
{-# INLINE writeCounter #-}
incCounter :: Counter -> IO Word64
incCounter c = do
x <- (+ 2) <$> readCounter c
writeCounter c x
return x
{-# INLINE incCounter #-}
data RCUState = RCUState
{
rcuStateGlobalCounter :: {-# UNPACK #-} !Counter
, rcuStateThreadCountersR :: {-# UNPACK #-} !(IORef [Counter])
, rcuStateThreadCountersLockV :: {-# UNPACK #-} !(MVar ())
, rcuStateWriterLockV :: {-# UNPACK #-} !(MVar ())
, rcuStateMyCounter :: {-# UNPACK #-} !Counter
, rcuStatePinned :: !(Maybe Int)
}
newtype ReadingRCU s a = ReadingRCU { runReadingRCU :: RCUState -> IO a }
deriving Functor
instance Applicative (ReadingRCU s) where
pure a = ReadingRCU $ \ _ -> pure a
ReadingRCU mf <*> ReadingRCU ma = ReadingRCU $ \ s -> mf s <*> ma s
instance Monad (ReadingRCU s) where
return a = ReadingRCU $ \ _ -> pure a
ReadingRCU m >>= f = ReadingRCU $ \ s -> do
a <- m s
runReadingRCU (f a) s
fail = Fail.fail
instance Fail.MonadFail (ReadingRCU s) where
fail s = ReadingRCU $ \ _ -> Fail.fail s
instance Alternative (ReadingRCU s) where
empty = ReadingRCU $ \ _ -> empty
ReadingRCU ma <|> ReadingRCU mb = ReadingRCU $ \s -> ma s <|> mb s
instance MonadPlus (ReadingRCU s) where
mzero = ReadingRCU $ \ _ -> mzero
ReadingRCU ma `mplus` ReadingRCU mb = ReadingRCU $ \s -> ma s `mplus` mb s
instance MonadNew (SRef s) (ReadingRCU s) where
newSRef a = ReadingRCU $ \_ -> SRef <$> newSRefIO a
instance MonadReading (SRef s) (ReadingRCU s) where
readSRef (SRef r) = ReadingRCU $ \ _ -> readSRefIO r
{-# INLINE readSRef #-}
newtype WritingRCU s a = WritingRCU { runWritingRCU :: RCUState -> IO a }
deriving Functor
instance Applicative (WritingRCU s) where
pure a = WritingRCU $ \ _ -> pure a
WritingRCU mf <*> WritingRCU ma = WritingRCU $ \ s -> mf s <*> ma s
instance Monad (WritingRCU s) where
return a = WritingRCU $ \ _ -> pure a
WritingRCU m >>= f = WritingRCU $ \ s -> do
a <- m s
runWritingRCU (f a) s
fail = Fail.fail
instance Fail.MonadFail (WritingRCU s) where
fail s = WritingRCU $ \ _ -> Fail.fail s
instance Alternative (WritingRCU s) where
empty = WritingRCU $ \ _ -> empty
WritingRCU ma <|> WritingRCU mb = WritingRCU $ \s -> ma s <|> mb s
instance MonadPlus (WritingRCU s) where
mzero = WritingRCU $ \ _ -> mzero
WritingRCU ma `mplus` WritingRCU mb = WritingRCU $ \s -> ma s `mplus` mb s
instance MonadNew (SRef s) (WritingRCU s) where
newSRef a = WritingRCU $ \_ -> SRef <$> newSRefIO a
instance MonadReading (SRef s) (WritingRCU s) where
readSRef (SRef r) = WritingRCU $ \ _ -> readSRefIO r
{-# INLINE readSRef #-}
instance MonadWriting (SRef s) (WritingRCU s) where
writeSRef (SRef r) a = WritingRCU $ \ _ -> writeSRefIO r a
{-# INLINE writeSRef #-}
synchronize = WritingRCU synchronizeIO
synchronizeIO :: RCUState -> IO ()
synchronizeIO RCUState { rcuStateGlobalCounter
, rcuStateMyCounter
, rcuStateThreadCountersR
, rcuStatePinned } = do
mc <- readCounter rcuStateMyCounter
when (mc /= offline) $ writeCounter rcuStateMyCounter offline
threadCounters <- readSRefIO rcuStateThreadCountersR
gc' <- incCounter rcuStateGlobalCounter
let busyWaitPeriod = case rcuStatePinned of Just _ -> 1000
Nothing -> 2
let waitForThread !(n :: Word64) threadCounter = do
tc <- readCounter threadCounter
when (tc /= offline && tc /= gc') $ do
if n `mod` busyWaitPeriod == 0
then yield
else pause
waitForThread (succ n) threadCounter
forM_ threadCounters (waitForThread 1)
when (mc /= offline) $ writeCounter rcuStateMyCounter gc'
storeLoadBarrier
newtype RCU s a = RCU { unRCU :: RCUState -> IO a }
deriving Functor
instance Applicative (RCU s) where
pure = return
(<*>) = ap
instance Monad (RCU s) where
return a = RCU $ \ _ -> return a
RCU m >>= f = RCU $ \s -> do
a <- m s
unRCU (f a) s
instance MonadNew (SRef s) (RCU s) where
newSRef a = RCU $ \_ -> SRef <$> newSRefIO a
data RCUThread s a = RCUThread
{ rcuThreadId :: {-# UNPACK #-} !ThreadId
, rcuThreadVar :: {-# UNPACK #-} !(MVar a)
}
instance MonadRCU (SRef s) (RCU s) where
type Reading (RCU s) = ReadingRCU s
type Writing (RCU s) = WritingRCU s
type Thread (RCU s) = RCUThread s
forking (RCU m) = RCU $ \ s@RCUState { rcuStateThreadCountersLockV
, rcuStateThreadCountersR
, rcuStatePinned } -> do
result <- newEmptyMVar
threadCounter <- newCounter
withMVar rcuStateThreadCountersLockV $ \ _ -> writeSRefIO rcuStateThreadCountersR . (threadCounter :) =<< readSRefIO rcuStateThreadCountersR
storeLoadBarrier
let frk = case rcuStatePinned of
Just i -> forkOn i
Nothing -> forkIO
tid <- frk $ do
x <- m $ s { rcuStateMyCounter = threadCounter }
putMVar result x
writeBarrier
writeCounter threadCounter offline
withMVar rcuStateThreadCountersLockV $ \ _ -> writeSRefIO rcuStateThreadCountersR . delete threadCounter =<< readSRefIO rcuStateThreadCountersR
return (RCUThread tid result)
{-# INLINE forking #-}
joining (RCUThread _ m) = RCU $ \ _ -> readMVar m
{-# INLINE joining #-}
reading (ReadingRCU m) = RCU $ \ s@RCUState { rcuStateMyCounter
, rcuStateGlobalCounter } -> do
writeCounter rcuStateMyCounter =<< readCounter rcuStateGlobalCounter
storeLoadBarrier
x <- m s
storeLoadBarrier
writeCounter rcuStateMyCounter offline
storeLoadBarrier
return x
{-# INLINE reading #-}
writing (WritingRCU m) = RCU $ \ s@RCUState { rcuStateWriterLockV } -> do
takeMVar rcuStateWriterLockV
x <- m s
synchronizeIO s
putMVar rcuStateWriterLockV ()
return x
{-# INLINE writing #-}
instance MonadIO (RCU s) where
liftIO m = RCU $ \ _ -> m
{-# INLINE liftIO #-}
runRCU :: (forall s. RCU s a) -> IO a
runRCU m =
unRCU m =<< RCUState <$> newCounter
<*> newIORef []
<*> newMVar ()
<*> newMVar ()
<*> newCounter
<*> pure Nothing
{-# INLINE runRCU #-}
runOnRCU :: Int -> (forall s. RCU s a) -> IO a
runOnRCU i m =
unRCU m =<< RCUState <$> newCounter
<*> newIORef []
<*> newMVar ()
<*> newMVar ()
<*> newCounter
<*> pure (Just i)