-- | This "Future" module was written by Chris Kuklewicz to see if he understood the design and -- utility of the new C++ standard's future. In particular the ability to cleanly access either a -- resulting value or exception. -- -- There a methods to poll (with 'check'), to block (with 'wait' or 'timedWait'), and to block and -- retrieve the actual value or rethrow the exception in the accessing thread (with 'get' or -- 'timedGet'). Timeouts are in micro seconds, values less than or equal to zero use non-blocking -- 'check'. The timeout should be detected reagarless of the blocking or FFI state of the worker -- thread. -- -- One can also manage the threadBy calling 'abort', which may cause the promise to store the -- exception from the abort as well as killing the worker thread. The worker thread Id is a -- secret, this is needed to ensure the running of the continuations. The 'abort' operation has the -- same synchronous behavior as 'killThread'. -- -- Note: There is no way for an outside thread to directly set the value of the promise to a -- non-exception value. Using 'abort' (or 'throwTo' with 'getPromiseThreadId') creates a race -- condition in setting the result of the promise. There is no way to change the result of promise -- once it has been set. -- -- The extension to the C++ standard is in the continuation attachment. The 'addTodo' command -- will, while the worker is running, add the 'todo' continuation to an internal list. Immediately -- upon finishing the action the worker thread will always run through the queued continuations. -- Each 'todo' will be run in its own forkIO thread (unblocked). If the 'addTodo' command is -- issued after the promise value has been set then it simplify runs the 'todo' in a new thread. -- Thus there is no way multiple continuations can interfere with each other, and there are no -- ordering guarantees between them. The 'todo' action will not be able to distinguish whether it -- is being run from the stored queue or immediately. -- -- The use of 'block' and 'finally' should ensure that no matter how the worker ends the stored -- continations are run. For instnace: if 'abort' is used then the continations might be run with -- that thread killing exception or with the custom "Promise.abort" exception if no other result is -- already present. -- -- One use case for 'addTodo' is to allow multiplexing. Several promises could be given a -- continuation to write the results to an MChan or MVar, allowing another process to block waiting -- for the first one to finish. module Control.Concurrent.Future (Promise,PromiseResult ,forkPromise,forkPromises,racePromises ,check,wait,get,timedWait,timedGet ,abort,addTodo) where import Prelude hiding (catch) import Control.Concurrent(forkIO,killThread,ThreadId) import Control.Concurrent.MVar import Control.Concurrent.Chan import Control.Monad(forM_,(>=>)) import Control.Exception(block,unblock,throw,try,finally,toException,SomeException,Exception) import System.Timeout(timeout) import Data.Monoid(Endo(..),mempty,mappend) type PromiseResult a = Either SomeException a data Promise a = Promise { p_tid :: {-# UNPACK #-} !ThreadId , p_write :: {-# UNPACK #-} !(MVar ()) , p_result :: {-# UNPACK #-} !(MVar (PromiseResult a)) , p_todo :: {-# UNPACK #-} !(MVar (Endo [PromiseResult a -> IO ()])) } instance Eq (Promise a) where a == b = p_tid a == p_tid b instance Ord (Promise a) where compare a b = compare (p_tid a) (p_tid b) instance Show (Promise a) where showsPrec _ p = shows "(Promise " . shows (p_tid p) . shows ')' -- internal routine, consumes the p_write token once, when consumed then sets the result once. -- always returns the value in the result. setPromise :: MVar () -> MVar (PromiseResult a) -> PromiseResult a -> IO (PromiseResult a) setPromise write result new = block $ do mayWrite <- tryTakeMVar write case mayWrite of Nothing -> readMVar result Just {} -> putMVar result new >> return new -- internal routine, takes the result and a continuation to run on it launchWith :: PromiseResult a -> (PromiseResult a -> IO ()) -> IO ThreadId launchWith val todo = forkIO . unblock . todo $ val forkPromises :: [IO a] -> IO ([Promise a],Chan (PromiseResult a)) forkPromises acts = do c <- newChan ps <- mapM forkPromise acts forM_ ps $ flip addTodo (writeChan c) return (ps,c) racePromises :: [IO a] -> IO (PromiseResult a) racePromises acts = do v <- newEmptyMVar let withPromise p = addTodo p (\ r -> tryPutMVar v r >> return ()) ps <- mapM forkPromise acts mapM_ withPromise ps a <- takeMVar v mapM_ abort ps return a -- | forkPromise take an action to run, and runs it in a new thread. This is run in an "unblock" -- context. If the action succeeds it will store its result as (Right {}). If the action throws -- an exception, or the forkPromise :: IO a -> IO (Promise a) forkPromise act = do write <- newMVar () -- unique token, to be taken precisely once result <- newEmptyMVar msTodo <- newMVar mempty let performAct = do val <- setPromise write result =<< try (unblock act) case val of Left err -> throw err Right {} -> return () processTodo = modifyMVar_ msTodo $ \ sTodo -> do let paranoid = Left (toException (userError "Promise.forkPromise.processTodo")) val <- setPromise write result paranoid mapM_ (launchWith val) (appEndo sTodo []) return mempty -- block is used to ensure the finally gets setup. tid <- block $ forkIO (finally performAct processTodo) return (Promise tid write result msTodo) -- | 'check' is a non-blocking read. Like 'timedWait' with 0 delay. check :: Promise a -> IO (Maybe (PromiseResult a)) check p = block $ do ma <- tryTakeMVar (p_result p) case ma of Nothing -> return () Just val -> putMVar (p_result p) val return ma -- | 'wait' is a blocking read. wait :: Promise a -> IO (PromiseResult a) wait p = readMVar (p_result p) -- | 'timedWait' with a positive value in micro seconds is a blocking read with timeout. timedWait :: Int -> Promise a -> IO (Maybe (PromiseResult a)) timedWait microSeconds | microSeconds <= 0 = check | otherwise = timeout microSeconds . wait -- | 'get' is wait which rethrows a SomeException in the calling thread get :: Promise a -> IO a get p = wait p >>= either throw return -- | 'timedGet' is a 'timedWait' which rethrows a SomeException in the calling thread timedGet :: Int -> Promise a -> IO (Maybe a) timedGet microSeconds p = timedWait microSeconds p >>= maybe (return Nothing) (either throw (return . Just)) -- | If the abort occurs before the act has stored a result then the result is set to (userError -- "Promise.abort" :: IOError), or the killThread exception. abort :: Promise t -> IO () abort p = block $ do setPromise (p_write p) (p_result p) (Left (toException (userError "Promise.abort"))) -- PARANOID: The killThread is done while holding msTodo, thus it cannot interfere with -- processTodo's launching of the continuation threads. withMVar (p_todo p) $ \ _ -> killThread (p_tid p) -- | Post an action to perform in a new thread with the reasult of the promise. All are -- run unblocked in a fresh thread. addTodo :: Promise a -> (PromiseResult a -> IO ()) -> IO () addTodo p todo = do ma <- modifyMVar (p_todo p) $ \ sTodo -> do ma <- check p case ma of Nothing -> return (sTodo `mappend` Endo (todo:),ma) Just {} -> return (sTodo,ma) case ma of Nothing -> return () Just val -> launchWith val todo >> return ()