module Interpreter.Lib.Concurrency where import Control.Concurrent import Control.Concurrent.STM (atomically) import Control.Concurrent.STM.TChan import Control.Concurrent.STM.TSem import Control.Concurrent.STM.TMVar import Control.Exception (AsyncException(..), SomeException, catch, toException) import Control.Monad.IO.Class import Control.Monad.State.Strict import Data.Coerce import Data.Text (pack) import Interpreter.Common import Interpreter.Interpreter builtInLaunchThread :: BuiltInFnWithDoc '[ '("callback_thread", Callback), '("callback_arg", Maybe Value) ] builtInLaunchThread ((coerce -> (processCb :: Callback)) :> (coerce -> (mthreadArg)) :> EmptyArgs) = do istate <- get liftIO $ do resultRef <- newEmptyTMVarIO threadId <- forkIO $ flip catch (asynExHandler resultRef) $ do threadId <- myThreadId (r, _) <- flip runStateT (istate { isThreadName = pack (show threadId) }) (evaluateCallback processCb $ maybe [] (\x -> [x]) mthreadArg) atomically $ putTMVar resultRef $ case r of Just r' -> Right r' Nothing -> Left (toException MissingProcedureReturn) pure $ Just $ ThreadRef $ ThreadInfo threadId resultRef where asynExHandler :: TMVar (Either SomeException Value) -> AsyncException -> IO () asynExHandler ref e = atomically $ putTMVar ref $ Left $ toException e builtInKillThread :: BuiltInFnWithDoc '[ '("thread_result", ThreadInfo) ] builtInKillThread ((coerce -> (ThreadInfo threadId _)) :> EmptyArgs) = do liftIO $ killThread threadId pure Nothing builtInAwait :: BuiltInFnWithDoc '[ '("thread_result", ThreadInfo) ] builtInAwait ((coerce -> (ThreadInfo _ pmvar)) :> EmptyArgs) = do void $ liftIO $ atomically $ readTMVar pmvar pure Nothing builtInAwaitResult :: BuiltInFnWithDoc '[ '("thread_result", ThreadInfo) ] builtInAwaitResult ((coerce -> (ThreadInfo _ pmvar)) :> EmptyArgs) = (liftIO $ atomically $ readTMVar pmvar) >>= \case Right x -> pure $ Just x Left e -> throwErr e builtInNewChannel :: BuiltInFnWithDoc '[] builtInNewChannel _ = (pure . Channel . ChannelRef) <$> liftIO newTChanIO builtInWriteChannel :: BuiltInFnWithDoc '[ '("channel_ref", ChannelRef), '("value", Value)] builtInWriteChannel ((coerce -> (ChannelRef chan)) :> (coerce -> val) :> _) = do liftIO $ atomically $ writeTChan chan val pure Nothing builtInReadChannel :: BuiltInFnWithDoc '[ '("channel_ref", ChannelRef)] builtInReadChannel ((coerce -> (ChannelRef chan)) :> _) = Just <$> (liftIO $ atomically $ readTChan chan) builtInNewRef :: BuiltInFnWithDoc '[ '("init_value", Value) ] builtInNewRef ((coerce -> (v :: Value)) :> EmptyArgs) = do (ref, sem) <- liftIO $ do r <- newTMVarIO v s <- atomically (newTSem 1) pure (r, s) pure $ Just $ Ref $ MutableRef ref sem builtInWriteRef :: BuiltInFnWithDoc '[ '("ref", MutableRef), '("new_value", Value) ] builtInWriteRef ((coerce -> (MutableRef ref _)) :> (coerce -> v) :> EmptyArgs) = do void $ liftIO $ atomically $ swapTMVar ref v pure Nothing builtInModifyRef :: BuiltInFnWithDoc '[ '("ref", MutableRef), '("callback", Callback) ] builtInModifyRef ((coerce -> (MutableRef ref sem)) :> (coerce -> (callback :: Callback)) :> EmptyArgs) = do v <- liftIO $ atomically $ do waitTSem sem readTMVar ref evaluateCallback callback [v] >>= \case Nothing -> throwErr MissingProcedureReturn Just r -> liftIO $ atomically $ do _ <- swapTMVar ref r signalTSem sem pure Nothing builtInReadRef :: BuiltInFnWithDoc '[ '("ref", MutableRef) ] builtInReadRef ((coerce -> (MutableRef ref _)) :> EmptyArgs) = do v <- liftIO $ atomically $ readTMVar ref pure $ Just v