{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE ImplicitParams #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE TypeFamilies #-}

{- | Description: Thread utilities. -}
module OM.Fork (
  -- * Actor Communication.
  Actor(..),
  Responder,
  Responded,
  respond,
  call,
  cast,

  -- * Forking Background Processes.
  logUnexpectedTermination,
  ProcessName(..),
  Race,
  runRace,
  race,
  wait,
) where


import Control.Concurrent (Chan, myThreadId, newEmptyMVar, putMVar,
  takeMVar, writeChan)
import Control.Monad (void)
import Control.Monad.Catch (MonadThrow(throwM), MonadCatch, SomeException,
  try)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Logger.CallStack (MonadLogger, logInfo, logWarn)
import Data.Aeson (ToJSON, toJSON)
import Data.String (IsString)
import Data.Text (Text)
import GHC.Conc (atomically)
import OM.Show (showt)
import UnliftIO (MonadUnliftIO, askRunInIO, throwString)
import qualified Ki


{- | How to respond to a asynchronous message. -}
newtype Responder a = Responder {
    forall a. Responder a -> a -> IO ()
unResponder :: a -> IO ()
  }
instance ToJSON (Responder a) where
  toJSON :: Responder a -> Value
toJSON Responder a
_ = Text -> Value
forall a. ToJSON a => a -> Value
toJSON (Text
"<Responder>" :: Text)
instance Show (Responder a) where
  show :: Responder a -> String
show Responder a
_ = String
"Responder"


{- | The class of types that can act as the handle for an asynchronous actor. -}
class Actor a where
  {- | The type of messages associated with the actor. -}
  type Msg a
  {- | The channel through which messages can be sent to the actor. -}
  actorChan :: a -> Msg a -> IO ()
instance Actor (Chan m) where
  type Msg (Chan m) = m
  actorChan :: Chan m -> Msg (Chan m) -> IO ()
actorChan = Chan m -> Msg (Chan m) -> IO ()
forall a. Chan a -> a -> IO ()
writeChan


{- | Respond to an asynchronous message. -}
respond :: (MonadIO m) => Responder a -> a -> m Responded
respond :: forall (m :: * -> *) a.
MonadIO m =>
Responder a -> a -> m Responded
respond Responder a
responder a
val = do
  IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Responder a -> a -> IO ()
forall a. Responder a -> a -> IO ()
unResponder Responder a
responder a
val)
  Responded -> m Responded
forall (m :: * -> *) a. Monad m => a -> m a
return Responded
Responded


{- | Send a message to an actor, and wait for a response. -}
call
  :: ( Actor actor
     , MonadIO m
     )
  => actor {- ^ The actor to which we are sending a call request. -}
  -> (Responder a -> Msg actor)
     {- ^
       Given a way for the actor to respond to the message, construct
       a message that should be sent to the actor.

       Typically, your 'Msg' type will look something like this:

       > data MyMsg
       >   = MsgWithResponse SomeData (Responder ResponseType)
       >     -- In this example, this type of message requires a
       >     -- response. We package the responder up as part of the
       >     -- message itself. Idiomatically it is best to put the
       >     -- responder as the last argument so that it is easy to pass
       >     -- 'MsgWithResponse someData' to 'call'.
       >   | MsgWithoutResponse SomeData
       >     -- In this example, this type of message requires no response. It
       >     -- is a "fire and forget" message.

       you will call 'call' like this:

       > do
       >   response <- call actor (MsgWithResponse someData)
       >   -- response :: ResponseType

     -}
  -> m a
call :: forall actor (m :: * -> *) a.
(Actor actor, MonadIO m) =>
actor -> (Responder a -> Msg actor) -> m a
call actor
actor Responder a -> Msg actor
mkMessage = IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO a -> m a) -> IO a -> m a
forall a b. (a -> b) -> a -> b
$ do
  MVar a
mVar <- IO (MVar a)
forall a. IO (MVar a)
newEmptyMVar
  actor -> Msg actor -> IO ()
forall a. Actor a => a -> Msg a -> IO ()
actorChan actor
actor (Responder a -> Msg actor
mkMessage ((a -> IO ()) -> Responder a
forall a. (a -> IO ()) -> Responder a
Responder (MVar a -> a -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar a
mVar)))
  MVar a -> IO a
