-- | This "Future" module was written by Chris Kuklewicz to see if he understood the design and
-- utility of the new C++ standard's future.  In particular the ability to cleanly access either a
-- resulting value or exception.
-- There a methods to poll (with 'check'), to block (with 'wait' or 'timedWait'), and to block and
-- retrieve the actual value or rethrow the exception in the accessing thread (with 'get' or
-- 'timedGet').  Timeouts are in micro seconds, values less than or equal to zero use non-blocking
-- 'check'.  The timeout should be detected reagarless of the blocking or FFI state of the worker
-- thread.
-- On top of 'forkPromise' is 'forkPromises', 'racePromises', and 'declarePromise'.
-- One can also manage the threadBy calling 'abort', which may cause the promise to store the
-- exception from the abort as well as killing the worker thread.  The worker thread Id is a
-- secret, this is needed to ensure the running of the continuations. The 'abort' operation has the
-- same synchronous behavior as 'killThread'.
-- Note: There is no way for an outside thread to directly set the value of the promise to a
-- non-exception value.  Using 'abort' (or 'throwTo' with 'getPromiseThreadId') creates a race
-- condition in setting the result of the promise.  There is no way to change the result of promise
-- once it has been set.
-- The extension to the C++ standard is in the continuation attachment.  The 'addTodo' command
-- will, while the worker is running, add the 'todo' continuation to an internal list.  Immediately
-- upon finishing the action the worker thread will always run through the queued continuations.
-- Each 'todo' will be run in its own forkIO thread (unblocked).  If the 'addTodo' command is
-- issued after the promise value has been set then it simplify runs the 'todo' in a new thread.
-- Thus there is no way multiple continuations can interfere with each other, and there are no
-- ordering guarantees between them.  The 'todo' action will not be able to distinguish whether it
-- is being run from the stored queue or immediately.
-- The use of 'block' and 'finally' should ensure that no matter how the worker ends the stored
-- continations are run.  For instance: if 'abort' is used then the continations might be run with
-- that thread killing exception or with the custom "Promise.abort" exception if no other result is
-- already present.
-- One use case for 'addTodo' is to allow multiplexing.  Several promises could be given a
-- continuation to write the results to an MChan or MVar, allowing another process to block waiting
-- for the first one to finish.

module Control.Concurrent.Future
  ,abort,addTodo) where

import Prelude hiding (catch)
import Control.Concurrent(forkIO,killThread,ThreadId,myThreadId)
import Control.Concurrent.MVar
import Control.Concurrent.Chan
import Control.Monad(join,forM_,(>=>))
import Control.Exception(block,unblock,throw,try,finally,toException,onException,SomeException,Exception)
import Data.IORef
import System.Timeout(timeout)
import Data.Monoid(Endo(..),mempty,mappend)

type PromiseResult a = Either SomeException a

