{- | Implemented here is a thread pool library on crack.
 -
 - 1.0 Introduction
 - Typically, a thread pool is a set of execution contexts that will execute
 - tasks from an input queue.  Typically, thread pools are used to parallize
 - the processing of incoming work across all available CPUs without going
 - through the expense of starting a new thread for every new task.
 -
 - In 'Control.Engine' you will find a somewhat unique implementation.  The
 - 'Engine' is not only a set of threads running a common mutator on the input
 - queue, producing an output queue, but also include hooks, task injection, and
 - state management.
 -
 - Hooks :: (a -> IO Maybe a)
 - Hooks can be added and removed during execution without creating a new
 - engine. They allow the developer to modify tasks:
 - * prior to parallization (for sequential preprocessing)
 - * in parallel, prior to main mutation funciton
 - * in parallel, after mutation function
 - * post parallization (for sequential post processing)
 -
 - State Management
 - The stateManager waits for any updates to the mutator state or hooks.  If any
 - modifications are made then the new set of hooks (or state) is provided
 - to the workers.  This allows the state of the entire engine to be atomically
 - modified (it is all STM) but allows the workers to use cheap and quick
 - MVars.
 -
 - The thinking here is that changing the hooks and state is a rare / low
  contention action while the need for this information will be constant
 - and performance critical.
 -
 - Injection
 - One injection point allows injection of a 'result' that had no preceding
 - 'task'.  With another the initial hooks ('Input' hooks) can be bypassed.
 -}

module Control.Engine
	(
	-- * Main functions
	  initSimpleEngine
	, initSimpleEngineIO
	, initEngine
	, Engine(..)
	-- * Hooks
	, Hook(..)
	, HookLoc(..)
	, addInputHook
	, addOutputHook
	, addPreMutateHook
	, addPostMutateHook
	, delInputHook
	, delOutputHook
	, delPreMutateHook
	, delPostMutateHook
	-- * Injectors
	, injectPreMutator
	, injectPostMutator
	) where

import Control.Concurrent
import Control.Concurrent.MVar
import Control.Concurrent.STM
import Control.Concurrent.Chan
import Control.Monad
import Data.List (insert)

-- |An 'Engine' represents a pool of threads ready to execute tasks.
data Engine job result state =
    Eng { chan1			:: Chan job
	, chan2			:: Chan result
	, tvInHook		:: TVar [Hook state job]
	, tvPreMutateHook	:: TVar [Hook state job]
	, tvPostMutateHook	:: TVar [Hook state result]
	, tvOutHook		:: TVar [Hook state result]
	, mvInHook		:: MVar [Hook state job]
	, mvPreMutateHook	:: MVar [Hook state job]
	, mvPostMutateHook	:: MVar [Hook state result]
	, mvOutHook		:: MVar [Hook state result]
	, state			:: TVar state
    }

-- |If all you want is a basic thread pool, this will work.
-- You should consider using Control.ThreadPool instead.
--
-- Evaluation of the result is forced using seq.
initSimpleEngine :: Int -> (job -> result) -> IO (Chan job, Chan result)
initSimpleEngine nr mutator = do
	input <- newChan
	output <- newChan
	let m = const (return . Just . mutator)
	initEngine nr (readChan input) (\o -> o `seq` writeChan output o) m ()
	return (input, output)

-- |Simpler than calling 'initEngine', but it allows no state or interaction
-- with the hooks and injectors. No strictness is forced.
initSimpleEngineIO :: Int -> (job -> IO result) -> IO (Chan job, Chan result)
initSimpleEngineIO nr mutator = do
	input <- newChan
	output <- newChan
	let m = (\_ j -> mutator j >>= return . Just)
	initEngine nr (readChan input) (writeChan output) m ()
	return (input, output)

