{-| Implemented here is a thread pool library on crack.
 
  1.0 Introduction
  Typically, a thread pool is a set of execution contexts that will execute
  tasks from an input queue.  Typically, thread pools are used to parallize
  the processing of incoming work across all available CPUs without going
  through the expense of starting a new thread for every new task.
 
  In 'Control.Engine' you will find a somewhat unique implementation.  The
  'Engine' is not only a set of threads running a common mutator on the input
  queue, producing an output queue, but also include hooks, task injection, and
  state management.

  Queues :: (Chan a)
  From "Control.Concurrent.Chan". 

  Hooks :: (a -> IO Maybe a)
  Hooks can be added and removed during execution without creating a new
  engine. They allow the developer to modify tasks:

   * prior to parallization (for sequential preprocessing)

   * in parallel, prior to main mutation funciton

   * in parallel, after mutation function

   * post parallization (for sequential post processing)
 
  State Management
  The stateManager waits for any updates to the mutator state or hooks.  If any
  modifications are made then the new set of hooks (or state) is provided
  to the workers.  Correctness is handled by keeping the master copies as TVars
  ("Control.Concurrent.STM").  While the mutators and hooks read from an 'MVar'
  ("Control.Concurrent.MVar") to avoid contention.

  The thinking here is that changing the hooks and state is a rare / low
  contention action while the need for this information will be constant
  and performance critical.  How successful this stratagy is has yet to
  be shown.
 
  Injection
  One injection point allows injection of a result that had no preceding
  task.  The second injector allows the initial hooks ('Input' hooks) to be
  bypassed.
 -}

module Control.Engine
	(
	-- * Main functions
	  initSimpleEngine
	, initSimpleEngineIO
	, initEngine
	, Engine(..)
	-- * Hooks
	, Hook(..)
	, HookLoc(..)
	, addInputHook
	, addOutputHook
	, addPreMutateHook
	, addPostMutateHook
	, delInputHook
	, delOutputHook
	, delPreMutateHook
	, delPostMutateHook
	-- * Injectors
	, injectPreMutator
	, injectPostMutator
	) where

import Control.Concurrent
import Control.Concurrent.MVar
import Control.Concurrent.STM
import Control.Concurrent.Chan
import Control.Monad
import Data.List (insert)

-- |An 'Engine' represents a pool of threads ready to execute tasks.
data Engine job result state =
    Eng { chan1			:: Chan job
	, chan2			:: Chan result
	, tvInHook		:: TVar [Hook state job]
	, tvPreMutateHook	:: TVar [Hook state job]
	, tvPostMutateHook	:: TVar [Hook state result]
	, tvOutHook		:: TVar [Hook state result]
	, mvInHook		:: MVar [Hook state job]
	, mvPreMutateHook	:: MVar [Hook state job]
	, mvPostMutateHook	:: MVar [Hook state result]
	, mvOutHook		:: MVar [Hook state result]
	, state			:: TVar state
    }

-- |If all you want is a basic thread pool, this will work.
-- You should consider using "Control.ThreadPool" instead.
--
-- Evaluation of the result is forced using seq.
initSimpleEngine :: Int -> (job -> result) -> IO (Chan job, Chan result)
initSimpleEngine nr mutator = do
	input <- newChan
	output <- newChan
	let m = const (return . Just . mutator)
	initEngine nr (readChan input) (\o -> o `seq` writeChan output o) m ()
	return (input, output)

-- |Simpler than calling 'initEngine', but it allows no state or interaction
-- with the hooks and injectors. No strictness is forced.
initSimpleEngineIO :: Int -> (job -> IO result) -> IO (Chan job, Chan result)
initSimpleEngineIO nr mutator = do
	input <- newChan
	output <- newChan
	let m = (\_ j -> mutator j >>= return . Just)
	initEngine nr (readChan input) (writeChan output) m ()
	return (input, output)

