module Engine.ReactiveBanana ( eventHandler , timer , observe , allocateActuated , allocatePaused , pushWorkerInput , pushWorkerInputJust , pushWorkerOutput , pushWorkerOutputJust , reactimateDebugShow , debounce ) where import RIO import Engine.Worker qualified as Worker import GHC.Stack (withFrozenCallStack) import Reactive.Banana qualified as RB import Reactive.Banana.Frameworks qualified as RBF import Resource.Region qualified as Region import UnliftIO.Resource (ResourceT) import UnliftIO.Resource qualified as Resource eventHandler :: (Resource.MonadResource m, MonadIO io) => ((a -> io ()) -> m Resource.ReleaseKey) -> ResourceT m (RBF.MomentIO (RB.Event a)) eventHandler action = do (addHandler, fire) <- liftIO RBF.newAddHandler Region.local_ $ action (liftIO . fire) pure $ RBF.fromAddHandler addHandler timer :: (MonadUnliftIO m) => Int -> ResourceT m (RBF.MomentIO (RB.Event Double)) timer delayMS = do (addHandler, fire) <- liftIO RBF.newAddHandler ticker <- async do begin <- getMonotonicTime threadDelay delayMS forever do before <- getMonotonicTime liftIO $ fire before after <- getMonotonicTime let tickNum = (after - begin) * 1e6 / fromIntegral delayMS :: Double intTick = truncate tickNum :: Integer driftTicks = tickNum - fromInteger intTick :: Double driftMS = driftTicks * fromIntegral delayMS :: Double adjustedDelay = max 0 $ delayMS - ceiling driftMS :: Int -- when (driftTicks > 0.01) $ -- -- traceShowM driftTicks -- traceShowM (delayMS, (tickNum, intTick, driftTicks), driftMS, adjustedDelay) threadDelay adjustedDelay Region.attachAsync ticker pure $ RBF.fromAddHandler addHandler observe :: (MonadUnliftIO m) => Worker.Var a -> ResourceT m (RBF.MomentIO (RB.Event a)) observe var = do (addHandler, fire) <- liftIO RBF.newAddHandler initial <- readTVarIO var tracker <- async $ go fire (Worker.vVersion initial) Region.attachAsync tracker liftIO $ fire (Worker.vData initial) -- XXX: the network isn't compiled yet! pure $ RBF.fromAddHandler addHandler where go fire oldVersion = do Worker.Versioned{..} <- atomically do next <- readTVar var if Worker.vVersion next > oldVersion then pure next else retrySTM liftIO $ fire vData go fire vVersion allocateActuated :: MonadUnliftIO m => (UnliftIO m -> RB.Event () -> RBF.MomentIO ()) -> ResourceT m RBF.EventNetwork allocateActuated builder = do (ah, fire) <- liftIO RBF.newAddHandler network <- allocatePaused \unlift -> do started <- RBF.fromAddHandler ah builder unlift started liftIO do RBF.actuate network fire () pure network allocatePaused :: MonadUnliftIO m => (UnliftIO m -> RBF.MomentIO ()) -> ResourceT m RBF.EventNetwork allocatePaused builder = do unlift <- lift askUnliftIO fmap snd $ Resource.allocate (RBF.compile $ builder unlift) RBF.pause pushWorkerInput :: Worker.HasInput var => var -> RB.Event (Worker.GetInput var) -> RBF.MomentIO () pushWorkerInput p = RBF.reactimate . fmap (Worker.pushInput p . const) pushWorkerInputJust :: Worker.HasInput var => var -> RB.Event (Maybe (Worker.GetInput var)) -> RBF.MomentIO () pushWorkerInputJust p = RBF.reactimate . fmap (traverse_ $ Worker.pushInput p . const) pushWorkerOutput :: Worker.HasOutput var => var -> RB.Event (Worker.GetOutput var) -> RBF.MomentIO () pushWorkerOutput p = RBF.reactimate . fmap (Worker.pushOutput p . const) pushWorkerOutputJust :: Worker.HasOutput var => var -> RB.Event (Maybe (Worker.GetOutput var)) -> RBF.MomentIO () pushWorkerOutputJust p = RBF.reactimate . fmap (traverse_ $ Worker.pushOutput p . const) reactimateDebugShow :: (Show a, MonadIO m, MonadReader env m, HasLogFunc env, HasCallStack) => (m () -> IO ()) -> RB.Event a -> RBF.MomentIO () reactimateDebugShow unlift = RBF.reactimate . fmap (unlift . withFrozenCallStack logDebug . displayShow) debounce :: Eq a => a -> RB.Event a -> RBF.MomentIO (RB.Event a) debounce initial spamUpdates = do (e, fire) <- RBF.newEvent oldVar <- newIORef initial RBF.reactimate $ spamUpdates <&> \new -> do changed <- atomicModifyIORef' oldVar \old -> (new, old /= new) when changed $ fire new pure e