-- |To initilize an engine you must provide:
--    * the number of threads
--    * an action that will get the input
--    * an action that will consume output
--    * a mutator function to perform on all inputs
--    * an initial state for the mutator function
--
-- No strictness is forced - be sure you force evaluation if wanted.
-- All hooks start out empty.
initEngine :: (Eq st) => Int -> (IO job) -> (result -> IO ()) -> (st -> job -> IO (Maybe result)) -> st -> IO (Engine job result st)
initEngine nrWorkers input output mutator initialState = do
	c1 <- newChan
	c2 <- newChan

	inputHooks <- newMVar []
	outputHooks <- newMVar []
	preMutatorHooks <- newMVar []
	postMutatorHooks <- newMVar []

	ms <- newMVar initialState
	tv <- newTVarIO initialState

	ch1tv <- newTVarIO []
	ch2tv <- newTVarIO []
	ch3tv <- newTVarIO []
	ch4tv <- newTVarIO []

	let engine = Eng c1 c2 ch1tv ch2tv ch3tv ch4tv inputHooks preMutatorHooks postMutatorHooks outputHooks tv

	forkIO $ inputManager input c1 inputHooks ms
	forkIO $ outputManager output c2 outputHooks ms
	forkIO $ stateManager engine ms
	forM_ [1..nrWorkers] $ \_ -> forkIO $ worker c1 preMutatorHooks ms mutator postMutatorHooks c2
	return engine

worker :: Chan job -> MVar [Hook st job] -> MVar st -> (st -> job -> IO (Maybe result)) -> MVar [Hook st result] -> Chan result -> IO ()
worker c1 preMutatorHooks ms mutator postMutatorHooks c2 = forever $ worker'
  where
  worker' = do
	-- Get next message, newest hooks, and state
	msg <- readChan c1
	preMH <- readMVar preMutatorHooks
	postMH <- readMVar postMutatorHooks
	st <- readMVar ms

	-- run hook1, mutator, hook2
	msg' <- runHooks preMH st msg
	out  <- runStage (mutator st) msg'
	out' <- runStage (runHooks postMH st) out

	case out' of
		Nothing -> return ()
		Just o  -> writeChan c2 o
  runStage :: (a -> IO (Maybe b)) -> Maybe a -> IO (Maybe b)
  runStage _ Nothing = return Nothing
  runStage stage (Just a) = stage a

