{-# LANGUAGE GeneralizedNewtypeDeriving, ScopedTypeVariables, DoRec, GADTs, TypeFamilies, EmptyDataDecls, FlexibleInstances #-} {-# OPTIONS_GHC -fno-cse -fno-full-laziness #-} module FRP.Sodium.Plain where import qualified FRP.Sodium.Context as R -- Note: the 'full-laziness' optimization messes up finalizers, so we're -- disabling it. It'd be nice to find a really robust solution to this. -- -fno-cse just in case, since we're using unsafePerformIO. import Control.Applicative import Control.Concurrent.Chan import Control.Concurrent.MVar (MVar, newEmptyMVar, takeMVar, newMVar, readMVar) import qualified Control.Concurrent.MVar as MV import Control.Exception (evaluate) import Control.Monad import Control.Monad.State.Strict import Control.Monad.Trans import Data.Int import Data.IORef import Data.Map (Map) import qualified Data.Map as M import Data.Maybe import Data.Set (Set) import qualified Data.Set as S import Data.Sequence (Seq, (|>), (><)) import qualified Data.Sequence as Seq import GHC.Exts import System.Mem.Weak import System.IO.Unsafe import Unsafe.Coerce modifyMVar :: MVar a -> (a -> IO (a, b)) -> IO b modifyMVar mv f = MV.modifyMVar mv $ \a -> do (a', b') <- f a evaluate a' return (a', b') modifyMVar_ :: MVar a -> (a -> IO a) -> IO () modifyMVar_ mv f = MV.modifyMVar_ mv $ \a -> do a' <- f a evaluate a' return a' putMVar :: MVar a -> a -> IO () putMVar mv a = do evaluate a MV.putMVar mv a -- Note: the second parameter is a dummy, make it depend -- on the last function parameter of the caller to make -- sure that the IORef is freshly created every time unsafeNewIORef :: a -> b -> IORef a {-# NOINLINE unsafeNewIORef #-} unsafeNewIORef v dummy = unsafePerformIO (newIORef v) -- | Phantom type for use with 'R.Context' type class. data Plain partition :: Partition {-# NOINLINE partition #-} partition = unsafePerformIO createPartition where createPartition :: IO Partition createPartition = do lock <- newEmptyMVar nextNodeIDRef <- newIORef (NodeID 0) return $ Partition { paLock = lock, paNextNodeID = nextNodeIDRef } -- | A monad for transactional reactive operations. Execute it from 'IO' using 'sync'. type Reactive = R.Reactive Plain -- | A stream of events. The individual firings of events are called \'event occurrences\'. type Event = R.Event Plain -- | A time-varying value, American spelling. type Behavior = R.Behavior Plain -- | A time-varying value, British spelling. type Behaviour = R.Behavior Plain -- Must be data not newtype, because we need to attach finalizers to it data Sample a = Sample { unSample :: IO a, sDep :: Dep, sampleKeepAlive :: IORef () } instance R.Context Plain where newtype Reactive Plain a = Reactive (StateT ReactiveState IO a) data Event Plain a = Event { -- Must be data not newtype, because we need to attach finalizers to it -- | Listen for event occurrences on this event, to be handled by the specified -- handler. The returned action is used to unregister the listener. getListenRaw :: Reactive (Listen a), evCacheRef :: IORef (Maybe (Listen a)), eDep :: Dep } data Behavior Plain a = Behavior { updates_ :: Event a, -- ^ An event that gives the updates for the behavior. It doesn't do any equality -- comparison as the name might imply. -- | Obtain the current value of a behavior. sampleImpl :: Sample a } sync = sync newEvent = newEvent listen = listen never = never merge = merge filterJust = filterJust hold = hold updates = updates value = value snapshot = snapshot switchE = switchE switch = switch execute = execute sample = sample coalesce = coalesce once = once split = split -- | An event that gives the updates for the behavior. If the behavior was created -- with 'hold', then 'updates' gives you an event equivalent to the one that was held. updates :: Behavior a -> Event a updates = updates_ -- | Execute the specified 'Reactive' within a new transaction, blocking the caller -- until all resulting processing is complete and all callbacks have been called. -- This operation is thread-safe, so it may be called from any thread. -- -- State changes to 'hold' values occur after processing of the transaction is complete. sync :: Reactive a -> IO a sync task = do let loop :: StateT ReactiveState IO () = do queue1 <- gets asQueue1 if not $ Seq.null queue1 then do let Reactive task = Seq.index queue1 0 modify $ \as -> as { asQueue1 = Seq.drop 1 queue1 } task loop else do queue2 <- gets asQueue2 mTask <- lift $ popPriorityQueue queue2 case mTask of Just (Reactive task) -> do task loop Nothing -> do final <- gets asFinal if not $ Seq.null final then do let Reactive task = Seq.index final 0 modify $ \as -> as { asFinal = Seq.drop 1 final } task loop else return () post <- gets asPost unless (Seq.null post) $ do let Reactive task = post `Seq.index` 0 modify $ \as -> as { asPost = Seq.drop 1 post } task loop outVar <- newIORef undefined let lock = paLock partition putMVar lock () q <- newPriorityQueue evalStateT loop $ ReactiveState { asQueue1 = Seq.singleton (task >>= ioReactive . writeIORef outVar), asQueue2 = q, asFinal = Seq.empty, asPost = Seq.empty } takeMVar lock readIORef outVar -- | Returns an event, and a push action for pushing a value into the event. newEvent :: Reactive (Event a, a -> Reactive ()) newEvent = do (ev, push, _) <- ioReactive $ newEventLinked undefined return (ev, push) -- | Listen for firings of this event. The returned @IO ()@ is an IO action -- that unregisters the listener. This is the observer pattern. -- -- To listen to a 'Behavior' use @listen (value b) handler@ or -- @listen (updates b) handler@ -- -- NOTE: The callback is called with the transaction held, so you cannot -- use 'sync' inside a listener. You can delegate to another thread and have -- that start the new transaction. If you want to do more processing in -- the same transction, then you can use 'FRP.Sodium.Internal.listenTrans' -- but this is discouraged unless you really need to write a new primitive. listen :: Event a -> (a -> IO ()) -> Reactive (IO ()) listen ev handle = listenTrans ev $ \a -> ioReactive (handle a >> touch ev) -- | An event that never fires. never :: Event a never = Event { getListenRaw = return $ Listen (\_ _ _ -> return (return ())) undefined, evCacheRef = unsafeNewIORef Nothing undefined, eDep = undefined } -- | Merge two streams of events of the same type. -- -- In the case where two event occurrences are simultaneous (i.e. both -- within the same transaction), both will be delivered in the same -- transaction. If the event firings are ordered for some reason, then -- their ordering is retained. In many common cases the ordering will -- be undefined. merge :: Event a -> Event a -> Event a merge ea eb = Event gl cacheRef (dep (ea, eb)) where cacheRef = unsafeNewIORef Nothing eb gl = do (l, push, nodeRef) <- ioReactive newEventImpl unlistener <- later $ do u1 <- linkedListen ea (Just nodeRef) False push u2 <- linkedListen eb (Just nodeRef) False push return (u1 >> u2) addCleanup_Listen unlistener l -- | Unwrap Just values, and discard event occurrences with Nothing values. filterJust :: Event (Maybe a) -> Event a filterJust ema = Event gl cacheRef (dep ema) where cacheRef = unsafeNewIORef Nothing ema gl = do (l, push, nodeRef) <- ioReactive newEventImpl unlistener <- later $ linkedListen ema (Just nodeRef) False $ \ma -> case ma of Just a -> push a Nothing -> return () addCleanup_Listen unlistener l -- | Create a behavior with the specified initial value, that gets updated -- by the values coming through the event. The \'current value\' of the behavior -- is notionally the value as it was 'at the start of the transaction'. -- That is, state updates caused by event firings get processed at the end of -- the transaction. hold :: a -> Event a -> Reactive (Behavior a) hold initA ea = do bsRef <- ioReactive $ newIORef $ initA `seq` BehaviorState initA Nothing unlistener <- later $ linkedListen ea Nothing False $ \a -> do bs <- ioReactive $ readIORef bsRef ioReactive $ writeIORef bsRef $ a `seq` bs { bsUpdate = Just a } when (isNothing (bsUpdate bs)) $ scheduleLast $ ioReactive $ do bs <- readIORef bsRef let newCurrent = fromJust (bsUpdate bs) writeIORef bsRef $ newCurrent `seq` BehaviorState newCurrent Nothing keepAliveRef <- ioReactive $ newIORef () sample <- ioReactive $ addCleanup_Sample unlistener (Sample (bsCurrent <$> readIORef bsRef) (dep ea) keepAliveRef) let beh = sample `seq` Behavior { updates_ = ea, sampleImpl = sample } return beh -- | An event that is guaranteed to fire once when you listen to it, giving -- the current value of the behavior, and thereafter behaves like 'updates', -- firing for each update to the behavior's value. value :: Behavior a -> Event a value ba = sa `seq` ea `seq` eventify (listenValueRaw ba) (dep (sa, ea)) where sa = sampleImpl ba ea = updates ba -- | Sample the behavior at the time of the event firing. Note that the 'current value' -- of the behavior that's sampled is the value as at the start of the transaction -- before any state changes of the current transaction are applied through 'hold's. snapshot :: (a -> b -> c) -> Event a -> Behavior b -> Event c snapshot f ea bb = sample' `seq` Event gl cacheRef (dep (ea, sample)) where cacheRef = unsafeNewIORef Nothing bb sample = sampleImpl bb sample' = unSample sample gl = do (l, push, nodeRef) <- ioReactive newEventImpl unlistener <- later $ linkedListen ea (Just nodeRef) False $ \a -> do b <- ioReactive sample' push (f a b) addCleanup_Listen unlistener l -- | Unwrap an event inside a behavior to give a time-varying event implementation. switchE :: Behavior (Event a) -> Event a switchE bea = eea `seq` Event gl cacheRef (dep (eea, depRef)) where eea = updates bea cacheRef = unsafeNewIORef Nothing bea depRef = unsafeNewIORef undefined bea gl = do (l, push, nodeRef) <- ioReactive newEventImpl unlisten2Ref <- ioReactive $ newIORef Nothing let doUnlisten2 = do mUnlisten2 <- readIORef unlisten2Ref fromMaybe (return ()) mUnlisten2 unlistener1 <- later $ do initEa <- sample bea (ioReactive . writeIORef unlisten2Ref) =<< (Just <$> linkedListen initEa (Just nodeRef) False push) unlisten1 <- linkedListen eea (Just nodeRef) False $ \ea -> scheduleLast $ do ioReactive $ do doUnlisten2 writeIORef depRef ea (ioReactive . writeIORef unlisten2Ref) =<< (Just <$> linkedListen ea (Just nodeRef) True push) return $ unlisten1 >> doUnlisten2 addCleanup_Listen unlistener1 l -- | Unwrap a behavior inside another behavior to give a time-varying behavior implementation. switch :: Behavior (Behavior a) -> Reactive (Behavior a) switch bba = do ba <- sample bba depRef <- ioReactive $ newIORef ba za <- sample ba let eba = updates bba ioReactive $ evaluate eba (ev, push, nodeRef) <- ioReactive $ newEventLinked (dep (bba, depRef)) unlisten2Ref <- ioReactive $ newIORef Nothing let doUnlisten2 = do mUnlisten2 <- readIORef unlisten2Ref fromMaybe (return ()) mUnlisten2 unlisten1 <- listenValueRaw bba (Just nodeRef) False $ \ba -> do ioReactive $ do doUnlisten2 writeIORef depRef ba (ioReactive . writeIORef unlisten2Ref . Just) =<< listenValueRaw ba (Just nodeRef) False push hold za $ finalizeEvent ev (unlisten1 >> doUnlisten2) -- | Execute the specified 'Reactive' action inside an event. execute :: Event (Reactive a) -> Event a execute ev = Event gl cacheRef (dep ev) where cacheRef = unsafeNewIORef Nothing ev gl = do (l, push, nodeRef) <- ioReactive newEventImpl unlistener <- later $ linkedListen ev (Just nodeRef) False $ \action -> action >>= push addCleanup_Listen unlistener l -- | Obtain the current value of a behavior. sample :: Behavior a -> Reactive a {-# NOINLINE sample #-} sample = ioReactive . unSample . sampleImpl -- | If there's more than one firing in a single transaction, combine them into -- one using the specified combining function. -- -- If the event firings are ordered, then the first will appear at the left -- input of the combining function. In most common cases it's best not to -- make any assumptions about the ordering, and the combining function would -- ideally be commutative. coalesce :: (a -> a -> a) -> Event a -> Event a coalesce combine e = Event gl cacheRef (dep e) where cacheRef = unsafeNewIORef Nothing e gl = do (l, push, nodeRef) <- ioReactive newEventImpl outRef <- ioReactive $ newIORef Nothing unlistener <- later $ linkedListen e (Just nodeRef) False $ \a -> do first <- isNothing <$> ioReactive (readIORef outRef) ioReactive $ modifyIORef outRef $ \ma -> Just $ case ma of Just a0 -> a0 `combine` a Nothing -> a when first $ schedulePrioritized (Just nodeRef) $ do Just out <- ioReactive $ readIORef outRef ioReactive $ writeIORef outRef Nothing push out addCleanup_Listen unlistener l -- | Throw away all event occurrences except for the first one. once :: Event a -> Event a once e = Event gl cacheRef (dep e) where cacheRef = unsafeNewIORef Nothing e gl = do (l, push, nodeRef) <- ioReactive newEventImpl aliveRef <- ioReactive $ newIORef True unlistener <- later $ do rec unlisten <- linkedListen e (Just nodeRef) False $ \a -> do alive <- ioReactive $ readIORef aliveRef when alive $ do ioReactive $ writeIORef aliveRef False scheduleLast $ ioReactive unlisten push a return unlisten addCleanup_Listen unlistener l -- | Take each list item and put it into a new transaction of its own. -- -- An example use case of this might be a situation where we are splitting -- a block of input data into frames. We obviously want each frame to have -- its own transaction so that state is updated separately each frame. split :: Event [a] -> Event a split esa = Event gl cacheRef (dep esa) where cacheRef = unsafeNewIORef Nothing esa gl = do (l, push, nodeRef) <- ioReactive newEventImpl unlistener <- later $ linkedListen esa (Just nodeRef) False $ \as -> schedulePost $ map push as addCleanup_Listen unlistener l -- | Create a new 'Behavior' along with an action to push changes into it. -- American spelling. newBehavior :: a -- ^ Initial behavior value -> Reactive (Behavior a, a -> Reactive ()) newBehavior = R.newBehavior -- | Create a new 'Behavior' along with an action to push changes into it. -- British spelling. newBehaviour :: a -- ^ Initial behavior value -> Reactive (Behavior a, a -> Reactive ()) newBehaviour = R.newBehaviour -- | Merge two streams of events of the same type, combining simultaneous -- event occurrences. -- -- In the case where multiple event occurrences are simultaneous (i.e. all -- within the same transaction), they are combined using the supplied -- function. The output event is guaranteed not to have more than one -- event occurrence per transaction. -- -- The combine function should be commutative, because simultaneous events -- should be considered to be order-agnostic. mergeWith :: (a -> a -> a) -> Event a -> Event a -> Event a mergeWith = R.mergeWith -- | Only keep event occurrences for which the predicate is true. filterE :: (a -> Bool) -> Event a -> Event a filterE = R.filterE -- | Let event occurrences through only when the behavior's value is True. -- Note that the behavior's value is as it was at the start of the transaction, -- that is, no state changes from the current transaction are taken into account. gate :: Event a -> Behavior Bool -> Event a gate = R.gate -- | Transform an event with a generalized state loop (a mealy machine). The function -- is passed the input and the old state and returns the new state and output value. collectE :: (a -> s -> (b, s)) -> s -> Event a -> Reactive (Event b) collectE = R.collectE -- | Transform a behavior with a generalized state loop (a mealy machine). The function -- is passed the input and the old state and returns the new state and output value. collect :: (a -> s -> (b, s)) -> s -> Behavior a -> Reactive (Behavior b) collect = R.collect -- | Accumulate state changes given in the input event. accum :: a -> Event (a -> a) -> Reactive (Behavior a) accum = R.accum class PriorityQueueable k where priorityOf :: k -> IO Int64 newtype Sequence = Sequence Int64 deriving (Eq, Ord, Enum) data PriorityQueue k v = PriorityQueue { pqNextSeq :: IORef Sequence, pqDirty :: IORef Bool, pqQueue :: IORef (Map (Int64, Sequence) v), pqData :: IORef (Map Sequence (k, v)) } newPriorityQueue :: IO (PriorityQueue k v) newPriorityQueue = PriorityQueue <$> newIORef (Sequence 0) <*> newIORef False <*> newIORef M.empty <*> newIORef M.empty pushPriorityQueue :: PriorityQueueable k => PriorityQueue k v -> k -> v -> IO () pushPriorityQueue pq k v = do prio <- priorityOf k seq <- readIORef (pqNextSeq pq) modifyIORef (pqNextSeq pq) succ modifyIORef (pqQueue pq) (M.insert (prio, seq) v) modifyIORef (pqData pq) (M.insert seq (k, v)) dirtyPriorityQueue :: PriorityQueue k v -> IO () dirtyPriorityQueue pq = writeIORef (pqDirty pq) True popPriorityQueue :: PriorityQueueable k => PriorityQueue k v -> IO (Maybe v) popPriorityQueue pq = do maybeRegen q <- readIORef (pqQueue pq) if M.null q then return Nothing else do let (pseq@(prio, seq), v) = M.findMin q modifyIORef (pqQueue pq) (M.delete pseq) modifyIORef (pqData pq) (M.delete seq) return $ Just v where maybeRegen = do dirty <- readIORef (pqDirty pq) when dirty $ do writeIORef (pqDirty pq) False dat <- readIORef (pqData pq) writeIORef (pqQueue pq) M.empty forM_ (M.assocs dat) $ \(seq,(k,v)) -> do prio <- priorityOf k modifyIORef (pqQueue pq) (M.insert (prio, seq) v) type ID = Int64 instance PriorityQueueable (Maybe (MVar Node)) where priorityOf (Just nodeRef) = noRank <$> readMVar nodeRef priorityOf Nothing = return maxBound data ReactiveState = ReactiveState { asQueue1 :: Seq (Reactive ()), asQueue2 :: PriorityQueue (Maybe (MVar Node)) (Reactive ()), asFinal :: Seq (Reactive ()), asPost :: Seq (Reactive ()) } instance Functor (R.Reactive Plain) where fmap f rm = Reactive (fmap f (unReactive rm)) unReactive :: Reactive a -> StateT ReactiveState IO a unReactive (Reactive m) = m instance Applicative (R.Reactive Plain) where pure a = Reactive $ return a rf <*> rm = Reactive $ unReactive rf <*> unReactive rm instance Monad (R.Reactive Plain) where return a = Reactive $ return a rma >>= kmb = Reactive $ do a <- unReactive rma unReactive (kmb a) instance MonadFix (R.Reactive Plain) where mfix f = Reactive $ mfix $ \a -> unReactive (f a) ioReactive :: IO a -> Reactive a ioReactive io = Reactive $ liftIO io newtype NodeID = NodeID Int deriving (Eq, Ord, Enum) data Partition = Partition { paLock :: MVar (), paNextNodeID :: IORef NodeID } -- | Queue the specified atomic to run at the end of the priority 1 queue scheduleEarly :: Reactive () -> Reactive () scheduleEarly task = Reactive $ modify $ \as -> as { asQueue1 = asQueue1 as |> task } scheduleLast :: Reactive () -> Reactive () scheduleLast task = Reactive $ modify $ \as -> as { asFinal = asFinal as |> task } schedulePost :: [Reactive ()] -> Reactive () schedulePost tasks = Reactive $ modify $ \as -> as { asPost = Seq.fromList tasks >< asPost as } data Listen a = Listen { runListen_ :: Maybe (MVar Node) -> Bool -> (a -> Reactive ()) -> Reactive (IO ()) , listenerKeepAlive :: IORef () } -- | Unwrap an event's listener machinery. getListen :: Event a -> Reactive (Listen a) getListen (Event getLRaw cacheRef _) = do mL <- ioReactive $ readIORef cacheRef case mL of Just l -> return l Nothing -> do l <- getLRaw ioReactive $ writeIORef cacheRef (Just l) return l -- | Listen for firings of this event. The returned @IO ()@ is an IO action -- that unregisters the listener. This is the observer pattern. linkedListen :: Event a -> Maybe (MVar Node) -> Bool -> (a -> Reactive ()) -> Reactive (IO ()) {-# NOINLINE linkedListen #-} linkedListen ev mv suppressEarlierFirings handle = do ioReactive $ evaluate ev l <- getListen ev unlisten <- runListen_ l mv suppressEarlierFirings handle -- ensure l doesn't get cleaned up while runListen_ is running _ <- ioReactive $ touch l return unlisten -- | Variant of 'listen' that allows you to initiate more activity in the current -- transaction. Useful for implementing new primitives. listenTrans :: Event a -> (a -> Reactive ()) -> Reactive (IO ()) listenTrans ev handle = linkedListen ev Nothing False handle data Observer p a = Observer { obNextID :: ID, obListeners :: Map ID (a -> Reactive ()), obFirings :: [a] } data Node = Node { noID :: NodeID, noRank :: Int64, noListeners :: Map ID (MVar Node) } newNode :: IO (MVar Node) newNode = do nodeID <- readIORef (paNextNodeID partition) modifyIORef (paNextNodeID partition) succ newMVar (Node nodeID 0 M.empty) wrap :: (Maybe (MVar Node) -> Bool -> (a -> Reactive ()) -> Reactive (IO ())) -> IO (Listen a) {-# NOINLINE wrap #-} wrap l = Listen l <$> newIORef () touch :: a -> IO () {-# NOINLINE touch #-} touch a = evaluate a >> return () linkNode :: MVar Node -> ID -> MVar Node -> IO Bool linkNode nodeRef iD mvTarget = do no <- readMVar nodeRef modified <- ensureBiggerThan S.empty mvTarget (noRank no) modifyMVar_ nodeRef $ \no -> return $ let listeners' = M.insert iD mvTarget (noListeners no) in listeners' `seq` no { noListeners = listeners' } return modified ensureBiggerThan :: Set NodeID -> MVar Node -> Int64 -> IO Bool ensureBiggerThan visited nodeRef limit = do no <- takeMVar nodeRef if noRank no > limit || noID no `S.member` visited then do putMVar nodeRef no return False else do let newSerial = succ limit --putStrLn $ show (noRank no) ++ " -> " ++ show newSerial putMVar nodeRef $ newSerial `seq` no { noRank = newSerial } forM_ (M.elems . noListeners $ no) $ \mvTarget -> do ensureBiggerThan (S.insert (noID no) visited) mvTarget newSerial return True unlinkNode :: MVar Node -> ID -> IO () unlinkNode nodeRef iD = do modifyMVar_ nodeRef $ \no -> do let listeners' = M.delete iD (noListeners no) return $ listeners' `seq` no { noListeners = listeners' } newtype Dep = Dep Any dep :: a -> Dep dep = Dep . unsafeCoerce -- | Returns a 'Listen' for registering listeners, and a push action for pushing -- a value into the event. newEventImpl :: forall p a . IO (Listen a, a -> Reactive (), MVar Node) newEventImpl = do nodeRef <- newNode mvObs <- newMVar (Observer 0 M.empty []) cacheRef <- newIORef Nothing rec let l mMvTarget suppressEarlierFirings handle = do (firings, unlisten, iD) <- ioReactive $ modifyMVar mvObs $ \ob -> do let iD = obNextID ob nextID' = succ iD listeners' = M.insert iD handle (obListeners ob) ob' = nextID' `seq` listeners' `seq` ob { obNextID = nextID', obListeners = listeners' } unlisten = do modifyMVar_ mvObs $ \ob -> do let listeners' = M.delete iD (obListeners ob) return $ listeners' `seq` ob { obListeners = listeners' } unlinkNode nodeRef iD --touch listen return () return (ob', (reverse . obFirings $ ob, unlisten, iD)) modified <- case mMvTarget of Just mvTarget -> ioReactive $ linkNode nodeRef iD mvTarget Nothing -> return False -- If any of our ranks are changed, dirty the priority queue so -- all the priorities will be re-evaluated when modified $ dirtyPrioritized unless suppressEarlierFirings $ mapM_ handle firings return unlisten listen <- wrap l -- defeat optimizer on ghc-7.0.4 let push a = do ioReactive $ evaluate a ob <- ioReactive $ modifyMVar mvObs $ \ob -> return $ (ob { obFirings = a : obFirings ob }, ob) -- If this is the first firing... when (null (obFirings ob)) $ scheduleLast $ ioReactive $ do modifyMVar_ mvObs $ \ob -> return $ ob { obFirings = [] } mapM_ ($ a) (M.elems . obListeners $ ob) return (listen, push, nodeRef) -- | Returns an event, and a push action for pushing a value into the event. newEventLinked :: Dep -> IO (Event a, a -> Reactive (), MVar Node) newEventLinked d = do (listen, push, nodeRef) <- newEventImpl cacheRef <- newIORef Nothing let ev = Event { getListenRaw = return listen, evCacheRef = cacheRef, eDep = d } return (ev, push, nodeRef) instance Functor (R.Event Plain) where f `fmap` e = Event getListen' cacheRef (dep e) where cacheRef = unsafeNewIORef Nothing e getListen' = return $ Listen (\mNodeRef suppressEarlierFirings handle -> do linkedListen e mNodeRef suppressEarlierFirings (handle . f)) undefined instance Functor (R.Behavior Plain) where f `fmap` Behavior e s = fs `seq` fe `seq` Behavior fe fs where fe = f `fmap` e s' = unSample s fs = s' `seq` Sample (f `fmap` s') (dep s) undefined constant :: a -> Behavior a constant a = Behavior { updates_ = never, sampleImpl = Sample (return a) undefined undefined } data BehaviorState a = BehaviorState { bsCurrent :: a, bsUpdate :: Maybe a } -- | Add a finalizer to an event. finalizeEvent :: Event a -> IO () -> Event a {-# NOINLINE finalizeEvent #-} finalizeEvent ea unlisten = ea { getListenRaw = gl } where gl = do l <- getListen ea ioReactive $ finalizeListen l unlisten -- | Add a finalizer to a listener. finalizeListen :: Listen a -> IO () -> IO (Listen a) {-# NOINLINE finalizeListen #-} finalizeListen l unlisten = do mkWeakIORef (listenerKeepAlive l) unlisten return l -- | Add a finalizer to a Reactive. finalizeSample :: Sample a -> IO () -> IO (Sample a) {-# NOINLINE finalizeSample #-} finalizeSample s unlisten = do mkWeakIORef (sampleKeepAlive s) unlisten return s newtype Unlistener = Unlistener (MVar (Maybe (IO ()))) -- | Perform a listen later so we can tolerate lazy loops. -- Returns an 'Unlistener' that can be attached to an event with 'addCleanup_Listen'. later :: Reactive (IO ()) -> Reactive Unlistener later doListen = do unlistener@(Unlistener ref) <- newUnlistener -- We schedule the actual listen rather than doing it now, so event values get -- evaluated lazily. Otherwise we get deadlocks when events are used in value loops. scheduleEarly $ do mOldUnlisten <- ioReactive $ takeMVar ref case mOldUnlisten of Just _ -> do unlisten <- doListen ioReactive $ putMVar ref (Just unlisten) Nothing -> ioReactive $ putMVar ref mOldUnlisten return unlistener where newUnlistener :: Reactive Unlistener newUnlistener = Unlistener <$> ioReactive (newMVar (Just $ return ())) -- | Cause the things listened to with 'later' to be unlistened when the -- specified listener is not referenced any more. addCleanup_Listen :: Unlistener -> Listen a -> Reactive (Listen a) addCleanup_Listen (Unlistener ref) l = ioReactive $ finalizeListen l $ do mUnlisten <- takeMVar ref fromMaybe (return ()) mUnlisten putMVar ref Nothing -- | Cause the things listened to with 'later' to be unlistened when the -- specified sample is not referenced any more. addCleanup_Sample :: Unlistener -> Sample a -> IO (Sample a) addCleanup_Sample (Unlistener ref) s = finalizeSample s $ do mUnlisten <- takeMVar ref fromMaybe (return ()) mUnlisten putMVar ref Nothing -- | Listen to the value of this behavior with an initial callback giving -- the current value. Can get multiple values per transaction, the last of -- which is considered valid. You would normally want to use 'listenValue', -- which removes the extra unwanted values. listenValueRaw :: Behavior a -> Maybe (MVar Node) -> Bool -> (a -> Reactive ()) -> Reactive (IO ()) listenValueRaw ba = lastFiringOnly $ \mNodeRef suppressEarlierFirings handle -> do a <- sample ba handle a linkedListen (updates ba) mNodeRef suppressEarlierFirings handle -- | Queue the specified atomic to run at the end of the priority 2 queue schedulePrioritized :: Maybe (MVar Node) -> Reactive () -> Reactive () schedulePrioritized mNodeRef task = Reactive $ do q <- gets asQueue2 lift $ pushPriorityQueue q mNodeRef task dirtyPrioritized :: Reactive () dirtyPrioritized = Reactive $ do q <- gets asQueue2 lift $ dirtyPriorityQueue q -- Clean up the listener so it gives only one value per transaction, specifically -- the last one. lastFiringOnly :: (Maybe (MVar Node) -> Bool -> (a -> Reactive ()) -> Reactive (IO ())) -> Maybe (MVar Node) -> Bool -> (a -> Reactive ()) -> Reactive (IO ()) lastFiringOnly listen mNodeRef suppressEarlierFirings handle = do aRef <- ioReactive $ newIORef Nothing listen mNodeRef suppressEarlierFirings $ \a -> do ma <- ioReactive $ readIORef aRef ioReactive $ writeIORef aRef (Just a) when (isNothing ma) $ schedulePrioritized mNodeRef $ do Just a <- ioReactive $ readIORef aRef ioReactive $ writeIORef aRef Nothing handle a eventify :: (Maybe (MVar Node) -> Bool -> (a -> Reactive ()) -> Reactive (IO ())) -> Dep -> Event a eventify listen d = Event gl cacheRef d where cacheRef = unsafeNewIORef Nothing listen gl = do (l, push, nodeRef) <- ioReactive newEventImpl unlistener <- later $ listen (Just nodeRef) False push addCleanup_Listen unlistener l instance Applicative (R.Behavior Plain) where pure = constant b1@(Behavior e1 s1) <*> b2@(Behavior e2 s2) = Behavior u s where cacheRef = unsafeNewIORef Nothing s2 keepaliveRef = unsafeNewIORef () s2 u = Event gl cacheRef (dep (e1,e2)) s1' = unSample s1 s2' = unSample s2 gl = do fRef <- ioReactive $ newIORef =<< unSample s1 aRef <- ioReactive $ newIORef =<< unSample s2 (l, push, nodeRef) <- ioReactive newEventImpl unlistener <- later $ do un1 <- linkedListen e1 (Just nodeRef) False $ \f -> do ioReactive $ writeIORef fRef f a <- ioReactive $ readIORef aRef push (f a) un2 <- linkedListen e2 (Just nodeRef) False $ \a -> do f <- ioReactive $ readIORef fRef ioReactive $ writeIORef aRef a push (f a) return (un1 >> un2) addCleanup_Listen unlistener l s = s1' `seq` s2' `seq` Sample (($) <$> s1' <*> s2') (dep (s1, s2)) keepaliveRef {- -- | Cross the specified event over to a different partition. crossE :: (Typeable p, Typeable q) => Event a -> Reactive (Event q a) crossE epa = do (ev, push) <- ioReactive newEvent unlisten <- listen epa $ async . push return $ finalizeEvent ev unlisten -- | Cross the specified behavior over to a different partition. cross :: (Typeable p, Typeable q) => Behavior a -> Reactive (Behavior q a) cross bpa = do a <- sample bpa ea <- crossE (updates bpa) ioReactive $ sync $ 3 a ea -} -- | An event that gives the updates for the behavior. If the behavior was created -- with 'hold', then 'changes' gives you an event equivalent to the one that was held. changes :: Behavior a -> Event a {-# DEPRECATED changes "renamed to 'updates'" #-} changes = updates -- | An event that is guaranteed to fire once when you listen to it, giving -- the current value of the behavior, and thereafter behaves like 'changes', -- firing for each update to the behavior's value. values :: Behavior a -> Event a {-# DEPRECATED values "renamed to 'value'" #-} values = value -- | Sample the behavior at the time of the event firing. Note that the 'current value' -- of the behavior that's sampled is the value as at the start of the transaction -- before any state changes of the current transaction are applied through 'hold's. snapshotWith :: (a -> b -> c) -> Event a -> Behavior b -> Event c {-# DEPRECATED snapshotWith "renamed to 'snapshot'" #-} snapshotWith = snapshot -- | Count event occurrences, giving a behavior that starts with 0 before the first occurrence. count :: Event a -> Reactive (Behavior Int) {-# DEPRECATED count "removing it in the pursuit of minimalism, replace with: accum 0 (const (1+) <$> e)" #-} count = accum 0 . (const (1+) <$>)