{-|
  /1.0 Introduction/

  Typically, a thread pool is a set of execution contexts that will execute
  tasks from an input queue.  Thread pools are used to parallize
  the processing of incoming work across all available CPUs without going
  through the expense of starting a new thread for every new task.
 
  In 'Control.Engine' you will find a somewhat unique implementation.  The
  'Engine' is not only a set of threads running a common mutator on the input
  queue, placing results on an output queue, but also include hooks, task
  injection, and state management.

 /1.1 System Figure/

 @
   One input            Configurable number            One output
   thread               of worker threads              thread
  +--------+  chan1 +------------------------+ chan2  +---------+
  | In Hks +  --->  | PreMH, Mutator, PostMH | -----> | Out Hks |
  +--------+        +------------------------+        +---------+
       ^                 ^                               ^
       |                 |                               |
       |                 |    Comms via an IO Ref        |
       +-----------------+-------------------------------+
                         |
                   +------------+
                   | State TVar |
                   +------------+
                   One thread monitoring
                   the TVar and updating
                   the IORef
 @

  /2.0 Queues :: (BoundedChan a)/ - from "Control.Concurrent.BoundedChan".

  The system uses two primary queues.  One for transporting data from
  Input hooks to the mutator (chan1), one for data from the mutator to the
  output hooks (chan2).  These channels are size-bounded - which is needed
  mostly due to the inflexibility of the GHC scheduler.

  /3.0 Hooks :: (a -> IO Maybe a)/

  Hooks can be added and removed during execution without creating a new
  engine. They allow the developer to modify tasks:

   * Input hooks - prior to parallization (for sequential preprocessing)

   * Pre-Mutator hooks - in parallel, prior to main mutation funciton

   * Post-Mutator hooks - in parallel, after mutation function

   * Output hooks - post parallization (for sequential post processing)

  A hook returning "Nothing" causes the job or result to be dropped
  (it does not propogate any further).

  /4.0 Injection/

  One injection point allows injection of a result that had no preceding task -
  thus the result is only seen by the output hooks; this uses chan2. Another 
  injector allows the input hooks to be bypassed; this uses chan1.  See the
  above figure for channels wrt the hooks and mutator.
 
  /5.0 State Management/

  Control-Engine manages state for you.  Semantically, all workers and hooks
  will see a correct state but it won't always be the most recent or consistent
  between threads.

  The stateManager waits for any updates to the mutator state or hooks.  If any
  modifications are made then the new set of hooks or state is provided
  to the workers.  Correctness is handled by keeping the master copies as
  'TVars'.  While the mutators and hooks read state from an 'IORef' to avoid
  contention.

  The thinking here is that changing the hooks and state is a rare / low
  contention action while the need for this information will be constant
  and performance critical.  How successful this stratagy is has yet to
  be shown.
 -}

module Control.Engine
	(
	-- * Main functions
	  initSimpleEngine
	, initSimpleEngineIO
	, initEngine
	, Engine(..)
	-- * Hooks
	, Hook(..)
	, addInputHook
	, addOutputHook
	, addPreMutateHook
	, addPostMutateHook
	, delInputHook
	, delOutputHook
	, delPreMutateHook
	, delPostMutateHook
	-- * Injectors
	, injectPreMutator
	, injectPostMutator
	) where

import Control.Concurrent (forkIO)
import Control.Concurrent.STM
import Control.Concurrent.BoundedChan
import Control.Monad
import Data.IORef
import Data.List (insert)

-- |An 'Engine' represents a pool of threads ready to execute tasks.
data Engine job result state =
    Eng { chan1			:: BoundedChan job
	, chan2			:: BoundedChan result
	, tvInHook		:: TVar [Hook state job]
	, tvPreMutateHook	:: TVar [Hook state job]
	, tvPostMutateHook	:: TVar [Hook state result]
	, tvOutHook		:: TVar [Hook state result]
	, state			:: TVar state
    }

data RefEngine job result state =
	RefEng
	{ refInHook		:: IORef [Hook state job]
	, refPreMutateHook	:: IORef [Hook state job]
	, refPostMutateHook	:: IORef [Hook state result]
	, refOutHook		:: IORef [Hook state result]
	, refState		:: IORef state
    }

