module Engine.Events.Sink ( Sink(..) , spawn ) where import RIO import Control.Concurrent.Chan.Unagi qualified as Unagi import Control.Exception (AsyncException(ThreadKilled)) import UnliftIO.Concurrent (forkFinally, killThread) import UnliftIO.Resource (ReleaseKey) import UnliftIO.Resource qualified as Resource import Engine.Types (StageRIO) newtype Sink event st = Sink { forall event st. Sink event st -> event -> StageRIO st () signal :: event -> StageRIO st () } spawn :: (event -> StageRIO rs ()) -> StageRIO rs (ReleaseKey, Sink event rs) spawn :: forall event rs. (event -> StageRIO rs ()) -> StageRIO rs (ReleaseKey, Sink event rs) spawn event -> StageRIO rs () handleEvent = do (InChan event eventsIn, OutChan event eventsOut) <- IO (InChan event, OutChan event) -> RIO (App GlobalHandles rs) (InChan event, OutChan event) forall (m :: * -> *) a. MonadIO m => IO a -> m a liftIO IO (InChan event, OutChan event) forall a. IO (InChan a, OutChan a) Unagi.newChan let sink :: Sink event rs sink = (event -> StageRIO rs ()) -> Sink event rs forall event st. (event -> StageRIO st ()) -> Sink event st Sink \event event -> IO () -> StageRIO rs () forall (m :: * -> *) a. MonadIO m => IO a -> m a liftIO (InChan event -> event -> IO () forall a. InChan a -> a -> IO () Unagi.writeChan InChan event eventsIn event event) let handler :: StageRIO rs () handler = StageRIO rs () -> StageRIO rs () forall (f :: * -> *) a b. Applicative f => f a -> f b forever (StageRIO rs () -> StageRIO rs ()) -> StageRIO rs () -> StageRIO rs () forall a b. (a -> b) -> a -> b $ IO event -> RIO (App GlobalHandles rs) event forall (m :: * -> *) a. MonadIO m => IO a -> m a liftIO (OutChan event -> IO event forall a. OutChan a -> IO a Unagi.readChan OutChan event eventsOut) RIO (App GlobalHandles rs) event -> (event -> StageRIO rs ()) -> StageRIO rs () forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b >>= event -> StageRIO rs () handleEvent ThreadId tid <- StageRIO rs () -> (Either SomeException () -> StageRIO rs ()) -> RIO (App GlobalHandles rs) ThreadId forall (m :: * -> *) a. MonadUnliftIO m => m a -> (Either SomeException a -> m ()) -> m ThreadId forkFinally StageRIO rs () handler \case Left SomeException exc -> case SomeException -> Maybe AsyncException forall e. Exception e => SomeException -> Maybe e fromException SomeException exc of Just AsyncException ThreadKilled -> Utf8Builder -> StageRIO rs () forall (m :: * -> *) env. (MonadIO m, MonadReader env m, HasLogFunc env, HasCallStack) => Utf8Builder -> m () logDebug Utf8Builder "Event thread killed" Maybe AsyncException _others -> Utf8Builder -> StageRIO rs () forall (m :: * -> *) env. (MonadIO m, MonadReader env m, HasLogFunc env, HasCallStack) => Utf8Builder -> m () logError (Utf8Builder -> StageRIO rs ()) -> Utf8Builder -> StageRIO rs () forall a b. (a -> b) -> a -> b $ Utf8Builder "Event thread crashed: " Utf8Builder -> Utf8Builder -> Utf8Builder forall a. Semigroup a => a -> a -> a <> SomeException -> Utf8Builder forall a. Show a => a -> Utf8Builder displayShow SomeException exc Right () -> Utf8Builder -> StageRIO rs () forall (m :: * -> *) env. (MonadIO m, MonadReader env m, HasLogFunc env, HasCallStack) => Utf8Builder -> m () logWarn Utf8Builder "Event thread exited prematurely" ReleaseKey key <- IO () -> RIO (App GlobalHandles rs) ReleaseKey forall (m :: * -> *). MonadResource m => IO () -> m ReleaseKey Resource.register (IO () -> RIO (App GlobalHandles rs) ReleaseKey) -> IO () -> RIO (App GlobalHandles rs) ReleaseKey forall a b. (a -> b) -> a -> b $ ThreadId -> IO () forall (m :: * -> *). MonadIO m => ThreadId -> m () killThread ThreadId tid pure (ReleaseKey key, Sink event rs sink)