forall a. MVar a -> IO a
takeMVar MVar a
mVar


{- | Send a message to an actor, but do not wait for a response. -}
cast :: (Actor actor, MonadIO m) => actor -> Msg actor -> m ()
cast :: forall actor (m :: * -> *).
(Actor actor, MonadIO m) =>
actor -> Msg actor -> m ()
cast actor
actor = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (Msg actor -> IO ()) -> Msg actor -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. actor -> Msg actor -> IO ()
forall a. Actor a => a -> Msg a -> IO ()
actorChan actor
actor


{- |
  Proof that 'respond' was called. Clients can use this type in their
  type signatures when they require that 'respond' be called at least
  once, because calling 'respond' is the only way to generate values of
  this type.
-}
data Responded = Responded


{- | Log (at WARN) when the action terminates for any reason. -}
logUnexpectedTermination :: (MonadLogger m, MonadCatch m)
  => ProcessName
  -> m a
  -> m a
logUnexpectedTermination :: forall (m :: * -> *) a.
(MonadLogger m, MonadCatch m) =>
ProcessName -> m a -> m a
logUnexpectedTermination (ProcessName Text
name) m a
action =
  m a -> m (Either SomeException a)
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
try m a
action m (Either SomeException a)
-> (Either SomeException a -> m a) -> m a
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Left SomeException
err -> do
      Text -> m ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logWarn
        (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text
"Thread " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
name Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" finished with an error: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> SomeException -> Text
forall a b. (Show a, IsString b) => a -> b
showt SomeException
err
      SomeException -> m a
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM (SomeException
err :: SomeException)
    Right a
v -> do
      Text -> m ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logWarn (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text
"Thread " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
name Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" finished normally."
      a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
v


{- |
  Run a thread race.

  Within the provided action, you can call 'race' to fork new background
  threads. When the action terminates, all background threads forked
  with 'race' are also terminated. Likewise, if any one of the racing
  threads terminates, then all other racing threads are terminated _and_
  'runRace' will throw an exception.

  In any event, when 'runRace' returns, all background threads forked
  by the @action@ using 'race' will have been terminated.
-}
runRace
  :: (MonadUnliftIO m)
  => (Race => m a) {- ^ - @action@: The provided "race" action. -}
  -> m a
runRace :: forall (m :: * -> *) a. MonadUnliftIO m => (Race => m a) -> m a
runRace Race => m a
action = do
  m a -> IO a
runInIO <- m (m a -> IO a)
forall (m :: * -> *) a. MonadUnliftIO m => m (m a -> IO a)
askRunInIO
  IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO a -> m a)
-> ((Scope -> IO a) -> IO a) -> (Scope -> IO a) -> m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Scope -> IO a) -> IO a
forall a. (Scope -> IO a) -> IO a
Ki.scoped ((Scope -> IO a) -> m a) -> (Scope -> IO a) -> m a
forall a b. (a -> b) -> a -> b
$ \Scope
scope ->
    m a -> IO a
runInIO (let ?scope = Race
Scope
scope in m a
Race => m a
action)


{- |
  This constraint indicates that we are in the context of a thread race. If any
  threads in the race terminate, then all threads in the race terminate.
  Threads are "in the race" if they were forked using 'race'.
-}
type Race = (?scope :: Ki.Scope)


{- |
  Fork a new thread within the context of a race. This thread will be
  terminated when any other racing thread terminates, or else if this
  thread terminates first it will cause all other racing threads to
  be terminated.

  Generally, we normally expect that the thread is a "background thread"
  and will never terminate under "normal" conditions.
-}
race
  :: ( MonadCatch m
     , MonadLogger m
     , MonadUnliftIO m
     , Race
     )
  => ProcessName
  -> m a
  -> m ()
race :: forall (m :: * -> *) a.
(MonadCatch m, MonadLogger m, MonadUnliftIO m, Race) =>
ProcessName -> m a -> m ()
race ProcessName
name m a
action = do
  m () -> IO ()
runInIO <- m (m () -> IO ())
forall (m :: * -> *) a. MonadUnliftIO m => m (m a -> IO a)
askRunInIO
  IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
    (IO () -> m ()) -> (IO Void -> IO ()) -> IO Void -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Scope -> IO Void -> IO ()
