module FRP.Sodium.Impl where
import Control.Applicative
import Control.Concurrent
import Control.Concurrent.Chan
import Control.Concurrent.MVar
import Control.Exception (evaluate)
import Control.Monad
import Control.Monad.State.Strict
import Control.Monad.Trans
import Data.Int
import Data.IORef
import Data.Map (Map)
import qualified Data.Map as M
import Data.Maybe
import Data.Set (Set)
import qualified Data.Set as S
import Data.Sequence (Seq, (|>))
import qualified Data.Sequence as Seq
import Data.Typeable
import GHC.Exts
import System.Mem.Weak
import System.IO.Unsafe
import Unsafe.Coerce
type ID = Int64
data ReactiveState p = ReactiveState {
asQueue1 :: Seq (Reactive p ()),
asQueue2 :: Map Int64 (Reactive p ()),
asFinal :: IO ()
}
data Reactive p a where
Reactive :: StateT (ReactiveState p) IO a -> Reactive p a
instance Functor (Reactive p) where
fmap f rm = Reactive (fmap f (unReactive rm))
unReactive :: Reactive p a -> StateT (ReactiveState p) IO a
unReactive (Reactive m) = m
instance Applicative (Reactive p) where
pure a = Reactive $ return a
rf <*> rm = Reactive $ unReactive rf <*> unReactive rm
instance Monad (Reactive p) where
return a = Reactive $ return a
rma >>= kmb = Reactive $ do
a <- unReactive rma
unReactive (kmb a)
instance MonadFix (Reactive p) where
mfix f = Reactive $ mfix $ \a -> unReactive (f a)
ioReactive :: IO a -> Reactive p a
ioReactive io = Reactive $ liftIO io
newtype NodeID = NodeID Int deriving (Eq, Ord, Enum)
data Partition p = Partition {
paRun :: Reactive p () -> IO () -> IO (),
paNextNodeID :: IORef NodeID
}
schedulePriority1 :: Reactive p () -> Reactive p ()
schedulePriority1 task = Reactive $ modify $ \as -> as { asQueue1 = asQueue1 as |> task }
onFinal :: IO () -> Reactive p ()
onFinal task = Reactive $ modify $ \as -> as { asFinal = asFinal as >> task }
partitionRegistry :: MVar (Map String Any)
partitionRegistry = unsafePerformIO $ newMVar M.empty
partition :: forall p . Typeable p => IO (Partition p)
partition = do
let typ = show $ typeOf (undefined :: p)
modifyMVar partitionRegistry $ \reg ->
case M.lookup typ reg of
Just part -> return (reg, unsafeCoerce part)
Nothing -> do
part <- createPartition
return (M.insert typ (unsafeCoerce part) reg, part)
createPartition :: IO (Partition p)
createPartition = do
ch <- newChan
forkIO $ forever $ do
(task, onCompletion) <- readChan ch
let loop = do
queue1 <- gets asQueue1
if not $ Seq.null queue1 then do
let Reactive task = Seq.index queue1 0
modify $ \as -> as { asQueue1 = Seq.drop 1 queue1 }
task
loop
else do
queue2 <- gets asQueue2
if not $ M.null queue2 then do
let (k, Reactive task) = M.findMin queue2
modify $ \as -> as { asQueue2 = M.delete k queue2 }
task
loop
else do
final <- gets asFinal
liftIO final
return ()
runStateT loop $ ReactiveState {
asQueue1 = Seq.singleton task,
asQueue2 = M.empty,
asFinal = return ()
}
onCompletion
nextNodeIDRef <- newIORef (NodeID 0)
return $ Partition {
paRun = \task onCompletion -> writeChan ch (task, onCompletion),
paNextNodeID = nextNodeIDRef
}
asynchronously :: Typeable p => Reactive p () -> IO ()
asynchronously task = do
part <- partition
paRun part task (return ())
synchronously :: Typeable p => Reactive p a -> IO a
synchronously task = do
mvOutput <- newEmptyMVar
mvCompleted <- newEmptyMVar
part <- partition
paRun part (task >>= ioReactive . putMVar mvOutput) (putMVar mvCompleted ())
takeMVar mvCompleted
takeMVar mvOutput
data Listen p a = Listen { runListen_ :: Maybe (MVar (Node p)) -> (a -> Reactive p ()) -> Reactive p (IO ()) }
runListen :: Listen p a -> Maybe (MVar (Node p)) -> (a -> Reactive p ()) -> Reactive p (IO ())
runListen l mv handle = do
o <- runListen_ l mv handle
_ <- ioReactive $ evaluate l
return o
data Event p a = Event {
getListenRaw :: Reactive p (Listen p a),
evCacheRef :: IORef (Maybe (Listen p a))
}
never :: Event p a
never = Event {
getListenRaw = return $ Listen $ \_ _ -> return (return ()),
evCacheRef = unsafePerformIO $ newIORef Nothing
}
getListen :: Event p a -> Reactive p (Listen p a)
getListen (Event getLRaw cacheRef) = do
mL <- ioReactive $ readIORef cacheRef
case mL of
Just l -> return l
Nothing -> do
l <- getLRaw
ioReactive $ writeIORef cacheRef (Just l)
return l
linkedListen :: Event p a -> Maybe (MVar (Node p)) -> (a -> Reactive p ()) -> Reactive p (IO ())
linkedListen ev mMvTarget handle = do
l <- getListen ev
runListen l mMvTarget handle
listen :: Event p a -> (a -> Reactive p ()) -> Reactive p (IO ())
listen ev handle = linkedListen ev Nothing handle
listenIO :: Event p a -> (a -> IO ()) -> Reactive p (IO ())
listenIO ev handle = listen ev (ioReactive . handle)
data Observer p a = Observer {
obNextID :: ID,
obListeners :: Map ID (a -> Reactive p ()),
obFirings :: [a]
}
data Node p = Node {
noID :: NodeID,
noSerial :: Int64,
noListeners :: Map ID (MVar (Node p))
}
newNode :: forall p . Typeable p => IO (MVar (Node p))
newNode = do
part <- partition :: IO (Partition p)
nodeID <- readIORef (paNextNodeID part)
modifyIORef (paNextNodeID part) succ
newMVar (Node nodeID 0 M.empty)
wrap :: (Maybe (MVar (Node p)) -> (a -> Reactive p ()) -> Reactive p (IO ())) -> IO (Listen p a)
wrap l = return (Listen l)
touch :: Listen p a -> IO ()
touch l = evaluate l >> return ()
linkNode :: MVar (Node p) -> ID -> MVar (Node p) -> IO ()
linkNode mvNode iD mvTarget = do
no <- readMVar mvNode
ensureBiggerThan S.empty mvTarget (noSerial no)
modifyMVar_ mvNode $ \no -> return $
no { noListeners = M.insert iD mvTarget (noListeners no) }
ensureBiggerThan :: Set NodeID -> MVar (Node p) -> Int64 -> IO ()
ensureBiggerThan visited mvNode limit = do
no <- readMVar mvNode
if noID no `S.member` visited || noSerial no > limit then
return ()
else do
let newSerial = succ limit
modifyMVar_ mvNode $ \no -> return $ no { noSerial = newSerial }
forM_ (M.elems . noListeners $ no) $ \mvTarget -> do
ensureBiggerThan (S.insert (noID no) visited) mvTarget newSerial
unlinkNode :: MVar (Node p) -> ID -> IO ()
unlinkNode mvNode iD = do
modifyMVar_ mvNode $ \no -> return $
no { noListeners = M.delete iD (noListeners no) }
newSink :: forall p a . Typeable p => IO (Listen p a, a -> Reactive p (), MVar (Node p))
newSink = do
mvNode <- newNode
mvObs <- newMVar (Observer 0 M.empty [])
cacheRef <- newIORef Nothing
rec
let l mMvTarget handle = do
(firings, unlisten, iD) <- ioReactive $ modifyMVar mvObs $ \ob -> return $
let iD = obNextID ob
handle' a = handle a >> ioReactive (touch listen)
ob' = ob { obNextID = succ iD,
obListeners = M.insert iD handle' (obListeners ob) }
unlisten = do
modifyMVar_ mvObs $ \ob -> return $ ob {
obListeners = M.delete iD (obListeners ob)
}
unlinkNode mvNode iD
return ()
in (ob', (reverse . obFirings $ ob, unlisten, iD))
case mMvTarget of
Just mvTarget -> ioReactive $ linkNode mvNode iD mvTarget
Nothing -> return ()
mapM_ handle firings
return unlisten
listen <- wrap l
let push a = do
ob <- ioReactive $ modifyMVar mvObs $ \ob -> return $
(ob { obFirings = a : obFirings ob }, ob)
when (null (obFirings ob)) $ onFinal $ do
modifyMVar_ mvObs $ \ob -> return $ ob { obFirings = [] }
let seqa = seq a a
mapM_ ($ seqa) (M.elems . obListeners $ ob)
return (listen, push, mvNode)
newEventLinked :: Typeable p => IO (Event p a, a -> Reactive p (), MVar (Node p))
newEventLinked = do
(listen, push, mvNode) <- newSink
cacheRef <- newIORef Nothing
let ev = Event {
getListenRaw = return listen,
evCacheRef = cacheRef
}
return (ev, push, mvNode)
newEvent :: Typeable p => IO (Event p a, a -> Reactive p ())
newEvent = do
(ev, push, _) <- newEventLinked
return (ev, push)
instance Functor (Event p) where
f `fmap` Event getListen cacheRef = Event getListen' cacheRef
where
cacheRef = unsafePerformIO $ newIORef Nothing
getListen' = do
return $ Listen $ \mMvNode handle -> do
l <- getListen
runListen l mMvNode (handle . f)
merge :: Typeable p => Event p a -> Event p a -> Event p a
merge ea eb = Event gl cacheRef
where
cacheRef = unsafePerformIO $ newIORef Nothing
gl = do
l1 <- getListen ea
l2 <- getListen eb
(l, push, mvNode) <- ioReactive newSink
unlistener1 <- unlistenize $ runListen l1 (Just mvNode) push
unlistener2 <- unlistenize $ runListen l2 (Just mvNode) push
(finalerize unlistener1 <=< finalerize unlistener2) l
mergeWith :: Typeable p => (a -> a -> a) -> Event p a -> Event p a -> Event p a
mergeWith f ea eb = Event gl cacheRef
where
cacheRef = unsafePerformIO $ newIORef Nothing
gl = do
l1 <- getListen ea
l2 <- getListen eb
(l, push, mvNode) <- ioReactive newSink
outRef <- ioReactive $ newIORef Nothing
let process a = do
mOut <- ioReactive $ readIORef outRef
ioReactive $ modifyIORef outRef $ \mOut -> Just $ case mOut of
Just out -> f out a
Nothing -> a
when (isNothing mOut) $ schedulePriority2 (Just mvNode) $ do
Just out <- ioReactive $ readIORef outRef
ioReactive $ writeIORef outRef Nothing
push out
unlistener1 <- unlistenize $ runListen l1 (Just mvNode) process
unlistener2 <- unlistenize $ runListen l2 (Just mvNode) process
(finalerize unlistener1 <=< finalerize unlistener2) l
justE :: Typeable p => Event p (Maybe a) -> Event p a
justE ema = Event gl cacheRef
where
cacheRef = unsafePerformIO $ newIORef Nothing
gl = do
(l', push, mvNode) <- ioReactive newSink
l <- getListen ema
unlistener <- unlistenize $ runListen l (Just mvNode) $ \ma -> case ma of
Just a -> push a
Nothing -> return ()
finalerize unlistener l'
filterE :: Typeable p => (a -> Bool) -> Event p a -> Event p a
filterE pred = justE . ((\a -> if pred a then Just a else Nothing) <$>)
type Behavior p a = Behaviour p a
data Behaviour p a = Behaviour {
underlyingEvent :: Event p a,
sample :: Reactive p a
}
instance Functor (Behaviour p) where
f `fmap` Behaviour underlyingEvent sample =
Behaviour (f `fmap` underlyingEvent) (f `fmap` sample)
constant :: a -> Behaviour p a
constant a = Behaviour {
underlyingEvent = never,
sample = return a
}
data BehaviourState p a = BehaviourState {
bsCurrent :: a,
bsUpdate :: Maybe a
}
finalizeEvent :: Event p a -> IO () -> Event p a
finalizeEvent ea unlisten = Event gl (evCacheRef ea)
where
gl = do
l <- getListen ea
ioReactive $ finalizeListen l unlisten
finalizeListen :: Listen p a -> IO () -> IO (Listen p a)
finalizeListen l unlisten = do
addFinalizer l unlisten
return l
newtype Unlistener = Unlistener (MVar (Maybe (IO ())))
unlistenize :: Reactive p (IO ()) -> Reactive p Unlistener
unlistenize doListen = do
unlistener@(Unlistener ref) <- newUnlistener
schedulePriority1 $ do
mOldUnlisten <- ioReactive $ takeMVar ref
case mOldUnlisten of
Just _ -> do
unlisten <- doListen
ioReactive $ putMVar ref (Just unlisten)
Nothing -> ioReactive $ putMVar ref mOldUnlisten
return unlistener
where
newUnlistener :: Reactive p Unlistener
newUnlistener = Unlistener <$> ioReactive (newMVar (Just $ return ()))
finalerize :: Unlistener -> Listen p a -> Reactive p (Listen p a)
finalerize (Unlistener ref) l = ioReactive $ finalizeListen l $ do
mUnlisten <- takeMVar ref
fromMaybe (return ()) mUnlisten
putMVar ref Nothing
hold :: a -> Event p a -> Reactive p (Behaviour p a)
hold initA ea = do
bsRef <- ioReactive $ newIORef (BehaviourState initA Nothing)
unlistener <- unlistenize $ listen ea $ \a -> do
bs <- ioReactive $ readIORef bsRef
ioReactive $ writeIORef bsRef $ bs { bsUpdate = Just a }
when (isNothing (bsUpdate bs)) $ onFinal $ do
bs <- readIORef bsRef
let newCurrent = fromJust (bsUpdate bs)
bs' = newCurrent `seq` BehaviourState newCurrent Nothing
evaluate bs'
writeIORef bsRef bs'
let gl = do
l <- getListen ea
finalerize unlistener l
beh = Behaviour {
underlyingEvent = Event gl (evCacheRef ea),
sample = ioReactive $ bsCurrent <$> readIORef bsRef
}
return beh
attachWith :: Typeable p => (a -> b -> c) -> Event p a -> Behaviour p b -> Event p c
attachWith f ea bb = Event gl cacheRef
where
cacheRef = unsafePerformIO $ newIORef Nothing
gl = do
(l, push, mvNode) <- ioReactive newSink
unlistener <- unlistenize $ linkedListen ea (Just mvNode) $ \a -> do
b <- sample bb
push (f a b)
finalerize unlistener l
attach :: Typeable p => Event p a -> Behaviour p b -> Event p (a,b)
attach = attachWith (,)
tag :: Typeable p => Event p a -> Behaviour p b -> Event p b
tag = attachWith (flip const)
listenValueRaw :: Behaviour p a -> Maybe (MVar (Node p)) -> (a -> Reactive p ()) -> Reactive p (IO ())
listenValueRaw ba mMvNode handle = do
a <- sample ba
handle a
linkedListen (underlyingEvent ba) mMvNode handle
schedulePriority2 :: Maybe (MVar (Node p))
-> Reactive p ()
-> Reactive p ()
schedulePriority2 mMvNode task = do
mNode <- case mMvNode of
Just mvNode -> Just <$> ioReactive (readMVar mvNode)
Nothing -> pure Nothing
let priority = maybe maxBound noSerial mNode
Reactive $ modify $ \as -> as {
asQueue2 = M.alter (\mOldTask -> Just $ case mOldTask of
Just oldTask -> oldTask >> task
Nothing -> task) priority (asQueue2 as)
}
tidy :: (Maybe (MVar (Node p)) -> (a -> Reactive p ()) -> Reactive p (IO ()))
-> Maybe (MVar (Node p)) -> (a -> Reactive p ()) -> Reactive p (IO ())
tidy listen mMvNode handle = do
aRef <- ioReactive $ newIORef Nothing
listen mMvNode $ \a -> do
ma <- ioReactive $ readIORef aRef
ioReactive $ writeIORef aRef (Just a)
when (isNothing ma) $ schedulePriority2 mMvNode $ do
Just a <- ioReactive $ readIORef aRef
ioReactive $ writeIORef aRef Nothing
handle a
linkedListenValue :: Behaviour p a -> Maybe (MVar (Node p)) -> (a -> Reactive p ()) -> Reactive p (IO ())
linkedListenValue ba = tidy (listenValueRaw ba)
listenValue :: Behaviour p a -> (a -> Reactive p ()) -> Reactive p (IO ())
listenValue ba = linkedListenValue ba Nothing
listenValueIO :: Behaviour p a -> (a -> IO ()) -> Reactive p (IO ())
listenValueIO ba handle = listenValue ba (ioReactive . handle)
eventify :: Typeable p => (Maybe (MVar (Node p)) -> (a -> Reactive p ()) -> Reactive p (IO ())) -> Event p a
eventify listen = Event gl cacheRef
where
cacheRef = unsafePerformIO $ newIORef Nothing
gl = do
(l, push, mvNode) <- ioReactive newSink
unlistener <- unlistenize $ listen (Just mvNode) push
finalerize unlistener l
valueEvent :: Typeable p => Behaviour p a -> Event p a
valueEvent ba = eventify (linkedListenValue ba)
instance Typeable p => Applicative (Behaviour p) where
pure = constant
Behaviour u1 s1 <*> Behaviour u2 s2 = Behaviour u s
where
cacheRef = unsafePerformIO $ newIORef Nothing
u = Event gl cacheRef
gl = do
fRef <- ioReactive . newIORef =<< s1
aRef <- ioReactive . newIORef =<< s2
l1 <- getListen u1
l2 <- getListen u2
(l, push, mvNode) <- ioReactive newSink
unlistener1 <- unlistenize $ runListen l1 (Just mvNode) $ \f -> do
ioReactive $ writeIORef fRef f
a <- ioReactive $ readIORef aRef
push (f a)
unlistener2 <- unlistenize $ runListen l2 (Just mvNode) $ \a -> do
f <- ioReactive $ readIORef fRef
ioReactive $ writeIORef aRef a
push (f a)
(finalerize unlistener1 <=< finalerize unlistener2) l
s = ($) <$> s1 <*> s2
gate :: Typeable p => Event p a -> Behaviour p Bool -> Event p a
gate ea = justE . attachWith (\a b -> if b then Just a else Nothing) ea
collectE :: Typeable p => (a -> s -> (b, s)) -> s -> Event p a -> Reactive p (Event p b)
collectE f z ea = do
rec
s <- hold z es
let ebs = attachWith f ea s
eb = fst <$> ebs
es = snd <$> ebs
return eb
collect :: Typeable p => (a -> s -> (b, s)) -> s -> Behaviour p a -> Reactive p (Behaviour p b)
collect f zs bea = do
let ea = eventify . tidy . linkedListen $ underlyingEvent bea
za <- sample bea
let (zb, zs') = f za zs
rec
bs <- hold (zb, zs') ebs
let ebs = attachWith f ea (snd <$> bs)
return (fst <$> bs)
accumE :: Typeable p => (a -> s -> s) -> s -> Event p a -> Reactive p (Event p s)
accumE f z ea = do
rec
let es = attachWith f ea s
s <- hold z es
return es
accum :: Typeable p => (a -> s -> s) -> s -> Event p a -> Reactive p (Behaviour p s)
accum f z ea = do
rec
s <- hold z (attachWith f ea s)
return s
countE :: Typeable p => Event p a -> Reactive p (Event p Int)
countE = accumE (+) 0 . (const 1 <$>)
count :: Typeable p => Event p a -> Reactive p (Behaviour p Int)
count = hold 0 <=< countE
splitLessThan :: Ord k => k -> Map k a -> (Map k a, Map k a)
splitLessThan k m =
let (lt, mEq, gt) = M.splitLookup k m
in (lt, case mEq of
Just eq -> M.insert k eq gt
Nothing -> gt)
unlistenLessThan :: IORef (Map ID (IO ())) -> ID -> IO ()
unlistenLessThan unlistensRef iD = do
uls <- readIORef unlistensRef
let (toDelete, uls') = splitLessThan iD uls
do
writeIORef unlistensRef uls'
forM_ (M.elems toDelete) $ \unl -> unl
switchE :: Typeable p => Behaviour p (Event p a) -> Event p a
switchE bea = Event gl cacheRef
where
cacheRef = unsafePerformIO $ newIORef Nothing
unlistensRef = unsafePerformIO $ newIORef M.empty
gl = do
beaId <- collect (\ea nxtID -> ((ea, nxtID), succ nxtID)) (0 :: ID) bea
(l, push, mvNode) <- ioReactive newSink
unlistener1 <- unlistenize $ linkedListenValue beaId (Just mvNode) $ \(ea, iD) -> do
let filtered = justE $ attachWith (\a activeID ->
if activeID == iD
then Just a
else Nothing
) ea (snd <$> beaId)
unlisten2 <- listen filtered $ \a -> do
push a
ioReactive $ unlistenLessThan unlistensRef iD
ioReactive $ modifyIORef unlistensRef (M.insert iD unlisten2)
finalerize unlistener1 l
switch :: Typeable p => Behaviour p (Behaviour p a) -> Reactive p (Behaviour p a)
switch bba = do
ba <- sample bba
za <- sample ba
(ev, push, mvNode) <- ioReactive newEventLinked
activeIDRef <- ioReactive $ newIORef (0 :: ID)
unlistensRef <- ioReactive $ newIORef M.empty
unlisten1 <- listenValueRaw bba (Just mvNode) $ \ba -> do
iD <- ioReactive $ do
modifyIORef activeIDRef succ
readIORef activeIDRef
unlisten2 <- listenValueRaw ba (Just mvNode) $ \a -> do
activeID <- ioReactive $ readIORef activeIDRef
when (activeID == iD) $ do
push a
ioReactive $ unlistenLessThan unlistensRef iD
ioReactive $ modifyIORef unlistensRef (M.insert iD unlisten2)
hold za (finalizeEvent ev unlisten1)
once :: Typeable p => Event p a -> Reactive p (Event p a)
once ea = justE <$> collectE (\a active -> (if active then Just a else Nothing, False)) True ea
execute :: Typeable p => Event p (Reactive p a) -> Event p a
execute ev = Event gl cacheRef
where
cacheRef = unsafePerformIO $ newIORef Nothing
gl = do
(l', push, mvNode) <- ioReactive newSink
unlistener <- unlistenize $ do
l <- getListen ev
runListen l (Just mvNode) $ \action -> action >>= push
finalerize unlistener l'
crossE :: (Typeable p, Typeable q) => Event p a -> Reactive p (Event q a)
crossE epa = do
(ev, push) <- ioReactive newEvent
unlisten <- listenIO epa $ asynchronously . push
return $ finalizeEvent ev unlisten
cross :: (Typeable p, Typeable q) => Behaviour p a -> Reactive p (Behaviour q a)
cross bpa = do
a <- sample bpa
ea <- crossE (underlyingEvent bpa)
ioReactive $ synchronously $ hold a ea