module Reactive.Bacon.EventStream.Combinators(mergeE, takeUntilE, eitherE, combineLatestE) where
import Reactive.Bacon.Core
import Reactive.Bacon.EventStream
import Reactive.Bacon.PushStream
import Reactive.Bacon.Property
import Data.IORef
import Control.Monad
import Control.Applicative
instance Show a => Show (EventStream a) where
show = const "EventStream"
instance Eq a => Eq (EventStream a) where
(==) = \x y -> False
mergeE :: EventSource s1 => EventSource s2 => s1 a -> s2 a -> IO (EventStream a)
mergeE xs ys = eitherE xs ys >>= mapE simplify
where simplify (Right x) = x
simplify (Left x) = x
takeUntilE :: EventSource s1 => EventSource s2 => s1 a -> s2 b -> IO (EventStream b)
takeUntilE stopper src = wrap $ sinkMap takeUntil' $ mergeRawE src stopper
where takeUntil' sink (Next (Left (Next x))) = sink (Next x)
takeUntil' sink (Next (Left End)) = sink End >> return NoMore
takeUntil' sink (Next (Right (Next x))) = sink End >> return NoMore
takeUntil' sink (Next (Right End)) = return More
eitherE :: EventSource s1 => EventSource s2 => s1 a -> s2 b -> IO (EventStream (Either a b))
eitherE left right = do endFlag <- newIORef False
return $ sinkMap (skipFirstEnd endFlag) (mergeRawE left right)
where skipFirstEnd flag sink event | isEnd event = do done <- readIORef flag
writeIORef flag True
handleEnd done sink
| otherwise = send sink skipFirstEnd event
handleEnd True sink = sink End >> return NoMore
handleEnd False sink = return More
send sink mapper (Next (Right (Next x))) = sink (Next (Right x))
send sink mapper (Next (Left (Next x))) = sink (Next (Left x))
combineLatestE :: EventSource s1 => EventSource s2 => s1 a -> s2 b -> IO (EventStream (a, b))
combineLatestE left right = do leftP <- fromEventSource left
rightP <- fromEventSource right
changesP $ combineP leftP rightP
mergeRawE :: EventSource s1 => EventSource s2 => s1 a -> s2 b -> EventStream (Either (Event a) (Event b))
mergeRawE left right = EventStream $\sink -> do
disposeRightHolder <- newIORef Nothing
disposeLeft <- subscribe (toEventStream left) (barrier Left sink (disposeIfPossible disposeRightHolder))
disposeRight <- subscribe (toEventStream right) (barrier Right sink disposeLeft)
writeIORef disposeRightHolder (Just disposeRight)
return $ disposeLeft >> disposeRight
where barrier mapping sink disposeOther event = do
result <- sink $ Next (mapping event)
case result of
More -> do
return More
NoMore -> do
disposeOther
return NoMore
disposeIfPossible ref = do
dispose <- readIORef ref
case dispose of Nothing -> return ()
Just f -> f
isEnd :: Event (Either (Event a) (Event b)) -> Bool
isEnd (Next (Right End)) = True
isEnd (Next (Left End)) = True
isEnd _ = False