#ifndef MODNAME
#define MODNAME Intel.Cnc8
#endif
#define CNC_SCHEDULER 8
#define STEPLIFT S.lift$
#define GRAPHLIFT id$
#define SUPPRESS_put
#define SUPPRESS_newItemCol
#define SUPPRESS_initialize
#define SUPPRESS_itemsToList
#include "Cnc.Header.hs"
type TagCol a = (IORef (Set a), IORef [Step a])
type StepCode a = (S.StateT (HiddenState8) IO a)
type GraphCode = IO
newtype HiddenState8 = HiddenState8 (StepCode (), [()])
newtype ItemCol a b = ItemCol (IORef (Map a ((Maybe b), WaitingSteps)))
type WaitingSteps = [StepCode ()]
data EscapeStep = EscapeStep deriving (Show, Typeable)
instance Exception EscapeStep
liftHidden fn = (\ (HiddenState8 (self,ls)) -> HiddenState8 (self, fn ls))
atomicModifyIORef_ ref fn = atomicModifyIORef ref (\x -> (fn x, ()))
stepStats :: StepCode ()
stepStats =
do
tid <- S.lift myThreadId
HiddenState8 (_, ls) <- S.get
S.lift$ putStrLn (">>> Step state: list len: "++ show (length ls))
launch_steps :: [StepCode ()] -> StepCode ()
launch_steps mls =
foldM (\ () m -> spawn (do try_stepcode m m; return ()))
() mls
try_stepcode :: StepCode () -> StepCode a -> IO (Maybe a)
try_stepcode retry m = wrapped
where
wrapped = do x <- try sync_action
case x of Left EscapeStep -> return Nothing
Right v -> return (Just v)
sync_action =
do
(v, HiddenState8 (_, ls)) <- S.runStateT m (HiddenState8 (retry,[]))
tid <- myThreadId
#ifdef DEBUG_HASKELL_CNC
putStrLn (">>> "++show tid ++": Syncing children")
#endif
foldr pseq (return v) ls
spawn :: IO () -> StepCode ()
spawn ioaction =
do
let thunk = unsafePerformIO ioaction
#ifdef DEBUG_HASKELL_CNC
let wrapped = unsafePerformIO$
do { tid <- myThreadId; putStrLn ("\n>>> "++show tid++": STOLE WORK!\n"); pseq thunk (return ()) }
let parthunk = wrapped
#else
let parthunk = thunk
#endif
S.modify $ liftHidden (thunk:)
id <- S.lift$ myThreadId
#ifdef DEBUG_HASKELL_CNC
S.lift$ putStrLn $ ">>> "++ show id ++ ": Spawning..."
stepStats
#endif
parthunk `par` (do
#ifdef DEBUG_HASKELL_CNC
mid <- S.lift$ myThreadId
S.lift$putStrLn (">>> "++show mid++" (spawned parallel)")
#endif
return ())
newItemCol = do ref <- newIORef Map.empty
return (ItemCol ref)
putt = proto_putt
(\ steps tag ->
launch_steps (Prelude.map (\step -> step tag) steps))
get (ItemCol icol) tag =
do map <- S.lift$ readIORef icol
case Map.lookup tag map of
Nothing -> addquit []
Just (Nothing, waiting) -> addquit waiting
Just (Just v, []) -> return v
Just (Just v, a:b) -> error "CnC: internal invariant violated"
where
addquit ls =
do (HiddenState8 (retry ,_)) <- S.get
S.lift$ atomicModifyIORef_ icol (Map.insert tag (Nothing, retry:ls))
throw EscapeStep
initfin :: String -> StepCode a -> GraphCode a
initfin str m = do let err = error str
x <- try_stepcode err m
case x of Nothing -> err
Just v -> return v
initialize = initfin "Get failed within initialize action!"
finalize = initfin "Get failed within finalize action!"
put (ItemCol icol) tag (!item) =
do waiting <- S.lift$ atomicModifyIORef icol mod
launch_steps waiting
return ()
where
mod map =
let new = (Just item, [])
f key _ (Nothing, _) = new
#ifdef REPEAT_PUT_ALLOWED
f key _ old@(Just v, ls) = old
#else
f key _ (Just v, _) = error ("Single assignment violated at tag: "++ show tag)
#endif
(old, map') = Map.insertLookupWithKey f tag new map
in case old of
Nothing -> (map', [])
Just (Nothing, waiting) -> (map', waiting)
#ifdef REPEAT_PUT_ALLOWED
Just (Just _, waiting) -> (map , waiting)
#else
Just (Just _, _) -> error ("Single assignment violated at tag: "++ show tag)
#endif
itemsToList (ItemCol icol) =
do if not quiescence_support
then error "need to use a scheduler with quiescence support for itemsToList"
else return ()
map <- S.lift$ readIORef icol
return $ Prelude.map (\ (key, (Just v, _)) -> (key,v))
$ Prelude.filter fil
$ (Map.toList map)
where
fil (key, (Nothing, _)) = False
fil _ = True
quiescence_support=True ;