data Promise a = Promise { p_tid    :: {-# UNPACK #-} !(MVar (Maybe ThreadId))
                         , p_write  :: {-# UNPACK #-} !(MVar ())
                         , p_notify :: {-# UNPACK #-} !(MVar ())
                         , p_result :: {-# UNPACK #-} !(IORef (Maybe (PromiseResult a)))
                         , p_todo   :: {-# UNPACK #-} !(MVar (Endo [PromiseResult a -> IO ()]))

instance Eq (Promise a) where
  a == b = p_tid a == p_tid b

instance Show (Promise a) where
  showsPrec _ p = shows "<Promise>"

-- internal routine, consumes the p_write token once, when consumed then sets the result once.
-- always returns the value in the result.
setPromise :: Promise a -> PromiseResult a -> IO (PromiseResult a)
setPromise p new = block $ do
  mayWrite <- tryTakeMVar (p_write p)
  case mayWrite of
    Nothing -> wait p -- should not block for long
    Just {} -> do
      writeIORef (p_result p) (Just new)
      putMVar (p_notify p) ()
      return new

-- internal routine, takes the result and a continuation to run on it
launchWith :: PromiseResult a -> (PromiseResult a -> IO ()) -> IO ThreadId
launchWith val todo = forkIO . unblock . todo $ val

-- | forkPromise take an action to run, and runs it in a new thread.  This is run in an "unblock"
-- context.  If the action succeeds it will store its result as (Right {}).  If the action throws
-- an exception, or the
forkPromise :: IO a -> IO (Promise a)
forkPromise act = do
  msTid <- newEmptyMVar
  write <- newMVar () -- unique token, to be taken precisely once
  notify <- newEmptyMVar
  result <- newIORef Nothing
  msTodo <- newMVar mempty
  let p = Promise msTid write notify result msTodo
  let performAct = do
        putMVar msTid . Just =<< myThreadId
        val <- setPromise p =<< try (unblock act)
        case val of Left err -> throw err
                    Right {} -> return ()
      processTodo = block $ do
        modifyMVar_ msTid (const (return Nothing)) -- disable 'abort'
        let paranoid = Left (toException (userError "Promise.forkPromise.processTodo"))
        val <- setPromise p paranoid  -- ensure promise is set
        -- addTodo will no longer alter msTodo
        sTodo <- swapMVar msTodo mempty
        mapM_ (launchWith val) (appEndo sTodo [])
  -- block is used to ensure the finally gets setup, and performAct sets msTid
  block $ forkIO (finally performAct processTodo)
  return p

-- | forkPromises is build on top of forkPromise.  It converts a list of actions into a list of
-- promises, and additionally collects the results, in completion order, into the returned Chan.
forkPromises :: [IO a] -> IO ([Promise a],Chan (PromiseResult a))
forkPromises acts = do
  c <- newChan
  ps <- mapM forkPromise acts
  forM_ ps $ flip addTodo (writeChan c)
  return (ps,c)

-- | racePromises is build on top of forkPromise.  It runs a list of actions as promises and waits
-- for the first result (which may be an exception).  Once the result is found it asynchronously
-- kills the threads.
racePromises :: [IO a] -> IO (PromiseResult a)
racePromises acts = do
  v <- newEmptyMVar
  let withPromise p = addTodo p (\ r -> tryPutMVar v r >> return ()) 
  ps <- mapM forkPromise acts
  mapM_ withPromise ps
  a <- takeMVar v
  mapM_ (forkIO . abort) ps
  return a

-- | declarePromise is built on top of forkPromise.  It creates a promise and an function to fulfill
-- the promise with an action.  The first time the fulfull function is used it gives the action to
-- the promise and returns True.  All additional usages of the fulfill function will do nothing and
-- return False.  Note that the Promise may be aborted before the fulfill function is used, and in
-- this case the fulfill function will appear to succeed but achieve nothing.
declarePromise :: IO (Promise a, IO a -> IO Bool)
declarePromise = do
  let fulfill takeOnce = \ act -> block $ do
        maybeUseOnce <- tryTakeMVar takeOnce
        case maybeUseOnce of
          Nothing -> return False
          Just useOnce -> putMVar useOnce act >> return True
  useOnce <- newEmptyMVar
  takeOnce <- newMVar useOnce
  promise <- forkPromise (join (takeMVar useOnce))
  return (promise,fulfill takeOnce)

-- | 'check' is a non-blocking read.  Like 'timedWait' with 0 delay.
check :: Promise a -> IO (Maybe (PromiseResult a))
check p = readIORef (p_result p)

-- | 'wait' is a blocking read.
wait :: Promise a -> IO (PromiseResult a)
wait p =  withMVar (p_notify p) $ \ _ -> do
  mr <- readIORef (p_result p)
  case mr of
    Nothing -> error "Control.Monad.Future.wait with result of Nothing is impossible"
    Just r -> return r

-- | 'timedWait' with a positive value in micro seconds is a blocking read with timeout.
timedWait :: Int -> Promise a -> IO (Maybe (PromiseResult a))
timedWait microSeconds | microSeconds <= 0 = check
                       | otherwise = timeout microSeconds . wait

-- | 'get' is wait which rethrows a SomeException in the calling thread
get :: Promise a -> IO a
get p = wait p >>= either throw return

-- | 'timedGet' is a 'timedWait' which rethrows a SomeException in the calling thread
timedGet :: Int -> Promise a -> IO (Maybe a)
timedGet microSeconds p = timedWait microSeconds p >>= maybe (return Nothing) (either throw (return . Just))

-- | If the abort occurs before the action has stored a result then the result is set to an
-- exception.  The first call to abort gets the threadId and performs the, possibly blocking,
-- killThread.  If it completes then the ThreadId is forgotten (so the thread can be garbage
-- collected).
abort :: Promise t -> IO ()
abort p = block $ do
  setPromise p (Left (toException (userError "Promise.abort")))
  modifyMVar_ (p_tid p) $ \ mTid -> do
    case mTid of
      Nothing -> return ()
      Just tid -> killThread tid 
    return Nothing

-- | Post an action to perform in a new thread with the reasult of the promise.  All are run
-- unblocked in a fresh thread.
addTodo :: Promise a -> (PromiseResult a -> IO ()) -> IO ()
addTodo p todo = do
  ma <- modifyMVar (p_todo p) $ \ sTodo -> do
           ma <- check p
           case ma of Nothing -> return (sTodo `mappend` Endo (todo:),ma)
                      Just {} -> return (sTodo,ma)
  case ma of Nothing -> return ()
             Just val -> launchWith val todo >> return ()