module Control.Broccoli (
X,
E,
never,
snapshot,
snapshot_,
accumulate,
edge,
justE,
maybeE,
filterE,
Setup,
runProgram,
newX,
newE,
input,
output,
debugX,
debugE
) where
import Control.Applicative
import Data.Functor
import Data.Monoid
import Control.Monad
import Data.Unamb
import Data.IORef
import Control.Concurrent
import Control.Concurrent.STM
import Data.Function
import System.IO.Unsafe
data X a where
PureX :: a -> X a
FmapX :: forall a b . (b -> a) -> X b -> X a
ApplX :: forall a b . X (b -> a) -> X b -> X a
PortX :: MVar [ThreadId] -> TVar a -> X a
data E a where
NeverE :: E a
FmapE :: forall a b . (b -> a) -> E b -> E a
MappendE :: E a -> E a -> E a
ProductE :: (b -> c -> a) -> E b -> E c -> E a
SnapshotE :: E b -> X a -> E a
JustE :: E (Maybe a) -> E a
PortE :: MVar [ThreadId] -> TChan a -> E a
instance Functor X where
fmap f x = FmapX f x
instance Applicative X where
pure x = PureX x
f <*> x = ApplX f x
instance Functor E where
fmap f e = FmapE f e
instance Monoid (E a) where
mempty = NeverE
mappend e1 e2 = MappendE e1 e2
data Setup a = Setup (MVar [ThreadId] -> IO a)
instance Monad Setup where
return x = Setup (\_ -> return x)
(Setup r) >>= f = Setup r' where
r' mv = do
x <- r mv
let Setup r'' = f x
r'' mv
instance Applicative Setup where
pure = return
(<*>) = ap
instance Functor Setup where
fmap f (Setup io) = Setup (\mv -> f <$> io mv)
setupIO :: IO a -> Setup a
setupIO io = Setup (\_ -> io)
getThreads :: Setup (MVar [ThreadId])
getThreads = Setup (\mv -> return mv)
getThreadsE :: E a -> Maybe (MVar [ThreadId])
getThreadsE e = case e of
NeverE -> Nothing
FmapE _ e' -> getThreadsE e'
MappendE e1 e2 -> getFirst $ First (getThreadsE e1) <> First (getThreadsE e2)
ProductE _ e1 e2 -> getFirst $ First (getThreadsE e1) <> First (getThreadsE e2)
SnapshotE e' x -> getFirst $ First (getThreadsE e') <> First (getThreadsX x)
JustE e' -> getThreadsE e'
PortE mv _ -> Just mv
getThreadsX :: X a -> Maybe (MVar [ThreadId])
getThreadsX x = case x of
PureX _ -> Nothing
FmapX _ x' -> getThreadsX x'
ApplX x1 x2 -> getFirst $ First (getThreadsX x1) <> First (getThreadsX x2)
PortX mv _ -> Just mv
dupE :: E a -> IO (E a)
dupE e = case e of
NeverE -> return NeverE
FmapE f e' -> do
e'' <- dupE e'
return (FmapE f e'')
MappendE e1 e2 -> do
e1' <- dupE e1
e2' <- dupE e2
return (MappendE e1' e2')
ProductE f e1 e2 -> do
e1' <- dupE e1
e2' <- dupE e2
return (ProductE f e1' e2')
SnapshotE e' x -> do
e'' <- dupE e'
return (SnapshotE e'' x)
JustE e' -> do
e'' <- dupE e'
return (JustE e'')
PortE mv ch -> do
ch' <- atomically (dupTChan ch)
return (PortE mv ch')
readE :: E a -> IO a
readE e = case e of
NeverE -> hang
MappendE e1 e2 -> race (readE e1) (readE e2)
FmapE f e' -> f <$> readE e'
ProductE f e1 e2 -> do
x <- readE e1
y <- readE e2
return (f x y)
SnapshotE e' x -> do
readE e'
atomically (readX x)
JustE e' -> fix $ \loop -> do
m <- readE e'
case m of
Nothing -> loop
Just x -> return x
PortE _ ch -> atomically (readTChan ch)
readX :: X a -> STM a
readX x = case x of
PureX v -> return v
FmapX f xx -> f <$> readX xx
ApplX ff xx -> do
f <- readX ff
x <- readX xx
return (f x)
PortX _ tv -> readTVar tv
hang :: IO a
hang = do
threadDelay (100 * 10^(6::Int))
hang
waitE :: E a -> IO a
waitE e0 = do
e <- dupE e0
readE e
snapshot :: E a -> X b -> E (a,b)
snapshot e x = ProductE (,) e (SnapshotE e x)
snapshot_ :: E a -> X b -> E b
snapshot_ e x = SnapshotE e x
justE :: E (Maybe a) -> E a
justE = JustE
maybeE :: (a -> Maybe b) -> E a -> E b
maybeE f e = justE (f <$> e)
filterE :: (a -> Bool) -> E a -> E a
filterE p e = maybeE (\x -> if p x then Just x else Nothing) e
never :: E a
never = mempty
accumulate :: E a -> s -> (a -> s -> s) -> X s
accumulate e0 s0 trans = case getThreadsE e0 of
Nothing -> pure s0
Just mv -> PortX mv tv where
tv = unsafePerformIO $ do
state <- newTVarIO s0
threadId <- forkIO $ do
putStrLn "accum forked"
e <- dupE e0
forever $ do
x <- readE e
atomically $ do
s <- readTVar state
let s' = trans x s
writeTVar state s'
modifyMVar_ mv (return . (threadId:))
return state
edge :: X a -> (a -> a -> Maybe b) -> E b
edge x diff = case getThreadsX x of
Nothing -> never
Just mv -> PortE mv ch where
ch = unsafePerformIO $ do
out <- newBroadcastTChanIO
threadId <- forkIO $ do
putStrLn "edge forked"
v0 <- atomically (readX x)
ref <- newIORef v0
forever $ do
v <- readIORef ref
(d, v') <- atomically $ do
v' <- readX x
case diff v v' of
Just d -> return (d, v')
Nothing -> retry
writeIORef ref v'
atomically (writeTChan out d)
modifyMVar_ mv (return . (threadId:))
return out
newE :: Setup (E a, a -> IO ())
newE = do
mv <- getThreads
bch <- setupIO newBroadcastTChanIO
return (PortE mv bch, atomically . writeTChan bch)
newX :: a -> Setup (X a, a -> IO ())
newX v = do
mv <- getThreads
tv <- setupIO (newTVarIO v)
return (PortX mv tv, atomically . writeTVar tv)
output :: E a -> (a -> IO ()) -> Setup ()
output e0 act = do
mv <- getThreads
setupIO $ do
e <- dupE e0
tid <- (forkIO . forever) (readE e >>= act)
modifyMVar_ mv (return . (tid:))
return ()
input :: IO () -> Setup ()
input handler = do
mv <- getThreads
setupIO $ do
tid <- forkIO handler
modifyMVar_ mv (return . (tid:))
return ()
runProgram :: Setup (IO (), E ()) -> IO ()
runProgram (Setup setup) = do
mv <- newMVar []
(boot, exit) <- setup mv
boot
waitE exit
withMVar mv (mapM killThread)
return ()
debugE :: Show a => E a -> E a
debugE e = unsafePerformIO $ do
e' <- dupE e
(forkIO . forever) (readE e' >>= print)
return e
debugX :: (Eq a, Show a) => X a -> X a
debugX x =
let diff a b = if a == b then Nothing else Just (a,b) in
let e = edge x diff in
unsafePerformIO $ do
forkIO $ do
putStrLn "edge forked"
e' <- dupE e
forever (readE e' >>= print)
return x