{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StaticPointers #-}
{-# LANGUAGE TypeApplications #-}
module Control.Distributed.Fork.Internal where
import Control.Concurrent
import Control.Concurrent.STM
import Control.Distributed.Closure
import Control.Monad.Catch
import Control.Monad.IO.Class
import Control.Monad.Trans.Reader
import Data.Binary
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as BL
import Data.Monoid ((<>))
import Data.Text (Text)
import qualified Data.Text as T
import Data.Void
import GHC.Generics
import System.Environment
import System.Exit
import System.IO (stdin)
argExecutorMode :: String
argExecutorMode = "DISTRIBUTED_FORK_EXECUTOR_MODE"
initDistributedFork :: IO ()
initDistributedFork =
getArgs >>= \case
[x]
| x == argExecutorMode -> vacuous runExecutor
_ -> return ()
runExecutor :: IO Void
runExecutor =
BL.hGetContents stdin
>>= unclosure . decode @ExecutorClosure
>> exitSuccess
type ExecutorClosure = Closure (IO ())
newtype Backend = Backend
{ bExecute :: BS.ByteString -> BackendM BS.ByteString
}
newtype BackendM a =
BackendM (ReaderT (ExecutorPendingStatus -> IO ()) IO a)
deriving (Functor, Applicative, Monad, MonadIO, MonadCatch, MonadThrow)
instance Binary a => Binary (ExecutorFinalStatus a)
runBackend ::
Closure (Dict (Serializable i))
-> Closure (IO i)
-> Backend
-> IO (Handle i)
runBackend dict cls (Backend backend) =
case unclosure dict of
Dict -> do
let BackendM m =
backend $ BL.toStrict $ encode @ExecutorClosure (toExecutorClosure dict cls)
t <- atomically (newTVar $ ExecutorPending (ExecutorWaiting Nothing))
_ <-
forkIO $ do
r <-
either
(\(err :: SomeException) ->
ExecutorFailed $
"Backend threw an exception: " <> T.pack (show err))
parseAnswer <$>
try (runReaderT m (atomically . writeTVar t . ExecutorPending))
atomically $ writeTVar t (ExecutorFinished r)
return ()
return $ Handle t
toExecutorClosure :: Closure (Dict (Serializable a)) -> Closure (IO a) -> Closure (IO ())
toExecutorClosure dict cls =
case unclosure dict of
Dict -> static run `cap` dict `cap` cls
where
run :: forall a. Dict (Serializable a) -> IO a -> IO ()
run Dict a =
(a >>= BL.putStr . encode . ExecutorSucceeded)
`catch` (\(ex :: SomeException) ->
BL.putStr . encode . ExecutorFailed @a $
"Exception from executor: " <> T.pack (show ex))
parseAnswer :: Binary a => BS.ByteString -> ExecutorFinalStatus a
parseAnswer bs =
case decodeOrFail (BL.fromStrict bs) of
Left (_, _, err) -> ExecutorFailed $ "Error decoding answer: " <> T.pack err
Right (_, _, a) -> a
data ExecutorStatus a
= ExecutorPending ExecutorPendingStatus
| ExecutorFinished (ExecutorFinalStatus a)
data ExecutorPendingStatus
= ExecutorWaiting (Maybe Text)
| ExecutorSubmitted (Maybe Text)
| ExecutorStarted (Maybe Text)
data ExecutorFinalStatus a
= ExecutorFailed Text
| ExecutorSucceeded a
deriving (Generic)
newtype Handle a = Handle (TVar (ExecutorStatus a))
tryAwait :: Handle a -> IO (Either Text a)
tryAwait (Handle t) = do
r <- liftIO . atomically $
readTVar t >>= \case
ExecutorPending _ -> retry
ExecutorFinished a -> return a
return $ case r of
ExecutorFailed err -> Left err
ExecutorSucceeded a -> Right a