-- | This "Future" module was written by Chris Kuklewicz to see if he understood the design and -- utility of the new C++ standard's future. In particular the ability to cleanly access either a -- resulting value or exception. -- -- There a methods to poll (with 'check'), to block (with 'wait' or 'timedWait'), and to block and -- retrieve the actual value or rethrow the exception in the accessing thread (with 'get' or -- 'timedGet'). Timeouts are in micro seconds, values less than or equal to zero use non-blocking -- 'check'. The timeout should be detected reagarless of the blocking or FFI state of the worker -- thread. -- -- On top of 'forkPromise' is 'forkPromises', 'racePromises', and 'declarePromise'. -- -- One can also manage the threadBy calling 'abort', which may cause the promise to store the -- exception from the abort as well as killing the worker thread. The worker thread Id is a -- secret, this is needed to ensure the running of the continuations. The 'abort' operation has the -- same synchronous behavior as 'killThread'. -- -- Note: There is no way for an outside thread to directly set the value of the promise to a -- non-exception value. Using 'abort' (or 'throwTo' with 'getPromiseThreadId') creates a race -- condition in setting the result of the promise. There is no way to change the result of promise -- once it has been set. -- -- The extension to the C++ standard is in the continuation attachment. The 'addTodo' command -- will, while the worker is running, add the 'todo' continuation to an internal list. Immediately -- upon finishing the action the worker thread will always run through the queued continuations. -- Each 'todo' will be run in its own forkIO thread (unblocked). If the 'addTodo' command is -- issued after the promise value has been set then it simplify runs the 'todo' in a new thread. -- Thus there is no way multiple continuations can interfere with each other, and there are no -- ordering guarantees between them. The 'todo' action will not be able to distinguish whether it -- is being run from the stored queue or immediately. -- -- The use of 'block' and 'finally' should ensure that no matter how the worker ends the stored -- continations are run. For instance: if 'abort' is used then the continations might be run with -- that thread killing exception or with the custom "Promise.abort" exception if no other result is -- already present. -- -- One use case for 'addTodo' is to allow multiplexing. Several promises could be given a -- continuation to write the results to an MChan or MVar, allowing another process to block waiting -- for the first one to finish. module Control.Concurrent.Future (Promise,PromiseResult ,forkPromise,declarePromise,forkPromises,racePromises ,check,wait,get,timedWait,timedGet ,abort,addTodo) where import Prelude hiding (catch) import Control.Concurrent(forkIO,killThread,ThreadId,myThreadId) import Control.Concurrent.MVar import Control.Concurrent.Chan import Control.Monad(join,forM_,(>=>)) import Control.Exception(block,unblock,throw,try,finally,toException,onException,SomeException,Exception) import Data.IORef import System.Timeout(timeout) import Data.Monoid(Endo(..),mempty,mappend) type PromiseResult a = Either SomeException a data Promise a = Promise { p_tid :: {-# UNPACK #-} !(MVar (Maybe ThreadId)) , p_write :: {-# UNPACK #-} !(MVar ()) , p_notify :: {-# UNPACK #-} !(MVar ()) , p_result :: {-# UNPACK #-} !(IORef (Maybe (PromiseResult a))) , p_todo :: {-# UNPACK #-} !(MVar (Endo [PromiseResult a -> IO ()])) } instance Eq (Promise a) where a == b = p_tid a == p_tid b instance Show (Promise a) where showsPrec _ p = shows "" -- internal routine, consumes the p_write token once, when consumed then sets the result once. -- always returns the value in the result. setPromise :: Promise a -> PromiseResult a -> IO (PromiseResult a) setPromise p new = block $ do mayWrite <- tryTakeMVar (p_write p) case mayWrite of Nothing -> wait p -- should not block for long Just {} -> do writeIORef (p_result p) (Just new) putMVar (p_notify p) () return new -- internal routine, takes the result and a continuation to run on it launchWith :: PromiseResult a -> (PromiseResult a -> IO ()) -> IO ThreadId launchWith val todo = forkIO . unblock . todo $ val -- | forkPromise take an action to run, and runs it in a new thread. This is run in an "unblock" -- context. If the action succeeds it will store its result as (Right {}). If the action throws -- an exception, or the forkPromise :: IO a -> IO (Promise a) forkPromise act = do msTid <- newEmptyMVar write <- newMVar () -- unique token, to be taken precisely once notify <- newEmptyMVar result <- newIORef Nothing msTodo <- newMVar mempty let p = Promise msTid write notify result msTodo let performAct = do putMVar msTid . Just =<< myThreadId val <- setPromise p =<< try (unblock act) case val of Left err -> throw err Right {} -> return () processTodo = block $ do modifyMVar_ msTid (const (return Nothing)) -- disable 'abort' let paranoid = Left (toException (userError "Promise.forkPromise.processTodo")) val <- setPromise p paranoid -- ensure promise is set -- addTodo will no longer alter msTodo sTodo <- swapMVar msTodo mempty mapM_ (launchWith val) (appEndo sTodo []) -- block is used to ensure the finally gets setup, and performAct sets msTid block $ forkIO (finally performAct processTodo) return p -- | forkPromises is build on top of forkPromise. It converts a list of actions into a list of -- promises, and additionally collects the results, in completion order, into the returned Chan. forkPromises :: [IO a] -> IO ([Promise a],Chan (PromiseResult a)) forkPromises acts = do c <- newChan ps <- mapM forkPromise acts forM_ ps $ flip addTodo (writeChan c) return (ps,c) -- | racePromises is build on top of forkPromise. It runs a list of actions as promises and waits -- for the first result (which may be an exception). Once the result is found it asynchronously -- kills the threads. racePromises :: [IO a] -> IO (PromiseResult a) racePromises acts = do v <- newEmptyMVar let withPromise p = addTodo p (\ r -> tryPutMVar v r >> return ()) ps <- mapM forkPromise acts mapM_ withPromise ps a <- takeMVar v mapM_ (forkIO . abort) ps return a -- | declarePromise is built on top of forkPromise. It creates a promise and an function to fulfill -- the promise with an action. The first time the fulfull function is used it gives the action to -- the promise and returns True. All additional usages of the fulfill function will do nothing and -- return False. Note that the Promise may be aborted before the fulfill function is used, and in -- this case the fulfill function will appear to succeed but achieve nothing. declarePromise :: IO (Promise a, IO a -> IO Bool) declarePromise = do let fulfill takeOnce = \ act -> block $ do maybeUseOnce <- tryTakeMVar takeOnce case maybeUseOnce of Nothing -> return False Just useOnce -> putMVar useOnce act >> return True useOnce <- newEmptyMVar takeOnce <- newMVar useOnce promise <- forkPromise (join (takeMVar useOnce)) return (promise,fulfill takeOnce) -- | 'check' is a non-blocking read. Like 'timedWait' with 0 delay. check :: Promise a -> IO (Maybe (PromiseResult a)) check p = readIORef (p_result p) -- | 'wait' is a blocking read. wait :: Promise a -> IO (PromiseResult a) wait p = withMVar (p_notify p) $ \ _ -> do mr <- readIORef (p_result p) case mr of Nothing -> error "Control.Monad.Future.wait with result of Nothing is impossible" Just r -> return r -- | 'timedWait' with a positive value in micro seconds is a blocking read with timeout. timedWait :: Int -> Promise a -> IO (Maybe (PromiseResult a)) timedWait microSeconds | microSeconds <= 0 = check | otherwise = timeout microSeconds . wait -- | 'get' is wait which rethrows a SomeException in the calling thread get :: Promise a -> IO a get p = wait p >>= either throw return -- | 'timedGet' is a 'timedWait' which rethrows a SomeException in the calling thread timedGet :: Int -> Promise a -> IO (Maybe a) timedGet microSeconds p = timedWait microSeconds p >>= maybe (return Nothing) (either throw (return . Just)) -- | If the abort occurs before the action has stored a result then the result is set to an -- exception. The first call to abort gets the threadId and performs the, possibly blocking, -- killThread. If it completes then the ThreadId is forgotten (so the thread can be garbage -- collected). abort :: Promise t -> IO () abort p = block $ do setPromise p (Left (toException (userError "Promise.abort"))) modifyMVar_ (p_tid p) $ \ mTid -> do case mTid of Nothing -> return () Just tid -> killThread tid return Nothing -- | Post an action to perform in a new thread with the reasult of the promise. All are run -- unblocked in a fresh thread. addTodo :: Promise a -> (PromiseResult a -> IO ()) -> IO () addTodo p todo = do ma <- modifyMVar (p_todo p) $ \ sTodo -> do ma <- check p case ma of Nothing -> return (sTodo `mappend` Endo (todo:),ma) Just {} -> return (sTodo,ma) case ma of Nothing -> return () Just val -> launchWith val todo >> return ()