-- |If all you want is a basic thread pool, this will work.
-- You should consider using "Control.ThreadPool" instead.
--
-- Evaluation of the result is forced using seq.
-- Input, output, and intermediate channels are length bounded to a multiple
-- of the number of workers.
initSimpleEngine :: Int -> (job -> result) -> IO (BoundedChan job, BoundedChan result)
initSimpleEngine nr mutator = do
	input <- newBoundedChan chanBound
	output <- newBoundedChan chanBound
	let m = const (return . Just . mutator)
	initEngine nr chanBound (readChan input) (\o -> o `seq` writeChan output o) m ()
	return (input, output)
  where
  chanBound = nr * 8

-- |Simpler than calling 'initEngine', but it allows no state or interaction
-- with the hooks and injectors. No strictness is forced.
--
-- Input, output, and intermediate channels are length bounded to a multiple
-- of the number of workers.
initSimpleEngineIO :: Int -> (job -> IO result) -> IO (BoundedChan job, BoundedChan result)
initSimpleEngineIO nr mutator = do
	input <- newBoundedChan chanBound
	output <- newBoundedChan chanBound
	let m = (\_ j -> mutator j >>= return . Just)
	initEngine nr chanBound (readChan input) (writeChan output) m ()
	return (input, output)
  where
  chanBound = nr * 8

-- |To initilize an engine you must provide:
--
--    * the number of threads
--
--    * the maxiumum channel size for intermediate channels
--
--    * an action that will get the input
--
--    * an action that will consume output
--
--    * a mutator function to perform on all inputs
--
--    * an initial state for the mutator function
--
-- No strictness is forced - be sure you force evaluation if wanted.
-- All hooks start out empty.
initEngine :: (Eq st) => 
        Int ->
        Int ->
        (IO job) ->
        (result -> IO ()) ->
        (st -> job -> IO (Maybe result)) ->
        st ->
        IO (Engine job result st)
initEngine nrWorkers chanBound input output mutator initialState = do
	c1 <- newBoundedChan chanBound
	c2 <- newBoundedChan chanBound

	inputHooks       <- newIORef []
	outputHooks      <- newIORef []
	preMutatorHooks  <- newIORef []
	postMutatorHooks <- newIORef []
	refState         <- newIORef initialState

	ch1tv <- newTVarIO []
	ch2tv <- newTVarIO []
	ch3tv <- newTVarIO []
	ch4tv <- newTVarIO []
	stv    <- newTVarIO initialState

	let engine = Eng c1 c2 ch1tv ch2tv ch3tv ch4tv stv
	    eref = RefEng inputHooks outputHooks preMutatorHooks postMutatorHooks refState

	forkIO $ inputManager input c1 eref
	forkIO $ outputManager output c2 eref
	forkIO $ stateManager engine eref
	forM_ [1..nrWorkers] $ \_ -> forkIO (worker c1 eref mutator c2)
	return engine

worker :: BoundedChan job -> RefEngine job result st -> (st -> job -> IO (Maybe result)) -> BoundedChan result -> IO ()
worker c1 eref mutator c2 = forever $ readChan c1 >>= worker'
  where
  worker' msg = do
	preMH  <- readIORef (refPreMutateHook eref)
	postMH <- readIORef (refPostMutateHook eref)
	st     <- readIORef (refState eref)

	-- run hook1, mutator, hook2
	msg' <- runHooks preMH st msg
	out  <- runStage (mutator st) msg'
	out' <- runStage (runHooks postMH st) out

	case out' of
		Nothing -> return ()
		Just o  -> writeChan c2 o
  runStage :: (a -> IO (Maybe b)) -> Maybe a -> IO (Maybe b)
  runStage _ Nothing = return Nothing
  runStage stage (Just a) = stage a

stateManager :: (Eq st) => Engine job result st -> RefEngine job result st -> IO ()
stateManager eng eref = atomically readState >>= updateState
  where
  readState = do
		s  <- readTVar (state eng)
		ih <- readTVar (tvInHook eng)
		eh <- readTVar (tvPreMutateHook eng)
		th <- readTVar (tvPostMutateHook eng)
		oh <- readTVar (tvOutHook eng)
		return (s,ih, eh, th, oh)
  updateState curr@(s, ih, eh, th, oh) = do
	writeIORef (refState eref) s
	writeIORef (refInHook eref) ih
	writeIORef (refPreMutateHook eref) eh
	writeIORef (refPostMutateHook eref) th
	writeIORef (refOutHook eref) oh
	new <- atomically $ do
		x <- readState
		when (x == curr) retry
		return x
	updateState new

