{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE ImplicitParams #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE TypeFamilies #-}
module OM.Fork (
Actor(..),
Responder,
Responded,
respond,
call,
cast,
logUnexpectedTermination,
ProcessName(..),
Race,
runRace,
race,
wait,
) where
import Control.Concurrent (Chan, myThreadId, newEmptyMVar, putMVar,
takeMVar, writeChan)
import Control.Monad (void)
import Control.Monad.Catch (MonadThrow(throwM), MonadCatch, SomeException,
try)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Logger.CallStack (MonadLogger, logInfo, logWarn)
import Data.Aeson (ToJSON, toJSON)
import Data.String (IsString)
import Data.Text (Text)
import GHC.Conc (atomically)
import OM.Show (showt)
import UnliftIO (MonadUnliftIO, askRunInIO, throwString)
import qualified Ki
newtype Responder a = Responder {
forall a. Responder a -> a -> IO ()
unResponder :: a -> IO ()
}
instance ToJSON (Responder a) where
toJSON :: Responder a -> Value
toJSON Responder a
_ = Text -> Value
forall a. ToJSON a => a -> Value
toJSON (Text
"<Responder>" :: Text)
instance Show (Responder a) where
show :: Responder a -> String
show Responder a
_ = String
"Responder"
class Actor a where
type Msg a
actorChan :: a -> Msg a -> IO ()
instance Actor (Chan m) where
type Msg (Chan m) = m
actorChan :: Chan m -> Msg (Chan m) -> IO ()
actorChan = Chan m -> Msg (Chan m) -> IO ()
forall a. Chan a -> a -> IO ()
writeChan
respond :: (MonadIO m) => Responder a -> a -> m Responded
respond :: forall (m :: * -> *) a.
MonadIO m =>
Responder a -> a -> m Responded
respond Responder a
responder a
val = do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Responder a -> a -> IO ()
forall a. Responder a -> a -> IO ()
unResponder Responder a
responder a
val)
Responded -> m Responded
forall (m :: * -> *) a. Monad m => a -> m a
return Responded
Responded
call
:: ( Actor actor
, MonadIO m
)
=> actor
-> (Responder a -> Msg actor)
-> m a
call :: forall actor (m :: * -> *) a.
(Actor actor, MonadIO m) =>
actor -> (Responder a -> Msg actor) -> m a
call actor
actor Responder a -> Msg actor
mkMessage = IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO a -> m a) -> IO a -> m a
forall a b. (a -> b) -> a -> b
$ do
MVar a
mVar <- IO (MVar a)
forall a. IO (MVar a)
newEmptyMVar
actor -> Msg actor -> IO ()
forall a. Actor a => a -> Msg a -> IO ()
actorChan actor
actor (Responder a -> Msg actor
mkMessage ((a -> IO ()) -> Responder a
forall a. (a -> IO ()) -> Responder a
Responder (MVar a -> a -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar a
mVar)))
MVar a -> IO a
forall a. MVar a -> IO a
takeMVar MVar a
mVar
cast :: (Actor actor, MonadIO m) => actor -> Msg actor -> m ()
cast :: forall actor (m :: * -> *).
(Actor actor, MonadIO m) =>
actor -> Msg actor -> m ()
cast actor
actor = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (Msg actor -> IO ()) -> Msg actor -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. actor -> Msg actor -> IO ()
forall a. Actor a => a -> Msg a -> IO ()
actorChan actor
actor
data Responded = Responded
logUnexpectedTermination :: (MonadLogger m, MonadCatch m)
=> ProcessName
-> m a
-> m a
logUnexpectedTermination :: forall (m :: * -> *) a.
(MonadLogger m, MonadCatch m) =>
ProcessName -> m a -> m a
logUnexpectedTermination (ProcessName Text
name) m a
action =
m a -> m (Either SomeException a)
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
try m a
action m (Either SomeException a)
-> (Either SomeException a -> m a) -> m a
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left SomeException
err -> do
Text -> m ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logWarn
(Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text
"Thread " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
name Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" finished with an error: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> SomeException -> Text
forall a b. (Show a, IsString b) => a -> b
showt SomeException
err
SomeException -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM (SomeException
err :: SomeException)
Right a
v -> do
Text -> m ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logWarn (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text
"Thread " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
name Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" finished normally."
a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
v
runRace
:: (MonadUnliftIO m)
=> (Race => m a)
-> m a
runRace :: forall (m :: * -> *) a. MonadUnliftIO m => (Race => m a) -> m a
runRace Race => m a
action = do
m a -> IO a
runInIO <- m (m a -> IO a)
forall (m :: * -> *) a. MonadUnliftIO m => m (m a -> IO a)
askRunInIO
IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO a -> m a)
-> ((Scope -> IO a) -> IO a) -> (Scope -> IO a) -> m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Scope -> IO a) -> IO a
forall a. (Scope -> IO a) -> IO a
Ki.scoped ((Scope -> IO a) -> m a) -> (Scope -> IO a) -> m a
forall a b. (a -> b) -> a -> b
$ \Scope
scope ->
m a -> IO a
runInIO (let ?scope = Race
Scope
scope in m a
Race => m a
action)
type Race = (?scope :: Ki.Scope)
race
:: ( MonadCatch m
, MonadLogger m
, MonadUnliftIO m
, Race
)
=> ProcessName
-> m a
-> m ()
race :: forall (m :: * -> *) a.
(MonadCatch m, MonadLogger m, MonadUnliftIO m, Race) =>
ProcessName -> m a -> m ()
race ProcessName
name m a
action = do
m () -> IO ()
runInIO <- m (m () -> IO ())
forall (m :: * -> *) a. MonadUnliftIO m => m (m a -> IO a)
askRunInIO
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
(IO () -> m ()) -> (IO Void -> IO ()) -> IO Void -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Scope -> IO Void -> IO ()
Ki.fork_ Race
Scope
?scope
(IO Void -> m ()) -> IO Void -> m ()
forall a b. (a -> b) -> a -> b
$ do
ThreadId
tid <- IO ThreadId
myThreadId
m () -> IO ()
runInIO (m () -> IO ()) -> (m () -> m ()) -> m () -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProcessName -> m () -> m ()
forall (m :: * -> *) a.
(MonadLogger m, MonadCatch m) =>
ProcessName -> m a -> m a
logUnexpectedTermination ProcessName
name (m () -> IO ()) -> m () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Text -> m ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logInfo (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text
"Starting thread (tid, name): " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> (ThreadId, ProcessName) -> Text
forall a b. (Show a, IsString b) => a -> b
showt (ThreadId
tid, ProcessName
name)
m a -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void m a
action
String -> IO Void
forall (m :: * -> *) a. (MonadIO m, HasCallStack) => String -> m a
throwString (String -> IO Void) -> String -> IO Void
forall a b. (a -> b) -> a -> b
$ String
"Thread Finished (tid, name): " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> (ThreadId, ProcessName) -> String
forall a. Show a => a -> String
show (ThreadId
tid, ProcessName
name)
newtype ProcessName = ProcessName
{ ProcessName -> Text
unProcessName :: Text
}
deriving newtype (String -> ProcessName
(String -> ProcessName) -> IsString ProcessName
forall a. (String -> a) -> IsString a
fromString :: String -> ProcessName
$cfromString :: String -> ProcessName
IsString, NonEmpty ProcessName -> ProcessName
ProcessName -> ProcessName -> ProcessName
(ProcessName -> ProcessName -> ProcessName)
-> (NonEmpty ProcessName -> ProcessName)
-> (forall b. Integral b => b -> ProcessName -> ProcessName)
-> Semigroup ProcessName
forall b. Integral b => b -> ProcessName -> ProcessName
forall a.
(a -> a -> a)
-> (NonEmpty a -> a)
-> (forall b. Integral b => b -> a -> a)
-> Semigroup a
stimes :: forall b. Integral b => b -> ProcessName -> ProcessName
$cstimes :: forall b. Integral b => b -> ProcessName -> ProcessName
sconcat :: NonEmpty ProcessName -> ProcessName
$csconcat :: NonEmpty ProcessName -> ProcessName
<> :: ProcessName -> ProcessName -> ProcessName
$c<> :: ProcessName -> ProcessName -> ProcessName
Semigroup, Semigroup ProcessName
ProcessName
Semigroup ProcessName
-> ProcessName
-> (ProcessName -> ProcessName -> ProcessName)
-> ([ProcessName] -> ProcessName)
-> Monoid ProcessName
[ProcessName] -> ProcessName
ProcessName -> ProcessName -> ProcessName
forall a.
Semigroup a -> a -> (a -> a -> a) -> ([a] -> a) -> Monoid a
mconcat :: [ProcessName] -> ProcessName
$cmconcat :: [ProcessName] -> ProcessName
mappend :: ProcessName -> ProcessName -> ProcessName
$cmappend :: ProcessName -> ProcessName -> ProcessName
mempty :: ProcessName
$cmempty :: ProcessName
Monoid, Int -> ProcessName -> ShowS
[ProcessName] -> ShowS
ProcessName -> String
(Int -> ProcessName -> ShowS)
-> (ProcessName -> String)
-> ([ProcessName] -> ShowS)
-> Show ProcessName
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ProcessName] -> ShowS
$cshowList :: [ProcessName] -> ShowS
show :: ProcessName -> String
$cshow :: ProcessName -> String
showsPrec :: Int -> ProcessName -> ShowS
$cshowsPrec :: Int -> ProcessName -> ShowS
Show)
wait :: (MonadIO m, Race) => m ()
wait :: forall (m :: * -> *). (MonadIO m, Race) => m ()
wait = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (STM () -> IO ()) -> STM () -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> m ()) -> STM () -> m ()
forall a b. (a -> b) -> a -> b
$ Scope -> STM ()
Ki.awaitAll Race
Scope
?scope