module Signal.Command ( Command
, CommandPolicy(..)
, newCommand
, canExecute
, executing
, execute
, values
, onExecute
, errors
, Channel
, Signal
, Scheduler
, SchedulerIO
, BackgroundScheduler
, MainScheduler
) where
import Control.Concurrent.MVar
import Control.Exception hiding (finally)
import Control.Monad
import Control.Monad.IO.Class
import Data.Monoid
import Data.Word
import Prelude hiding (map)
import Scheduler
import Scheduler.Main
import Signal
import Signal.Channel
import Signal.Connection
import Signal.Operators
import Signal.Scheduled
import Signal.Subscriber
import System.Mem.Weak
data CommandPolicy = ExecuteSerially
| ExecuteConcurrently
deriving (Eq, Show)
data Command v = Command {
policy :: CommandPolicy,
valuesChannel :: Channel MainScheduler v,
errorsChannel :: Channel MainScheduler IOException,
itemsInFlight :: MVar Word32,
executingChannel :: Channel MainScheduler Bool,
canExecuteChannel :: Channel MainScheduler Bool
}
newCommand
:: CommandPolicy
-> Signal MainScheduler Bool
-> SchedulerIO MainScheduler (Command v)
newCommand p ces = do
vChan <- liftIO newChannel
errChan <- liftIO newChannel
exChan <- liftIO $ newReplayChannel $ LimitedCapacity 1
ceChan <- liftIO $ newReplayChannel $ LimitedCapacity 1
items <- liftIO $ newMVar 0
let canExecute :: Bool -> Bool -> CommandPolicy -> Bool
canExecute ce executing ExecuteSerially = ce && not executing
canExecute ce _ ExecuteConcurrently = ce
onEvent :: Event (Bool, Bool) -> SchedulerIO MainScheduler ()
onEvent (NextEvent (ce, ex)) = send (fst ceChan) $ NextEvent $ canExecute ce ex p
onEvent _ = return ()
complete :: IO ()
complete = do
sch <- getMainScheduler
void $ schedule sch $ do
send (fst vChan) CompletedEvent
send (fst errChan) CompletedEvent
send (fst exChan) CompletedEvent
send (fst ceChan) CompletedEvent
combine (return True `mappend` ces) (snd exChan) >>: onEvent
let command = Command {
policy = p,
valuesChannel = vChan,
errorsChannel = errChan,
itemsInFlight = items,
executingChannel = exChan,
canExecuteChannel = ceChan
}
setExecuting command False
liftIO $ addFinalizer command complete
return command
executing :: Command v -> Signal MainScheduler Bool
executing = snd . executingChannel
errors :: Command v -> Signal MainScheduler IOException
errors = snd . errorsChannel
values :: Command v -> Signal MainScheduler v
values = snd . valuesChannel
canExecute :: Command v -> Signal MainScheduler Bool
canExecute = snd . canExecuteChannel
setExecuting :: Command v -> Bool -> SchedulerIO MainScheduler ()
setExecuting c b = send (fst $ executingChannel c) $ NextEvent b
execute
:: Command v
-> v
-> SchedulerIO MainScheduler Bool
execute c v = do
items <- liftIO $ takeMVar $ itemsInFlight c
let execute' :: SchedulerIO MainScheduler Bool
execute' = do
liftIO $ putMVar (itemsInFlight c) $ items + 1
setExecuting c True
fst (valuesChannel c) `send` NextEvent v
items <- liftIO $ modifyMVar (itemsInFlight c) $ \items ->
return (items 1, items 1)
when (items == 0) $ setExecuting c False
return True
ce <- first $ canExecute c
case ce of
(NextEvent True) -> execute'
_ -> do
liftIO $ putMVar (itemsInFlight c) items
return False
onExecute
:: Command v
-> (v -> Signal BackgroundScheduler ())
-> SchedulerIO MainScheduler (Signal MainScheduler (Signal BackgroundScheduler ()))
onExecute c f =
let incItems :: SchedulerIO MainScheduler ()
incItems = do
liftIO $ modifyMVar_ (itemsInFlight c) $ \items -> return $ items + 1
setExecuting c True
decItems :: SchedulerIO BackgroundScheduler ()
decItems = do
mainSch <- liftIO getMainScheduler
void $ liftIO $ schedule mainSch $ do
items <- liftIO $ modifyMVar (itemsInFlight c) $ \items -> return (items 1, items 1)
when (items == 0) $ setExecuting c False
sendError :: IOException -> IO ()
sendError ex = do
sch <- getMainScheduler
void $ schedule sch $ send (fst $ errorsChannel c) $ NextEvent ex
coldSignals :: Signal MainScheduler (Signal BackgroundScheduler ())
coldSignals =
values c
`doNext` const incItems
`map` \v ->
f v
`doError` (liftIO . sendError)
`finally` decItems
signals :: Signal MainScheduler (Signal BackgroundScheduler ())
signals =
coldSignals >>= \sig ->
signal $ \sub -> do
sch <- liftIO newScheduler
chan <- liftIO $ newReplayChannel UnlimitedCapacity
conn <- liftIO $ multicast sig chan
d <- liftIO $ schedule sch $ void $ connect conn
send sub $ NextEvent $ multicastedSignal conn
return d
in replayLast signals