module Control.Engine
(
initSimpleEngine
, initSimpleEngineIO
, initEngine
, Engine(..)
, Hook(..)
, addInputHook
, addOutputHook
, addPreMutateHook
, addPostMutateHook
, delInputHook
, delOutputHook
, delPreMutateHook
, delPostMutateHook
, injectPreMutator
, injectPostMutator
) where
import Control.Concurrent (forkIO)
import Control.Concurrent.STM
import Control.Concurrent.BoundedChan
import Control.Monad
import Data.IORef
import Data.List (insert)
data Engine job result state =
Eng { chan1 :: BoundedChan job
, chan2 :: BoundedChan result
, tvInHook :: TVar [Hook state job]
, tvPreMutateHook :: TVar [Hook state job]
, tvPostMutateHook :: TVar [Hook state result]
, tvOutHook :: TVar [Hook state result]
, state :: TVar state
}
data RefEngine job result state =
RefEng
{ refInHook :: IORef [Hook state job]
, refPreMutateHook :: IORef [Hook state job]
, refPostMutateHook :: IORef [Hook state result]
, refOutHook :: IORef [Hook state result]
, refState :: IORef state
}
initSimpleEngine :: Int -> (job -> result) -> IO (BoundedChan job, BoundedChan result)
initSimpleEngine nr mutator = do
input <- newBoundedChan chanBound
output <- newBoundedChan chanBound
let m = const (return . Just . mutator)
initEngine nr chanBound (readChan input) (\o -> o `seq` writeChan output o) m ()
return (input, output)
where
chanBound = nr * 8
initSimpleEngineIO :: Int -> (job -> IO result) -> IO (BoundedChan job, BoundedChan result)
initSimpleEngineIO nr mutator = do
input <- newBoundedChan chanBound
output <- newBoundedChan chanBound
let m = (\_ j -> mutator j >>= return . Just)
initEngine nr chanBound (readChan input) (writeChan output) m ()
return (input, output)
where
chanBound = nr * 8
initEngine :: (Eq st) =>
Int ->
Int ->
(IO job) ->
(result -> IO ()) ->
(st -> job -> IO (Maybe result)) ->
st ->
IO (Engine job result st)
initEngine nrWorkers chanBound input output mutator initialState = do
c1 <- newBoundedChan chanBound
c2 <- newBoundedChan chanBound
inputHooks <- newIORef []
outputHooks <- newIORef []
preMutatorHooks <- newIORef []
postMutatorHooks <- newIORef []
refState <- newIORef initialState
ch1tv <- newTVarIO []
ch2tv <- newTVarIO []
ch3tv <- newTVarIO []
ch4tv <- newTVarIO []
stv <- newTVarIO initialState
let engine = Eng c1 c2 ch1tv ch2tv ch3tv ch4tv stv
eref = RefEng inputHooks outputHooks preMutatorHooks postMutatorHooks refState
forkIO $ inputManager input c1 eref
forkIO $ outputManager output c2 eref
forkIO $ stateManager engine eref
forM_ [1..nrWorkers] $ \_ -> forkIO (worker c1 eref mutator c2)
return engine
worker :: BoundedChan job -> RefEngine job result st -> (st -> job -> IO (Maybe result)) -> BoundedChan result -> IO ()
worker c1 eref mutator c2 = forever $ readChan c1 >>= worker'
where
worker' msg = do
preMH <- readIORef (refPreMutateHook eref)
postMH <- readIORef (refPostMutateHook eref)
st <- readIORef (refState eref)
msg' <- runHooks preMH st msg
out <- runStage (mutator st) msg'
out' <- runStage (runHooks postMH st) out
case out' of
Nothing -> return ()
Just o -> writeChan c2 o
runStage :: (a -> IO (Maybe b)) -> Maybe a -> IO (Maybe b)
runStage _ Nothing = return Nothing
runStage stage (Just a) = stage a
stateManager :: (Eq st) => Engine job result st -> RefEngine job result st -> IO ()
stateManager eng eref = atomically readState >>= updateState
where
readState = do
s <- readTVar (state eng)
ih <- readTVar (tvInHook eng)
eh <- readTVar (tvPreMutateHook eng)
th <- readTVar (tvPostMutateHook eng)
oh <- readTVar (tvOutHook eng)
return (s,ih, eh, th, oh)
updateState curr@(s, ih, eh, th, oh) = do
writeIORef (refState eref) s
writeIORef (refInHook eref) ih
writeIORef (refPreMutateHook eref) eh
writeIORef (refPostMutateHook eref) th
writeIORef (refOutHook eref) oh
new <- atomically $ do
x <- readState
when (x == curr) retry
return x
updateState new
inputManager :: (IO job) -> BoundedChan job -> RefEngine job result st -> IO ()
inputManager input outChan eref = forever $ input >>= handleMsg
where
handleMsg msg = do
hook <- readIORef (refInHook eref)
s <- readIORef (refState eref)
new <- runHooks hook s msg
case new of
Just m -> writeChan outChan m
Nothing -> return ()
outputManager :: (result -> IO ()) -> BoundedChan result -> RefEngine job result state -> IO ()
outputManager output msgChan eref = forever $ do
m <- readChan msgChan
hook <- readIORef (refOutHook eref)
s <- readIORef (refState eref)
new <- runHooks hook s m
case new of
Just n -> output n
Nothing -> return ()
data Hook st msg = Hk
{ hkFunc :: st -> msg -> IO (Maybe msg)
, hkPriority :: Int
, hkDescription :: String
}
instance Eq (Hook m s) where
(Hk _ p d) == (Hk _ p' d') = p == p' && d == d'
instance Ord (Hook a s) where
(Hk _ p _) `compare` (Hk _ p' _) = p `compare` p'
instance Show (Hook a s) where
show (Hk _ p d) = d ++ " Priority = " ++ (show p)
showsPrec _ (Hk _ p d) = (++) ("Hk { hkFunc = undefined, p = " ++ (show p) ++ " , hkDescription = " ++ d ++ " } ")
runHooks :: [Hook st msg] -> st -> msg -> IO (Maybe msg)
runHooks hooks st m = foldM apply (Just m) hooks
where
apply Nothing f = return Nothing
apply (Just a) f = hkFunc f st a
addInputHook :: Engine job result state -> Hook state job -> IO ()
addInputHook e h = atomically $ do
readTVar (tvInHook e) >>= writeTVar (tvInHook e) . insert h
addOutputHook :: Engine job result state -> Hook state result -> IO ()
addOutputHook e h = atomically $ do
readTVar (tvOutHook e) >>= writeTVar (tvOutHook e) . insert h
addPreMutateHook :: Engine job result state -> Hook state job -> IO ()
addPreMutateHook e h = atomically $ do
readTVar (tvPreMutateHook e) >>= writeTVar (tvPreMutateHook e) . insert h
addPostMutateHook :: Engine job result state -> Hook state result -> IO ()
addPostMutateHook e h = atomically $ do
readTVar (tvPostMutateHook e) >>= writeTVar (tvPostMutateHook e) . insert h
delInputHook :: Engine j r s -> String -> IO ()
delInputHook e s = atomically $ do
readTVar (tvInHook e) >>= writeTVar (tvInHook e) . filter ( (/= s) . hkDescription)
delPreMutateHook :: Engine j r s -> String -> IO ()
delPreMutateHook e s = atomically $ do
readTVar (tvPreMutateHook e) >>= writeTVar (tvPreMutateHook e) . filter ( (/= s) . hkDescription)
delPostMutateHook :: Engine j r s -> String -> IO ()
delPostMutateHook e s = atomically $ do
readTVar (tvPostMutateHook e) >>= writeTVar (tvPostMutateHook e) . filter ( (/= s) . hkDescription)
delOutputHook :: Engine j r s -> String -> IO ()
delOutputHook e s = atomically $ do
readTVar (tvOutHook e) >>= writeTVar (tvOutHook e) . filter ( (/= s) . hkDescription)
injectPreMutator :: Engine j r s -> j -> IO ()
injectPreMutator eng i = writeChan (chan1 eng) i
injectPostMutator :: Engine j r s -> r -> IO ()
injectPostMutator eng o = writeChan (chan2 eng) o