module Reactive.Bacon.PushStream(newPushStream, newDispatcher, wrap) where import Reactive.Bacon.Core import Data.IORef import Control.Monad data Subscription a = Subscription (EventSink a) Int instance Eq (Subscription q) where (==) (Subscription _ a) (Subscription _ b) = a == b data PushCollection a = PushCollection (IORef ([Subscription a], Int)) (EventStream a) (IORef(Maybe Disposable)) (IORef Bool) instance EventSource PushCollection where toEventStream collection = EventStream (subscribePushCollection collection) where subscribePushCollection pc@(PushCollection ref src dref eref) sink = do (subscriptions, id) <- readIORef ref let subscription = Subscription sink id writeIORef ref $ (subscription : subscriptions, id+1) ended <- readIORef eref when (not ended && null subscriptions) $ do dispose <- subscribe src $ \event -> do pushEvent pc event return $ case event of Next a -> More End -> NoMore writeIORef dref (Just dispose) return (removeSubscription pc subscription) instance Observable PushCollection where (==>) = (==>) . obs removeSubscription (PushCollection ref _ disposeRef _) s = do (subscriptions, counter) <- readIORef ref let updated = (removeSubscription' subscriptions) writeIORef ref $ (updated, counter) dispose <- readIORef disposeRef when (null updated) (unsubscribe dispose) where removeSubscription' sinks = filter (/= s) sinks unsubscribe (Just dispose) = dispose unsubscribe _ = return () -- | Makes an observable with a single connection to the underlying EventSource. -- Automatically subscribes/unsubscribes from EventSource based on whether there -- are any EventSinks. wrap :: EventSource s => s a -> IO (EventStream a) wrap src = newPushStream' src >>= return . fst newDispatcher :: ((a -> IO ()) -> IO Disposable) -> IO (EventStream a) newDispatcher pusher = wrap $ EventStream $ \sink -> pusher (void . sink . Next) newPushStream' :: EventSource s => s a -> IO (EventStream a, (Event a -> IO())) newPushStream' src = do stateRef <- newIORef ([], 1) disposeRef <- newIORef Nothing endRef <- newIORef False let pc = PushCollection stateRef (obs src) disposeRef endRef return $ (obs pc, pushEvent pc) newPushStream :: IO (EventStream a, (Event a -> IO ())) newPushStream = newPushStream' neverE pushEvent :: PushCollection a -> Event a -> IO () pushEvent pc@(PushCollection listRef src _ endRef) event = do ended <- readIORef endRef unless ended $ do applyEnd event endRef (sinks, _) <- readIORef listRef mapM_ (applyTo event) sinks where applyTo event s@(Subscription sink _) = do result <- sink event case result of More -> return () NoMore -> removeSubscription pc s applyEnd End endRef = writeIORef endRef True applyEnd _ _ = return ()