-- | This "Future" module was written by Chris Kuklewicz to see if he understood the design and
-- utility of the new C++ standard's future.  In particular the ability to cleanly access either a
-- resulting value or exception.
-- There a methods to poll (with 'check'), to block (with 'wait' or 'timedWait'), and to block and
-- retrieve the actual value or rethrow the exception in the accessing thread (with 'get' or
-- 'timedGet').  Timeouts are in micro seconds, values less than or equal to zero use non-blocking
-- 'check'.  The timeout should be detected reagarless of the blocking or FFI state of the worker
-- thread.
-- One can also manage the threadBy calling 'abort', which may cause the promise to store the
-- exception from the abort as well as killing the worker thread.  The worker thread Id is a
-- secret, this is needed to ensure the running of the continuations. The 'abort' operation has the
-- same synchronous behavior as 'killThread'.
-- Note: There is no way for an outside thread to directly set the value of the promise to a
-- non-exception value.  Using 'abort' (or 'throwTo' with 'getPromiseThreadId') creates a race
-- condition in setting the result of the promise.  There is no way to change the result of promise
-- once it has been set.
-- The extension to the C++ standard is in the continuation attachment.  The 'addTodo' command
-- will, while the worker is running, add the 'todo' continuation to an internal list.  Immediately
-- upon finishing the action the worker thread will always run through the queued continuations.
-- Each 'todo' will be run in its own forkIO thread (unblocked).  If the 'addTodo' command is
-- issued after the promise value has been set then it simplify runs the 'todo' in a new thread.
-- Thus there is no way multiple continuations can interfere with each other, and there are no
-- ordering guarantees between them.  The 'todo' action will not be able to distinguish whether it
-- is being run from the stored queue or immediately.
-- The use of 'block' and 'finally' should ensure that no matter how the worker ends the stored
-- continations are run.  For instnace: if 'abort' is used then the continations might be run with
-- that thread killing exception or with the custom "Promise.abort" exception if no other result is
-- already present.
-- One use case for 'addTodo' is to allow multiplexing.  Several promises could be given a
-- continuation to write the results to an MChan or MVar, allowing another process to block waiting
-- for the first one to finish.

module Control.Concurrent.Future
  ,abort,addTodo) where

import Prelude hiding (catch)
import Control.Concurrent(forkIO,killThread,ThreadId)
import Control.Concurrent.MVar
import Control.Concurrent.Chan
import Control.Monad(forM_,(>=>))
import Control.Exception(block,unblock,throw,try,finally,toException,SomeException,Exception)
import System.Timeout(timeout)
import Data.Monoid(Endo(..),mempty,mappend)

type PromiseResult a = Either SomeException a

data Promise a = Promise { p_tid    :: {-# UNPACK #-} !ThreadId
                         , p_write  :: {-# UNPACK #-} !(MVar ())
                         , p_result :: {-# UNPACK #-} !(MVar (PromiseResult a))
                         , p_todo   :: {-# UNPACK #-} !(MVar (Endo [PromiseResult a -> IO ()]))

instance Eq (Promise a) where
  a == b = p_tid a == p_tid b

instance Ord (Promise a) where
  compare a b = compare (p_tid a) (p_tid b)

instance Show (Promise a) where
  showsPrec _ p = shows "(Promise " . shows (p_tid p) . shows ')'

-- internal routine, consumes the p_write token once, when consumed then sets the result once.
-- always returns the value in the result.
setPromise :: MVar () -> MVar (PromiseResult a) -> PromiseResult a -> IO (PromiseResult a)
setPromise write result new = block $ do
  mayWrite <- tryTakeMVar write
  case mayWrite of
    Nothing -> readMVar result
    Just {} -> putMVar result new >> return new

-- internal routine, takes the result and a continuation to run on it
launchWith :: PromiseResult a -> (PromiseResult a -> IO ()) -> IO ThreadId
launchWith val todo = forkIO . unblock . todo $ val

forkPromises :: [IO a] -> IO ([Promise a],Chan (PromiseResult a))
forkPromises acts = do
  c <- newChan
  ps <- mapM forkPromise acts
  forM_ ps $ flip addTodo (writeChan c)
  return (ps,c)

racePromises :: [IO a] -> IO (PromiseResult a)
racePromises acts = do
  v <- newEmptyMVar
  let withPromise p = addTodo p (\ r -> tryPutMVar v r >> return ()) 
  ps <- mapM forkPromise acts
  mapM_ withPromise ps
  a <- takeMVar v
  mapM_ (forkIO . abort) ps
  return a

-- | forkPromise take an action to run, and runs it in a new thread.  This is run in an "unblock"
-- context.  If the action succeeds it will store its result as (Right {}).  If the action throws
-- an exception, or the
forkPromise :: IO a -> IO (Promise a)
forkPromise act = do
  write <- newMVar () -- unique token, to be taken precisely once
  result <- newEmptyMVar
  msTodo <- newMVar mempty
  let performAct = do
        val <- setPromise write result =<< try (unblock act)
        case val of Left err -> throw err
                    Right {} -> return ()
      processTodo = modifyMVar_ msTodo $ \ sTodo -> do
        let paranoid = Left (toException (userError "Promise.forkPromise.processTodo"))
        val <- setPromise write result paranoid
        mapM_ (launchWith val) (appEndo sTodo [])
        return mempty
  -- block is used to ensure the finally gets setup.
  tid <- block $ forkIO (finally performAct processTodo)
  return (Promise tid write result msTodo)

-- | 'check' is a non-blocking read.  Like 'timedWait' with 0 delay.
check :: Promise a -> IO (Maybe (PromiseResult a))
check p = block $ do
  ma <- tryTakeMVar (p_result p)
  case ma of Nothing -> return ()
             Just val -> putMVar (p_result p) val
  return ma

-- | 'wait' is a blocking read.
wait :: Promise a -> IO (PromiseResult a)
wait p = readMVar (p_result p)

-- | 'timedWait' with a positive value in micro seconds is a blocking read with timeout.
timedWait :: Int -> Promise a -> IO (Maybe (PromiseResult a))
timedWait microSeconds | microSeconds <= 0 = check
                       | otherwise = timeout microSeconds . wait

-- | 'get' is wait which rethrows a SomeException in the calling thread
get :: Promise a -> IO a
get p = wait p >>= either throw return

-- | 'timedGet' is a 'timedWait' which rethrows a SomeException in the calling thread
timedGet :: Int -> Promise a -> IO (Maybe a)
timedGet microSeconds p = timedWait microSeconds p >>= maybe (return Nothing) (either throw (return . Just))

-- | If the abort occurs before the act has stored a result then the result is set to (userError
-- "Promise.abort" :: IOError), or the killThread exception.
abort :: Promise t -> IO ()
abort p = block $ do
  setPromise (p_write p) (p_result p) (Left (toException (userError "Promise.abort")))
  -- PARANOID: The killThread is done while holding msTodo, thus it cannot interfere with
  -- processTodo's launching of the continuation threads.
  withMVar (p_todo p) $ \ _ -> killThread (p_tid p)

-- | Post an action to perform in a new thread with the reasult of the promise.  All are
-- run unblocked in a fresh thread.
addTodo :: Promise a -> (PromiseResult a -> IO ()) -> IO ()
addTodo p todo = do
  ma <- modifyMVar (p_todo p) $ \ sTodo -> do
           ma <- check p
           case ma of Nothing -> return (sTodo `mappend` Endo (todo:),ma)
                      Just {} -> return (sTodo,ma)
  case ma of Nothing -> return ()
             Just val -> launchWith val todo >> return ()