module Signal ( Signal
, signal
, subscribe
, (>>:)
, never
, Signal.empty
, Event(..)
, Disposable
, Scheduler
, SchedulerIO
) where
import Control.Applicative
import Control.Concurrent.STM
import Control.Monad
import Control.Monad.IO.Class
import Control.Monad.Zip
import Data.IORef
import Data.Monoid
import Data.Sequence as Seq
import Data.Word
import Disposable
import Prelude hiding (length, drop, zip)
import Scheduler.Internal
import Signal.Event
import Signal.Subscriber
import Signal.Subscriber.Internal
data Signal s v where
Signal :: Scheduler s => (Subscriber s v -> SchedulerIO s Disposable) -> Signal s v
signal
:: Scheduler s
=> (Subscriber s v -> SchedulerIO s Disposable)
-> Signal s v
signal = Signal
subscribe
:: Scheduler s
=> Signal s v
-> Subscriber s v
-> SchedulerIO s Disposable
subscribe (Signal f) sub = do
d <- f sub
liftIO $ addSubscriptionDisposable sub d
return d
never :: Scheduler s => Signal s v
never = Signal $ const $ return EmptyDisposable
empty :: Scheduler s => Signal s v
empty = mempty
(>>:)
:: Scheduler s
=> Signal s v
-> (Event v -> SchedulerIO s ())
-> SchedulerIO s Disposable
(>>:) s f = do
sub <- liftIO $ subscriber f
subscribe s sub
infixl 1 >>:
instance Scheduler s => Monad (Signal s) where
return v =
Signal $ \sub -> do
send sub (NextEvent v)
send sub CompletedEvent
return EmptyDisposable
s >>= f =
Signal $ \sub -> do
sc <- liftIO $ newIORef (1 :: Word32)
ds <- liftIO newDisposableSet
let decSubscribers = do
rem <- liftIO $ atomicModifyIORef sc $ \n ->
let n' = n 1
in (n', n')
when (rem == 0) $ send sub CompletedEvent
onInner CompletedEvent = decSubscribers
onInner ev = send sub ev
onOuter CompletedEvent = decSubscribers
onOuter (ErrorEvent e) = send sub $ ErrorEvent e
onOuter (NextEvent v) = do
liftIO $ atomicModifyIORef sc $ \n -> (n + 1, ())
d <- f v >>: onInner
liftIO $ ds `addDisposable` d
d <- s >>: onOuter
liftIO $ ds `addDisposable` d
liftIO $ toDisposable ds
instance Scheduler s => Functor (Signal s) where
fmap = liftM
instance Scheduler s => Applicative (Signal s) where
pure = return
(<*>) = ap
instance Scheduler s => Monoid (Signal s v) where
mempty = Signal $ \sub -> do
send sub CompletedEvent
return EmptyDisposable
a `mappend` b =
Signal $ \sub -> do
ds <- liftIO newDisposableSet
let onEvent CompletedEvent = do
d <- b `subscribe` sub
liftIO $ ds `addDisposable` d
onEvent e = send sub e
d <- a >>: onEvent
liftIO $ ds `addDisposable` d
liftIO $ toDisposable ds
instance Scheduler s => MonadPlus (Signal s) where
mzero = mempty
a `mplus` b =
join $ Signal $ \sub -> do
send sub $ NextEvent a
send sub $ NextEvent b
send sub CompletedEvent
return EmptyDisposable
szip :: forall a b s. Scheduler s => Signal s a -> Signal s b -> Signal s (a, b)
szip a b =
Signal $ \sub -> do
aVals <- liftIO $ atomically $ newTVar (Seq.empty :: Seq a)
aDone <- liftIO $ atomically $ newTVar False
bVals <- liftIO $ atomically $ newTVar (Seq.empty :: Seq b)
bDone <- liftIO $ atomically $ newTVar False
ds <- liftIO newDisposableSet
let completed :: STM [Event (a, b)]
completed = do
ac <- readTVar aDone
al <- length <$> readTVar aVals
bc <- readTVar bDone
bl <- length <$> readTVar bVals
return $ if (ac && al == 0) || (bc && bl == 0) then [CompletedEvent] else []
onEvent' :: (TVar (Seq x), TVar Bool) -> (TVar (Seq y), TVar Bool) -> (Seq x -> Seq y -> Seq (a, b)) -> Event x -> STM [Event (a, b)]
onEvent' vt@(vals, _) ot@(otherVals, _) f (NextEvent v) = do
modifyTVar' vals (|> v)
vs <- readTVar vals
os <- readTVar otherVals
case viewl $ f vs os of
(t :< _) -> do
modifyTVar' vals $ drop 1
modifyTVar' otherVals $ drop 1
(:) (NextEvent t) <$> completed
_ -> return []
onEvent' (_, done) _ _ CompletedEvent = writeTVar done True >> completed
onEvent' _ _ _ (ErrorEvent e) = return [ErrorEvent e]
onEvent :: (TVar (Seq x), TVar Bool) -> (TVar (Seq y), TVar Bool) -> (Seq x -> Seq y -> Seq (a, b)) -> Event x -> SchedulerIO s ()
onEvent vt ot f ev = do
evl <- liftIO $ atomically $ onEvent' vt ot f ev
mapM_ (send sub) evl
let at = (aVals, aDone)
bt = (bVals, bDone)
ad <- a >>: onEvent at bt zip
liftIO $ ds `addDisposable` ad
bd <- b >>: onEvent bt at (flip zip)
liftIO $ ds `addDisposable` bd
liftIO $ toDisposable ds
instance Scheduler s => MonadZip (Signal s) where
mzip = szip