module Signal.Scheduled ( start
, subscribeOn
, deliverOn
, first
, Scheduler
, SchedulerIO
, Signal
) where
import Control.Concurrent.MVar
import Control.Monad
import Control.Monad.IO.Class
import Disposable
import Prelude hiding (take)
import Scheduler
import Scheduler.Internal
import Signal
import Signal.Channel
import Signal.Operators
import Signal.Subscriber
start :: Scheduler s => s -> (Subscriber s v -> SchedulerIO s ()) -> IO (Signal s v)
start s action = do
(sub, sig) <- newReplayChannel UnlimitedCapacity
schedule s $ action sub
return sig
subscribeOn :: forall s t v. (Scheduler s, Scheduler t) => Signal s v -> t -> Signal t v
subscribeOn sig sch =
let onSubscribe :: Subscriber t v -> SchedulerIO t Disposable
onSubscribe sub = do
ds <- liftIO newDisposableSet
let forward :: Event v -> SchedulerIO s ()
forward ev = SchedulerIO $ unsafeRunSchedulerIO $ sub `send` ev
subscribe :: SchedulerIO t ()
subscribe = do
d <- SchedulerIO $ unsafeRunSchedulerIO $ sig >>: forward
liftIO $ ds `addDisposable` d
schD <- liftIO $ sch `schedule` subscribe
liftIO $ ds `addDisposable` schD
liftIO $ toDisposable ds
in signal onSubscribe
deliverOn :: forall s t v. (Scheduler s, Scheduler t) => Signal s v -> t -> Signal t v
deliverOn sig sch =
let onSubscribe :: Subscriber t v -> SchedulerIO t Disposable
onSubscribe sub = do
let deliver :: t -> Event v -> SchedulerIO s Disposable
deliver sch ev =
let sio = SchedulerIO $ unsafeRunSchedulerIO $ sub `send` ev
in liftIO $ sch `schedule` sio
forward :: Event v -> SchedulerIO s ()
forward ev = void $ deliver sch ev
SchedulerIO $ unsafeRunSchedulerIO $ sig >>: forward
in signal onSubscribe
first :: forall s v. Scheduler s => Signal s v -> IO (Event v)
first s = do
var <- newEmptyMVar
let onEvent :: Event v -> SchedulerIO s ()
onEvent ev = void $ liftIO $ tryPutMVar var ev
subscribe :: SchedulerIO s Disposable
subscribe = take s 1 >>: onEvent
unsafeRunSchedulerIO subscribe
takeMVar var