stateManager :: (Eq st) => Engine job result st -> MVar st -> IO ()
stateManager eng ms = do
	curr <- atomically $ do  -- FIXME, clean up this redundant code - its ugly!
			s  <- readTVar (state eng)
			ih <- readTVar (tvInHook eng)
			eh <- readTVar (tvPreMutateHook eng)
			th <- readTVar (tvPostMutateHook eng)
			oh <- readTVar (tvOutHook eng)
			return (s,ih, eh, th, oh)
	updateState curr
  where
  updateState (s, ih, eh, th, oh) = do
	new@(s', ih', eh', th', oh') <- atomically $ do
		s'  <- readTVar (state eng)
		ih' <- readTVar (tvInHook eng)
		eh' <- readTVar (tvPreMutateHook eng)
		th' <- readTVar (tvPostMutateHook eng)
		oh' <- readTVar (tvOutHook eng)
		when (not $ s' == s && ih' == ih && eh' == eh && th' == th && oh' == oh) retry
		return (s', ih', eh', th', oh')
	when (s' /= s)   (swapMVar ms s' >> return ())
	when (ih' /= ih) (swapMVar (mvInHook eng) ih' >> return ())
	when (eh' /= eh) (swapMVar (mvPreMutateHook eng) eh' >> return ())
	when (th' /= th) (swapMVar (mvPostMutateHook eng) th' >> return ())
	when (oh' /= oh) (swapMVar (mvOutHook eng) oh' >> return ())
	updateState new

-- Input.hs
inputManager :: (IO msg) -> Chan msg -> MVar [Hook st msg] -> MVar st -> IO ()
inputManager input outChan hookMV stMV= forever $ input >>= handleMsg
  where
  handleMsg msg = do
	hook <- readMVar hookMV
	s <- readMVar stMV
	new <- runHooks hook s msg
	case new of
		Just m -> writeChan outChan m
		Nothing -> return ()

-- Output.hs
outputManager :: (result -> IO ()) -> Chan result -> MVar [Hook st result] -> MVar st -> IO ()
outputManager output msgChan hookMV stMV = forever $ do
	m <- readChan msgChan
	hook <- readMVar hookMV
	s <- readMVar stMV
	new <- runHooks hook s m
	case new of
		Just n -> output n
		Nothing -> return ()

-- Hooks.hs
-- A hook is simply a mutation on the task.  To order the hooks they all have
-- priorities (lower value priorites happen first).  For accounting and to
-- remove old hooks there is a description field.
data Hook st msg = Hk
		{ hkFunc	:: st -> msg -> IO (Maybe msg)
		, hkPriority	:: Int
		, hkDescription :: String
	}

instance Eq (Hook m s) where
	(Hk _ p d) == (Hk _ p' d') = p == p' && d == d'

instance Ord (Hook a s) where
	(Hk _ p _) `compare` (Hk _ p' _) = p `compare` p'

instance Show (Hook a s) where
	show (Hk _ p d) = d ++ " Priority = " ++ (show p)
	showsPrec _ (Hk _ p d) = (++) ("Hk { hkFunc = undefined, p = " ++ (show p) ++ " , hkDescription = " ++ d ++ " } ")

data HookLoc = InputHook | PreMutateHook | PostMutateHook | OutputHook deriving (Eq, Ord, Show)

runHooks :: [Hook st msg] -> st -> msg -> IO (Maybe msg)
runHooks hooks st m = foldM apply (Just m) hooks
  where
  apply Nothing  f = return Nothing
  apply (Just a) f = (hkFunc f st) a

addInputHook :: Engine job result state -> Hook state job -> IO ()
addInputHook e h = atomically $ do
	readTVar (tvInHook e) >>= writeTVar (tvInHook e) . insert h

addOutputHook :: Engine job result state -> Hook state result -> IO ()
addOutputHook e h = atomically $ do
	readTVar (tvOutHook e) >>= writeTVar (tvOutHook e) . insert h

addPreMutateHook :: Engine job result state -> Hook state job -> IO ()
addPreMutateHook e h = atomically $ do
	readTVar (tvPreMutateHook e) >>= writeTVar (tvPreMutateHook e) . insert h

addPostMutateHook :: Engine job result state -> Hook state result -> IO ()
addPostMutateHook e h = atomically $ do
	readTVar (tvPostMutateHook e) >>= writeTVar (tvPostMutateHook e) . insert h

delInputHook :: Engine j r s -> String -> IO ()
delInputHook e s = atomically $ do
	readTVar (tvInHook e) >>= writeTVar (tvInHook e) . filter ( (/= s) . hkDescription)

delPreMutateHook :: Engine j r s -> String -> IO ()
delPreMutateHook e s = atomically $ do
	readTVar (tvPreMutateHook e) >>= writeTVar (tvPreMutateHook e) . filter ( (/= s) . hkDescription)

delPostMutateHook :: Engine j r s -> String -> IO ()
delPostMutateHook e s = atomically $ do
	readTVar (tvPostMutateHook e) >>= writeTVar (tvPostMutateHook e) . filter ( (/= s) . hkDescription)

delOutputHook :: Engine j r s -> String -> IO ()
delOutputHook e s = atomically $ do
	readTVar (tvOutHook e) >>= writeTVar (tvOutHook e) . filter ( (/= s) . hkDescription)

-- Inject.hs
-- | Allows adding tasks that bypass the input hooks.
injectPreMutator :: Engine j r s -> j -> IO ()
injectPreMutator eng i = writeChan (chan1 eng) i

-- | Allows bypassing the mutator, meaning a 'result' can be produced without a task.
-- This still hits the output hooks.
injectPostMutator :: Engine j r s -> r -> IO ()
injectPostMutator eng o = writeChan (chan2 eng) o