module Signal.Scheduled ( start
, subscribeOn
, deliverOn
, first
, Scheduler
, SchedulerIO
, Signal
) where
import Control.Concurrent.MVar
import Control.Monad
import Control.Monad.IO.Class
import Disposable
import Prelude hiding (take)
import Scheduler
import Signal
import Signal.Channel
import Signal.Operators
import Signal.Subscriber
start :: Scheduler s => s -> (Subscriber s v -> SchedulerIO s ()) -> IO (Signal s v)
start s action = do
(sub, sig) <- newReplayChannel UnlimitedCapacity
schedule s $ action sub
return sig
subscribeOn :: forall s v. Scheduler s => Signal s v -> s -> Signal s v
subscribeOn sig sch =
let onSubscribe :: Subscriber s v -> SchedulerIO s Disposable
onSubscribe sub = do
ds <- liftIO newDisposableSet
schD <- liftIO $ schedule sch $ do
d <- subscribe sig sub
liftIO $ ds `addDisposable` d
liftIO $ ds `addDisposable` schD
liftIO $ toDisposable ds
in signal onSubscribe
deliverOn :: forall s t v. (Scheduler s, Scheduler t) => Signal s v -> s -> t -> Signal t v
deliverOn sig schA schB =
let onSubscribe :: Subscriber t v -> SchedulerIO t Disposable
onSubscribe sub =
let forward :: Event v -> SchedulerIO s ()
forward ev =
void $ liftIO $ schedule schB $ send sub ev
in do
ds <- liftIO newDisposableSet
schD <- liftIO $ schedule schA $ do
d <- sig >>: forward
liftIO $ ds `addDisposable` d
liftIO $ ds `addDisposable` schD
liftIO $ toDisposable ds
in signal onSubscribe
first :: forall s v. Scheduler s => Signal s v -> SchedulerIO s (Event v)
first sig = do
var <- liftIO newEmptyMVar
let onEvent :: Event v -> SchedulerIO s ()
onEvent ev = void $ liftIO $ tryPutMVar var ev
take sig 1 >>: onEvent
liftIO $ takeMVar var