-- Input.hs
inputManager :: (IO job) -> BoundedChan job -> RefEngine job result st -> IO ()
inputManager input outChan eref = forever $ input >>= handleMsg
  where
  handleMsg msg = do
	hook <- readIORef (refInHook eref)
	s <- readIORef (refState eref)
	new <- runHooks hook s msg
	case new of
		Just m -> writeChan outChan m
		Nothing -> return ()

-- Output.hs
outputManager :: (result -> IO ()) -> BoundedChan result -> RefEngine job result state -> IO ()
outputManager output msgChan eref = forever $ do
	m <- readChan msgChan
	hook <- readIORef (refOutHook eref)
	s <- readIORef (refState eref)
	new <- runHooks hook s m
	case new of
		Just n -> output n
		Nothing -> return ()

-- Hooks.hs
-- |A hook is simply a mutation on the task.  The priority is used to order
-- hook execution (lower value priorites happen first).  For accounting and to
-- remove old hooks the description field is used.
data Hook st msg = Hk
		{ hkFunc	:: st -> msg -> IO (Maybe msg)
		, hkPriority	:: Int
		, hkDescription :: String
	}

instance Eq (Hook m s) where
	(Hk _ p d) == (Hk _ p' d') = p == p' && d == d'

instance Ord (Hook a s) where
	(Hk _ p d) `compare` (Hk _ p' d') = (p,d) `compare` (p', d')

instance Show (Hook a s) where
	show (Hk _ p d) = d ++ " Priority = " ++ (show p)
	showsPrec _ (Hk _ p d) = (++) ("Hk { hkFunc = undefined, p = " ++ (show p) ++ " , hkDescription = " ++ d ++ " } ")

runHooks :: [Hook st msg] -> st -> msg -> IO (Maybe msg)
runHooks hooks st m = foldM apply (Just m) hooks
  where
  apply Nothing  f = return Nothing
  apply (Just a) f = hkFunc f st a

-- |Adds a hook that will be performed in serial on all jobs added to
-- the input queue.
addInputHook :: Engine job result state -> Hook state job -> IO ()
addInputHook e h = atomically $ do
	readTVar (tvInHook e) >>= writeTVar (tvInHook e) . insert h

-- |Adds a hook that will be performed in serial on all results
-- before they are added to the output queue.
addOutputHook :: Engine job result state -> Hook state result -> IO ()
addOutputHook e h = atomically $ do
	readTVar (tvOutHook e) >>= writeTVar (tvOutHook e) . insert h

-- |Adds a hook that will be performed in parallel before the main mutator
-- function.
addPreMutateHook :: Engine job result state -> Hook state job -> IO ()
addPreMutateHook e h = atomically $ do
	readTVar (tvPreMutateHook e) >>= writeTVar (tvPreMutateHook e) . insert h

-- |Adds a hook that will be performed in parallel after the main mutator
-- function.
addPostMutateHook :: Engine job result state -> Hook state result -> IO ()
addPostMutateHook e h = atomically $ do
	readTVar (tvPostMutateHook e) >>= writeTVar (tvPostMutateHook e) . insert h

-- |Deletes all input hooks matching the provided desciption
delInputHook :: Engine j r s -> String -> IO ()
delInputHook e s = atomically $ do
	readTVar (tvInHook e) >>= writeTVar (tvInHook e) . filter ( (/= s) . hkDescription)

-- |Deletes all pre-mutate hooks matching the provided desciption
delPreMutateHook :: Engine j r s -> String -> IO ()
delPreMutateHook e s = atomically $ do
	readTVar (tvPreMutateHook e) >>= writeTVar (tvPreMutateHook e) . filter ( (/= s) . hkDescription)

-- |Deletes all post-mutate hooks matching the provided desciption
delPostMutateHook :: Engine j r s -> String -> IO ()
delPostMutateHook e s = atomically $ do
	readTVar (tvPostMutateHook e) >>= writeTVar (tvPostMutateHook e) . filter ( (/= s) . hkDescription)

-- |Deletes all output hooks matching the provided desciption
delOutputHook :: Engine j r s -> String -> IO ()
delOutputHook e s = atomically $ do
	readTVar (tvOutHook e) >>= writeTVar (tvOutHook e) . filter ( (/= s) . hkDescription)

-- Inject.hs
-- | Allows adding tasks that bypass the input hooks.
injectPreMutator :: Engine j r s -> j -> IO ()
injectPreMutator eng i = writeChan (chan1 eng) i

-- | Allows bypassing the mutator, meaning a 'result' can be produced without a task.
-- This still hits the output hooks.
injectPostMutator :: Engine j r s -> r -> IO ()
injectPostMutator eng o = writeChan (chan2 eng) o