module Util.Broadcaster(
GeneralBroadcaster,
Broadcaster,
SimpleBroadcaster,
newBroadcaster,
newSimpleBroadcaster,
newGeneralBroadcaster,
BroadcasterClass(broadcast),
applySimpleUpdate,
applySimpleUpdate',
applyUpdate,
applyGeneralUpdate,
switchOffSimpleSource,
mirrorSimpleSource,
mirrorSimpleSourceWithDelayer,
) where
import Data.IORef
import qualified Control.Concurrent.MVar as MVar
import System.IO.Unsafe
import Util.Sink
import Util.Sources
import Util.Delayer
import Util.Debug(debug)
data GeneralBroadcaster x d = GeneralBroadcaster {
source' :: Source x d,
updater :: Updater x d
}
data Broadcaster x d = Broadcaster {
source :: Source x d,
updateAct :: (x -> (x,[d])) -> IO ()
}
data SimpleBroadcaster x = SimpleBroadcaster {
simpleSource :: SimpleSource x,
updateAct3 :: (forall y . (x -> (x,y)) -> IO y)
}
updateAct2 :: SimpleBroadcaster x -> (x -> x) -> IO ()
updateAct2 broadcaster fn =
updateAct3 broadcaster (\ x -> (fn x,()))
newBroadcaster :: x -> IO (Broadcaster x d)
newBroadcaster x =
do
(source,updateAct) <- variableSource x
return (Broadcaster {source = source,updateAct = updateAct})
newSimpleBroadcaster :: x -> IO (SimpleBroadcaster x)
newSimpleBroadcaster (x :: x) =
do
(source,updater :: Updater x x) <- variableGeneralSource x
let
updateAct3 :: (x -> (x,y)) -> IO y
updateAct3 fn = applyToUpdater updater
(\ x0 ->
let
(x1,y) = fn x0
in
(x1,[x1],y)
)
return (SimpleBroadcaster {simpleSource = SimpleSource source,
updateAct3 = updateAct3})
newGeneralBroadcaster :: x -> IO (GeneralBroadcaster x d)
newGeneralBroadcaster x =
do
(source,updater) <- variableGeneralSource x
return (GeneralBroadcaster {source' = source,updater = updater})
class BroadcasterClass broadcaster value | broadcaster -> value where
broadcast :: broadcaster -> value -> IO ()
instance BroadcasterClass (Broadcaster x d) (x,[d]) where
broadcast (Broadcaster {updateAct = updateAct}) (x,ds) =
updateAct (\ _ -> (x,ds))
instance BroadcasterClass (SimpleBroadcaster x) x where
broadcast broadcaster x =
updateAct2 broadcaster (\ _ -> x)
applySimpleUpdate :: SimpleBroadcaster x -> (x -> x) -> IO ()
applySimpleUpdate simpleBroadcaster updateFn =
updateAct2 simpleBroadcaster updateFn
applySimpleUpdate' :: SimpleBroadcaster x -> (x -> (x,y)) -> IO y
applySimpleUpdate' simpleBroadcaster updateFn =
updateAct3 simpleBroadcaster updateFn
applyUpdate :: Broadcaster x d -> (x -> (x,[d])) -> IO ()
applyUpdate (Broadcaster {updateAct = updateAct}) updateFn =
updateAct updateFn
applyGeneralUpdate :: GeneralBroadcaster x d -> (x -> (x,[d],extra)) -> IO extra
applyGeneralUpdate (GeneralBroadcaster {updater = updater}) updateAct =
applyToUpdater updater updateAct
instance HasSource (Broadcaster x d) x d where
toSource broadcaster = source broadcaster
instance HasSource (SimpleBroadcaster x) x x where
toSource broadcaster = toSource . toSimpleSource $ broadcaster
instance HasSource (GeneralBroadcaster x d) x d where
toSource generalBroadcaster = source' generalBroadcaster
instance HasSimpleSource (SimpleBroadcaster x) x where
toSimpleSource simpleBroadcaster = simpleSource simpleBroadcaster
switchOffSimpleSource :: SimpleSource a -> IO (SimpleSource a,IO (IO ()))
switchOffSimpleSource simpleSource =
do
broadcaster <- newSimpleBroadcaster simpleSource
let
switchOffSource = staticSimpleSourceIO (readContents simpleSource)
switchOff =
do
broadcast broadcaster switchOffSource
return (broadcast broadcaster simpleSource)
newSource =
do
source <- toSimpleSource broadcaster
source
return (newSource,switchOff)
mirrorSimpleSource :: SimpleSource a -> IO (SimpleSource a,IO ())
mirrorSimpleSource (simpleSource :: SimpleSource a) =
do
(sourceMVar :: MVar.MVar (Maybe (SimpleSource a)))
<- MVar.newMVar Nothing
sinkId <- newSinkID
let
getSource :: IO (SimpleSource a)
getSource = MVar.modifyMVar sourceMVar
(\ sourceOpt -> case sourceOpt of
Just source -> return (sourceOpt,source)
Nothing ->
do
parallelX <- newParallelExec
broadcaster <- newSimpleBroadcaster
(error "mirrorSimpleSource: 1")
initialised <- MVar.newEmptyMVar
let
writeX a =
do
broadcast broadcaster a
MVar.putMVar initialised ()
writeD a =
do
broadcast broadcaster a
addNewSourceActions (toSource simpleSource) writeX writeD
sinkId parallelX
MVar.takeMVar initialised
let
source = toSimpleSource broadcaster
return (Just source,source)
)
source <- getSource
return (source,invalidate sinkId)
mirrorSimpleSourceWithDelayer :: Delayer -> SimpleSource a -> IO (SimpleSource a,IO ())
mirrorSimpleSourceWithDelayer delayer (simpleSource :: SimpleSource a) =
do
sinkId <- newSinkID
parallelX <- newParallelExec
let
emergencyRead =
do
debug "Broadcaster: emergency read"
readContents simpleSource
broadcaster <- newSimpleBroadcaster (unsafePerformIO emergencyRead)
ref <- newIORef (error "mirrorSimpleSource: 3")
let
writeAct val = writeIORef ref val
bumpAct =
do
val <- readIORef ref
broadcast broadcaster val
delayedBumpAct <- newDelayedAction bumpAct
let
updateAct val =
do
writeAct val
delayedAct delayer delayedBumpAct
addNewSourceActions (toSource simpleSource)
(broadcast broadcaster) updateAct sinkId parallelX
return (toSimpleSource broadcaster,invalidate sinkId)