module SimpleH.Reactive (
module SimpleH.Reactive.Time,
module SimpleH.Reactive.TimeVal,
Event,_event,
atTimes,
withTime,times,
mapFutures,
(//),(<|*>),
groupE,mask,
sink,event,
Future,_future,_time,_value,futureIO,
) where
import SimpleH
import Control.Concurrent
import SimpleH.Reactive.TimeVal
import System.IO.Unsafe (unsafeInterleaveIO)
import Data.List (group)
import SimpleH.Reactive.Time
newtype Event t a = Event { getEvent :: Compose [] (Future t) a }
deriving (Unit,Functor,Foldable,Traversable)
instance (Ord t,Show t,Show a) => Show (Event t a) where show = show . at' _event
instance Ord t => Semigroup (Event t a) where
(+) = warp2 (from _event._OrdList) (+)
instance Ord t => Monoid (Event t a) where zero = []^._event
instance (Bounded t,Ord t) => Applicative (Event t) where
fe@(at' _event -> f:_) <*> xe@(at' _event -> x:_) = mapAccum_ fun (e^.._event) (f,x) ^. _event
where fun mod = at' _state $ modify ((const +++ const) (sequenceEither mod))
>> uncurry (<*>)<$>get
e = (Left<$>mapFutures (x>>) fe) + (Right<$>mapFutures (f>>) xe)
_ <*> _ = zero
instance (Bounded t,Ord t) => Monad (Event t) where
join = map (at' _event) >>> at' _event >>> map (sequence >>> map join >>> group >>> map last)
>>> merge >>> at _event
where merge [] = []
merge ([]:t) = merge t
merge ((x:xs):t) = x:merge (insertOrd xs t)
pureEither :: (forall a. a -> f a) -> Either a b -> Either (f a) (f b)
pureEither f = f ||| f
sequenceEither f = pureEither ((f^._time,)>>>at _future) (f^._value)
type EventRep t a = Compose [] (Future t) a
_Event :: Iso (Event t a) (Event t' b) (EventRep t a) (EventRep t' b)
_Event = iso Event getEvent
_event :: Iso (Event t a) (Event t' b) [Future t a] [Future t' b]
_event = _Compose._Event
atTimes ts = map (at _future . (,()) . pure . pure) ts^._event
(//) :: Ord t => Event t a -> Event t b -> Event t (a, Event t b)
bs // es = mapAccum_ fun (bs^.._event) (es^.._event) ^. _event
where fun b es = (ys,b & _value %~ (,xs^._event))
where (xs,ys) = span ((==GT) . cmpFut b) es
infixl 1 //
(<|*>) :: Ord t => Event t (a -> b) -> (a,Event t a) -> Event t b
fs <|*> (a,as) = (traverse tr (fs // as) ^.. _state <&> snd) a
where tr (f,as) = traverse_ put as >> map f get
infixl 2 <|*>
groupE = from _event %~ groupE . (+repeat (Future (maxBound,undefined)))
where groupE fs = (f & _value %- xs) : (groupE ys & _head._time %~ (sum (at _time<$>xs)+))
where (xs,ys) = span ((==f^._value) . at _value) fs ; f = head fs
mapFutures f = from _event %~ map f
withTime = mapFutures (\(Future f) -> Future (_1%~timeVal <$> listen f))
times = map2 fst withTime
mask m e = (m // e) `withNext` (True,zero) >>= \((b,_),(_,e)) -> guard b >> e
sink l = for_ (withTime l) $ \(Since t,v) -> waitTill t >> v
event m = at _event<$>event' zero
where event' t = do
Future ~(t',a) <- futureIO (timeVal t `seq` m)
fs <- unsafeInterleaveIO $ event' t'
return (Future (t',a):fs)
newtype Future t a = Future (Time t,a)
deriving (Show,Functor,Unit,Applicative,Traversable,Foldable,Monad,Semigroup,Monoid)
instance Ord t => Eq (Future t a) where f == f' = compare f f'==EQ
instance Ord t => Ord (Future t a) where compare = cmpFut
_future :: Iso (Future t a) (Future t' b) (Time t,a) (Time t',b)
_future = iso Future (\(Future ~(t,a)) -> (t,a))
_time :: Lens (Time t) (Time t') (Future t a) (Future t' a)
_time = from _future._1
_value :: Lens a b (Future t a) (Future t b)
_value = from _future._2
cmpFut :: Ord t => Future t a -> Future t b -> Ordering
cmpFut a b = compare (a^._time) (b^._time)
futureIO :: IO a -> IO (Future Seconds a)
futureIO m = do
val <- newEmptyMVar
forkIO $ putMVar val =<< m
time <- timeIO (readMVar val)
return (Future (time,readMVar val^._thunk))