-- | Small experimental library for interactive functional programs. {-# LANGUAGE GADTs #-} {-# LANGUAGE RankNTypes #-} module Control.Broccoli ( X, E, never, snapshot, snapshot_, accumulate, edge, justE, maybeE, filterE, Setup, runProgram, newX, newE, input, output, debugX, debugE ) where import Control.Applicative import Data.Functor import Data.Monoid import Control.Monad import Data.Unamb import Data.IORef import Control.Concurrent import Control.Concurrent.STM import Data.Function import System.IO.Unsafe -- | A value of type a that varies. data X a where PureX :: a -> X a FmapX :: forall a b . (b -> a) -> X b -> X a ApplX :: forall a b . X (b -> a) -> X b -> X a PortX :: MVar [ThreadId] -> TVar a -> X a -- | An event that carries values of type a when it occurs. data E a where NeverE :: E a FmapE :: forall a b . (b -> a) -> E b -> E a MappendE :: E a -> E a -> E a ProductE :: (b -> c -> a) -> E b -> E c -> E a SnapshotE :: E b -> X a -> E a JustE :: E (Maybe a) -> E a PortE :: MVar [ThreadId] -> TChan a -> E a instance Functor X where fmap f x = FmapX f x instance Applicative X where pure x = PureX x f <*> x = ApplX f x instance Functor E where fmap f e = FmapE f e instance Monoid (E a) where mempty = NeverE mappend e1 e2 = MappendE e1 e2 -- | A monad for hooking up inputs and outputs to a program. data Setup a = Setup (MVar [ThreadId] -> IO a) instance Monad Setup where return x = Setup (\_ -> return x) (Setup r) >>= f = Setup r' where r' mv = do x <- r mv let Setup r'' = f x r'' mv instance Applicative Setup where pure = return (<*>) = ap instance Functor Setup where fmap f (Setup io) = Setup (\mv -> f <$> io mv) setupIO :: IO a -> Setup a setupIO io = Setup (\_ -> io) getThreads :: Setup (MVar [ThreadId]) getThreads = Setup (\mv -> return mv) getThreadsE :: E a -> Maybe (MVar [ThreadId]) getThreadsE e = case e of NeverE -> Nothing FmapE _ e' -> getThreadsE e' MappendE e1 e2 -> getFirst $ First (getThreadsE e1) <> First (getThreadsE e2) ProductE _ e1 e2 -> getFirst $ First (getThreadsE e1) <> First (getThreadsE e2) SnapshotE e' x -> getFirst $ First (getThreadsE e') <> First (getThreadsX x) JustE e' -> getThreadsE e' PortE mv _ -> Just mv getThreadsX :: X a -> Maybe (MVar [ThreadId]) getThreadsX x = case x of PureX _ -> Nothing FmapX _ x' -> getThreadsX x' ApplX x1 x2 -> getFirst $ First (getThreadsX x1) <> First (getThreadsX x2) PortX mv _ -> Just mv dupE :: E a -> IO (E a) dupE e = case e of NeverE -> return NeverE FmapE f e' -> do e'' <- dupE e' return (FmapE f e'') MappendE e1 e2 -> do e1' <- dupE e1 e2' <- dupE e2 return (MappendE e1' e2') ProductE f e1 e2 -> do e1' <- dupE e1 e2' <- dupE e2 return (ProductE f e1' e2') SnapshotE e' x -> do e'' <- dupE e' return (SnapshotE e'' x) JustE e' -> do e'' <- dupE e' return (JustE e'') PortE mv ch -> do ch' <- atomically (dupTChan ch) return (PortE mv ch') readE :: E a -> IO a readE e = case e of NeverE -> hang MappendE e1 e2 -> race (readE e1) (readE e2) FmapE f e' -> f <$> readE e' ProductE f e1 e2 -> do x <- readE e1 y <- readE e2 return (f x y) SnapshotE e' x -> do readE e' atomically (readX x) JustE e' -> fix $ \loop -> do m <- readE e' case m of Nothing -> loop Just x -> return x PortE _ ch -> atomically (readTChan ch) readX :: X a -> STM a readX x = case x of PureX v -> return v FmapX f xx -> f <$> readX xx ApplX ff xx -> do f <- readX ff x <- readX xx return (f x) PortX _ tv -> readTVar tv hang :: IO a hang = do threadDelay (100 * 10^(6::Int)) hang waitE :: E a -> IO a waitE e0 = do e <- dupE e0 readE e --- -- | An event which gets the value of a signal when another event occurs. snapshot :: E a -> X b -> E (a,b) snapshot e x = ProductE (,) e (SnapshotE e x) -- | Like snapshot but ignores the original event's payload. snapshot_ :: E a -> X b -> E b snapshot_ e x = SnapshotE e x -- | Filter out events with the value of Nothing. justE :: E (Maybe a) -> E a justE = JustE -- | Filter out events using a Maybe function. maybeE :: (a -> Maybe b) -> E a -> E b maybeE f e = justE (f <$> e) -- | Filter out events using a Bool function. filterE :: (a -> Bool) -> E a -> E a filterE p e = maybeE (\x -> if p x then Just x else Nothing) e -- | An event that never happens. never :: E a never = mempty -- | Sum over events using an initial state and a state transition function. accumulate :: E a -> s -> (a -> s -> s) -> X s accumulate e0 s0 trans = case getThreadsE e0 of Nothing -> pure s0 Just mv -> PortX mv tv where tv = unsafePerformIO $ do state <- newTVarIO s0 threadId <- forkIO $ do e <- dupE e0 forever $ do x <- readE e atomically $ do s <- readTVar state let s' = trans x s writeTVar state s' modifyMVar_ mv (return . (threadId:)) return state -- | An event that occurs when an edge is detected in a signal. The edge test -- is applied to values before and after a discrete transition in the signal. -- The test should return Nothing when the two values are the same. edge :: X a -> (a -> a -> Maybe b) -> E b edge x diff = case getThreadsX x of Nothing -> never Just mv -> PortE mv ch where ch = unsafePerformIO $ do out <- newBroadcastTChanIO threadId <- forkIO $ do v0 <- atomically (readX x) ref <- newIORef v0 forever $ do v <- readIORef ref (d, v') <- atomically $ do v' <- readX x case diff v v' of Just d -> return (d, v') Nothing -> retry writeIORef ref v' atomically (writeTChan out d) modifyMVar_ mv (return . (threadId:)) return out -- | Creates a new event and an IO action to trigger it. newE :: Setup (E a, a -> IO ()) newE = do mv <- getThreads bch <- setupIO newBroadcastTChanIO return (PortE mv bch, atomically . writeTChan bch) -- | Creates a new signal and an IO action to update it. The argument is -- the initial value of the signal. newX :: a -> Setup (X a, a -> IO ()) newX v = do mv <- getThreads tv <- setupIO (newTVarIO v) return (PortX mv tv, atomically . writeTVar tv) -- | Spawn a thread to execute an action for each event occurrence. output :: E a -> (a -> IO ()) -> Setup () output e0 act = do mv <- getThreads setupIO $ do e <- dupE e0 tid <- (forkIO . forever) (readE e >>= act) modifyMVar_ mv (return . (tid:)) return () -- | Spawn an input thread to generate source signals and events. input :: IO () -> Setup () input handler = do mv <- getThreads setupIO $ do tid <- forkIO handler modifyMVar_ mv (return . (tid:)) return () -- | Run the setup action to create input and output threads. The returned IO -- action will be executed when setup is complete. runProgram blocks until -- the returned event occurs, at which time it kills all the threads and -- returns. runProgram :: Setup (IO (), E ()) -> IO () runProgram (Setup setup) = do mv <- newMVar [] (boot, exit) <- setup mv --threadDelay 5000 boot waitE exit withMVar mv (mapM killThread) return () -- | Print out events as they occur. Only for debugging purposes. debugE :: Show a => E a -> E a debugE e = unsafePerformIO $ do e' <- dupE e (forkIO . forever) (readE e' >>= print) return e -- | Print out transitions in a signal. Only for debugging purposes. debugX :: (Eq a, Show a) => X a -> X a debugX x = let diff a b = if a == b then Nothing else Just (a,b) in let e = edge x diff in unsafePerformIO $ do forkIO $ do e' <- dupE e forever (readE e' >>= print) return x