module Fx
(
Fx,
provideAndUse,
handleEnv,
start,
wait,
concurrently,
runTotalIO,
runPartialIO,
runExceptionalIO,
runSTM,
Provider,
acquireAndRelease,
pool,
Future,
Conc,
FxRunning(..),
ErrHandling(..),
exposeErr,
absorbErr,
EnvMapping(..),
FxException(..),
FxExceptionReason(..),
)
where
import Fx.Prelude hiding (app)
import qualified Data.Text as Text
import qualified Data.HashSet as HashSet
import qualified Fx.Strings as Strings
runFxInIO :: Fx () Void res -> IO res
runFxInIO (Fx m) = uninterruptibleMask $ \ unmask -> do
fatalErrChan <- newTQueueIO
resVar <- newEmptyTMVarIO
forkIO $ do
tid <- myThreadId
finalize <- let
crash tids reason = atomically (writeTQueue fatalErrChan (FxException (tid : tids) reason))
fxEnv = FxEnv unmask crash ()
in
catch
(do
resOrVoid <- runExceptT (runReaderT m fxEnv)
return $ case resOrVoid of
Right res -> atomically (putTMVar resVar res)
Left a ->
catch
(do
evaluate a
crash [] (BugFxExceptionReason "Unexpected void")
)
(crash [] . ErrorCallFxExceptionReason)
)
(\ exc -> return $ case fromException exc of
Just errorCall -> crash [] (ErrorCallFxExceptionReason errorCall)
_ -> crash [] (BugFxExceptionReason (Strings.unexpectedException exc))
)
finalize
join $ catch
(
unmask $ atomically $ asum
[
do
fatalErr <- readTQueue fatalErrChan
return $ throwIO fatalErr
,
do
res <- readTMVar resVar
return $ return res
]
)
(\ (exc :: SomeException) ->
case fromException exc of
Just (exc :: AsyncException) -> throwIO exc
_ -> throwIO (FxException [] (BugFxExceptionReason (Strings.failedWaitingForFinalResult exc))))
newtype Fx env err res = Fx (ReaderT (FxEnv env) (ExceptT err IO) res)
deriving instance Functor (Fx env err)
deriving instance Applicative (Fx env err)
deriving instance Selective (Fx env err)
deriving instance Monoid err => Alternative (Fx env err)
deriving instance Monad (Fx env err)
deriving instance Monoid err => MonadPlus (Fx env err)
instance MonadFail (Fx env err) where
fail msg = Fx $ ReaderT $ \ (FxEnv _ crash _) -> liftIO $ do
crash [] (ErrorCallFxExceptionReason (ErrorCall msg))
fail "Crashed"
instance MonadIO (Fx env SomeException) where
liftIO io = Fx (ReaderT (\ (FxEnv unmask _ _) -> ExceptT (try (unmask io))))
instance Bifunctor (Fx env) where
bimap lf rf = mapFx (mapReaderT (mapExceptT (fmap (bimap lf rf))))
data FxEnv env = FxEnv (forall a. IO a -> IO a) ([ThreadId] -> FxExceptionReason -> IO ()) env
mapFx fn (Fx m) = Fx (fn m)
runTotalIO :: IO res -> Fx env err res
runTotalIO io = Fx $ ReaderT $ \ (FxEnv unmask crash _) -> lift $
catch (unmask io)
(\ (exc :: SomeException) -> do
crash [] (UncaughtExceptionFxExceptionReason exc)
fail "Unhandled exception in runTotalIO. Got propagated to top."
)
runPartialIO :: IO (Either err res) -> Fx env err res
runPartialIO io = runTotalIO io >>= either throwErr return
runExceptionalIO :: Exception exc => IO res -> Fx env exc res
runExceptionalIO io =
Fx $ ReaderT $ \ (FxEnv unmask crash _) -> ExceptT $
catch (fmap Right (unmask io)) $ \ exc -> case fromException exc of
Just exc' -> return (Left exc')
Nothing -> do
crash [] (UncaughtExceptionFxExceptionReason exc)
fail "Unhandled exception in runExceptionalIO. Got propagated to top."
runSTM :: STM res -> Fx env err res
runSTM = runTotalIO . atomically
start :: Fx env err res -> Fx env err' (Future err res)
start (Fx m) =
Fx $ ReaderT $ \ (FxEnv unmask crash env) -> lift $ do
futureVar <- newEmptyTMVarIO
forkIO $ do
tid <- myThreadId
let childCrash tids dls = crash (tid : tids) dls
finalize <-
catch
(do
res <- runExceptT (runReaderT m (FxEnv unmask childCrash env))
return (atomically (putTMVar futureVar (first Just res)))
)
(\ exc -> return $ do
case fromException exc of
Just errorCall -> crash [] (ErrorCallFxExceptionReason errorCall)
_ -> crash [] (BugFxExceptionReason (Strings.unexpectedException exc))
atomically (putTMVar futureVar (Left Nothing))
)
finalize
return $ Future $ Compose $ readTMVar futureVar
wait :: Future err res -> Fx env err res
wait (Future m) = Fx $ ReaderT $ \ (FxEnv unmask crash env) -> ExceptT $ join $ catch
(do
futureStatus <- unmask (atomically (getCompose m))
return $ case futureStatus of
Right res -> return (Right res)
Left (Just err) -> return (Left err)
Left Nothing -> fail "Waiting for a future that crashed"
)
(\ (exc :: SomeException) -> return $ do
crash [] (BugFxExceptionReason (Strings.failedWaitingForResult exc))
fail "Thread crashed with uncaught exception waiting for result."
)
concurrently :: Conc env err res -> Fx env err res
concurrently (Conc fx) = fx
provideAndUse :: Provider err env -> Fx env err res -> Fx env' err res
provideAndUse (Provider (Fx acquire)) (Fx fx) =
Fx $ ReaderT $ \ (FxEnv unmask crash _) -> ExceptT $ do
let providerFxEnv = FxEnv unmask crash ()
acquisition <- runExceptT (runReaderT acquire providerFxEnv)
case acquisition of
Left err -> return (Left err)
Right (env, (Fx release)) -> do
resOrErr <- runExceptT (runReaderT fx (FxEnv unmask crash env))
releasing <- runExceptT (runReaderT release providerFxEnv)
return (resOrErr <* releasing)
handleEnv :: (env -> Fx env err res) -> Fx env err res
handleEnv handler =
Fx $ ReaderT $ \ (FxEnv unmask crash env) ->
case handler env of
Fx rdr -> runReaderT rdr (FxEnv unmask crash env)
newtype Future err res = Future (Compose STM (Either (Maybe err)) res)
deriving (Functor, Applicative)
deriving instance Selective (Future err)
instance Bifunctor Future where
bimap lf rf = mapFuture (mapCompose (fmap (bimap (fmap lf) rf)))
mapFuture fn (Future m) = Future (fn m)
newtype Conc env err res = Conc (Fx env err res)
deriving instance Functor (Conc env err)
deriving instance Bifunctor (Conc env)
instance Applicative (Conc env err) where
pure = Conc . pure
(<*>) (Conc m1) (Conc m2) = Conc $ do
future1 <- start m1
res2 <- m2
res1 <- wait future1
return (res1 res2)
instance Selective (Conc env err) where
select (Conc choose) (Conc act) = Conc $ do
actFtr <- start act
chooseRes <- choose
case chooseRes of
Left a -> do
aToB <- wait actFtr
return (aToB a)
Right b -> return b
mapConc fn (Conc m) = Conc (fn m)
newtype Provider err env = Provider (Fx () err (env, Fx () err ()))
instance Functor (Provider err) where
fmap f (Provider m) = Provider $ do
(env, release) <- m
return (f env, release)
instance Applicative (Provider err) where
pure env = Provider (pure (env, pure ()))
Provider m1 <*> Provider m2 = Provider $
liftA2 (\ (env1, release1) (env2, release2) -> (env1 env2, release2 *> release1)) m1 m2
instance Monad (Provider err) where
return = pure
(>>=) (Provider m1) k2 = Provider $ do
(env1, release1) <- m1
(env2, release2) <- case k2 env1 of Provider m2 -> m2
return (env2, release2 >> release1)
instance MonadIO (Provider SomeException) where
liftIO = runFx . liftIO
instance Bifunctor Provider where
bimap lf rf (Provider m) = Provider (bimap lf (bimap rf (first lf)) m)
second = fmap
acquireAndRelease :: Fx () err env -> (env -> Fx () err ()) -> Provider err env
acquireAndRelease acquire release = Provider $ do
env <- acquire
return (env, release env)
pool :: Int -> Provider err env -> Provider err (Provider err' env)
pool poolSize (Provider acquire) = Provider $ do
queue <- runSTM newTQueue
replicateM_ poolSize $ do
handle <- acquire
runSTM $ writeTQueue queue handle
let
resourceProvider = Provider $ do
(env, releaseResource) <- runSTM $ readTQueue queue
return (env, runSTM (writeTQueue queue (env, releaseResource)))
release = do
releasers <- runSTM $ do
list <- flushTQueue queue
guard (length list == poolSize)
return (fmap snd list)
sequence_ releasers
in return (resourceProvider, release)
class FxRunning env err m | m -> env, m -> err where
runFx :: Fx env err res -> m res
instance FxRunning () Void IO where
runFx = runFxInIO
instance FxRunning () err (ExceptT err IO) where
runFx fx = ExceptT (runFx (exposeErr fx))
instance FxRunning env err (ReaderT env (ExceptT err IO)) where
runFx fx = ReaderT (\ env -> ExceptT (runFx (mapEnv (const env) (exposeErr fx))))
instance FxRunning () Void (Fx env err) where
runFx = mapEnv (const ()) . first absurd
instance FxRunning env err (Conc env err) where
runFx = Conc
instance FxRunning () err (Provider err) where
runFx fx = Provider (fmap (\ env -> (env, pure ())) fx)
class ErrHandling m where
throwErr :: err -> m err res
handleErr :: (a -> m b res) -> m a res -> m b res
exposeErr :: (ErrHandling m, Functor (m a), Applicative (m b)) => m a res -> m b (Either a res)
exposeErr = absorbErr Left . fmap Right
absorbErr :: (ErrHandling m, Applicative (m b)) => (a -> res) -> m a res -> m b res
absorbErr fn = handleErr (pure . fn)
instance ErrHandling (Fx env) where
throwErr = Fx . lift . throwE
handleErr handler = mapFx $ \ m -> ReaderT $ \ unmask -> ExceptT $ do
a <- runExceptT (runReaderT m unmask)
case a of
Right res -> return (Right res)
Left err -> case handler err of
Fx m -> runExceptT (runReaderT m unmask)
instance ErrHandling Future where
throwErr = Future . Compose . return . Left . Just
handleErr handler = mapFuture $ \ m -> Compose $ do
a <- getCompose m
case a of
Right res -> return (Right res)
Left b -> case b of
Just err -> case handler err of
Future m' -> getCompose m'
Nothing -> return (Left Nothing)
deriving instance ErrHandling (Conc env)
class EnvMapping m where
mapEnv :: (b -> a) -> m a err res -> m b err res
instance EnvMapping Fx where
mapEnv fn (Fx m) =
Fx $ ReaderT $ \ (FxEnv unmask crash env) ->
runReaderT m (FxEnv unmask crash (fn env))
deriving instance EnvMapping Conc
data FxException = FxException [ThreadId] FxExceptionReason
instance Show FxException where
show (FxException tids reason) = Strings.fatalErrorAtThreadPath tids (show reason)
instance Exception FxException
data FxExceptionReason =
UncaughtExceptionFxExceptionReason SomeException |
ErrorCallFxExceptionReason ErrorCall |
BugFxExceptionReason String
instance Show FxExceptionReason where
show = \ case
UncaughtExceptionFxExceptionReason exc -> Strings.uncaughtException exc
ErrorCallFxExceptionReason errorCall -> show errorCall
BugFxExceptionReason details -> Strings.bug details