-- |To initilize an engine you must provide:
--
--    * the number of threads
--
--    * an action that will get the input
--
--    * an action that will consume output
--
--    * a mutator function to perform on all inputs
--
--    * an initial state for the mutator function
--
-- No strictness is forced - be sure you force evaluation if wanted.
-- All hooks start out empty.
initEngine :: (Eq st) => Int -> (IO job) -> (result -> IO ()) -> (st -> job -> IO (Maybe result)) -> st -> IO (Engine job result st)
initEngine nrWorkers input output mutator initialState = do
	c1 <- newChan
	c2 <- newChan

	inputHooks <- newMVar []
	outputHooks <- newMVar []
	preMutatorHooks <- newMVar []
	postMutatorHooks <- newMVar []

	ms <- newMVar initialState
	tv <- newTVarIO initialState

	ch1tv <- newTVarIO []
	ch2tv <- newTVarIO []
	ch3tv <- newTVarIO []
	ch4tv <- newTVarIO []

	let engine = Eng c1 c2 ch1tv ch2tv ch3tv ch4tv inputHooks preMutatorHooks postMutatorHooks outputHooks tv

	forkIO $ inputManager input c1 inputHooks ms
	forkIO $ outputManager output c2 outputHooks ms
	forkIO $ stateManager engine ms
	forM_ [1..nrWorkers] $ \_ -> forkIO $ worker c1 preMutatorHooks ms mutator postMutatorHooks c2
	return engine

worker :: Chan job -> MVar [Hook st job] -> MVar st -> (st -> job -> IO (Maybe result)) -> MVar [Hook st result] -> Chan result -> IO ()
worker c1 preMutatorHooks ms mutator postMutatorHooks c2 = forever $ worker'
  where
  worker' = do
	-- Get next message, newest hooks, and state
	msg <- readChan c1
	preMH <- readMVar preMutatorHooks
	postMH <- readMVar postMutatorHooks
	st <- readMVar ms

	-- run hook1, mutator, hook2
	msg' <- runHooks preMH st msg
	out  <- runStage (mutator st) msg'
	out' <- runStage (runHooks postMH st) out

	case out' of
		Nothing -> return ()
		Just o  -> writeChan c2 o
  runStage :: (a -> IO (Maybe b)) -> Maybe a -> IO (Maybe b)
  runStage _ Nothing = return Nothing
  runStage stage (Just a) = stage a

