module FRP.Sodium.Impl where
import Control.Applicative
import Control.Concurrent
import Control.Concurrent.Chan
import Control.Concurrent.MVar
import Control.Exception (evaluate)
import Control.Monad
import Control.Monad.State.Strict
import Control.Monad.Trans
import Data.Int
import Data.IORef
import Data.Map (Map)
import qualified Data.Map as M
import Data.Maybe
import Data.Set (Set)
import qualified Data.Set as S
import Data.Sequence (Seq, (|>))
import qualified Data.Sequence as Seq
import Data.Typeable
import GHC.Exts
import System.Mem.Weak
import System.IO.Unsafe
import Unsafe.Coerce
type ID = Int64
data ReactiveState p = ReactiveState {
asQueue1 :: Seq (Reactive p ()),
asQueue2 :: Map Int64 (Reactive p ()),
asFinal :: IO ()
}
newtype Reactive p a = Reactive (StateT (ReactiveState p) IO a)
deriving (Functor, Applicative, Monad, MonadFix)
ioReactive :: IO a -> Reactive p a
ioReactive io = Reactive $ liftIO io
newtype NodeID = NodeID Int deriving (Eq, Ord, Enum)
data Partition p = Partition {
paRun :: Reactive p () -> IO () -> IO (),
paNextNodeID :: IORef NodeID
}
schedulePriority1 :: Reactive p () -> Reactive p ()
schedulePriority1 task = Reactive $ modify $ \as -> as { asQueue1 = asQueue1 as |> task }
schedulePriority2 :: Int64 -> Reactive p () -> Reactive p ()
schedulePriority2 priority task = Reactive $ modify $ \as -> as {
asQueue2 = M.alter (\mOldTask -> Just $ case mOldTask of
Just oldTask -> oldTask >> task
Nothing -> task) priority (asQueue2 as)
}
onFinal :: IO () -> Reactive p ()
onFinal task = Reactive $ modify $ \as -> as { asFinal = asFinal as >> task }
partitionRegistry :: MVar (Map String Any)
partitionRegistry = unsafePerformIO $ newMVar M.empty
partition :: forall p . Typeable p => IO (Partition p)
partition = do
let typ = show $ typeOf (undefined :: p)
modifyMVar partitionRegistry $ \reg ->
case M.lookup typ reg of
Just part -> return (reg, unsafeCoerce part)
Nothing -> do
part <- createPartition
return (M.insert typ (unsafeCoerce part) reg, part)
createPartition :: IO (Partition p)
createPartition = do
ch <- newChan
forkIO $ forever $ do
(task, onCompletion) <- readChan ch
let loop = do
queue1 <- gets asQueue1
if not $ Seq.null queue1 then do
let Reactive task = Seq.index queue1 0
modify $ \as -> as { asQueue1 = Seq.drop 1 queue1 }
task
loop
else do
queue2 <- gets asQueue2
if not $ M.null queue2 then do
let (k, Reactive task) = M.findMin queue2
modify $ \as -> as { asQueue2 = M.delete k queue2 }
task
loop
else do
final <- gets asFinal
liftIO final
return ()
runStateT loop $ ReactiveState {
asQueue1 = Seq.singleton task,
asQueue2 = M.empty,
asFinal = return ()
}
onCompletion
nextNodeIDRef <- newIORef (NodeID 0)
return $ Partition {
paRun = \task onCompletion -> writeChan ch (task, onCompletion),
paNextNodeID = nextNodeIDRef
}
asynchronously :: Typeable p => Reactive p () -> IO ()
asynchronously task = do
part <- partition
paRun part task (return ())
synchronously :: Typeable p => Reactive p a -> IO a
synchronously task = do
mvOutput <- newEmptyMVar
mvCompleted <- newEmptyMVar
part <- partition
paRun part (task >>= ioReactive . putMVar mvOutput) (putMVar mvCompleted ())
takeMVar mvCompleted
takeMVar mvOutput
data Listen p a = Listen { runListen_ :: Maybe (MVar (Node p)) -> (a -> Reactive p ()) -> Reactive p (IO ()) }
runListen :: Listen p a -> Maybe (MVar (Node p)) -> (a -> Reactive p ()) -> Reactive p (IO ())
runListen l mv handle = do
o <- runListen_ l mv handle
_ <- ioReactive $ evaluate l
return o
data Event p a = Event {
getListenRaw :: Reactive p (Listen p a),
evCacheRef :: IORef (Maybe (Listen p a))
}
never :: Event p a
never = Event {
getListenRaw = return $ Listen $ \_ _ -> return (return ()),
evCacheRef = unsafePerformIO $ newIORef Nothing
}
getListen :: Event p a -> Reactive p (Listen p a)
getListen (Event getLRaw cacheRef) = do
mL <- ioReactive $ readIORef cacheRef
case mL of
Just l -> return l
Nothing -> do
l <- getLRaw
ioReactive $ writeIORef cacheRef (Just l)
return l
linkedListen :: Event p a -> Maybe (MVar (Node p)) -> (a -> Reactive p ()) -> Reactive p (IO ())
linkedListen ev mMvTarget handle = do
l <- getListen ev
runListen l mMvTarget handle
listen :: Event p a -> (a -> Reactive p ()) -> Reactive p (IO ())
listen ev handle = linkedListen ev Nothing handle
listenIO :: Event p a -> (a -> IO ()) -> Reactive p (IO ())
listenIO ev handle = listen ev (ioReactive . handle)
data Observer p a = Observer {
obNextID :: ID,
obListeners :: Map ID (a -> Reactive p ()),
obFirings :: [a]
}
data Node p = Node {
noID :: NodeID,
noSerial :: Int64,
noListeners :: Map ID (MVar (Node p))
}
newNode :: forall p . Typeable p => IO (MVar (Node p))
newNode = do
part <- partition :: IO (Partition p)
nodeID <- readIORef (paNextNodeID part)
modifyIORef (paNextNodeID part) succ
newMVar (Node nodeID 0 M.empty)
wrap :: (Maybe (MVar (Node p)) -> (a -> Reactive p ()) -> Reactive p (IO ())) -> IO (Listen p a)
wrap l = return (Listen l)
touch :: Listen p a -> IO ()
touch l = evaluate l >> return ()
linkNode :: MVar (Node p) -> ID -> MVar (Node p) -> IO ()
linkNode mvNode iD mvTarget = do
no <- readMVar mvNode
ensureBiggerThan S.empty mvTarget (noSerial no)
modifyMVar_ mvNode $ \no -> return $
no { noListeners = M.insert iD mvTarget (noListeners no) }
ensureBiggerThan :: Set NodeID -> MVar (Node p) -> Int64 -> IO ()
ensureBiggerThan visited mvNode limit = do
no <- readMVar mvNode
if noID no `S.member` visited || noSerial no > limit then
return ()
else do
let newSerial = succ limit
modifyMVar_ mvNode $ \no -> return $ no { noSerial = newSerial }
forM_ (M.elems . noListeners $ no) $ \mvTarget -> do
ensureBiggerThan (S.insert (noID no) visited) mvTarget newSerial
unlinkNode :: MVar (Node p) -> ID -> IO ()
unlinkNode mvNode iD = do
modifyMVar_ mvNode $ \no -> return $
no { noListeners = M.delete iD (noListeners no) }
newSink :: forall p a . Typeable p => IO (Listen p a, a -> Reactive p (), MVar (Node p))
newSink = do
mvNode <- newNode
mvObs <- newMVar (Observer 0 M.empty [])
cacheRef <- newIORef Nothing
rec
let l mMvTarget handle = do
(firings, unlisten, iD) <- ioReactive $ modifyMVar mvObs $ \ob -> return $
let iD = obNextID ob
handle' a = handle a >> ioReactive (touch listen)
ob' = ob { obNextID = succ iD,
obListeners = M.insert iD handle' (obListeners ob) }
unlisten = do
modifyMVar_ mvObs $ \ob -> return $ ob {
obListeners = M.delete iD (obListeners ob)
}
unlinkNode mvNode iD
return ()
in (ob', (reverse . obFirings $ ob, unlisten, iD))
case mMvTarget of
Just mvTarget -> ioReactive $ linkNode mvNode iD mvTarget
Nothing -> return ()
mapM_ handle firings
return unlisten
listen <- wrap l
let push a = do
ob <- ioReactive $ modifyMVar mvObs $ \ob -> return $
(ob { obFirings = a : obFirings ob }, ob)
when (null (obFirings ob)) $ onFinal $ do
modifyMVar_ mvObs $ \ob -> return $ ob { obFirings = [] }
let seqa = seq a a
mapM_ ($ seqa) (M.elems . obListeners $ ob)
return (listen, push, mvNode)
newEventLinked :: Typeable p => IO (Event p a, a -> Reactive p (), MVar (Node p))
newEventLinked = do
(listen, push, mvNode) <- newSink
cacheRef <- newIORef Nothing
let ev = Event {
getListenRaw = return listen,
evCacheRef = cacheRef
}
return (ev, push, mvNode)
newEvent :: Typeable p => IO (Event p a, a -> Reactive p ())
newEvent = do
(ev, push, _) <- newEventLinked
return (ev, push)
instance Functor (Event p) where
f `fmap` (Event getListen cacheRef) = Event getListen' cacheRef
where
cacheRef = unsafePerformIO $ newIORef Nothing
getListen' = do
return $ Listen $ \mMvNode handle -> do
l <- getListen
runListen l mMvNode (handle . f)
merge :: Typeable p => Event p a -> Event p a -> Event p a
merge ea eb = Event gl cacheRef
where
cacheRef = unsafePerformIO $ newIORef Nothing
gl = do
l1 <- getListen ea
l2 <- getListen eb
(l, push, mvNode) <- ioReactive newSink
unlisten1 <- runListen l1 (Just mvNode) push
unlisten2 <- runListen l2 (Just mvNode) push
ioReactive $ finalizeListen l $ unlisten1 >> unlisten2
justE :: Typeable p => Event p (Maybe a) -> Event p a
justE ema = Event gl cacheRef
where
cacheRef = unsafePerformIO $ newIORef Nothing
gl = do
(l', push, mvNode) <- ioReactive newSink
l <- getListen ema
unlisten <- runListen l (Just mvNode) $ \ma -> case ma of
Just a -> push a
Nothing -> return ()
ioReactive $ finalizeListen l' unlisten
filterE :: Typeable p => (a -> Bool) -> Event p a -> Event p a
filterE pred = justE . ((\a -> if pred a then Just a else Nothing) <$>)
type Behavior p a = Behaviour p a
data Accessor p a = Accessor {
acSample :: Reactive p a,
acListenUpdates :: (a -> Reactive p ()) -> Reactive p (IO ())
}
data Behaviour p a = Behaviour {
underlyingEvent :: Event p a,
sample :: Reactive p a
}
instance Functor (Behaviour p) where
f `fmap` Behaviour underlyingEvent sample =
Behaviour (f `fmap` underlyingEvent) (f `fmap` sample)
constant :: a -> Behaviour p a
constant a = Behaviour {
underlyingEvent = never,
sample = return a
}
data BehaviourState p a = BehaviourState {
bsCurrent :: a,
bsUpdate :: Maybe a
}
finalizeEvent :: Event p a -> IO () -> Event p a
finalizeEvent ea unlisten = Event gl (evCacheRef ea)
where
gl = do
l <- getListen ea
ioReactive $ finalizeListen l unlisten
finalizeListen :: Listen p a -> IO () -> IO (Listen p a)
finalizeListen l unlisten = do
addFinalizer l unlisten
return l
hold :: a -> Event p a -> Reactive p (Behaviour p a)
hold initA ea = do
bsRef <- ioReactive $ newIORef (BehaviourState initA Nothing)
unlistenRef <- ioReactive $ newMVar (Just $ return ())
schedulePriority1 $ do
mOldUnlisten <- ioReactive $ takeMVar unlistenRef
case mOldUnlisten of
Just _ -> do
unlisten <- listen ea $ \a -> do
bs <- ioReactive $ readIORef bsRef
ioReactive $ writeIORef bsRef $ bs { bsUpdate = Just a }
when (isNothing (bsUpdate bs)) $ onFinal $ do
bs <- readIORef bsRef
let newCurrent = fromJust (bsUpdate bs)
bs' = newCurrent `seq` BehaviourState newCurrent Nothing
writeIORef bsRef bs'
ioReactive $ putMVar unlistenRef (Just unlisten)
Nothing ->
ioReactive $ putMVar unlistenRef mOldUnlisten
let gl = do
l <- getListen ea
ioReactive $ finalizeListen l $ do
mUnlisten <- takeMVar unlistenRef
case mUnlisten of
Just unlisten -> unlisten
Nothing -> return ()
putMVar unlistenRef Nothing
beh = Behaviour {
underlyingEvent = Event gl (evCacheRef ea),
sample = ioReactive $ bsCurrent <$> readIORef bsRef
}
return beh
attachWith :: Typeable p => (a -> b -> c) -> Event p a -> Behaviour p b -> Event p c
attachWith f ea bb = Event gl cacheRef
where
cacheRef = unsafePerformIO $ newIORef Nothing
gl = do
(l, push, mvNode) <- ioReactive newSink
unlisten <- linkedListen ea (Just mvNode) $ \a -> do
b <- sample bb
push (f a b)
ioReactive $ finalizeListen l unlisten
attach :: Typeable p => Event p a -> Behaviour p b -> Event p (a,b)
attach = attachWith (,)
tag :: Typeable p => Event p a -> Behaviour p b -> Event p b
tag = attachWith (flip const)
listenValueRaw :: Behaviour p a -> Maybe (MVar (Node p)) -> (a -> Reactive p ()) -> Reactive p (IO ())
listenValueRaw ba mMvNode handle = do
a <- sample ba
handle a
linkedListen (underlyingEvent ba) mMvNode handle
tidy :: (Maybe (MVar (Node p)) -> (a -> Reactive p ()) -> Reactive p (IO ()))
-> Maybe (MVar (Node p)) -> (a -> Reactive p ()) -> Reactive p (IO ())
tidy listen mMvNode handle = do
aRef <- ioReactive $ newIORef Nothing
listen mMvNode $ \a -> do
ma <- ioReactive $ readIORef aRef
ioReactive $ writeIORef aRef (Just a)
priority <- case mMvNode of
Just mvNode -> do
node <- ioReactive $ readMVar mvNode
return (noSerial node)
Nothing -> return maxBound
when (isNothing ma) $ schedulePriority2 priority $ do
Just a <- ioReactive $ readIORef aRef
ioReactive $ writeIORef aRef Nothing
handle a
linkedListenValue :: Behaviour p a -> Maybe (MVar (Node p)) -> (a -> Reactive p ()) -> Reactive p (IO ())
linkedListenValue ba = tidy (listenValueRaw ba)
listenValue :: Behaviour p a -> (a -> Reactive p ()) -> Reactive p (IO ())
listenValue ba = linkedListenValue ba Nothing
listenValueIO :: Behaviour p a -> (a -> IO ()) -> Reactive p (IO ())
listenValueIO ba handle = listenValue ba (ioReactive . handle)
eventify :: Typeable p => (Maybe (MVar (Node p)) -> (a -> Reactive p ()) -> Reactive p (IO ())) -> Event p a
eventify listen = Event gl cacheRef
where
cacheRef = unsafePerformIO $ newIORef Nothing
gl = do
(l, push, mvNode) <- ioReactive newSink
unlisten <- listen (Just mvNode) push
ioReactive $ finalizeListen l unlisten
valueEvent :: Typeable p => Behaviour p a -> Event p a
valueEvent ba = eventify (linkedListenValue ba)
instance Typeable p => Applicative (Behaviour p) where
pure = constant
Behaviour u1 s1 <*> Behaviour u2 s2 = Behaviour u s
where
cacheRef = unsafePerformIO $ newIORef Nothing
u = Event gl cacheRef
gl = do
fRef <- ioReactive . newIORef =<< s1
aRef <- ioReactive . newIORef =<< s2
l1 <- getListen u1
l2 <- getListen u2
(l, push, mvNode) <- ioReactive newSink
unlisten1 <- runListen l1 (Just mvNode) $ \f -> do
ioReactive $ writeIORef fRef f
a <- ioReactive $ readIORef aRef
push (f a)
unlisten2 <- runListen l2 (Just mvNode) $ \a -> do
f <- ioReactive $ readIORef fRef
ioReactive $ writeIORef aRef a
push (f a)
ioReactive $ finalizeListen l (unlisten1 >> unlisten2)
s = ($) <$> s1 <*> s2
gate :: Typeable p => Event p a -> Behaviour p Bool -> Event p a
gate ea = justE . attachWith (\a b -> if b then Just a else Nothing) ea
collectE :: Typeable p => (a -> s -> (b, s)) -> s -> Event p a -> Reactive p (Event p b)
collectE f z ea = do
rec
let ebs = attachWith f ea s
eb = fst <$> ebs
es = snd <$> ebs
s <- hold z es
return eb
collect :: Typeable p => (a -> s -> (b, s)) -> s -> Behaviour p a -> Reactive p (Behaviour p b)
collect f zs bea = do
let ea = eventify . tidy . linkedListen $ underlyingEvent bea
za <- sample bea
let (zb, zs') = f za zs
rec
let ebs = attachWith f ea (snd <$> bs)
bs <- hold (zb, zs') ebs
return (fst <$> bs)
accumE :: Typeable p => (a -> s -> s) -> s -> Event p a -> Reactive p (Event p s)
accumE f z ea = do
rec
let es = attachWith f ea s
s <- hold z es
return es
countE :: Typeable p => Event p a -> Reactive p (Event p Int)
countE = accumE (+) 0 . (const 1 <$>)
count :: Typeable p => Event p a -> Reactive p (Behaviour p Int)
count = hold 0 <=< countE
splitLessThan :: Ord k => k -> Map k a -> (Map k a, Map k a)
splitLessThan k m =
let (lt, mEq, gt) = M.splitLookup k m
in (lt, case mEq of
Just eq -> M.insert k eq gt
Nothing -> gt)
unlistenLessThan :: IORef (Map ID (IO ())) -> ID -> IO ()
unlistenLessThan unlistensRef iD = do
uls <- readIORef unlistensRef
let (toDelete, uls') = splitLessThan iD uls
do
writeIORef unlistensRef uls'
forM_ (M.elems toDelete) $ \unl -> unl
switchE :: Typeable p => Behaviour p (Event p a) -> Event p a
switchE bea = Event gl cacheRef
where
cacheRef = unsafePerformIO $ newIORef Nothing
unlistensRef = unsafePerformIO $ newIORef M.empty
gl = do
beaId <- collect (\ea nxtID -> ((ea, nxtID), succ nxtID)) (0 :: ID) bea
(l, push, mvNode) <- ioReactive newSink
unlisten1 <- linkedListenValue beaId (Just mvNode) $ \(ea, iD) -> do
let filtered = justE $ attachWith (\a activeID ->
if activeID == iD
then Just a
else Nothing
) ea (snd <$> beaId)
unlisten2 <- listen filtered $ \a -> do
push a
ioReactive $ unlistenLessThan unlistensRef iD
ioReactive $ modifyIORef unlistensRef (M.insert iD unlisten2)
ioReactive $ finalizeListen l unlisten1
switch :: Typeable p => Behaviour p (Behaviour p a) -> Reactive p (Behaviour p a)
switch bba = do
ba <- sample bba
za <- sample ba
(ev, push, mvNode) <- ioReactive newEventLinked
activeIDRef <- ioReactive $ newIORef (0 :: ID)
unlistensRef <- ioReactive $ newIORef M.empty
unlisten1 <- listenValueRaw bba (Just mvNode) $ \ba -> do
iD <- ioReactive $ do
modifyIORef activeIDRef succ
readIORef activeIDRef
unlisten2 <- listenValueRaw ba (Just mvNode) $ \a -> do
activeID <- ioReactive $ readIORef activeIDRef
when (activeID == iD) $ do
push a
ioReactive $ unlistenLessThan unlistensRef iD
ioReactive $ modifyIORef unlistensRef (M.insert iD unlisten2)
hold za (finalizeEvent ev unlisten1)
once :: Typeable p => Event p a -> Reactive p (Event p a)
once ea = justE <$> collectE (\a active -> (if active then Just a else Nothing, False)) True ea
execute :: Typeable p => Event p (Reactive p a) -> Event p a
execute ev = Event gl cacheRef
where
cacheRef = unsafePerformIO $ newIORef Nothing
gl = do
(l', push, mvNode) <- ioReactive newSink
l <- getListen ev
unlisten <- runListen l (Just mvNode) $ \action -> action >>= push
ioReactive $ finalizeListen l' unlisten
crossE :: (Typeable p, Typeable q) => Event p a -> Reactive p (Event q a)
crossE epa = do
(ev, push) <- ioReactive newEvent
unlisten <- listenIO epa $ synchronously . push
return $ finalizeEvent ev unlisten
cross :: (Typeable p, Typeable q) => Behaviour p a -> Reactive p (Behaviour q a)
cross bpa = do
a <- sample bpa
ea <- crossE (underlyingEvent bpa)
ioReactive $ synchronously $ hold a ea