module BlockingTransactions.BlockingTransactions
(
BVar,
newBVar,
peekBVar,
pokeBVar,
modifyBVar,
runBTM,
BTM,
Value,
when, unless,
readBVar,
writeBVar,
retry,
runBTA,
BTA,
fetchBVar,
storeBVar,
retryWhen,
retryUnless) where
import Prelude hiding ((.),id)
import Control.Category
import Control.Arrow
import Control.Applicative hiding (empty)
import System.IO.Unsafe
import Control.Monad hiding (when,unless)
import Control.Concurrent
import Control.Parallel
import Data.IORef
import qualified Data.Set as Set
data BVar a = BVar {
bvar_index :: Integer,
bvar_lock :: (MVar ()),
bvar_data :: (IORef a),
bvar_retry_list :: (IORef [MVar ()])
}
data AnonymousVar where
Anon :: BVar a -> AnonymousVar
instance Eq (BVar a) where
a == b = bvar_index a == bvar_index b
instance Ord (BVar a) where
BVar a _ _ _ `compare` BVar b _ _ _ = a `compare` b
BVar a _ _ _ > BVar b _ _ _ = a > b
BVar a _ _ _ >= BVar b _ _ _ = a >= b
BVar a _ _ _ < BVar b _ _ _ = a > b
BVar a _ _ _ <= BVar b _ _ _ = a <= b
instance Eq AnonymousVar where
Anon (BVar a _ _ _) == Anon (BVar b _ _ _) = a == b
instance Ord AnonymousVar where
Anon (BVar a _ _ _) `compare` Anon (BVar b _ _ _) = a `compare` b
Anon (BVar a _ _ _) > Anon (BVar b _ _ _) = a > b
Anon (BVar a _ _ _) >= Anon (BVar b _ _ _) = a >= b
Anon (BVar a _ _ _) < Anon (BVar b _ _ _) = a < b
Anon (BVar a _ _ _) <= Anon (BVar b _ _ _) = a <= b
unique_source :: IORef Integer
unique_source = unsafePerformIO $ newIORef 0
newBVar :: a -> IO (BVar a)
newBVar a =
BVar <$>
atomicModifyIORef unique_source (\n -> (succ n,n)) <*>
newMVar () <*>
newIORef a <*>
newIORef []
peekBVar :: BVar a -> IO a
peekBVar bvar =
do takeMVar $ bvar_lock bvar
result <- readIORef $ bvar_data bvar
putMVar (bvar_lock bvar) ()
return result
pokeBVar :: BVar a -> a -> IO ()
pokeBVar bvar value =
do takeMVar $ bvar_lock bvar
writeIORef (bvar_data bvar) value
signal_list <- readIORef (bvar_retry_list bvar)
writeIORef (bvar_retry_list bvar) []
putMVar (bvar_lock bvar) ()
forM_ signal_list $ \v -> tryPutMVar v ()
modifyBVar :: BVar a -> (a -> (a,b)) -> IO b
modifyBVar bvar f =
do takeMVar $ bvar_lock bvar
a <- readIORef (bvar_data bvar)
let ab = f a
writeIORef (bvar_data bvar) $ fst ab
signal_list <- readIORef (bvar_retry_list bvar)
putMVar (bvar_lock bvar) ()
forM_ signal_list $ \v -> tryPutMVar v ()
return $ snd ab
data BlockingStatus = BlockingStatus {
should_retry :: IORef Bool,
is_active_branch :: IORef Bool
}
data Value e a = Value { fromValue :: a }
data BTM e a =
BTReturn a
| BTM a ([AnonymousVar] -> [AnonymousVar]) (BlockingStatus -> IO a)
staticResult :: BTM e a -> a
staticResult (BTReturn a) = a
staticResult (BTM a _ _) = a
operation :: BTM e a -> BlockingStatus -> IO a
operation (BTReturn a) _ = return a
operation (BTM _ _ op) x = op x
workingSet :: BTM e a -> [AnonymousVar] -> [AnonymousVar]
workingSet (BTReturn _) = id
workingSet (BTM _ working_set _) = working_set
instance Functor (BTM e) where
fmap f = liftM f
instance Functor (Value e) where
fmap f (Value a) = Value $ f a
instance Applicative (BTM e) where
pure = return
a <*> b =
do a' <- a
b' <- b
return $ a' b'
instance Applicative (Value e) where
pure x = Value x
Value a <*> Value b = Value $ a b
instance Monad (BTM e) where
return = BTReturn
(BTReturn k) >>= m = m k
k >>= m =
case m $ staticResult k of
BTReturn j ->
BTM j
(workingSet k) $
\x ->
do k' <- operation k x
operation (m k') x
BTM j ws _ ->
BTM j
(ws . workingSet k) $
\x ->
do k' <- operation k x
operation (m k') x
instance Monad (Value e) where
return = Value
(Value k) >>= m = m k
invalid_value :: Value e a
invalid_value = Value $ error "BlockingTransaction (Value): Inaccessable value."
when :: Value e Bool ->
BTM e (Value e ()) ->
BTM e (Value e ())
when b a = switch b a >> return (Value ())
unless :: Value e Bool ->
BTM e (Value e ()) ->
BTM e (Value e ())
unless v = when (fmap not v)
switch :: Value e Bool ->
BTM e (Value e a) ->
BTM e (Value e a)
switch _ (BTReturn a) = return a
switch v action = BTM (staticResult action) (workingSet action) $
\x ->
do is_active <- readIORef $ is_active_branch x
let active_section = if is_active then fromValue v else False
writeIORef (is_active_branch x) $ active_section
a <- operation action x
writeIORef (is_active_branch x) is_active
return a
writeBVar :: BVar a -> Value e a -> BTM e (Value e ())
writeBVar bv@(BVar _ _ ref _) (Value i) = bv `seq` BTM
(Value ()) (Anon bv:) $
\x ->
do is_active <- readIORef $ is_active_branch x
modifyIORef ref $ if is_active then const i else id
return $ Value ()
readBVar :: BVar a -> BTM e (Value e a)
readBVar bv@(BVar _ _ ref _) = bv `seq` BTM
invalid_value (Anon bv:) $
\_ -> liftM Value $ readIORef ref
retry :: BTM e (Value e ())
retry = BTM (Value ()) id $
\x ->
do is_active <- readIORef $ is_active_branch x
modifyIORef (should_retry x) $ (||) is_active
return $ Value ()
runBTM :: (forall e. BTM e (Value e a)) -> IO a
runBTM action = runBlockingTransaction action
runBlockingTransaction :: forall a. BTM () (Value () a) -> IO a
runBlockingTransaction bm =
do
let working_set = Set.toAscList $ Set.fromList $ workingSet bm []
should_retry_var <- newIORef False
is_active_var <- newIORef True
restore <- openVariables working_set
(Value pre_result) <- operation bm (BlockingStatus should_retry_var is_active_var)
retry_signal <- newEmptyMVar
b <- readIORef should_retry_var
restore retry_signal b
closeVariables b working_set
result <- if b then
do takeMVar retry_signal
runBlockingTransaction bm
else return pre_result
return result
openVariables :: [AnonymousVar] -> IO (MVar () -> Bool -> IO ())
openVariables s =
liftM (\as retry_flag b -> sequence_ $ map (\f -> f retry_flag b) as) $
forM s $ \(Anon (BVar _ m ref retry_signal_list)) ->
do takeMVar m
a <- readIORef ref
return $ \retry_flag b ->
do modifyIORef ref $ if b then (const a) else id
modifyIORef retry_signal_list $
if b then (retry_flag:) else id
closeVariables :: Bool -> [AnonymousVar] -> IO ()
closeVariables b s =
do retry_signals <- liftM concat $ forM s $
\(Anon bv) ->
do result <- readIORef (bvar_retry_list bv)
modifyIORef (bvar_retry_list bv) $ if b then id else (const [])
a <- readIORef (bvar_data bv)
a `par` return ()
_ <- tryPutMVar (bvar_lock bv) ()
return result
if (not b) then forM_ retry_signals $ \m ->
tryPutMVar m ()
else return ()
newtype BTA a b = BTA ((Value () a) -> BTM () (Value () b))
runBTA :: BTA a b -> a -> IO b
runBTA (BTA action) a = runBlockingTransaction (action (Value a))
instance Functor (BTA a) where
fmap f = (<<<) (arr f)
instance Applicative (BTA a) where
pure a = arr (const a)
a <*> b = proc i ->
do a' <- a -< i
b' <- b -< i
returnA -< a' b'
instance Category BTA where
(BTA a) . (BTA b) = BTA $ \i ->
do x <- b i
a x
id = BTA return
instance Arrow BTA where
arr f = BTA $ return . fmap f
first (BTA action) = BTA $ \ab ->
do a <- action $ fmap fst ab
return $
do ab' <- ab
a' <- a
return $ first (const a') ab'
second (BTA action) = BTA $ \ab ->
do b <- action $ fmap snd ab
return $
do ab' <- ab
b' <- b
return $ second (const b') ab'
instance ArrowChoice BTA where
left (BTA a) = BTA $ \i ->
do result <- switch (fmap (either (const True) (const False)) i) $
a $ fmap (either id (const $ fromValue invalid_value)) i
return $ fmap (either (Left . const (fromValue result)) Right) i
storeBVar :: BVar a -> BTA a ()
storeBVar bv = BTA $ writeBVar bv
fetchBVar :: BVar a -> BTA () a
fetchBVar bv = BTA $ const $ readBVar bv
retryWhen :: BTA Bool ()
retryWhen = BTA $ \b -> when b retry
retryUnless :: BTA Bool ()
retryUnless = BTA $ \b -> unless b retry