module Reactive.Bacon.EventStream.IO where
import Reactive.Bacon.Core
import Reactive.Bacon.EventStream.Monadic
import Reactive.Bacon.EventStream
import Reactive.Bacon.PushStream
import Data.IORef
import Control.Concurrent(forkIO)
import Control.Monad
fromStoppableProcess :: ((Event a -> IO ()) -> IO Bool -> IO ()) -> IO (EventStream a, IO ())
fromStoppableProcess startProcess = do
(stream, pushEvent) <- newPushStream
stopSignal <- newIORef False
let getStopState = (readIORef stopSignal)
startProcess (guardedPush pushEvent getStopState) getStopState
return (stream, (writeIORef stopSignal True))
where guardedPush pushEvent getStopState event = do stop <- getStopState
unless stop $pushEvent event
fromNonStoppableProcess :: ((Event a -> IO ()) -> IO ()) -> IO (EventStream a)
fromNonStoppableProcess startProcess = do
(stream, pushEvent) <- newPushStream
startProcess (pushEvent)
return stream
fromIO :: IO a -> IO (EventStream a)
fromIO action = fromNonStoppableProcess $ \sink -> void $ forkIO $ action >>= sink . Next >> sink End