{-# LANGUAGE Arrows, GADTs, ExistentialQuantification, Rank2Types #-} {-# OPTIONS_GHC -Wall #-} -- | Composable blocking transactions, based on the -- blog post: -- module BlockingTransactions.BlockingTransactions ( -- * Transactional Variables BVar, newBVar, peekBVar, pokeBVar, modifyBVar, -- * Blocking Transaction Monad runBTM, BTM, Value, when, unless, readBVar, writeBVar, retry, -- * Blocking Transaction Arrow runBTA, BTA, fetchBVar, storeBVar, retryWhen, retryUnless) where import Prelude hiding ((.),id) import Control.Category import Control.Arrow import Control.Applicative hiding (empty) import System.IO.Unsafe import Control.Monad hiding (when,unless) import Control.Concurrent import Control.Parallel import Data.IORef import qualified Data.Set as Set -- | A transactional variable with a blocking implementation. data BVar a = BVar { bvar_index :: Integer, -- ^ The unique value for ordering purposes. bvar_lock :: (MVar ()), -- ^ The mutex. bvar_data :: (IORef a), -- ^ User data. bvar_retry_list :: (IORef [MVar ()]) -- ^ Single-use modification listeners. Whenever a variable is modified, -- these will all be written to and then discarded. } -- | An untyped BVar. data AnonymousVar where Anon :: BVar a -> AnonymousVar instance Eq (BVar a) where a == b = bvar_index a == bvar_index b instance Ord (BVar a) where BVar a _ _ _ `compare` BVar b _ _ _ = a `compare` b BVar a _ _ _ > BVar b _ _ _ = a > b BVar a _ _ _ >= BVar b _ _ _ = a >= b BVar a _ _ _ < BVar b _ _ _ = a > b BVar a _ _ _ <= BVar b _ _ _ = a <= b instance Eq AnonymousVar where Anon (BVar a _ _ _) == Anon (BVar b _ _ _) = a == b instance Ord AnonymousVar where Anon (BVar a _ _ _) `compare` Anon (BVar b _ _ _) = a `compare` b Anon (BVar a _ _ _) > Anon (BVar b _ _ _) = a > b Anon (BVar a _ _ _) >= Anon (BVar b _ _ _) = a >= b Anon (BVar a _ _ _) < Anon (BVar b _ _ _) = a < b Anon (BVar a _ _ _) <= Anon (BVar b _ _ _) = a <= b {-# NOINLINE unique_source #-} unique_source :: IORef Integer unique_source = unsafePerformIO $ newIORef 0 -- | Construct a new transactional variable. newBVar :: a -> IO (BVar a) newBVar a = BVar <$> atomicModifyIORef unique_source (\n -> (succ n,n)) <*> newMVar () <*> newIORef a <*> newIORef [] -- | Observe the contents of a transactional variable. peekBVar :: BVar a -> IO a peekBVar bvar = do takeMVar $ bvar_lock bvar result <- readIORef $ bvar_data bvar putMVar (bvar_lock bvar) () return result -- | One-off write to a transactional variable. pokeBVar :: BVar a -> a -> IO () pokeBVar bvar value = do takeMVar $ bvar_lock bvar writeIORef (bvar_data bvar) value signal_list <- readIORef (bvar_retry_list bvar) writeIORef (bvar_retry_list bvar) [] putMVar (bvar_lock bvar) () forM_ signal_list $ \v -> tryPutMVar v () -- | Perform a transaction using only a single variable. modifyBVar :: BVar a -> (a -> (a,b)) -> IO b modifyBVar bvar f = do takeMVar $ bvar_lock bvar a <- readIORef (bvar_data bvar) let ab = f a writeIORef (bvar_data bvar) $ fst ab signal_list <- readIORef (bvar_retry_list bvar) putMVar (bvar_lock bvar) () forM_ signal_list $ \v -> tryPutMVar v () return $ snd ab {------------------------------------------------------------------------------} -- Monadic Interface {------------------------------------------------------------------------------} -- | State during the progress of a black transaction. data BlockingStatus = BlockingStatus { should_retry :: IORef Bool, -- ^ Set to true if the user decides to retry a transaction. -- Initially this value is false. is_active_branch :: IORef Bool -- ^ Set to indicate conditional sections. When false, all -- writes are supressed. } -- | An opaque value. It can be modified and combined with other opaque values, -- but not observed. -- -- The type variable @e@ binds the value to the monadic context in which it -- occurs, (this is identical to the @runST@ existential type trick). data Value e a = Value { fromValue :: a } -- | The blocking transaction monad. data BTM e a = BTReturn a -- Consists of: a static return value, a constructor for the working set of -- transactional variables, and the transactional behavior. | BTM a ([AnonymousVar] -> [AnonymousVar]) (BlockingStatus -> IO a) -- | Unsafe: get the result of a transaction without running it. -- (Doesn't preserve type system properties of BTM, and any values -- returned will be internall undefined.) staticResult :: BTM e a -> a staticResult (BTReturn a) = a staticResult (BTM a _ _) = a -- | Unsafe: get the transactional behavior. -- (Doesn't preserve type system properties of BTM.) operation :: BTM e a -> BlockingStatus -> IO a operation (BTReturn a) _ = return a operation (BTM _ _ op) x = op x -- | Construct the working set of a transaction. workingSet :: BTM e a -> [AnonymousVar] -> [AnonymousVar] workingSet (BTReturn _) = id workingSet (BTM _ working_set _) = working_set instance Functor (BTM e) where fmap f = liftM f instance Functor (Value e) where fmap f (Value a) = Value $ f a instance Applicative (BTM e) where pure = return a <*> b = do a' <- a b' <- b return $ a' b' instance Applicative (Value e) where pure x = Value x Value a <*> Value b = Value $ a b instance Monad (BTM e) where return = BTReturn (BTReturn k) >>= m = m k -- The key here is that we bind the static result of the previous -- operation to the static result of the subsequent operation, and -- the monadic IO result to the subsequent monadic IO result. -- -- For example, if we read a variable that contains the number 23, -- the static result will be $Value undefined$ while the monadic -- result will be $Value 23$. -- -- The monadic result is unobservable without running the transaction -- but the static result, wrapped in a 'Value', is simply unobservable. -- -- However, whenever we return a constant value, which is unwrapped, -- the value is observable. k >>= m = case m $ staticResult k of BTReturn j -> BTM j (workingSet k) $ \x -> do k' <- operation k x operation (m k') x BTM j ws _ -> BTM j (ws . workingSet k) $ \x -> do k' <- operation k x operation (m k') x instance Monad (Value e) where return = Value (Value k) >>= m = m k invalid_value :: Value e a invalid_value = Value $ error "BlockingTransaction (Value): Inaccessable value." -- | Flow control. Skip the critical section if the predicate is false. when :: Value e Bool -> BTM e (Value e ()) -> BTM e (Value e ()) when b a = switch b a >> return (Value ()) -- | Flow control. Skip the critical section if the predicate is true. unless :: Value e Bool -> BTM e (Value e ()) -> BTM e (Value e ()) unless v = when (fmap not v) -- | Flow control for the BTM monad. If the predicate is false, -- then all writes during the critical section are suppressed. -- Since reads always happen, the return value is available -- even in a suppressed branch. switch :: Value e Bool -> BTM e (Value e a) -> BTM e (Value e a) switch _ (BTReturn a) = return a switch v action = BTM (staticResult action) (workingSet action) $ \x -> do is_active <- readIORef $ is_active_branch x let active_section = if is_active then fromValue v else False writeIORef (is_active_branch x) $ active_section a <- operation action x writeIORef (is_active_branch x) is_active return a -- | Write to a variable. writeBVar :: BVar a -> Value e a -> BTM e (Value e ()) writeBVar bv@(BVar _ _ ref _) (Value i) = bv `seq` BTM (Value ()) (Anon bv:) $ \x -> do is_active <- readIORef $ is_active_branch x modifyIORef ref $ if is_active then const i else id return $ Value () -- | Read from a variable. {-# NOINLINE readBVar #-} readBVar :: BVar a -> BTM e (Value e a) readBVar bv@(BVar _ _ ref _) = bv `seq` BTM invalid_value (Anon bv:) $ \_ -> liftM Value $ readIORef ref -- | Electively retry. This will restore all variables to their -- state before the transaction began, and listen for a change -- to any variable in the working set before trying the -- transaction again. retry :: BTM e (Value e ()) retry = BTM (Value ()) id $ \x -> do is_active <- readIORef $ is_active_branch x modifyIORef (should_retry x) $ (||) is_active return $ Value () -- | Commit a blocking transaction. runBTM :: (forall e. BTM e (Value e a)) -> IO a runBTM action = runBlockingTransaction action {------------------------------------------------------------------------------} -- Evaluation -- {------------------------------------------------------------------------------} -- | Run a blocking transaction without type system tricks. runBlockingTransaction :: forall a. BTM () (Value () a) -> IO a runBlockingTransaction bm = do -- construct the working set let working_set = Set.toAscList $ Set.fromList $ workingSet bm [] -- create state variables should_retry_var <- newIORef False is_active_var <- newIORef True -- acquire locks on the working set restore <- openVariables working_set -- perform the transaction (Value pre_result) <- operation bm (BlockingStatus should_retry_var is_active_var) -- in the event of a retry, store a signal in the working set, -- so that we can wait for modification. retry_signal <- newEmptyMVar b <- readIORef should_retry_var restore retry_signal b -- release the locks on the working set closeVariables b working_set -- this is the first place where we actually force a result of -- the transaction: conditionally wait on the retry signal and re-run the -- the transaction. result <- if b then do takeMVar retry_signal runBlockingTransaction bm else return pre_result return result -- | Acquire locks on the working set: returns an operation to restore the -- working set to its initial state. The restoration function -- requires a retry signal, and a value indicating whether -- or not the user elected to retry. (If that value is false, -- the entire restoration function is suppressed.) openVariables :: [AnonymousVar] -> IO (MVar () -> Bool -> IO ()) openVariables s = liftM (\as retry_flag b -> sequence_ $ map (\f -> f retry_flag b) as) $ forM s $ \(Anon (BVar _ m ref retry_signal_list)) -> do takeMVar m a <- readIORef ref return $ \retry_flag b -> do modifyIORef ref $ if b then (const a) else id modifyIORef retry_signal_list $ if b then (retry_flag:) else id -- | Release locks on the working set. closeVariables :: Bool -> [AnonymousVar] -> IO () closeVariables b s = do retry_signals <- liftM concat $ forM s $ \(Anon bv) -> do result <- readIORef (bvar_retry_list bv) modifyIORef (bvar_retry_list bv) $ if b then id else (const []) a <- readIORef (bvar_data bv) a `par` return () _ <- tryPutMVar (bvar_lock bv) () return result if (not b) then forM_ retry_signals $ \m -> tryPutMVar m () else return () {------------------------------------------------------------------------------} -- Arrow Interface -- {------------------------------------------------------------------------------} -- | The blocking transaction arrow. The semantics are -- identical to the equivalent operations on the -- monadic interface. newtype BTA a b = BTA ((Value () a) -> BTM () (Value () b)) runBTA :: BTA a b -> a -> IO b runBTA (BTA action) a = runBlockingTransaction (action (Value a)) instance Functor (BTA a) where fmap f = (<<<) (arr f) instance Applicative (BTA a) where pure a = arr (const a) a <*> b = proc i -> do a' <- a -< i b' <- b -< i returnA -< a' b' instance Category BTA where (BTA a) . (BTA b) = BTA $ \i -> do x <- b i a x id = BTA return instance Arrow BTA where arr f = BTA $ return . fmap f first (BTA action) = BTA $ \ab -> do a <- action $ fmap fst ab return $ do ab' <- ab a' <- a return $ first (const a') ab' second (BTA action) = BTA $ \ab -> do b <- action $ fmap snd ab return $ do ab' <- ab b' <- b return $ second (const b') ab' instance ArrowChoice BTA where left (BTA a) = BTA $ \i -> do result <- switch (fmap (either (const True) (const False)) i) $ a $ fmap (either id (const $ fromValue invalid_value)) i return $ fmap (either (Left . const (fromValue result)) Right) i -- | As 'readBVar'. storeBVar :: BVar a -> BTA a () storeBVar bv = BTA $ writeBVar bv -- | As 'writeBVar'. fetchBVar :: BVar a -> BTA () a fetchBVar bv = BTA $ const $ readBVar bv -- | As 'retry'. retryWhen :: BTA Bool () retryWhen = BTA $ \b -> when b retry retryUnless :: BTA Bool () retryUnless = BTA $ \b -> unless b retry