{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE FunctionalDependencies #-}

-- | Very primitive concurrency, this implements a sink, which passes messages
-- along until the receiver is no longer interested.
module Util.Sink(
   HasInvalidate(..),

   SinkID,
   newSinkID,

   Sink,
   newSink,
   newSinkGeneral,
   newParallelSink,
   newParallelDelayedSink,

   putSink,
   putSinkMultiple,
   coMapSink,
   coMapSink',
   coMapIOSink',

   CanAddSinks(..),
   addNewAction,

   ParallelExec,
   newParallelExec,
   parallelExec,
   parallelExecVSem,
   ) where

import Control.Concurrent
import Control.Exception (try)
import System.IO.Unsafe
import Data.IORef

import Util.Object
import Util.ExtendedPrelude
import Util.Thread
import Util.Computation (done)
import Util.VSem

-- -------------------------------------------------------------------------
-- The HasInvalidate
-- -------------------------------------------------------------------------

-- | The HasInvalidate class represents information sources which can be told
-- \"No more, I\'m not interested.\"
class HasInvalidate source where
   invalidate :: source -> IO ()

-- -------------------------------------------------------------------------
-- SinkID
-- -------------------------------------------------------------------------

--
-- A SinkID identifies the consumer and whether the consumer is still
-- interested.
data SinkID = SinkID {
   oID :: ObjectID,
   interested :: IORef Bool
   }

newSinkID :: IO SinkID
newSinkID =
   do
      oID <- newObject
      interested <- newIORef True
      return (SinkID {
         oID = oID,
         interested = interested
         })

-- | Returns True if sink is still interested
isInterested :: SinkID -> IO Bool
isInterested sinkID = readIORef (interested sinkID)

instance HasInvalidate SinkID where
   invalidate sinkID = writeIORef (interested sinkID) False

instance Eq SinkID where
   (==) sinkID1 sinkID2 = (oID sinkID1) == (oID sinkID2)

instance Ord SinkID where
   compare sinkID1 sinkID2 = compare (oID sinkID1) (oID sinkID2)


-- -------------------------------------------------------------------------
-- Sinks
-- -------------------------------------------------------------------------

data Sink x = Sink {
   sinkID :: SinkID,
   action :: x -> IO ()
   }

-- -------------------------------------------------------------------------
-- The consumer's interface
-- -------------------------------------------------------------------------

-- | Creates a new sink with its own SinkID
newSink :: (x -> IO ()) -> IO (Sink x)
newSink action =
   do
      sinkID <- newSinkID
      newSinkGeneral sinkID action

-- | Creates a new sink with a given SinkID.  This allows us to
-- invalidate lots of sinks just by invalidating one sinkID.
newSinkGeneral :: SinkID -> (x -> IO ()) -> IO (Sink x)
newSinkGeneral sinkID action = return (Sink {sinkID = sinkID,action = action})

-- | Or we can do so with HasInvalidate
instance HasInvalidate (Sink x) where
   invalidate sink = invalidate (sinkID sink)

-- -------------------------------------------------------------------------
-- The provider's interface
-- -------------------------------------------------------------------------

-- | Put a value into the sink, returning False if the sink id has been
-- invalidated.
putSink :: Sink x -> x -> IO Bool
putSink sink x =
   do
      interested <- isInterested (sinkID sink)
      if interested then (action sink x) else done
      return interested
-- | Put a list of values into the sink, returning False if the sink id has been
-- invalidated
putSinkMultiple :: Sink x -> [x] -> IO Bool
putSinkMultiple sink [] = return True
putSinkMultiple sink (x:xs) =
   do
      interested <- putSink sink x
      if interested
         then
            putSinkMultiple sink xs
         else
            return interested

-- | Convert a sink from one type to another
coMapSink :: (y -> x) -> Sink x -> Sink y
coMapSink fn (Sink {sinkID = sinkID,action = action}) =
   Sink {sinkID = sinkID,action = action . fn}

-- | Another version which allows a transformation function to filter
-- certain elements
coMapSink' :: (y -> Maybe x) -> Sink x -> Sink y
coMapSink' fn (Sink {sinkID = sinkID,action = action}) =
   let
      action' y = case fn y of
         Nothing -> done
         Just x -> action x
   in
      Sink {sinkID = sinkID,action = action'}

-- | A version which allows an IO action, which had better not take too long.
coMapIOSink' :: (y -> IO (Maybe x)) -> Sink x -> Sink y
coMapIOSink' actFn (Sink {sinkID = sinkID,action = action}) =
    let
       action' y =
          do
             xOpt <- actFn y
             case xOpt of
                Nothing -> done
                Just x -> action x
    in
       Sink {sinkID = sinkID,action = action'}

-- -------------------------------------------------------------------------
-- The CanAddSinks class.
-- -------------------------------------------------------------------------

-- | A class for things (in particular Source and SimpleSource) that can
-- output via sinks.  Each sink source is supposed to have a unique
-- x, containing a representation of the current value, and delta,
-- containing the (incremental) updates which are put in the sink.
-- Only the addOrdSink function must be defined by instances.
class CanAddSinks sinkSource x delta | sinkSource -> x,sinkSource -> delta
      where
   ---
   -- Create and add a new sink containing the given action.
   addNewSink :: sinkSource -> (delta -> IO ()) -> IO (x,Sink delta)
   addNewSink sinkSource action =
      do
         parallelX <- newParallelExec
         addNewQuickSink sinkSource
            (\ delta -> parallelExec parallelX (action delta))

   ---
   -- Like addNewSink, but use the supplied SinkID
   addNewSinkGeneral :: sinkSource -> (delta -> IO ()) -> SinkID
      -> IO (x,Sink delta)
   addNewSinkGeneral sinkSource action sinkID =
      do
         parallelX <- newParallelExec
         addNewSinkVeryGeneral sinkSource action sinkID parallelX

   ---
   -- Like addNewQuickSink, but use the supplied ParallelExec as well
   addNewSinkVeryGeneral :: sinkSource -> (delta -> IO ()) -> SinkID
      -> ParallelExec -> IO (x,Sink delta)
   addNewSinkVeryGeneral sinkSource action sinkID parallelX =
      addNewQuickSinkGeneral
         sinkSource
         (\ delta -> parallelExec parallelX (
            do
               -- add an extra check here to prevent surplus queued actions
               -- being performed after the sink has been invalidated.
               interested <- isInterested sinkID
               if interested then action delta else done
            ))
         sinkID

   ---
   -- Like addNewSinkVeryGeneral, but compute an action from the x value which
   -- is performed in the parallelExec thread first of all.
   addNewSinkWithInitial :: sinkSource -> (x -> IO ()) -> (delta -> IO ())
      -> SinkID -> ParallelExec -> IO (x,Sink delta)
   addNewSinkWithInitial sinkSource xAction deltaAction sinkID parallelX =
      do
         mVar <- newEmptyMVar
         let
            firstAct =
               do
                  x <- takeMVar mVar
                  xAction x
         parallelExec parallelX firstAct
         (returnValue @ (x,sink))
            <- addNewSinkVeryGeneral sinkSource deltaAction sinkID parallelX
         putMVar mVar x
         return returnValue

   ---
   -- Like addNewSink, but the action is guaranteed to terminate quickly
   -- and normally.
   addNewQuickSink :: sinkSource -> (delta -> IO ()) -> IO (x,Sink delta)
   addNewQuickSink sinkSource action =
      do
         sink <- newSink action
         x <- addOldSink sinkSource sink
         return (x,sink)

   ---
   -- Like addNewQuickSink, but use the supplied SinkID
   addNewQuickSinkGeneral :: sinkSource -> (delta -> IO ()) -> SinkID
      -> IO (x,Sink delta)
   addNewQuickSinkGeneral sinkSource action sinkID =
      do
         sink <- newSinkGeneral sinkID action
         x <- addOldSink sinkSource sink
         return (x,sink)

   ---
   -- Adds a pre-existing sink.
   addOldSink :: sinkSource -> Sink delta -> IO x

-- | Add an action to a sinkSource which is performed until the action returns
-- False.
addNewAction :: CanAddSinks sinkSource x delta
   => sinkSource -> (delta -> IO Bool) -> IO x
addNewAction sinkSource action =
   do
      sinkMVar <- newEmptyMVar
      let
         deltaAct delta =
            do
               continue <- action delta
               if continue
                  then
                     done
                  else
                     do
                        sink <- takeMVar sinkMVar
                        invalidate sink
                        simpleFallOut ""

      (x,sink) <- addNewSink sinkSource deltaAct
      putMVar sinkMVar sink
      return x

-- -------------------------------------------------------------------------
-- A ParallelExec executes actions concurrently in a separate thread
--
-- Apart from (probably) being cheaper than forking off a new thread
-- each time, it also guarantees the order of the actions.
--
-- The Thread can be stopped with simpleFallOut.
--
-- We also provide a VSem which is locked locally when a parallelExec action
-- is pending.
-- -------------------------------------------------------------------------

newtype ParallelExec = ParallelExec (Chan (IO ()))

parallelExecVSem :: VSem
parallelExecVSem = unsafePerformIO newVSem
{-# NOINLINE parallelExecVSem #-}

newParallelExec :: IO ParallelExec
newParallelExec =
   do
      chan <- newChan
      let
         parallelExecThread0 =
            do
               act <- readChan chan
               result <- try act
               case result of
                  Left excep -> putStrLn ("Exception detected: "
                     ++ showException2 excep)
                  Right () -> done
               releaseLocal parallelExecVSem

               parallelExecThread0

         parallelExecThread =
            do
               addSimpleFallOut parallelExecThread0
               done

      forkIODebug parallelExecThread
      return (ParallelExec chan)

parallelExec :: ParallelExec -> IO () -> IO ()
parallelExec (ParallelExec chan) act =
   do
      acquireLocal parallelExecVSem
      writeChan chan act

-- | Creates a new sink which executes actions in a parallelExec thread.
newParallelSink :: (x -> IO ()) -> IO (Sink x)
newParallelSink action =
   do
      parallelX <- newParallelExec
      sinkID <- newSinkID
      newSinkGeneral sinkID (\ delta -> parallelExec parallelX (
         do
            interested <- isInterested sinkID
            if interested then action delta else done
         ))

-- | Creates a new sink which executes actions in a parallelExec thread,
-- but allow the function generating these actions to be specified later,
-- via the returned command.
newParallelDelayedSink :: IO (Sink x,(x -> IO ()) -> IO ())
newParallelDelayedSink =
   do
      actionMVar <- newEmptyMVar
      parallelX <- newParallelExec
      sinkID <- newSinkID

      sink <- newSinkGeneral sinkID (\ delta -> parallelExec parallelX (
         do
            interested <- isInterested sinkID
            if interested
               then
                  do
                     action <- readMVar actionMVar
                     action delta
               else
                  done
         ))

      return (sink,putMVar actionMVar)