{-# LANGUAGE DeriveGeneric              #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE LambdaCase                 #-}
{-# LANGUAGE OverloadedStrings          #-}
{-# LANGUAGE ScopedTypeVariables        #-}
{-# LANGUAGE StaticPointers             #-}
{-# LANGUAGE TypeApplications           #-}

module Control.Distributed.Fork.Internal where

--------------------------------------------------------------------------------
import           Control.Concurrent
import           Control.Concurrent.STM
import           Control.Distributed.Closure
import           Control.Monad.Catch
import           Control.Monad.IO.Class
import           Control.Monad.Trans.Reader
import           Data.Binary
import qualified Data.ByteString             as BS
import qualified Data.ByteString.Lazy        as BL
import           Data.Monoid                 ((<>))
import           Data.Text                   (Text)
import qualified Data.Text                   as T
import           Data.Void
import           GHC.Generics
import           System.Environment
import           System.Exit
import           System.IO                   (stdin)
--------------------------------------------------------------------------------

-- |
-- We switch to executor mode only when  @argv[1] == argExecutorMode@.
argExecutorMode :: String
argExecutorMode = "DISTRIBUTED_FORK_EXECUTOR_MODE"

-- |
-- On distributed-fork, we run the same binary both in your machine (called
-- "driver") and in the remote environment (called "executor"). In order for the
-- program to act according to where it is, you should call this function as the
-- first thing in your @main@:
--
-- @
-- main = do
--   initDistributedFork
--   ...
-- @
initDistributedFork :: IO ()
initDistributedFork =
  getArgs >>= \case
    [x]
      | x == argExecutorMode -> vacuous runExecutor
    _ -> return ()

-- |
-- When on executor mode, we expect a serialised ExecutorClosure from `stdin`,
-- run it and exit.
runExecutor :: IO Void
runExecutor =
  BL.hGetContents stdin
    >>= unclosure . decode @ExecutorClosure
    >> exitSuccess

-- |
-- An ExecutorClosure is a serialisable IO action.
type ExecutorClosure = Closure (IO ())

-- |
-- 'Backend' is responsible for running your functions in a remote environment.
--
-- See:
--
--   * 'Control.Distributed.Fork.LocalProcessBackend.localProcessBackend'
--
--   * <http://hackage.haskell.org/package/distributed-fork-aws-lambda distributed-fork-aws-lambda>
newtype Backend = Backend
  { bExecute :: BS.ByteString -> BackendM BS.ByteString
  -- ^ Should run the current binary in the target environment, put the given
  -- string as standard input and return the executables answer on the standard
  -- output.
  }

-- |
-- BackendM is essentially `IO`, but also has the ability to report the status of the
-- executor.
newtype BackendM a =
  BackendM (ReaderT (ExecutorPendingStatus -> IO ()) IO a)
  deriving (Functor, Applicative, Monad, MonadIO, MonadCatch, MonadThrow)

instance Binary a => Binary (ExecutorFinalStatus a)

-- |
-- Given an IO action and a static proof that the result is 'Serializable', this
-- function runs the action using the Backend in a separate thread and returns a
-- 'TVar' holding the 'ExecutorStatus'.
runBackend ::
     Closure (Dict (Serializable i))
  -> Closure (IO i)
  -> Backend
  -> IO (Handle i)
runBackend dict cls (Backend backend) =
  case unclosure dict of
    Dict -> do
      let BackendM m =
            backend $ BL.toStrict $ encode @ExecutorClosure (toExecutorClosure dict cls)
      t <- atomically (newTVar $ ExecutorPending (ExecutorWaiting Nothing))
      _ <-
        forkIO $ do
          r <-
            either
              (\(err :: SomeException) ->
                 ExecutorFailed $
                 "Backend threw an exception: " <> T.pack (show err))
              parseAnswer <$>
            try (runReaderT m (atomically . writeTVar t . ExecutorPending))
          atomically $ writeTVar t (ExecutorFinished r)
          return ()
      return $ Handle t

toExecutorClosure :: Closure (Dict (Serializable a)) -> Closure (IO a) -> Closure (IO ())
toExecutorClosure dict cls =
  case unclosure dict of
    Dict -> static run `cap` dict `cap` cls
  where
    run :: forall a. Dict (Serializable a) -> IO a -> IO ()
    run Dict a =
      (a >>= BL.putStr . encode . ExecutorSucceeded)
        `catch` (\(ex :: SomeException) ->
          BL.putStr . encode . ExecutorFailed @a $
            "Exception from executor: " <> T.pack (show ex))

parseAnswer :: Binary a => BS.ByteString -> ExecutorFinalStatus a
parseAnswer bs =
  case decodeOrFail (BL.fromStrict bs) of
    Left (_, _, err) -> ExecutorFailed $ "Error decoding answer: " <> T.pack err
    Right (_, _, a) -> a

data ExecutorStatus a
  = ExecutorPending ExecutorPendingStatus
  | ExecutorFinished (ExecutorFinalStatus a)

data ExecutorPendingStatus
  = ExecutorWaiting (Maybe Text)
  | ExecutorSubmitted (Maybe Text)
  | ExecutorStarted (Maybe Text)

data ExecutorFinalStatus a
  = ExecutorFailed Text
  | ExecutorSucceeded a
  deriving (Generic)

-- |
-- Result of a 'fork' is an Handle where you can 'await' a result.
newtype Handle a = Handle (TVar (ExecutorStatus a))

tryAwait :: Handle a -> IO (Either Text a)
tryAwait (Handle t) = do
  r <- liftIO . atomically $
    readTVar t >>= \case
      ExecutorPending _ -> retry
      ExecutorFinished a -> return a
  return $ case r of
    ExecutorFailed err  -> Left err
    ExecutorSucceeded a -> Right a