{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE FunctionalDependencies #-}

-- | Very primitive concurrency, this implements a sink, which passes messages
-- along until the receiver is no longer interested.
module Util.Sink(
   HasInvalidate(..),

   SinkID,
   newSinkID,

   Sink,
   newSink,
   newSinkGeneral,
   newParallelSink,
   newParallelDelayedSink,

   putSink,
   putSinkMultiple,
   coMapSink,
   coMapSink',
   coMapIOSink',

   CanAddSinks(..),
   addNewAction,

   ParallelExec,
   newParallelExec,
   parallelExec,
   parallelExecVSem,
   ) where

import Control.Concurrent
import Control.Exception (try)
import System.IO.Unsafe
import Data.IORef

import Util.Object
import Util.ExtendedPrelude
import Util.Thread
import Util.Computation (done)
import Util.VSem

-- -------------------------------------------------------------------------
-- The HasInvalidate
-- -------------------------------------------------------------------------

-- | The HasInvalidate class represents information sources which can be told
-- \"No more, I\'m not interested.\"
class HasInvalidate source where
   invalidate :: source -> IO ()

-- -------------------------------------------------------------------------
-- SinkID
-- -------------------------------------------------------------------------

--
-- A SinkID identifies the consumer and whether the consumer is still
-- interested.
data SinkID = SinkID {
   SinkID -> ObjectID
oID :: ObjectID,
   SinkID -> IORef Bool
interested :: IORef Bool
   }

newSinkID :: IO SinkID
newSinkID :: IO SinkID
newSinkID =
   do
      ObjectID
oID <- IO ObjectID
newObject
      IORef Bool
interested <- Bool -> IO (IORef Bool)
forall a. a -> IO (IORef a)
newIORef Bool
True
      SinkID -> IO SinkID
forall (m :: * -> *) a. Monad m => a -> m a
return (SinkID :: ObjectID -> IORef Bool -> SinkID
SinkID {
         oID :: ObjectID
oID = ObjectID
oID,
         interested :: IORef Bool
interested = IORef Bool
interested
         })

-- | Returns True if sink is still interested
isInterested :: SinkID -> IO Bool
isInterested :: SinkID -> IO Bool
isInterested SinkID
sinkID = IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef (SinkID -> IORef Bool
interested SinkID
sinkID)

instance HasInvalidate SinkID where
   invalidate :: SinkID -> IO ()
invalidate SinkID
sinkID = IORef Bool -> Bool -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (SinkID -> IORef Bool
interested SinkID
sinkID) Bool
False

instance Eq SinkID where
   == :: SinkID -> SinkID -> Bool
(==) SinkID
sinkID1 SinkID
sinkID2 = (SinkID -> ObjectID
oID SinkID
sinkID1) ObjectID -> ObjectID -> Bool
forall a. Eq a => a -> a -> Bool
== (SinkID -> ObjectID
oID SinkID
sinkID2)

instance Ord SinkID where
   compare :: SinkID -> SinkID -> Ordering
compare SinkID
sinkID1 SinkID
sinkID2 = ObjectID -> ObjectID -> Ordering
forall a. Ord a => a -> a -> Ordering
compare (SinkID -> ObjectID
oID SinkID
sinkID1) (SinkID -> ObjectID
oID SinkID
sinkID2)


-- -------------------------------------------------------------------------
-- Sinks
-- -------------------------------------------------------------------------

data Sink x = Sink {
   Sink x -> SinkID
sinkID :: SinkID,
   Sink x -> x -> IO ()
action :: x -> IO ()
   }

-- -------------------------------------------------------------------------
-- The consumer's interface
-- -------------------------------------------------------------------------

-- | Creates a new sink with its own SinkID
newSink :: (x -> IO ()) -> IO (Sink x)
newSink :: (x -> IO ()) -> IO (Sink x)
newSink x -> IO ()
action =
   do
      SinkID
sinkID <- IO SinkID
newSinkID
      SinkID -> (x -> IO ()) -> IO (Sink x)
forall x. SinkID -> (x -> IO ()) -> IO (Sink x)
newSinkGeneral SinkID
sinkID x -> IO ()
action

-- | Creates a new sink with a given SinkID.  This allows us to
-- invalidate lots of sinks just by invalidating one sinkID.
newSinkGeneral :: SinkID -> (x -> IO ()) -> IO (Sink x)
newSinkGeneral :: SinkID -> (x -> IO ()) -> IO (Sink x)
newSinkGeneral SinkID
sinkID x -> IO ()
action = Sink x -> IO (Sink x)
forall (m :: * -> *) a. Monad m => a -> m a
return (Sink :: forall x. SinkID -> (x -> IO ()) -> Sink x
Sink {sinkID :: SinkID
sinkID = SinkID
sinkID,action :: x -> IO ()
action = x -> IO ()
action})

-- | Or we can do so with HasInvalidate
instance HasInvalidate (Sink x) where
   invalidate :: Sink x -> IO ()
invalidate Sink x
sink = SinkID -> IO ()
forall source. HasInvalidate source => source -> IO ()
invalidate (Sink x -> SinkID
forall x. Sink x -> SinkID
sinkID Sink x
sink)

-- -------------------------------------------------------------------------
-- The provider's interface
-- -------------------------------------------------------------------------

-- | Put a value into the sink, returning False if the sink id has been
-- invalidated.
putSink :: Sink x -> x -> IO Bool
putSink :: Sink x -> x -> IO Bool
putSink Sink x
sink x
x =
   do
      Bool
interested <- SinkID -> IO Bool
isInterested (Sink x -> SinkID
forall x. Sink x -> SinkID
sinkID Sink x
sink)
      if Bool
interested then (Sink x -> x -> IO ()
forall x. Sink x -> x -> IO ()
action Sink x
sink x
x) else IO ()
forall (m :: * -> *). Monad m => m ()
done
      Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
interested
-- | Put a list of values into the sink, returning False if the sink id has been
-- invalidated
putSinkMultiple :: Sink x -> [x] -> IO Bool
putSinkMultiple :: Sink x -> [x] -> IO Bool
putSinkMultiple Sink x
sink [] = Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
putSinkMultiple Sink x
sink (x
x:[x]
xs) =
   do
      Bool
interested <- Sink x -> x -> IO Bool
forall x. Sink x -> x -> IO Bool
putSink Sink x
sink x
x
      if Bool
interested
         then
            Sink x -> [x] -> IO Bool
forall x. Sink x -> [x] -> IO Bool
putSinkMultiple Sink x
sink [x]
xs
         else
            Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
interested

-- | Convert a sink from one type to another
coMapSink :: (y -> x) -> Sink x -> Sink y
coMapSink :: (y -> x) -> Sink x -> Sink y
coMapSink y -> x
fn (Sink {sinkID :: forall x. Sink x -> SinkID
sinkID = SinkID
sinkID,action :: forall x. Sink x -> x -> IO ()
action = x -> IO ()
action}) =
   Sink :: forall x. SinkID -> (x -> IO ()) -> Sink x
Sink {sinkID :: SinkID
sinkID = SinkID
sinkID,action :: y -> IO ()
action = x -> IO ()
action (x -> IO ()) -> (y -> x) -> y -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. y -> x
fn}

-- | Another version which allows a transformation function to filter
-- certain elements
coMapSink' :: (y -> Maybe x) -> Sink x -> Sink y
coMapSink' :: (y -> Maybe x) -> Sink x -> Sink y
coMapSink' y -> Maybe x
fn (Sink {sinkID :: forall x. Sink x -> SinkID
sinkID = SinkID
sinkID,action :: forall x. Sink x -> x -> IO ()
action = x -> IO ()
action}) =
   let
      action' :: y -> IO ()
action' y
y = case y -> Maybe x
fn y
y of
         Maybe x
Nothing -> IO ()
forall (m :: * -> *). Monad m => m ()
done
         Just x
x -> x -> IO ()
action x
x
   in
      Sink :: forall x. SinkID -> (x -> IO ()) -> Sink x
Sink {sinkID :: SinkID
sinkID = SinkID
sinkID,action :: y -> IO ()
action = y -> IO ()
action'}

-- | A version which allows an IO action, which had better not take too long.
coMapIOSink' :: (y -> IO (Maybe x)) -> Sink x -> Sink y
coMapIOSink' :: (y -> IO (Maybe x)) -> Sink x -> Sink y
coMapIOSink' y -> IO (Maybe x)
actFn (Sink {sinkID :: forall x. Sink x -> SinkID
sinkID = SinkID
sinkID,action :: forall x. Sink x -> x -> IO ()
action = x -> IO ()
action}) =
    let
       action' :: y -> IO ()
action' y
y =
          do
             Maybe x
xOpt <- y -> IO (Maybe x)
actFn y
y
             case Maybe x
xOpt of
                Maybe x
Nothing -> IO ()
forall (m :: * -> *). Monad m => m ()
done
                Just x
x -> x -> IO ()
action x
x
    in
       Sink :: forall x. SinkID -> (x -> IO ()) -> Sink x
Sink {sinkID :: SinkID
sinkID = SinkID
sinkID,action :: y -> IO ()
action = y -> IO ()
action'}

-- -------------------------------------------------------------------------
-- The CanAddSinks class.
-- -------------------------------------------------------------------------

-- | A class for things (in particular Source and SimpleSource) that can
-- output via sinks.  Each sink source is supposed to have a unique
-- x, containing a representation of the current value, and delta,
-- containing the (incremental) updates which are put in the sink.
-- Only the addOrdSink function must be defined by instances.
class CanAddSinks sinkSource x delta | sinkSource -> x,sinkSource -> delta
      where
   ---
   -- Create and add a new sink containing the given action.
   addNewSink :: sinkSource -> (delta -> IO ()) -> IO (x,Sink delta)
   addNewSink sinkSource
sinkSource delta -> IO ()
action =
      do
         ParallelExec
parallelX <- IO ParallelExec
newParallelExec
         sinkSource -> (delta -> IO ()) -> IO (x, Sink delta)
forall sinkSource x delta.
CanAddSinks sinkSource x delta =>
sinkSource -> (delta -> IO ()) -> IO (x, Sink delta)
addNewQuickSink sinkSource
sinkSource
            (\ delta
delta -> ParallelExec -> IO () -> IO ()
parallelExec ParallelExec
parallelX (delta -> IO ()
action delta
delta))

   ---
   -- Like addNewSink, but use the supplied SinkID
   addNewSinkGeneral :: sinkSource -> (delta -> IO ()) -> SinkID
      -> IO (x,Sink delta)
   addNewSinkGeneral sinkSource
sinkSource delta -> IO ()
action SinkID
sinkID =
      do
         ParallelExec
parallelX <- IO ParallelExec
newParallelExec
         sinkSource
-> (delta -> IO ()) -> SinkID -> ParallelExec -> IO (x, Sink delta)
forall sinkSource x delta.
CanAddSinks sinkSource x delta =>
sinkSource
-> (delta -> IO ()) -> SinkID -> ParallelExec -> IO (x, Sink delta)
addNewSinkVeryGeneral sinkSource
sinkSource delta -> IO ()
action SinkID
sinkID ParallelExec
parallelX

   ---
   -- Like addNewQuickSink, but use the supplied ParallelExec as well
   addNewSinkVeryGeneral :: sinkSource -> (delta -> IO ()) -> SinkID
      -> ParallelExec -> IO (x,Sink delta)
   addNewSinkVeryGeneral sinkSource
sinkSource delta -> IO ()
action SinkID
sinkID ParallelExec
parallelX =
      sinkSource -> (delta -> IO ()) -> SinkID -> IO (x, Sink delta)
forall sinkSource x delta.
CanAddSinks sinkSource x delta =>
sinkSource -> (delta -> IO ()) -> SinkID -> IO (x, Sink delta)
addNewQuickSinkGeneral
         sinkSource
sinkSource
         (\ delta
delta -> ParallelExec -> IO () -> IO ()
parallelExec ParallelExec
parallelX (
            do
               -- add an extra check here to prevent surplus queued actions
               -- being performed after the sink has been invalidated.
               Bool
interested <- SinkID -> IO Bool
isInterested SinkID
sinkID
               if Bool
interested then delta -> IO ()
action delta
delta else IO ()
forall (m :: * -> *). Monad m => m ()
done
            ))
         SinkID
sinkID

   ---
   -- Like addNewSinkVeryGeneral, but compute an action from the x value which
   -- is performed in the parallelExec thread first of all.
   addNewSinkWithInitial :: sinkSource -> (x -> IO ()) -> (delta -> IO ())
      -> SinkID -> ParallelExec -> IO (x,Sink delta)
   addNewSinkWithInitial sinkSource
sinkSource x -> IO ()
xAction delta -> IO ()
deltaAction SinkID
sinkID ParallelExec
parallelX =
      do
         MVar x
mVar <- IO (MVar x)
forall a. IO (MVar a)
newEmptyMVar
         let
            firstAct :: IO ()
firstAct =
               do
                  x
x <- MVar x -> IO x
forall a. MVar a -> IO a
takeMVar MVar x
mVar
                  x -> IO ()
xAction x
x
         ParallelExec -> IO () -> IO ()
parallelExec ParallelExec
parallelX IO ()
firstAct
         (returnValue :: (x, Sink delta)
returnValue @ (x
x,Sink delta
sink))
            <- sinkSource
-> (delta -> IO ()) -> SinkID -> ParallelExec -> IO (x, Sink delta)
forall sinkSource x delta.
CanAddSinks sinkSource x delta =>
sinkSource
-> (delta -> IO ()) -> SinkID -> ParallelExec -> IO (x, Sink delta)
addNewSinkVeryGeneral sinkSource
sinkSource delta -> IO ()
deltaAction SinkID
sinkID ParallelExec
parallelX
         MVar x -> x -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar x
mVar x
x
         (x, Sink delta) -> IO (x, Sink delta)
forall (m :: * -> *) a. Monad m => a -> m a
return (x, Sink delta)
returnValue

   ---
   -- Like addNewSink, but the action is guaranteed to terminate quickly
   -- and normally.
   addNewQuickSink :: sinkSource -> (delta -> IO ()) -> IO (x,Sink delta)
   addNewQuickSink sinkSource
sinkSource delta -> IO ()
action =
      do
         Sink delta
sink <- (delta -> IO ()) -> IO (Sink delta)
forall x. (x -> IO ()) -> IO (Sink x)
newSink delta -> IO ()
action
         x
x <- sinkSource -> Sink delta -> IO x
forall sinkSource x delta.
CanAddSinks sinkSource x delta =>
sinkSource -> Sink delta -> IO x
addOldSink sinkSource
sinkSource Sink delta
sink
         (x, Sink delta) -> IO (x, Sink delta)
forall (m :: * -> *) a. Monad m => a -> m a
return (x
x,Sink delta
sink)

   ---
   -- Like addNewQuickSink, but use the supplied SinkID
   addNewQuickSinkGeneral :: sinkSource -> (delta -> IO ()) -> SinkID
      -> IO (x,Sink delta)
   addNewQuickSinkGeneral sinkSource
sinkSource delta -> IO ()
action SinkID
sinkID =
      do
         Sink delta
sink <- SinkID -> (delta -> IO ()) -> IO (Sink delta)
forall x. SinkID -> (x -> IO ()) -> IO (Sink x)
newSinkGeneral SinkID
sinkID delta -> IO ()
action
         x
x <- sinkSource -> Sink delta -> IO x
forall sinkSource x delta.
CanAddSinks sinkSource x delta =>
sinkSource -> Sink delta -> IO x
addOldSink sinkSource
sinkSource Sink delta
sink
         (x, Sink delta) -> IO (x, Sink delta)
forall (m :: * -> *) a. Monad m => a -> m a
return (x
x,Sink delta
sink)

   ---
   -- Adds a pre-existing sink.
   addOldSink :: sinkSource -> Sink delta -> IO x

-- | Add an action to a sinkSource which is performed until the action returns
-- False.
addNewAction :: CanAddSinks sinkSource x delta
   => sinkSource -> (delta -> IO Bool) -> IO x
addNewAction :: sinkSource -> (delta -> IO Bool) -> IO x
addNewAction sinkSource
sinkSource delta -> IO Bool
action =
   do
      MVar (Sink delta)
sinkMVar <- IO (MVar (Sink delta))
forall a. IO (MVar a)
newEmptyMVar
      let
         deltaAct :: delta -> IO ()
deltaAct delta
delta =
            do
               Bool
continue <- delta -> IO Bool
action delta
delta
               if Bool
continue
                  then
                     IO ()
forall (m :: * -> *). Monad m => m ()
done
                  else
                     do
                        Sink delta
sink <- MVar (Sink delta) -> IO (Sink delta)
forall a. MVar a -> IO a
takeMVar MVar (Sink delta)
sinkMVar
                        Sink delta -> IO ()
forall source. HasInvalidate source => source -> IO ()
invalidate Sink delta
sink
                        String -> IO ()
BreakFn
simpleFallOut String
""

      (x
x,Sink delta
sink) <- sinkSource -> (delta -> IO ()) -> IO (x, Sink delta)
forall sinkSource x delta.
CanAddSinks sinkSource x delta =>
sinkSource -> (delta -> IO ()) -> IO (x, Sink delta)
addNewSink sinkSource
sinkSource delta -> IO ()
deltaAct
      MVar (Sink delta) -> Sink delta -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Sink delta)
sinkMVar Sink delta
sink
      x -> IO x
forall (m :: * -> *) a. Monad m => a -> m a
return x
x

-- -------------------------------------------------------------------------
-- A ParallelExec executes actions concurrently in a separate thread
--
-- Apart from (probably) being cheaper than forking off a new thread
-- each time, it also guarantees the order of the actions.
--
-- The Thread can be stopped with simpleFallOut.
--
-- We also provide a VSem which is locked locally when a parallelExec action
-- is pending.
-- -------------------------------------------------------------------------

newtype ParallelExec = ParallelExec (Chan (IO ()))

parallelExecVSem :: VSem
parallelExecVSem :: VSem
parallelExecVSem = IO VSem -> VSem
forall a. IO a -> a
unsafePerformIO IO VSem
newVSem
{-# NOINLINE parallelExecVSem #-}

newParallelExec :: IO ParallelExec
newParallelExec :: IO ParallelExec
newParallelExec =
   do
      Chan (IO ())
chan <- IO (Chan (IO ()))
forall a. IO (Chan a)
newChan
      let
         parallelExecThread0 :: IO b
parallelExecThread0 =
            do
               IO ()
act <- Chan (IO ()) -> IO (IO ())
forall a. Chan a -> IO a
readChan Chan (IO ())
chan
               Either Dyn ()
result <- IO () -> IO (Either Dyn ())
forall e a. Exception e => IO a -> IO (Either e a)
try IO ()
act
               case Either Dyn ()
result of
                  Left Dyn
excep -> String -> IO ()
putStrLn (String
"Exception detected: "
                     String -> String -> String
forall a. [a] -> [a] -> [a]
++ Dyn -> String
showException2 Dyn
excep)
                  Right () -> IO ()
forall (m :: * -> *). Monad m => m ()
done
               VSem -> IO ()
releaseLocal VSem
parallelExecVSem

               IO b
parallelExecThread0

         parallelExecThread :: IO ()
parallelExecThread =
            do
               IO Any -> IO (Either String Any)
forall a. IO a -> IO (Either String a)
addSimpleFallOut IO Any
forall b. IO b
parallelExecThread0
               IO ()
forall (m :: * -> *). Monad m => m ()
done

      IO () -> IO ThreadId
forkIODebug IO ()
parallelExecThread
      ParallelExec -> IO ParallelExec
forall (m :: * -> *) a. Monad m => a -> m a
return (Chan (IO ()) -> ParallelExec
ParallelExec Chan (IO ())
chan)

parallelExec :: ParallelExec -> IO () -> IO ()
parallelExec :: ParallelExec -> IO () -> IO ()
parallelExec (ParallelExec Chan (IO ())
chan) IO ()
act =
   do
      VSem -> IO ()
acquireLocal VSem
parallelExecVSem
      Chan (IO ()) -> IO () -> IO ()
forall a. Chan a -> a -> IO ()
writeChan Chan (IO ())
chan IO ()
act

-- | Creates a new sink which executes actions in a parallelExec thread.
newParallelSink :: (x -> IO ()) -> IO (Sink x)
newParallelSink :: (x -> IO ()) -> IO (Sink x)
newParallelSink x -> IO ()
action =
   do
      ParallelExec
parallelX <- IO ParallelExec
newParallelExec
      SinkID
sinkID <- IO SinkID
newSinkID
      SinkID -> (x -> IO ()) -> IO (Sink x)
forall x. SinkID -> (x -> IO ()) -> IO (Sink x)
newSinkGeneral SinkID
sinkID (\ x
delta -> ParallelExec -> IO () -> IO ()
parallelExec ParallelExec
parallelX (
         do
            Bool
interested <- SinkID -> IO Bool
isInterested SinkID
sinkID
            if Bool
interested then x -> IO ()
action x
delta else IO ()
forall (m :: * -> *). Monad m => m ()
done
         ))

-- | Creates a new sink which executes actions in a parallelExec thread,
-- but allow the function generating these actions to be specified later,
-- via the returned command.
newParallelDelayedSink :: IO (Sink x,(x -> IO ()) -> IO ())
newParallelDelayedSink :: IO (Sink x, (x -> IO ()) -> IO ())
newParallelDelayedSink =
   do
      MVar (x -> IO ())
actionMVar <- IO (MVar (x -> IO ()))
forall a. IO (MVar a)
newEmptyMVar
      ParallelExec
parallelX <- IO ParallelExec
newParallelExec
      SinkID
sinkID <- IO SinkID
newSinkID

      Sink x
sink <- SinkID -> (x -> IO ()) -> IO (Sink x)
forall x. SinkID -> (x -> IO ()) -> IO (Sink x)
newSinkGeneral SinkID
sinkID (\ x
delta -> ParallelExec -> IO () -> IO ()
parallelExec ParallelExec
parallelX (
         do
            Bool
interested <- SinkID -> IO Bool
isInterested SinkID
sinkID
            if Bool
interested
               then
                  do
                     x -> IO ()
action <- MVar (x -> IO ()) -> IO (x -> IO ())
forall a. MVar a -> IO a
readMVar MVar (x -> IO ())
actionMVar
                     x -> IO ()
action x
delta
               else
                  IO ()
forall (m :: * -> *). Monad m => m ()
done
         ))

      (Sink x, (x -> IO ()) -> IO ())
-> IO (Sink x, (x -> IO ()) -> IO ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Sink x
sink,MVar (x -> IO ()) -> (x -> IO ()) -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (x -> IO ())
actionMVar)