{-# LANGUAGE DeriveGeneric              #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE LambdaCase                 #-}
{-# LANGUAGE OverloadedStrings          #-}
{-# LANGUAGE ScopedTypeVariables        #-}
{-# LANGUAGE StaticPointers             #-}
{-# LANGUAGE TypeApplications           #-}
module Control.Distributed.Fork.Internal where
import           Control.Concurrent
import           Control.Concurrent.STM
import           Control.Distributed.Closure
import           Control.Monad.Catch
import           Control.Monad.IO.Class
import           Control.Monad.Trans.Reader
import           Data.Binary
import qualified Data.ByteString             as BS
import qualified Data.ByteString.Lazy        as BL
import           Data.Monoid                 ((<>))
import           Data.Text                   (Text)
import qualified Data.Text                   as T
import           Data.Void
import           GHC.Generics
import           System.Environment
import           System.Exit
import           System.IO                   (stdin)
argExecutorMode :: String
argExecutorMode = "DISTRIBUTED_FORK_EXECUTOR_MODE"
initDistributedFork :: IO ()
initDistributedFork =
  getArgs >>= \case
    [x]
      | x == argExecutorMode -> vacuous runExecutor
    _ -> return ()
runExecutor :: IO Void
runExecutor =
  BL.hGetContents stdin
    >>= unclosure . decode @ExecutorClosure
    >> exitSuccess
type ExecutorClosure = Closure (IO ())
newtype Backend = Backend
  { bExecute :: BS.ByteString -> BackendM BS.ByteString
  
  
  
  }
newtype BackendM a =
  BackendM (ReaderT (ExecutorPendingStatus -> IO ()) IO a)
  deriving (Functor, Applicative, Monad, MonadIO, MonadCatch, MonadThrow)
instance Binary a => Binary (ExecutorFinalStatus a)
runBackend ::
     Closure (Dict (Serializable i))
  -> Closure (IO i)
  -> Backend
  -> IO (Handle i)
runBackend dict cls (Backend backend) =
  case unclosure dict of
    Dict -> do
      let BackendM m =
            backend $ BL.toStrict $ encode @ExecutorClosure (toExecutorClosure dict cls)
      t <- atomically (newTVar $ ExecutorPending (ExecutorWaiting Nothing))
      _ <-
        forkIO $ do
          r <-
            either
              (\(err :: SomeException) ->
                 ExecutorFailed $
                 "Backend threw an exception: " <> T.pack (show err))
              parseAnswer <$>
            try (runReaderT m (atomically . writeTVar t . ExecutorPending))
          atomically $ writeTVar t (ExecutorFinished r)
          return ()
      return $ Handle t
toExecutorClosure :: Closure (Dict (Serializable a)) -> Closure (IO a) -> Closure (IO ())
toExecutorClosure dict cls =
  case unclosure dict of
    Dict -> static run `cap` dict `cap` cls
  where
    run :: forall a. Dict (Serializable a) -> IO a -> IO ()
    run Dict a =
      (a >>= BL.putStr . encode . ExecutorSucceeded)
        `catch` (\(ex :: SomeException) ->
          BL.putStr . encode . ExecutorFailed @a $
            "Exception from executor: " <> T.pack (show ex))
parseAnswer :: Binary a => BS.ByteString -> ExecutorFinalStatus a
parseAnswer bs =
  case decodeOrFail (BL.fromStrict bs) of
    Left (_, _, err) -> ExecutorFailed $ "Error decoding answer: " <> T.pack err
    Right (_, _, a) -> a
data ExecutorStatus a
  = ExecutorPending ExecutorPendingStatus
  | ExecutorFinished (ExecutorFinalStatus a)
data ExecutorPendingStatus
  = ExecutorWaiting (Maybe Text)
  | ExecutorSubmitted (Maybe Text)
  | ExecutorStarted (Maybe Text)
data ExecutorFinalStatus a
  = ExecutorFailed Text
  | ExecutorSucceeded a
  deriving (Generic)
newtype Handle a = Handle (TVar (ExecutorStatus a))
tryAwait :: Handle a -> IO (Either Text a)
tryAwait (Handle t) = do
  r <- liftIO . atomically $
    readTVar t >>= \case
      ExecutorPending _ -> retry
      ExecutorFinished a -> return a
  return $ case r of
    ExecutorFailed err  -> Left err
    ExecutorSucceeded a -> Right a