-- FIXME, clean up this redundant code - its ugly!
stateManager :: (Eq st) => Engine job result st -> MVar st -> IO ()
stateManager eng ms = do
	curr@(s,ih,eh,th,oh) <- atomically $ do
			s  <- readTVar (state eng)
			ih <- readTVar (tvInHook eng)
			eh <- readTVar (tvPreMutateHook eng)
			th <- readTVar (tvPostMutateHook eng)
			oh <- readTVar (tvOutHook eng)
			return (s,ih, eh, th, oh)
	swapMVar' ms s
	swapMVar' (mvInHook eng) ih
	swapMVar' (mvPreMutateHook eng) eh
	swapMVar' (mvPostMutateHook eng) th
	swapMVar' (mvOutHook eng) oh
	updateState curr
  where
  swapMVar' m v = swapMVar m v >> return ()
  updateState (s, ih, eh, th, oh) = do
	new@(s', ih', eh', th', oh') <- atomically $ do
		s'  <- readTVar (state eng)
		ih' <- readTVar (tvInHook eng)
		eh' <- readTVar (tvPreMutateHook eng)
		th' <- readTVar (tvPostMutateHook eng)
		oh' <- readTVar (tvOutHook eng)
		when (s' == s && ih' == ih && eh' == eh && th' == th && oh' == oh) retry
		return (s', ih', eh', th', oh')
	when (s' /= s)   (swapMVar ms s' >> return ())
	when (ih' /= ih) (swapMVar (mvInHook eng) ih' >> return ())
	when (eh' /= eh) (swapMVar (mvPreMutateHook eng) eh' >> return ())
	when (th' /= th) (swapMVar (mvPostMutateHook eng) th' >> return ())
	when (oh' /= oh) (swapMVar (mvOutHook eng) oh' >> return ())
	updateState new

-- Input.hs
inputManager :: (IO msg) -> Chan msg -> MVar [Hook st msg] -> MVar st -> IO ()
inputManager input outChan hookMV stMV= forever $ input >>= handleMsg
  where
  handleMsg msg = do
	hook <- readMVar hookMV
	s <- readMVar stMV
	new <- runHooks hook s msg
	case new of
		Just m -> writeChan outChan m
		Nothing -> return ()

-- Output.hs
outputManager :: (result -> IO ()) -> Chan result -> MVar [Hook st result] -> MVar st -> IO ()
outputManager output msgChan hookMV stMV = forever $ do
	m <- readChan msgChan
	hook <- readMVar hookMV
	s <- readMVar stMV
	new <- runHooks hook s m
	case new of
		Just n -> output n
		Nothing -> return ()

-- Hooks.hs
-- |A hook is simply a mutation on the task.  The priority is used to order
-- hook execution (lower value priorites happen first).  For accounting and to
-- remove old hooks the description field is used.
data Hook st msg = Hk
		{ hkFunc	:: st -> msg -> IO (Maybe msg)
		, hkPriority	:: Int
		, hkDescription :: String
	}

instance Eq (Hook m s) where
	(Hk _ p d) == (Hk _ p' d') = p == p' && d == d'

instance Ord (Hook a s) where
	(Hk _ p _) `compare` (Hk _ p' _) = p `compare` p'

instance Show (Hook a s) where
	show (Hk _ p d) = d ++ " Priority = " ++ (show p)
	showsPrec _ (Hk _ p d) = (++) ("Hk { hkFunc = undefined, p = " ++ (show p) ++ " , hkDescription = " ++ d ++ " } ")

data HookLoc = InputHook | PreMutateHook | PostMutateHook | OutputHook deriving (Eq, Ord, Show)

runHooks :: [Hook st msg] -> st -> msg -> IO (Maybe msg)
runHooks hooks st m = foldM apply (Just m) hooks
  where
  apply Nothing  f = return Nothing
  apply (Just a) f = hkFunc f st a

-- |Adds a hook that will be performed in serial on all jobs added to
-- the input queue.
addInputHook :: Engine job result state -> Hook state job -> IO ()
addInputHook e h = atomically $ do
	readTVar (tvInHook e) >>= writeTVar (tvInHook e) . insert h

-- |Adds a hook that will be performed in serial on all results
-- before they are added to the output queue.
addOutputHook :: Engine job result state -> Hook state result -> IO ()
addOutputHook e h = atomically $ do
	readTVar (tvOutHook e) >>= writeTVar (tvOutHook e) . insert h

-- |Adds a hook that will be performed in parallel before the main mutator
-- function.
addPreMutateHook :: Engine job result state -> Hook state job -> IO ()
addPreMutateHook e h = atomically $ do
	readTVar (tvPreMutateHook e) >>= writeTVar (tvPreMutateHook e) . insert h

-- |Adds a hook that will be performed in parallel after the main mutator
-- function.
addPostMutateHook :: Engine job result state -> Hook state result -> IO ()
addPostMutateHook e h = atomically $ do
	readTVar (tvPostMutateHook e) >>= writeTVar (tvPostMutateHook e) . insert h

-- |Deletes all input hooks matching the provided desciption
delInputHook :: Engine j r s -> String -> IO ()
delInputHook e s = atomically $ do
	readTVar (tvInHook e) >>= writeTVar (tvInHook e) . filter ( (/= s) . hkDescription)

-- |Deletes all pre-mutate hooks matching the provided desciption
delPreMutateHook :: Engine j r s -> String -> IO ()
delPreMutateHook e s = atomically $ do
	readTVar (tvPreMutateHook e) >>= writeTVar (tvPreMutateHook e) . filter ( (/= s) . hkDescription)

-- |Deletes all post-mutate hooks matching the provided desciption
delPostMutateHook :: Engine j r s -> String -> IO ()
delPostMutateHook e s = atomically $ do
	readTVar (tvPostMutateHook e) >>= writeTVar (tvPostMutateHook e) . filter ( (/= s) . hkDescription)

-- |Deletes all output hooks matching the provided desciption
delOutputHook :: Engine j r s -> String -> IO ()
delOutputHook e s = atomically $ do
	readTVar (tvOutHook e) >>= writeTVar (tvOutHook e) . filter ( (/= s) . hkDescription)

-- Inject.hs
-- | Allows adding tasks that bypass the input hooks.
injectPreMutator :: Engine j r s -> j -> IO ()
injectPreMutator eng i = writeChan (chan1 eng) i

-- | Allows bypassing the mutator, meaning a 'result' can be produced without a task.
-- This still hits the output hooks.
injectPostMutator :: Engine j r s -> r -> IO ()
injectPostMutator eng o = writeChan (chan2 eng) o