#ifndef INCLUDEMETHOD
module Intel.CncPure(
Step, TagCol, ItemCol,
StepCode(..), GraphCode,
newItemCol, newTagCol, prescribe,
putt, put, get,
initialize, finalize,
runGraph,
stepPutStr, cncPutStr, cncVariant,
tests,
)
where
#endif
import Data.Array as Array
import Data.List as List
import Data.Set as Set
import Data.Map as Map
import Data.Maybe
import Data.IORef
import qualified Data.IntMap as IntMap
import Data.Word
import Data.Complex
import Control.Concurrent
import GHC.Conc
import Control.Monad
import Debug.Trace
import Unsafe.Coerce
import Intel.CncUtil hiding (tests)
import System.IO.Unsafe
import System.Random
import Test.HUnit
#ifdef MEMOIZE
#warning "Memoization enabled"
memoize = True
#else
memoize = False
#endif
#ifndef CNC_SCHEDULER
#warning "CncPure.hs -- CNC_SCHEDULER unset, defaulting to scheduler 2 "
#define CNC_SCHEDULER 2
#endif
#if CNC_SCHEDULER == 1
scheduler = simpleScheduler
#elif CNC_SCHEDULER == 2
scheduler = betterBlockingScheduler
#elif CNC_SCHEDULER == 3
#warning "Enabling parallel scheduler..."
scheduler = parallelScheduler
#elif CNC_SCHEDULER == 4
scheduler = parSched2
#else
#error "CncPure.hs -- CNC_SCHEDULER is not set to one of {1,2,3}"
#endif
cncVariant = "pure/" ++ show CNC_SCHEDULER
type Collections = (Int, MatchedTagMap, MatchedItemMap)
data MatchedItemMap = forall a b. MI !(IntMap.IntMap (ItemColInternal a b))
data MatchedTagMap = forall a. MT !(IntMap.IntMap (TagColInternal a))
data TagCol a = TCID Int deriving (Ord, Eq, Show)
data ItemCol a b = ICID Int deriving (Ord, Eq, Show)
type TagColInternal a = Set a
type ItemColInternal a b = Map a b
type Step a = a -> Collections -> StepResult
data StepResult = Done [NewTag] [NewItem]
| forall a b. (Ord a, Show a) => Block (ItemCol a b) a
data NewTag = forall a. Ord a => NT (TagCol a) a
data NewItem = forall a b. (Ord a,Show a) => NI (ItemCol a b) a b
data Graph = forall a. G (IntMap.IntMap [Step a])
_newWorld :: Int -> Collections
_newTagCol :: Collections -> (TagCol ma, Collections)
_newItemCol :: Collections -> (ItemCol a b, Collections)
_put :: (Show a,Ord a) => ItemCol a b -> a -> b -> NewItem
_putt :: Ord a => TagCol a -> a -> NewTag
_get :: Ord a => Collections -> ItemCol a b -> a -> Maybe b
_prescribe :: Ord a => TagCol a -> Step a -> Graph -> Graph
_newWorld n = (n, MT IntMap.empty, MI IntMap.empty)
_newTagCol (cnt, MT tags, items) =
(TCID cnt, (cnt+1, MT newtags, items))
where newtags = IntMap.insert cnt Set.empty tags
_newItemCol (cnt, tags, MI items) =
(ICID cnt, (cnt+1, tags, MI newitems))
where newitems = IntMap.insert cnt Map.empty items
magic :: ItemCol a b -> ItemColInternal c d -> ItemColInternal a b
magic id = (unsafeCoerce)
_get (_, _, MI imap) id tag =
let ICID n = id
badcol = (IntMap.!) imap n
goodcol = magic id badcol
in
case Map.lookup tag goodcol of
Nothing -> Nothing
Just d -> Just d
_rem :: Ord a => Collections -> ItemCol a b -> a -> Collections
_rem (cnt,tmap,MI imap) id tag =
let ICID n = id in
(cnt, tmap,
MI$ IntMap.adjust
(\col -> moremagic imap $ Map.delete tag (magic id col))
n imap)
_put id tag item = NI id tag item
_putt id tag = NT id tag
moremagic :: IntMap.IntMap (ItemColInternal a b) -> ItemColInternal c d -> ItemColInternal a b
moremagic id = (unsafeCoerce)
tmagic :: TagCol a -> TagColInternal c -> TagColInternal a
tmagic id = (unsafeCoerce)
mostmagic :: IntMap.IntMap (TagColInternal a) -> TagColInternal c -> TagColInternal a
mostmagic id = (unsafeCoerce)
mergeUpdates :: [NewTag] -> [NewItem] -> Collections -> (Collections, [NewTag])
mergeUpdates newtags newitems (n, MT tags, MI items) =
let items' = foldl (\ acc (NI id k x) ->
let ICID n = id
badcol = (IntMap.!) acc n
goodcol = magic id badcol
newcol = moremagic acc $ Map.insert k x goodcol
in
IntMap.insert n newcol acc)
items newitems in
let (tags',fresh) =
foldl (\ (acc,fresh) nt ->
case nt of
NT id k ->
let
TCID n = id
badcol = (IntMap.!) acc n
goodcol = tmagic id badcol
newcol = mostmagic acc $ Set.insert k goodcol
notnew = Set.member k goodcol
in
(IntMap.insert n newcol acc,
if notnew then fresh else nt:fresh))
(tags,[]) newtags in
if memoize
then ((n, MT tags', MI items'), fresh)
else ((n, MT tags, MI items'), newtags)
megamagic :: TagCol a -> IntMap.IntMap [Step b] -> IntMap.IntMap [Step a]
megamagic id col = (unsafeCoerce col)
emptyGraph = G IntMap.empty
_prescribe id step (G gmap) =
case id of
TCID n ->
G (IntMap.insertWith (++) n [step] $ megamagic id gmap)
getSteps :: Graph -> TagCol a -> [Step a]
getSteps (G gmap) id =
case id of
TCID n -> IntMap.findWithDefault [] n (megamagic id gmap)
type PrimedStep = Collections -> StepResult
callSteps :: Graph -> TagCol a -> a -> [PrimedStep]
callSteps (G gmap) id tag =
case id of
TCID n -> Prelude.map (\fn -> fn tag) $
IntMap.findWithDefault [] n (megamagic id gmap)
simpleScheduler :: Graph -> [NewTag] -> Collections -> Collections
simpleScheduler graph inittags cols = schedloop cols [] inittags []
where
schedloop c [] [] [] = c
schedloop c blocked [] [] = schedloop c [] [] blocked
schedloop c blocked (hd : tl) [] =
case hd of
NT id tag ->
schedloop c blocked tl (callSteps graph id tag)
schedloop c blocked tags (step : tl) =
case step c of
Block d_id tag -> schedloop c (step:blocked) tags tl
Done newtags newitems ->
let (c2,fresh) = mergeUpdates newtags newitems c
in schedloop c2 blocked (fresh++tags) tl
magic_to_alternate :: ItemCol a b -> ItemCol a [PrimedStep]
magic_to_alternate id = unsafeCoerce id
betterBlockingScheduler :: Graph -> [NewTag] -> Collections -> Collections
betterBlockingScheduler graph inittags world = schedloop world alternate' inittags []
where
alternate' = mirrorWorld world
schedloop :: Collections -> Collections -> [NewTag] -> [PrimedStep] -> Collections
schedloop w alternate [] [] = w
schedloop w alternate (hd : tl) [] =
case hd of
NT id tag ->
schedloop w alternate tl (callSteps graph id tag)
schedloop w alternate tags (pstep:tl) =
case pstep w of
Block (d_id) tag ->
#ifdef VERBOSEBLOCKING
trace (" ... Blocked ... " ++ show (d_id,tag)) $
#endif
let alternate' = updateMirror alternate d_id tag pstep
in schedloop w alternate' tags tl
Done newtags newitems ->
let (w2,fresh) = mergeUpdates newtags newitems w
(steps',alternate') =
foldl (\ (acc,alternate) (NI (id) tag _) ->
let (alternate',steps) = mirrorGet alternate id tag in
(steps++acc, alternate')
)
(tl,alternate) newitems
in schedloop w2 alternate' (fresh++tags) steps'
mirrorWorld :: Collections -> Collections
mirrorWorld world =
case world of
(_,_,MI imap) ->
foldl (\ w _ -> snd $ _newItemCol w)
(_newWorld 0)
[0.. foldl max 0 (IntMap.keys imap)]
updateMirror :: (Show a, Ord a) => Collections -> ItemCol a b -> a -> PrimedStep -> Collections
updateMirror mirror d_id tag val = mirror'
where
alt_id = magic_to_alternate d_id
others =
case _get mirror alt_id tag of
Nothing -> []
Just ls -> ls
new = _put alt_id tag (val:others)
(mirror',[]) = mergeUpdates [] [new] mirror
updateMirrorList mirror bls sls =
case bls of
[] -> mirror
Block d_id tag : tl ->
updateMirrorList (updateMirror mirror d_id tag (head sls))
tl (tail sls)
mirrorGet mirror id tag =
let alt_id = magic_to_alternate id in
case _get mirror alt_id tag of
Nothing -> (mirror, [])
Just steps -> (_rem mirror alt_id tag, steps)
newItemsAgainstBlocked :: [NewItem] -> Collections -> (Collections, [PrimedStep])
newItemsAgainstBlocked newitems mirror =
foldl (\ (mirror,acc) (NI (id) tag _) ->
let alt_id = magic_to_alternate id in
case _get mirror alt_id tag of
Nothing -> (mirror,acc)
Just steps ->
trace (" ... REACTIVATED ... " ++ show (alt_id,tag)) $
(_rem mirror alt_id tag, steps++acc)
)
(mirror,[]) newitems
data Bundle a =
B { blocked :: [StepResult]
, bsteps :: [PrimedStep]
, intags :: a
, outtags :: [NewTag]
, items :: [NewItem]}
_GRAIN = 5 ; _NUMTHREADS = numCapabilities
parallelScheduler :: Graph -> [NewTag] -> Collections -> Collections
parallelScheduler graph inittags world =
unsafePerformIO $
do global_world <- newIORef world
global_blocked <- newIORef (mirrorWorld world)
forkJoin $ Prelude.map (threadloop global_world global_blocked [])
$ splitN _NUMTHREADS inittags
readIORef global_world
where
threadloop worldref blockedref primed mytags =
do
world <- readIORef worldref
let B {blocked, bsteps, intags, outtags, items} =
runSomeSteps graph world _GRAIN
(B {blocked=[], bsteps=[], intags=mytags, outtags=[], items=[]})
primed
len <- return $ length bsteps
fresh <- atomicModifyIORef worldref (mergeUpdates outtags items)
newprimed <- atomicModifyIORef blockedref
(\ oldblck ->
let newb = updateMirrorList oldblck blocked bsteps in
newItemsAgainstBlocked items newb)
if Prelude.null intags && Prelude.null fresh && Prelude.null newprimed
then return ()
else threadloop worldref blockedref newprimed (fresh ++ intags)
parSched2 :: Graph -> [NewTag] -> Collections -> Collections
parSched2 graph inittags world =
unsafePerformIO $
do worldref <- newIORef world
blockedref <- newIORef (mirrorWorld world)
work_queues <- mapM (\_ -> newChan) [1..10]
let queue_arr = listArray (0,length work_queues1) work_queues
let
workerthread primed (myid, chan, mytags) =
do putStrLn $ "=== Starting thread "++ show (myid) ++" with "++ show (length mytags) ++" initial tags."
writeList2Chan chan mytags
threadloop primed
where
trysteal 0 = putStr "Thread giving up and dying...\n"
trysteal n =
do _i :: Int <- randomIO
let i = _i `mod` _NUMTHREADS
if myid == i then return () else putStrLn $ " "++ show myid ++" Stealing from " ++ show i
let q = (Array.!) queue_arr i
b <- isEmptyChan q
if b then trysteal (n1)
else do x <- readChan q
putStrLn " <STOLEN>"
writeChan chan x
threadloop []
threadloop primed =
do
world <- readIORef worldref
B {blocked, bsteps, outtags, items} <-
runSomeSteps2 graph world _GRAIN chan
(B {blocked=[], bsteps=[], intags=(), outtags=[], items=[]})
primed
fresh <- atomicModifyIORef worldref (mergeUpdates outtags items)
newprimed <- atomicModifyIORef blockedref
(\ oldblck ->
let newb = updateMirrorList oldblck blocked bsteps in
newItemsAgainstBlocked items newb)
writeList2Chan chan fresh
if List.null newprimed
then do b <- isEmptyChan chan
if b then trysteal (_NUMTHREADS * 2)
else threadloop newprimed
else threadloop newprimed
forkJoin $ Prelude.map (workerthread [])
$ zip3 [0.. length work_queues] work_queues
$ splitN _NUMTHREADS inittags
readIORef worldref
runSomeSteps2 :: Graph -> Collections -> Int -> Chan NewTag -> Bundle () -> [PrimedStep] -> IO (Bundle ())
runSomeSteps2 g w n c (rec @ B{..}) primed =
case primed of
[] ->
if n <= 0 then return rec else
do b <- isEmptyChan c
if b then return rec else
do hd <- readChan c
case hd of
NT id tag ->
runSomeSteps2 g w n c rec (callSteps g id tag)
pstep:tl ->
case pstep w of
newb@(Block _ _) ->
runSomeSteps2 g w (n1) c
rec{blocked= newb:blocked, bsteps= pstep:bsteps} tl
Done newtags newitems ->
runSomeSteps2 g w (n1) c
rec{outtags=newtags++outtags, items=newitems++items} tl
distScheduler graph inittags world =
unsafePerformIO $
do chans <- sequence
[ sequence [ do c <- newChan; return (i,j,c)
| j <- [1.. _NUMTHREADS], not(i == j) ]
| i <- [1.. _NUMTHREADS] ]
forkJoin $ Prelude.map
(\ (ch,tags) ->
let (my_i,_,_):_ = ch
myinbound = List.filter (\ (_,j,_) -> j == my_i)
$ concat chans
third (_,_,x) = x
thirds = List.map third
in threadloop world (mirrorWorld world)
(thirds ch) (thirds myinbound) [] tags)
$ zip chans
$ splitN _NUMTHREADS inittags
return chans
where
threadloop world bworld outchans inchans primed mytags =
do
world2 <-
foldM (\ w c -> do b <- isEmptyChan c
if b then return w
else return w
)
world inchans
let B {blocked, bsteps, intags, outtags, items} =
runSomeSteps graph world _GRAIN
(B {blocked=[], bsteps=[], intags=mytags, outtags=[], items=[]})
primed
world2 = mergeUpdates outtags items
newb = updateMirrorList bworld blocked bsteps
bworld2 = newItemsAgainstBlocked items newb
mapM_ (\_ -> return () ) outchans
return (error "CncPure distScheduler not complete yet")
runSomeSteps :: Graph -> Collections -> Int -> Bundle [NewTag] -> [PrimedStep] -> Bundle [NewTag]
runSomeSteps _ _ n (rec @ B{intags=[]}) [] = rec
--(blocked,bsteps,[],items)
runSomeSteps _ _ n bundle [] | n <= 0 = bundle
runSomeSteps graph w n (rec @ B{intags = hd:tl}) [] =
case hd of
NT id tag ->
runSomeSteps graph w n rec{intags=tl} (callSteps graph id tag)
runSomeSteps g w n (rec @ B{..}) (pstep:tl) =
case pstep w of
newb@(Block _ _) ->
runSomeSteps g w (n1)
rec{blocked= newb:blocked, bsteps= pstep:bsteps} tl
Done newtags newitems ->
runSomeSteps g w (n1)
rec{outtags=newtags++outtags, items=newitems++items} tl
data StepCode a = CC (Collections -> [NewTag] -> [NewItem] -> (Maybe a, StepResult))
data GraphCode a = GC (Collections -> Graph -> [NewTag] -> (Collections, Graph, [NewTag], a))
newTagCol :: GraphCode (TagCol a)
newItemCol :: GraphCode (ItemCol a b)
put :: (Show a, Ord a) => ItemCol a b -> a -> b -> StepCode ()
get :: (Show a, Ord a) => ItemCol a b -> a -> StepCode b
putt :: Ord a => TagCol a -> a -> StepCode ()
instance Monad StepCode where
return x = CC$ \w nt ni -> (Just x, Done nt ni)
(CC ma) >>= f = CC$ \w nt ni ->
case ma w nt ni of
(_, Block ic t) -> (Nothing, Block ic t)
(Just a, Done nt' ni') -> let CC mb = f a
in mb w nt' ni'
get col tag = CC $
\ w tags items ->
case _get w col tag of
Nothing -> (Nothing, Block col tag)
Just x -> (Just x, Done tags items)
put col tag val = CC $
\ w tags items ->
(Just (), Done tags (_put col tag val : items))
putt col tag = CC $
\ w tags items ->
(Just (), Done (_putt col tag : tags) items)
instance Monad GraphCode where
return x = GC$ \ w g it -> (w,g,it, x)
(GC ma) >>= f =
GC $ \w g itags ->
let (w',g',it',a) = ma w g itags
GC mb = f a
in mb w' g' it'
newTagCol =
GC$ \(cnt, MT tags, items) graph inittags ->
let newtags = IntMap.insert cnt Set.empty tags in
((cnt+1, MT newtags, items),
graph, inittags,TCID cnt)
newItemCol =
GC$ \(cnt, tags, MI items) graph inittags ->
let newitems = IntMap.insert cnt Map.empty items in
((cnt+1, tags, MI newitems),
graph, inittags, ICID cnt)
prescribe :: Ord a => TagCol a -> (a -> StepCode ()) -> GraphCode ()
prescribe tc stepcode =
GC$ \ cols graph inittags ->
(cols,
_prescribe tc
(\a w ->
let CC fn = stepcode a
(_,result) = fn w [] []
in result)
graph,
inittags, ())
initialize :: StepCode a -> GraphCode a
initialize (CC fn) =
GC$ \w graph inittags ->
case fn w inittags [] of
(Just x, Done nt ni) ->
let (w2,[]) = mergeUpdates [] ni w
in (w2, graph, nt, x)
(Nothing, Block itemcol tag) ->
error ("Tried to run initialization StepCode within the GraphCode monad but it blocked!: "
++ show (itemcol, tag))
finalize :: StepCode a -> GraphCode a
finalize (CC fn) =
GC$ \w graph inittags ->
case w of
(_, MT tmap, _) ->
let finalworld = scheduler graph inittags w in
case fn finalworld [] [] of
(Just x, Done [] []) -> (finalworld, graph, [], x)
(Just _, Done _ _) -> error "It isn't proper for a finalize action to produce new tags/items!"
(Nothing, Block itemcol tag) ->
error ("Tried to run finalization StepCode but it blocked!: "
++ show (itemcol, tag))
runGraph :: GraphCode a -> a
runGraph (GC fn) = x
where (_,_,_,x) = fn (_newWorld 0) emptyGraph []
gcPrintWorld :: String -> GraphCode ()
gcPrintWorld str =
GC$ \w g it ->
case w of
(n, MT tmap, MI imap) ->
seq (unsafePerformIO $
do putStr "GraphCode - Printing world: "
putStrLn str
putStrLn (" "++ show (IntMap.size tmap) ++" tag collections "++
show (IntMap.size imap) ++" item collections")
mapM (\key ->
let m = IntMap.findWithDefault (error "shouldn't happen") key tmap in
putStrLn (" Tag col "++ show key ++" size "++ show (Set.size m)))
(IntMap.keys tmap)
mapM (\key ->
let m = IntMap.findWithDefault (error "shouldn't happen") key imap in
putStrLn (" Item col "++ show key ++" size "++ show (Map.size m)))
(IntMap.keys imap)
)
(w,g,it,())
cncUnsafeIO :: IO () -> GraphCode ()
cncUnsafeIO action =
GC$ \w g it ->
seq (unsafePerformIO action)
(w,g,it,())
stepUnsafeIO :: IO () -> StepCode ()
stepUnsafeIO action =
CC$ \w nt ni ->
seq (unsafePerformIO action)
(Just (), Done nt ni)
stepPutStr str = stepUnsafeIO (putStr str)
cncPutStr str = cncUnsafeIO (putStr str)
finalmagic :: ItemCol a b -> [(c,d)] -> [(a,b)]
finalmagic id ls = unsafeCoerce ls
itemsToList :: ItemCol a b -> StepCode [(a,b)]
itemsToList id =
CC $ \w tags items ->
case w of
(_, _, MI imap) ->
let ICID num = id
it = (IntMap.!) imap num
in (Just (finalmagic id (Map.toList it)),
Done tags items)
cncFor :: Int -> Int -> (Int -> StepCode ()) -> StepCode ()
cncFor start end body = for_ start (end+1) body
cncFor2D :: (Int,Int) -> (Int,Int) -> (Int -> Int -> StepCode ()) -> StepCode ()
cncFor2D (s1,s2) (e1,e2) body =
cncFor s1 e1 $ \ i ->
cncFor s2 e2 (body i)
type TI = TagCol Char
type II = ItemCol Char Int
incrStep :: II -> (TI, II) -> Step Char
incrStep d1 (t2,d2) tag c =
case _get c d1 tag of
Nothing -> Block d1 tag
Just n -> Done [_putt t2 tag]
[_put d2 tag (n+1)]
test1 = TestCase $
let w0 = _newWorld 0
(t1,w2) = _newTagCol w0
(t2,w3) = _newTagCol w2
(t3,w4) = _newTagCol w3
(d1,w5) = _newItemCol w4
(d2,w6) = _newItemCol w5
(d3,w7) = _newItemCol w6
(w8,[]) = mergeUpdates [] [_put d1 'a' 33,
_put d1 'b' 100] w7
graph = _prescribe t1 (incrStep d1 (t2,d2)) $
_prescribe t2 (incrStep d2 (t3,d3)) $
emptyGraph
inittags = [_putt t1 'b', _putt t1 'a']
w9 = scheduler graph inittags w8
in
do putStrLn $ ""
putStrLn $ showcol w9
putStrLn $ " d1: " ++ show (_get w9 d1 'a', _get w9 d1 'b')
putStrLn $ " d2: " ++ show (_get w9 d2 'a', _get w9 d2 'b')
putStrLn $ " d3: " ++ show (_get w9 d3 'a', _get w9 d3 'b')
return ()
test2 = TestCase $
let v = runGraph $ do
t1 <- newTagCol
t2 <- newTagCol
t3 <- newTagCol
d1 <- newItemCol
d2 <- newItemCol
d3 <- newItemCol
initialize $ do stepPutStr "\n"
put d1 'a' 33
put d1 'b' 100
putt t1 'b'
putt t1 'a'
let incrStep d1 (t2,d2) tag =
do n <- get d1 tag
put d2 tag (n+1)
putt t2 tag
prescribe t1 (incrStep d1 (t2,d2))
prescribe t2 (incrStep d2 (t3,d3))
gcPrintWorld "Initialization finished"
finalize $
do a <- itemsToList d1
b <- itemsToList d2
c <- itemsToList d3
return (a,b,c)
in putStrLn ("Final: "++ show v)
showcol (n, MT tmap, MI imap) =
show (n, IntMap.size tmap, IntMap.keys imap,
Map.keys foo,
Map.elems foo)
where
foo = (unsafeCoerce $ (IntMap.!) imap 3) :: ItemColInternal Char Int
tests = TestList [test1, test2]