Ki.fork_ Race
Scope
?scope
    (IO Void -> m ()) -> IO Void -> m ()
forall a b. (a -> b) -> a -> b
$ do
      ThreadId
tid <- IO ThreadId
myThreadId
      m () -> IO ()
runInIO (m () -> IO ()) -> (m () -> m ()) -> m () -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProcessName -> m () -> m ()
forall (m :: * -> *) a.
(MonadLogger m, MonadCatch m) =>
ProcessName -> m a -> m a
logUnexpectedTermination ProcessName
name (m () -> IO ()) -> m () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        Text -> m ()
forall (m :: * -> *). (HasCallStack, MonadLogger m) => Text -> m ()
logInfo (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text
"Starting thread (tid, name): " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> (ThreadId, ProcessName) -> Text
forall a b. (Show a, IsString b) => a -> b
showt (ThreadId
tid, ProcessName
name)
        m a -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void m a
action
      String -> IO Void
forall (m :: * -> *) a. (MonadIO m, HasCallStack) => String -> m a
throwString (String -> IO Void) -> String -> IO Void
forall a b. (a -> b) -> a -> b
$ String
"Thread Finished (tid, name): " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> (ThreadId, ProcessName) -> String
forall a. Show a => a -> String
show (ThreadId
tid, ProcessName
name)


{- | The name of a process. -}
newtype ProcessName = ProcessName
  { ProcessName -> Text
unProcessName :: Text
  }
  deriving newtype (String -> ProcessName
(String -> ProcessName) -> IsString ProcessName
forall a. (String -> a) -> IsString a
fromString :: String -> ProcessName
$cfromString :: String -> ProcessName
IsString, NonEmpty ProcessName -> ProcessName
ProcessName -> ProcessName -> ProcessName
(ProcessName -> ProcessName -> ProcessName)
-> (NonEmpty ProcessName -> ProcessName)
-> (forall b. Integral b => b -> ProcessName -> ProcessName)
-> Semigroup ProcessName
forall b. Integral b => b -> ProcessName -> ProcessName
forall a.
(a -> a -> a)
-> (NonEmpty a -> a)
-> (forall b. Integral b => b -> a -> a)
-> Semigroup a
stimes :: forall b. Integral b => b -> ProcessName -> ProcessName
$cstimes :: forall b. Integral b => b -> ProcessName -> ProcessName
sconcat :: NonEmpty ProcessName -> ProcessName
$csconcat :: NonEmpty ProcessName -> ProcessName
<> :: ProcessName -> ProcessName -> ProcessName
$c<> :: ProcessName -> ProcessName -> ProcessName
Semigroup, Semigroup ProcessName
ProcessName
Semigroup ProcessName
-> ProcessName
-> (ProcessName -> ProcessName -> ProcessName)
-> ([ProcessName] -> ProcessName)
-> Monoid ProcessName
[ProcessName] -> ProcessName
ProcessName -> ProcessName -> ProcessName
forall a.
Semigroup a -> a -> (a -> a -> a) -> ([a] -> a) -> Monoid a
mconcat :: [ProcessName] -> ProcessName
$cmconcat :: [ProcessName] -> ProcessName
mappend :: ProcessName -> ProcessName -> ProcessName
$cmappend :: ProcessName -> ProcessName -> ProcessName
mempty :: ProcessName
$cmempty :: ProcessName
Monoid, Int -> ProcessName -> ShowS
[ProcessName] -> ShowS
ProcessName -> String
(Int -> ProcessName -> ShowS)
-> (ProcessName -> String)
-> ([ProcessName] -> ShowS)
-> Show ProcessName
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ProcessName] -> ShowS
$cshowList :: [ProcessName] -> ShowS
show :: ProcessName -> String
$cshow :: ProcessName -> String
showsPrec :: Int -> ProcessName -> ShowS
$cshowsPrec :: Int -> ProcessName -> ShowS
Show)


{- | Wait for all racing threads to terminate. -}
wait :: (MonadIO m, Race) => m ()
wait :: forall (m :: * -> *). (MonadIO m, Race) => m ()
wait = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (STM () -> IO ()) -> STM () -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> m ()) -> STM () -> m ()
forall a b. (a -> b) -> a -> b
$ Scope -> STM ()
Ki.awaitAll Race
Scope
?scope