{- | Many clocks tick at nondeterministic times (such as event sources), and it is thus impossible to schedule them deterministically with most other clocks. Using concurrency, they can still be scheduled with all clocks in 'IO', by running the clocks in separate threads. -} {-# LANGUAGE Arrows #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE TypeFamilies #-} module FRP.Rhine.Schedule.Concurrently where -- base import Control.Concurrent import Control.Monad (void) import Data.IORef -- transformers import Control.Monad.Trans.Class -- dunai import Control.Monad.Trans.MSF.Except import Control.Monad.Trans.MSF.Maybe import Control.Monad.Trans.MSF.Writer -- rhine import FRP.Rhine.Clock import FRP.Rhine.Schedule -- | Runs two clocks in separate GHC threads -- and collects the results in the foreground thread. -- Caution: The data processing will still happen in the same thread -- (since data processing and scheduling are separated concerns). concurrently :: ( Clock IO cl1, Clock IO cl2 , Time cl1 ~ Time cl2 ) => Schedule IO cl1 cl2 concurrently = Schedule $ \cl1 cl2 -> do iMVar <- newEmptyMVar mvar <- newEmptyMVar _ <- launchSubthread cl1 Left iMVar mvar _ <- launchSubthread cl2 Right iMVar mvar initTime <- takeMVar iMVar -- The first clock to be initialised sets the first time stamp _ <- takeMVar iMVar -- Initialise the second clock return (constM $ takeMVar mvar, initTime) where launchSubthread cl leftright iMVar mvar = forkIO $ do (runningClock, initTime) <- initClock cl putMVar iMVar initTime reactimate $ runningClock >>> second (arr leftright) >>> arrM (putMVar mvar) -- TODO These threads can't be killed from outside easily since we've lost their ids -- => make a MaybeT or ExceptT variant -- TODO Test whether signal networks also share the writer and except effects correctly with these schedules -- | As 'concurrently', but in the @WriterT w IO@ monad. -- Both background threads share a joint variable with the foreground -- to which the writer effect writes. concurrentlyWriter :: ( Monoid w , Clock (WriterT w IO) cl1 , Clock (WriterT w IO) cl2 , Time cl1 ~ Time cl2 ) => Schedule (WriterT w IO) cl1 cl2 concurrentlyWriter = Schedule $ \cl1 cl2 -> do iMVar <- lift newEmptyMVar mvar <- lift newEmptyMVar _ <- launchSubthread cl1 Left iMVar mvar _ <- launchSubthread cl2 Right iMVar mvar -- The first clock to be initialised sets the first time stamp (initTime, w1) <- lift $ takeMVar iMVar -- Initialise the second clock (_ , w2) <- lift $ takeMVar iMVar tell w1 tell w2 return (constM (WriterT $ takeMVar mvar), initTime) where launchSubthread cl leftright iMVar mvar = lift $ forkIO $ do ((runningClock, initTime), w) <- runWriterT $ initClock cl putMVar iMVar (initTime, w) reactimate $ runWriterS runningClock >>> proc (w', (time, tag_)) -> arrM (putMVar mvar) -< ((time, leftright tag_), w') -- | Schedule in the @ExceptT e IO@ monad. -- Whenever one clock encounters an exception in 'ExceptT', -- this exception is thrown in the other clock's 'ExceptT' layer as well, -- and in the schedule's (i.e. in the main clock's) thread. concurrentlyExcept :: ( Clock (ExceptT e IO) cl1 , Clock (ExceptT e IO) cl2 , Time cl1 ~ Time cl2 ) => Schedule (ExceptT e IO) cl1 cl2 concurrentlyExcept = Schedule $ \cl1 cl2 -> do (iMVar, mvar, errorref) <- lift $ do iMVar <- newEmptyMVar -- The initialisation time is transferred over this variable. It's written to twice. mvar <- newEmptyMVar -- The ticks and exceptions are transferred over this variable. It receives two 'Left' values in total. errorref <- newIORef Nothing -- Used to broadcast the exception to both clocks _ <- launchSubThread cl1 Left iMVar mvar errorref _ <- launchSubThread cl2 Right iMVar mvar errorref return (iMVar, mvar, errorref) catchAndDrain mvar $ do initTime <- ExceptT $ takeMVar iMVar -- The first clock to be initialised sets the first time stamp _ <- ExceptT $ takeMVar iMVar -- Initialise the second clock let runningSchedule = constM $ do eTick <- lift $ takeMVar mvar case eTick of Right tick -> return tick Left e -> do lift $ writeIORef errorref $ Just e -- Broadcast the exception to both clocks throwE e return (runningSchedule, initTime) where launchSubThread cl leftright iMVar mvar errorref = forkIO $ do initialised <- runExceptT $ initClock cl case initialised of Right (runningClock, initTime) -> do putMVar iMVar $ Right initTime Left e <- runExceptT $ reactimate $ runningClock >>> proc (td, tag2) -> do arrM (lift . putMVar mvar) -< Right (td, leftright tag2) me <- constM (lift $ readIORef errorref) -< () _ <- throwMaybe -< me returnA -< () putMVar mvar $ Left e -- Either throw own exception or acknowledge the exception from the other clock Left e -> void $ putMVar iMVar $ Left e catchAndDrain mvar initScheduleAction = catchE initScheduleAction $ \e -> do _ <- reactimate $ (constM $ ExceptT $ takeMVar mvar) >>> arr (const ()) -- Drain the mvar until the other clock acknowledges the exception throwE e -- | As 'concurrentlyExcept', with a single possible exception value. concurrentlyMaybe :: ( Clock (MaybeT IO) cl1 , Clock (MaybeT IO) cl2 , Time cl1 ~ Time cl2 ) => Schedule (MaybeT IO) cl1 cl2 concurrentlyMaybe = Schedule $ \cl1 cl2 -> initSchedule (hoistSchedule exceptTIOToMaybeTIO concurrentlyExcept) (HoistClock cl1 maybeTIOToExceptTIO) (HoistClock cl2 maybeTIOToExceptTIO) where exceptTIOToMaybeTIO :: ExceptT () IO a -> MaybeT IO a exceptTIOToMaybeTIO = exceptToMaybeT maybeTIOToExceptTIO :: MaybeT IO a -> ExceptT () IO a maybeTIOToExceptTIO = maybeToExceptT ()