{-# LANGUAGE Rank2Types #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE FunctionalDependencies #-}
{-# LANGUAGE FlexibleInstances #-}

-- | A Broadcaster/SimpleBroadcaster is a variable Source/SimpleSource paired
-- with its update function
module Util.Broadcaster(
   -- instances of HasSource (and so CanAddSinks)
   GeneralBroadcaster,
   Broadcaster,
   SimpleBroadcaster,

   newBroadcaster, -- :: x -> IO (Broadcaster x d)
   newSimpleBroadcaster, -- :: x -> IO (SimpleBroadcaster x)
   newGeneralBroadcaster, -- :: x -> IO (GeneralBroadcaster x d)

   BroadcasterClass(broadcast),
      -- sends an update to a broadcaster.

   applySimpleUpdate, -- :: SimpleBroadcaster x -> (x -> x) -> IO ()
   applySimpleUpdate', -- :: SimpleBroadcaster x -> (x -> (x,y)) -> IO y


   applyUpdate, -- :: Broadcaster x d -> (x -> (x,[d])) -> IO ()

   applyGeneralUpdate, -- :: GeneralBroadcaster x d -> (x -> (x,[d],extra)) -> IO extra

   switchOffSimpleSource,
      -- :: SimpleSource a -> IO (SimpleSource a,IO (IO ()))
      -- Replace a SimpleSource by another which comes with a switch-off
      -- function, which temporarily blocks further updates.
      -- The action returned by the switch-off function switches the source
      -- again.

   mirrorSimpleSource,
      -- :: SimpleSource a -> IO (SimpleSource a,IO ())
      -- Replace a SimpleSource by another which mirrors it, but only copies
      -- from it once, hopefully saving CPU time.
      -- The IO action stops the mirroring.

   mirrorSimpleSourceWithDelayer,
      -- :: Delayer -> (a -> IO ()) -> SimpleSource a
      -- -> IO (SimpleSource a,IO ())
      -- Replace a SimpleSource by another which mirrors it, but only copies
      -- from it once, hopefully saving CPU time.  In addition, block all
      -- update while the Delayer is delaying things.

   ) where

import Data.IORef
import qualified Control.Concurrent.MVar as MVar
import System.IO.Unsafe

import Util.Sink
import Util.Sources
import Util.Delayer
import Util.Debug(debug)

-- -----------------------------------------------------------------
-- Datatypes
-- -----------------------------------------------------------------

data GeneralBroadcaster x d = GeneralBroadcaster {
   GeneralBroadcaster x d -> Source x d
source' :: Source x d,
   GeneralBroadcaster x d -> Updater x d
updater :: Updater x d
   }

data Broadcaster x d = Broadcaster {
   Broadcaster x d -> Source x d
source :: Source x d,
   Broadcaster x d -> (x -> (x, [d])) -> IO ()
updateAct :: (x -> (x,[d])) -> IO ()
   }

data SimpleBroadcaster x = SimpleBroadcaster {
   SimpleBroadcaster x -> SimpleSource x
simpleSource :: SimpleSource x,
   SimpleBroadcaster x -> forall y. (x -> (x, y)) -> IO y
updateAct3 :: (forall y . (x -> (x,y)) -> IO y)
   }

-- | old field name, preserved here for compatibility.
updateAct2 :: SimpleBroadcaster x -> (x -> x) -> IO ()
updateAct2 :: SimpleBroadcaster x -> (x -> x) -> IO ()
updateAct2 SimpleBroadcaster x
broadcaster x -> x
fn =
   SimpleBroadcaster x -> (x -> (x, ())) -> IO ()
forall x. SimpleBroadcaster x -> forall y. (x -> (x, y)) -> IO y
updateAct3 SimpleBroadcaster x
broadcaster (\ x
x -> (x -> x
fn x
x,()))

-- -----------------------------------------------------------------
-- Creation
-- -----------------------------------------------------------------

newBroadcaster :: x -> IO (Broadcaster x d)
newBroadcaster :: x -> IO (Broadcaster x d)
newBroadcaster x
x =
   do
      (Source x d
source,(x -> (x, [d])) -> IO ()
updateAct) <- x -> IO (Source x d, (x -> (x, [d])) -> IO ())
forall x d. x -> IO (Source x d, (x -> (x, [d])) -> IO ())
variableSource x
x
      Broadcaster x d -> IO (Broadcaster x d)
forall (m :: * -> *) a. Monad m => a -> m a
return (Broadcaster :: forall x d.
Source x d -> ((x -> (x, [d])) -> IO ()) -> Broadcaster x d
Broadcaster {source :: Source x d
source = Source x d
source,updateAct :: (x -> (x, [d])) -> IO ()
updateAct = (x -> (x, [d])) -> IO ()
updateAct})

newSimpleBroadcaster :: x -> IO (SimpleBroadcaster x)
newSimpleBroadcaster :: x -> IO (SimpleBroadcaster x)
newSimpleBroadcaster (x
x :: x) =
   do
      (Source x x
source,Updater x x
updater :: Updater x x) <- x -> IO (Source x x, Updater x x)
forall x d. x -> IO (Source x d, Updater x d)
variableGeneralSource x
x
      let
         updateAct3 :: (x -> (x,y)) -> IO y
         updateAct3 :: (x -> (x, y)) -> IO y
updateAct3 x -> (x, y)
fn = Updater x x -> (x -> (x, [x], y)) -> IO y
forall x d extra. Updater x d -> (x -> (x, [d], extra)) -> IO extra
applyToUpdater Updater x x
updater
            (\ x
x0 ->
               let
                  (x
x1,y
y) = x -> (x, y)
fn x
x0
               in
                  (x
x1,[x
x1],y
y)
               )
      SimpleBroadcaster x -> IO (SimpleBroadcaster x)
forall (m :: * -> *) a. Monad m => a -> m a
return (SimpleBroadcaster :: forall x.
SimpleSource x
-> (forall y. (x -> (x, y)) -> IO y) -> SimpleBroadcaster x
SimpleBroadcaster {simpleSource :: SimpleSource x
simpleSource = Source x x -> SimpleSource x
forall x. Source x x -> SimpleSource x
SimpleSource Source x x
source,
         updateAct3 :: forall y. (x -> (x, y)) -> IO y
updateAct3 = forall y. (x -> (x, y)) -> IO y
updateAct3})

newGeneralBroadcaster :: x -> IO (GeneralBroadcaster x d)
newGeneralBroadcaster :: x -> IO (GeneralBroadcaster x d)
newGeneralBroadcaster x
x =
   do
      (Source x d
source,Updater x d
updater) <- x -> IO (Source x d, Updater x d)
forall x d. x -> IO (Source x d, Updater x d)
variableGeneralSource x
x
      GeneralBroadcaster x d -> IO (GeneralBroadcaster x d)
forall (m :: * -> *) a. Monad m => a -> m a
return (GeneralBroadcaster :: forall x d. Source x d -> Updater x d -> GeneralBroadcaster x d
GeneralBroadcaster {source' :: Source x d
source' = Source x d
source,updater :: Updater x d
updater = Updater x d
updater})

-- -----------------------------------------------------------------
-- Sending values
-- -----------------------------------------------------------------

class BroadcasterClass broadcaster value | broadcaster -> value where
   broadcast :: broadcaster -> value -> IO ()

instance BroadcasterClass (Broadcaster x d) (x,[d]) where
   broadcast :: Broadcaster x d -> (x, [d]) -> IO ()
broadcast (Broadcaster {updateAct :: forall x d. Broadcaster x d -> (x -> (x, [d])) -> IO ()
updateAct = (x -> (x, [d])) -> IO ()
updateAct}) (x
x,[d]
ds) =
      (x -> (x, [d])) -> IO ()
updateAct (\ x
_ -> (x
x,[d]
ds))

instance BroadcasterClass (SimpleBroadcaster x) x where
   broadcast :: SimpleBroadcaster x -> x -> IO ()
broadcast SimpleBroadcaster x
broadcaster x
x =
      SimpleBroadcaster x -> (x -> x) -> IO ()
forall x. SimpleBroadcaster x -> (x -> x) -> IO ()
updateAct2 SimpleBroadcaster x
broadcaster (\ x
_ -> x
x)

applySimpleUpdate :: SimpleBroadcaster x -> (x -> x) -> IO ()
applySimpleUpdate :: SimpleBroadcaster x -> (x -> x) -> IO ()
applySimpleUpdate SimpleBroadcaster x
simpleBroadcaster x -> x
updateFn =
   SimpleBroadcaster x -> (x -> x) -> IO ()
forall x. SimpleBroadcaster x -> (x -> x) -> IO ()
updateAct2 SimpleBroadcaster x
simpleBroadcaster x -> x
updateFn

applySimpleUpdate' :: SimpleBroadcaster x -> (x -> (x,y)) -> IO y
applySimpleUpdate' :: SimpleBroadcaster x -> (x -> (x, y)) -> IO y
applySimpleUpdate' SimpleBroadcaster x
simpleBroadcaster x -> (x, y)
updateFn =
   SimpleBroadcaster x -> (x -> (x, y)) -> IO y
forall x. SimpleBroadcaster x -> forall y. (x -> (x, y)) -> IO y
updateAct3 SimpleBroadcaster x
simpleBroadcaster x -> (x, y)
updateFn

applyUpdate :: Broadcaster x d -> (x -> (x,[d])) -> IO ()
applyUpdate :: Broadcaster x d -> (x -> (x, [d])) -> IO ()
applyUpdate (Broadcaster {updateAct :: forall x d. Broadcaster x d -> (x -> (x, [d])) -> IO ()
updateAct = (x -> (x, [d])) -> IO ()
updateAct}) x -> (x, [d])
updateFn =
   (x -> (x, [d])) -> IO ()
updateAct x -> (x, [d])
updateFn

applyGeneralUpdate :: GeneralBroadcaster x d -> (x -> (x,[d],extra)) -> IO extra
applyGeneralUpdate :: GeneralBroadcaster x d -> (x -> (x, [d], extra)) -> IO extra
applyGeneralUpdate (GeneralBroadcaster {updater :: forall x d. GeneralBroadcaster x d -> Updater x d
updater = Updater x d
updater}) x -> (x, [d], extra)
updateAct =
   Updater x d -> (x -> (x, [d], extra)) -> IO extra
forall x d extra. Updater x d -> (x -> (x, [d], extra)) -> IO extra
applyToUpdater Updater x d
updater x -> (x, [d], extra)
updateAct

-- -----------------------------------------------------------------
-- Instances of HasSource and HasSimpleSource
-- -----------------------------------------------------------------

instance HasSource (Broadcaster x d) x d where
   toSource :: Broadcaster x d -> Source x d
toSource Broadcaster x d
broadcaster = Broadcaster x d -> Source x d
forall x d. Broadcaster x d -> Source x d
source Broadcaster x d
broadcaster

instance HasSource (SimpleBroadcaster x) x x where
   toSource :: SimpleBroadcaster x -> Source x x
toSource SimpleBroadcaster x
broadcaster = SimpleSource x -> Source x x
forall hasSource x d.
HasSource hasSource x d =>
hasSource -> Source x d
toSource (SimpleSource x -> Source x x)
-> (SimpleBroadcaster x -> SimpleSource x)
-> SimpleBroadcaster x
-> Source x x
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SimpleBroadcaster x -> SimpleSource x
forall hasSource x.
HasSimpleSource hasSource x =>
hasSource -> SimpleSource x
toSimpleSource (SimpleBroadcaster x -> Source x x)
-> SimpleBroadcaster x -> Source x x
forall a b. (a -> b) -> a -> b
$ SimpleBroadcaster x
broadcaster

instance HasSource (GeneralBroadcaster x d) x d where
   toSource :: GeneralBroadcaster x d -> Source x d
toSource GeneralBroadcaster x d
generalBroadcaster = GeneralBroadcaster x d -> Source x d
forall x d. GeneralBroadcaster x d -> Source x d
source' GeneralBroadcaster x d
generalBroadcaster

instance HasSimpleSource (SimpleBroadcaster x) x where
   toSimpleSource :: SimpleBroadcaster x -> SimpleSource x
toSimpleSource SimpleBroadcaster x
simpleBroadcaster = SimpleBroadcaster x -> SimpleSource x
forall x. SimpleBroadcaster x -> SimpleSource x
simpleSource SimpleBroadcaster x
simpleBroadcaster


-- -----------------------------------------------------------------
-- switchOffSimpleSource
-- -----------------------------------------------------------------

-- | Replace a SimpleSource by another which comes with a switch-off function,
-- which temporarily blocks further updates.
-- The action returned by the switch-off function switches the source back on
-- again.
switchOffSimpleSource :: SimpleSource a -> IO (SimpleSource a,IO (IO ()))
switchOffSimpleSource :: SimpleSource a -> IO (SimpleSource a, IO (IO ()))
switchOffSimpleSource SimpleSource a
simpleSource =
   do
      SimpleBroadcaster (SimpleSource a)
broadcaster <- SimpleSource a -> IO (SimpleBroadcaster (SimpleSource a))
forall x. x -> IO (SimpleBroadcaster x)
newSimpleBroadcaster SimpleSource a
simpleSource
      let
         switchOffSource :: SimpleSource a
switchOffSource = IO a -> SimpleSource a
forall x. IO x -> SimpleSource x
staticSimpleSourceIO (SimpleSource a -> IO a
forall source x d. HasSource source x d => source -> IO x
readContents SimpleSource a
simpleSource)

         switchOff :: IO (IO ())
switchOff =
            do
               SimpleBroadcaster (SimpleSource a) -> SimpleSource a -> IO ()
forall broadcaster value.
BroadcasterClass broadcaster value =>
broadcaster -> value -> IO ()
broadcast SimpleBroadcaster (SimpleSource a)
broadcaster SimpleSource a
switchOffSource
               IO () -> IO (IO ())
forall (m :: * -> *) a. Monad m => a -> m a
return (SimpleBroadcaster (SimpleSource a) -> SimpleSource a -> IO ()
forall broadcaster value.
BroadcasterClass broadcaster value =>
broadcaster -> value -> IO ()
broadcast SimpleBroadcaster (SimpleSource a)
broadcaster SimpleSource a
simpleSource)

         newSource :: SimpleSource a
newSource =
            do
               SimpleSource a
source <- SimpleBroadcaster (SimpleSource a) -> SimpleSource (SimpleSource a)
forall hasSource x.
HasSimpleSource hasSource x =>
hasSource -> SimpleSource x
toSimpleSource SimpleBroadcaster (SimpleSource a)
broadcaster
               SimpleSource a
source
      (SimpleSource a, IO (IO ())) -> IO (SimpleSource a, IO (IO ()))
forall (m :: * -> *) a. Monad m => a -> m a
return (SimpleSource a
newSource,IO (IO ())
switchOff)

-- -----------------------------------------------------------------
-- mirrorSimpleSource and mirrorSimpleSourceWithDelayer
-- -----------------------------------------------------------------

-- | Replace a SimpleSource by another which mirrors it, but only copies
-- from it once, hopefully saving CPU time.
-- The IO action stops the mirroring.
mirrorSimpleSource :: SimpleSource a -> IO (SimpleSource a,IO ())
mirrorSimpleSource :: SimpleSource a -> IO (SimpleSource a, IO ())
mirrorSimpleSource (SimpleSource a
simpleSource :: SimpleSource a) =
   do
      (MVar (Maybe (SimpleSource a))
sourceMVar :: MVar.MVar (Maybe (SimpleSource a)))
         <- Maybe (SimpleSource a) -> IO (MVar (Maybe (SimpleSource a)))
forall a. a -> IO (MVar a)
MVar.newMVar Maybe (SimpleSource a)
forall a. Maybe a
Nothing
      SinkID
sinkId <- IO SinkID
newSinkID

      let
         getSource :: IO (SimpleSource a)
         getSource :: IO (SimpleSource a)
getSource = MVar (Maybe (SimpleSource a))
-> (Maybe (SimpleSource a)
    -> IO (Maybe (SimpleSource a), SimpleSource a))
-> IO (SimpleSource a)
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
MVar.modifyMVar MVar (Maybe (SimpleSource a))
sourceMVar
            (\ Maybe (SimpleSource a)
sourceOpt -> case Maybe (SimpleSource a)
sourceOpt of
               Just SimpleSource a
source -> (Maybe (SimpleSource a), SimpleSource a)
-> IO (Maybe (SimpleSource a), SimpleSource a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (SimpleSource a)
sourceOpt,SimpleSource a
source)
               Maybe (SimpleSource a)
Nothing ->
                  do
                     ParallelExec
parallelX <- IO ParallelExec
newParallelExec
                     SimpleBroadcaster a
broadcaster <- a -> IO (SimpleBroadcaster a)
forall x. x -> IO (SimpleBroadcaster x)
newSimpleBroadcaster
                        ([Char] -> a
forall a. HasCallStack => [Char] -> a
error [Char]
"mirrorSimpleSource: 1")
                     MVar ()
initialised <- IO (MVar ())
forall a. IO (MVar a)
MVar.newEmptyMVar

                     let
                        writeX :: a -> IO ()
writeX a
a =
                           do
                              SimpleBroadcaster a -> a -> IO ()
forall broadcaster value.
BroadcasterClass broadcaster value =>
broadcaster -> value -> IO ()
broadcast SimpleBroadcaster a
broadcaster a
a
                              MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
MVar.putMVar MVar ()
initialised ()
                        writeD :: a -> IO ()
writeD a
a =
                           do
                              SimpleBroadcaster a -> a -> IO ()
forall broadcaster value.
BroadcasterClass broadcaster value =>
broadcaster -> value -> IO ()
broadcast SimpleBroadcaster a
broadcaster a
a

                     Source a a
-> (a -> IO ()) -> (a -> IO ()) -> SinkID -> ParallelExec -> IO a
forall x d.
Source x d
-> (x -> IO ()) -> (d -> IO ()) -> SinkID -> ParallelExec -> IO x
addNewSourceActions (SimpleSource a -> Source a a
forall hasSource x d.
HasSource hasSource x d =>
hasSource -> Source x d
toSource SimpleSource a
simpleSource) a -> IO ()
writeX a -> IO ()
writeD
                        SinkID
sinkId ParallelExec
parallelX
                     MVar () -> IO ()
forall a. MVar a -> IO a
MVar.takeMVar MVar ()
initialised
                     let
                        source :: SimpleSource a
source = SimpleBroadcaster a -> SimpleSource a
forall hasSource x.
HasSimpleSource hasSource x =>
hasSource -> SimpleSource x
toSimpleSource SimpleBroadcaster a
broadcaster
                     (Maybe (SimpleSource a), SimpleSource a)
-> IO (Maybe (SimpleSource a), SimpleSource a)
forall (m :: * -> *) a. Monad m => a -> m a
return (SimpleSource a -> Maybe (SimpleSource a)
forall a. a -> Maybe a
Just SimpleSource a
source,SimpleSource a
source)
               )

      SimpleSource a
source <- IO (SimpleSource a)
getSource

      (SimpleSource a, IO ()) -> IO (SimpleSource a, IO ())
forall (m :: * -> *) a. Monad m => a -> m a
return (SimpleSource a
source,SinkID -> IO ()
forall source. HasInvalidate source => source -> IO ()
invalidate SinkID
sinkId)


-- | Replace a SimpleSource by another which mirrors it, but only copies
-- from it once, hopefully saving CPU time.  In addition, block all
-- update while the Delayer is delaying things.
mirrorSimpleSourceWithDelayer :: Delayer -> SimpleSource a -> IO (SimpleSource a,IO ())
mirrorSimpleSourceWithDelayer :: Delayer -> SimpleSource a -> IO (SimpleSource a, IO ())
mirrorSimpleSourceWithDelayer Delayer
delayer (SimpleSource a
simpleSource :: SimpleSource a) =
   do
      SinkID
sinkId <- IO SinkID
newSinkID
      ParallelExec
parallelX <- IO ParallelExec
newParallelExec
      let
         -- emergencyRead should not be used too often I hope.
         emergencyRead :: IO a
emergencyRead =
            do
               [Char] -> IO ()
forall a. Show a => a -> IO ()
debug [Char]
"Broadcaster: emergency read"
               SimpleSource a -> IO a
forall source x d. HasSource source x d => source -> IO x
readContents SimpleSource a
simpleSource

      SimpleBroadcaster a
broadcaster <- a -> IO (SimpleBroadcaster a)
forall x. x -> IO (SimpleBroadcaster x)
newSimpleBroadcaster (IO a -> a
forall a. IO a -> a
unsafePerformIO IO a
emergencyRead)
      IORef a
ref <- a -> IO (IORef a)
forall a. a -> IO (IORef a)
newIORef ([Char] -> a
forall a. HasCallStack => [Char] -> a
error [Char]
"mirrorSimpleSource: 3")

      let
         writeAct :: a -> IO ()
writeAct a
val = IORef a -> a -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef a
ref a
val

         bumpAct :: IO ()
bumpAct =
            do
               a
val <- IORef a -> IO a
forall a. IORef a -> IO a
readIORef IORef a
ref
               SimpleBroadcaster a -> a -> IO ()
forall broadcaster value.
BroadcasterClass broadcaster value =>
broadcaster -> value -> IO ()
broadcast SimpleBroadcaster a
broadcaster a
val

      DelayedAction
delayedBumpAct <- IO () -> IO DelayedAction
newDelayedAction IO ()
bumpAct

      let
         updateAct :: a -> IO ()
updateAct a
val =
            do
               a -> IO ()
writeAct a
val
               Delayer -> DelayedAction -> IO ()
delayedAct Delayer
delayer DelayedAction
delayedBumpAct

      Source a a
-> (a -> IO ()) -> (a -> IO ()) -> SinkID -> ParallelExec -> IO a
forall x d.
Source x d
-> (x -> IO ()) -> (d -> IO ()) -> SinkID -> ParallelExec -> IO x
addNewSourceActions (SimpleSource a -> Source a a
forall hasSource x d.
HasSource hasSource x d =>
hasSource -> Source x d
toSource SimpleSource a
simpleSource)
         (SimpleBroadcaster a -> a -> IO ()
forall broadcaster value.
BroadcasterClass broadcaster value =>
broadcaster -> value -> IO ()
broadcast SimpleBroadcaster a
broadcaster) a -> IO ()
updateAct SinkID
sinkId ParallelExec
parallelX

      (SimpleSource a, IO ()) -> IO (SimpleSource a, IO ())
forall (m :: * -> *) a. Monad m => a -> m a
return (SimpleBroadcaster a -> SimpleSource a
forall hasSource x.
HasSimpleSource hasSource x =>
hasSource -> SimpleSource x
toSimpleSource SimpleBroadcaster a
broadcaster,SinkID -> IO ()
forall source. HasInvalidate source => source -> IO ()
invalidate SinkID
sinkId)