Ticket #7606: 0001-Heap-draft-3-rebased.patch

File 0001-Heap-draft-3-rebased.patch, 21.4 KB (added by ezyang, 4 months ago)

array heap implementation draft 3 (second epoch)

  • includes/rts/storage/TSO.h

    From 7f9d8febd15842f4a351b0d8bc01bf830be69d80 Mon Sep 17 00:00:00 2001
    From: "Edward Z. Yang" <ezyang@mit.edu>
    Date: Sat, 26 Jan 2013 00:00:20 -0800
    Subject: [PATCH] Heap draft 3 - rebased
    
    Signed-off-by: Edward Z. Yang <ezyang@mit.edu>
    ---
     includes/rts/storage/TSO.h |    2 -
     rts/Capability.c           |   11 +++--
     rts/Capability.h           |    7 ++--
     rts/PQueue.c               |   67 +++++++++++++++++++++++++++++++
     rts/PQueue.h               |   95 ++++++++++++++++++++++++++++++++++++++++++++
     rts/RtsUtils.c             |   16 ++++++++
     rts/RtsUtils.h             |    2 +
     rts/Schedule.c             |   89 ++++++++++++++++-------------------------
     rts/Schedule.h             |   93 ++++++++++++++++++++++++++-----------------
     rts/Threads.c              |    6 +--
     rts/WSDeque.c              |   18 ---------
     rts/sm/Sanity.c            |    9 +----
     rts/sm/Storage.c           |   10 -----
     13 files changed, 284 insertions(+), 141 deletions(-)
     create mode 100644 rts/PQueue.c
     create mode 100644 rts/PQueue.h
    
    diff --git a/includes/rts/storage/TSO.h b/includes/rts/storage/TSO.h
    index 20a67a2..5458d64 100644
    a b  
    4646/* Reason for thread being blocked. See comment above struct StgTso_. */ 
    4747typedef union { 
    4848  StgClosure *closure; 
    49   StgTSO *prev; // a back-link when the TSO is on the run queue (NotBlocked) 
    5049  struct MessageBlackHole_ *bh; 
    5150  struct MessageThrowTo_ *throwto; 
    5251  struct MessageWakeup_  *wakeup; 
     
    8786    struct StgTSO_*         _link; 
    8887    /* 
    8988      Currently used for linking TSOs on: 
    90       * cap->run_queue_{hd,tl} 
    9189      * (non-THREADED_RTS); the blocked_queue 
    9290      * and pointing to the next chunk for a ThreadOldStack 
    9391 
  • rts/Capability.c

    diff --git a/rts/Capability.c b/rts/Capability.c
    index 811df58..1891599 100644
    a b  
    230230    cap->idle              = 0; 
    231231    cap->disabled          = rtsFalse; 
    232232 
    233     cap->run_queue_hd      = END_TSO_QUEUE; 
    234     cap->run_queue_tl      = END_TSO_QUEUE; 
     233    cap->run_pqueue        = newPQueue(4); // tune this... 
     234    cap->promoted_run_queue_hd = END_TSO_QUEUE; 
    235235 
    236236#if defined(THREADED_RTS) 
    237237    initMutex(&cap->lock); 
     
    10071007    // or fewer Capabilities as GC threads, but just in case there 
    10081008    // are more, we mark every Capability whose number is the GC 
    10091009    // thread's index plus a multiple of the number of GC threads. 
    1010     evac(user, (StgClosure **)(void *)&cap->run_queue_hd); 
    1011     evac(user, (StgClosure **)(void *)&cap->run_queue_tl); 
     1010    nat i; 
     1011    if (cap->promoted_run_queue_hd != END_TSO_QUEUE) { 
     1012        evac(user, (StgClosure **)&cap->promoted_run_queue_hd); 
     1013    } 
     1014    markPQueue(cap->run_pqueue, evac, user); 
    10121015#if defined(THREADED_RTS) 
    10131016    evac(user, (StgClosure **)(void *)&cap->inbox); 
    10141017#endif 
  • rts/Capability.h

    diff --git a/rts/Capability.h b/rts/Capability.h
    index 81322c8..7d2ae6e 100644
    a b  
    2424#include "sm/GC.h" // for evac_fn 
    2525#include "Task.h" 
    2626#include "Sparks.h" 
     27#include "PQueue.h" 
    2728 
    2829#include "BeginPrivate.h" 
    2930 
     
    5556    // access to its run queue, so can wake up threads without 
    5657    // taking a lock, and the common path through the scheduler is 
    5758    // also lock-free. 
    58     StgTSO *run_queue_hd; 
    59     StgTSO *run_queue_tl; 
     59    StgTSO *promoted_run_queue_hd; 
     60    PQueue *run_pqueue; 
    6061 
    6162    // [SSS] Stride scheduling extensions.  The Task with this 
    6263    // Capability has exclusive access to this variable. 
     
    162163// Task is bound, its thread has just blocked, and it may have been 
    163164// moved to another Capability. 
    164165#define ASSERT_PARTIAL_CAPABILITY_INVARIANTS(cap,task)  \ 
    165   ASSERT(cap->run_queue_hd == END_TSO_QUEUE ?           \ 
    166             cap->run_queue_tl == END_TSO_QUEUE : 1);    \ 
    167166  ASSERT(myTask() == task);                             \ 
    168167  ASSERT_TASK_ID(task); 
    169168 
  • (a) /dev/null vs. (b) b/rts/PQueue.c

    diff --git a/rts/PQueue.c b/rts/PQueue.c
    new file mode 100644
    index 0000000..f83b2ff
    a b  
     1/* 
     2 * Priority queue data structure (not concurrent). 
     3 * 
     4 * Classic undergraduate implementation of a Heap. 
     5 */ 
     6 
     7#include "PosixSource.h" 
     8#include "Rts.h" 
     9 
     10#include "RtsUtils.h" 
     11#include "PQueue.h" 
     12#include "sm/GC.h" 
     13 
     14PQueue * 
     15newPQueue (nat size) 
     16{ 
     17    StgWord realsize; 
     18    PQueue *q; 
     19    realsize = roundUp2(size); 
     20    q = (PQueue*) stgMallocBytes(sizeof(PQueue), "newPQueue"); 
     21    int sz = realsize * sizeof(StgClosurePtr); 
     22    q->elements = stgMallocBytes(sz, "newPQueue: elements"); 
     23    q->capacity = realsize; 
     24    q->size = 0; 
     25    q->deferred = 0; 
     26    return q; 
     27} 
     28 
     29void 
     30freePQueue (PQueue *q) 
     31{ 
     32    stgFree(q->elements); 
     33    stgFree(q); 
     34} 
     35 
     36void 
     37iteratePQueue(PQueue *q, void (*fp)(StgTSO *e)) { 
     38    nat i; 
     39    // 0 1 2 (size = 3, deferred = 0) 
     40    //   1 2 (size = 2, deferred = 1) 
     41    for (i = q->deferred; i < q->size + q->deferred; i++) { 
     42        fp(q->elements[i]); 
     43    } 
     44} 
     45 
     46void 
     47markPQueue(PQueue *q, evac_fn evac, void *user) { 
     48    nat i; 
     49    for (i = q->deferred; i < q->size + q->deferred; i++) { 
     50        evac(user, (StgClosure **)&q->elements[i]); 
     51    } 
     52} 
     53 
     54void 
     55truncatePQueue(PQueue *q) { 
     56    q->size = 0; 
     57} 
     58 
     59void 
     60growPQueue(PQueue *q) { 
     61    int esz = q->capacity * sizeof(StgClosurePtr); 
     62    void ** nelements = stgMallocBytes(2 * esz, "insertPQueue: elements"); 
     63    memcpy(/*dst*/nelements, q->elements, esz); 
     64    stgFree(q->elements); 
     65    q->elements = nelements; 
     66    q->capacity *= 2; 
     67} 
  • (a) /dev/null vs. (b) b/rts/PQueue.h

    diff --git a/rts/PQueue.h b/rts/PQueue.h
    new file mode 100644
    index 0000000..68694aa
    a b  
     1/* 
     2 * Priority queue data structure (not concurrent) 
     3 */ 
     4 
     5#ifndef PQUEUE_H 
     6#define PQUEUE_H 
     7 
     8#include "sm/GC.h" 
     9 
     10#define PARENT(i) (((i)-1)/2) 
     11#define LEFT(i)   (2*(i)+1) 
     12#define RIGHT(i)  (2*(i)+2) 
     13 
     14typedef struct PQueue_ { 
     15    // size of the heap array 
     16    StgWord size; 
     17    // capacity of the heap array 
     18    StgWord capacity; 
     19    // the elements array 
     20    StgTSO **elements; 
     21    // deferred? 
     22    int deferred; 
     23} PQueue; 
     24 
     25PQueue * newPQueue(nat size); 
     26void freePQueue(PQueue *q); 
     27 
     28EXTERN_INLINE void upHeap(PQueue*, StgTSO*); 
     29EXTERN_INLINE void 
     30upHeap(PQueue *q, StgTSO *last) 
     31{ 
     32    StgWord64 lastk = last->ss_pass; 
     33    nat c, i; 
     34    for (i = 0; LEFT(i) < q->size; i = c) { 
     35        if (LEFT(i) != q->size && q->elements[LEFT(i)]->ss_pass > q->elements[RIGHT(i)]->ss_pass) { 
     36            c = RIGHT(i); // right exists and wins 
     37        } else { 
     38            c = LEFT(i); // only had left node, or left wins 
     39        } 
     40        if (lastk > q->elements[c]->ss_pass) { 
     41            q->elements[i] = q->elements[c]; 
     42        } else { 
     43            break; 
     44        } 
     45    } 
     46    q->elements[i] = last; 
     47} 
     48 
     49EXTERN_INLINE void * deleteMinPQueue(PQueue *q); 
     50EXTERN_INLINE void * peekMinPQueue(PQueue *q); 
     51EXTERN_INLINE void insertPQueue(PQueue *q, StgTSO *elem); 
     52void truncatePQueue(PQueue *q); 
     53void iteratePQueue(PQueue *q, void (*fp)(StgTSO *e)); 
     54void markPQueue(PQueue *q, evac_fn evac, void *user); 
     55void growPQueue(PQueue *q); 
     56 
     57EXTERN_INLINE void 
     58insertPQueue(PQueue *q, StgTSO *elem) 
     59{ 
     60    StgWord64 key = elem->ss_pass; 
     61    if (q->size == q->capacity) { growPQueue(q); } 
     62    nat i; 
     63    for (i = q->size; i > 0 && q->elements[PARENT(i)]->ss_pass > key; i = PARENT(i)) { 
     64        q->elements[i] = q->elements[PARENT(i)]; 
     65    } 
     66    q->elements[i] = elem; 
     67    q->size++; 
     68} 
     69 
     70EXTERN_INLINE void * 
     71peekMinPQueue(PQueue *q) { 
     72    return q->elements[0]; 
     73} 
     74 
     75EXTERN_INLINE void * 
     76deleteMinPQueue(PQueue *q) { 
     77    if (q->size == 0) { 
     78        return NULL; 
     79    } 
     80    StgTSO *min = q->elements[0]; 
     81    q->size--; 
     82    // XXX implementing properly amortized downsizing 
     83    if (q->size == 0) { 
     84        q->elements[0] = NULL; // make peekMinPQueue work 
     85    } else { 
     86        upHeap(q, q->elements[q->size]); 
     87    } 
     88    return min; 
     89} 
     90 
     91#undef PARENT 
     92#undef LEFT 
     93#undef RIGHT 
     94 
     95#endif // PQUEUE_H 
  • rts/RtsUtils.c

    diff --git a/rts/RtsUtils.c b/rts/RtsUtils.c
    index fcbb757..d1ef397 100644
    a b  
    5151#include <windows.h> 
    5252#endif 
    5353 
     54StgWord 
     55roundUp2(StgWord val) 
     56{ 
     57    StgWord rounded = 1; 
     58     
     59    /* StgWord is unsigned anyway, only catch 0 */ 
     60    if (val == 0) { 
     61        barf("roundUp2: invalid size 0 requested"); 
     62    } 
     63    /* at least 1 bit set, shift up to its place */ 
     64    do { 
     65        rounded = rounded << 1; 
     66    } while (0 != (val = val>>1)); 
     67    return rounded; 
     68} 
     69 
    5470/* ----------------------------------------------------------------------------- 
    5571   Result-checking malloc wrappers. 
    5672   -------------------------------------------------------------------------- */ 
  • rts/RtsUtils.h

    diff --git a/rts/RtsUtils.h b/rts/RtsUtils.h
    index 5d825a2..6a9c1d8 100644
    a b  
    1111 
    1212#include "BeginPrivate.h" 
    1313 
     14StgWord roundUp2(StgWord); 
     15 
    1416/* ----------------------------------------------------------------------------- 
    1517 * (Checked) dynamic allocation 
    1618 * -------------------------------------------------------------------------- */ 
  • rts/Schedule.c

    diff --git a/rts/Schedule.c b/rts/Schedule.c
    index 3de00c5..347b282 100644
    a b  
    1212 
    1313#include "sm/Storage.h" 
    1414#include "RtsUtils.h" 
     15#include "PQueue.h" 
    1516#include "StgRun.h" 
    1617#include "Schedule.h" 
    1718#include "Interpreter.h" 
     
    560561 * -------------------------------------------------------------------------- */ 
    561562 
    562563void 
    563 removeFromRunQueue (Capability *cap, StgTSO *tso) 
     564promoteInRunQueue (Capability *cap STG_UNUSED, StgTSO *tso STG_UNUSED) 
    564565{ 
    565     if (tso->block_info.prev == END_TSO_QUEUE) { 
    566         ASSERT(cap->run_queue_hd == tso); 
    567         cap->run_queue_hd = tso->_link; 
    568     } else { 
    569         setTSOLink(cap, tso->block_info.prev, tso->_link); 
    570     } 
    571     if (tso->_link == END_TSO_QUEUE) { 
    572         ASSERT(cap->run_queue_tl == tso); 
    573         cap->run_queue_tl = tso->block_info.prev; 
    574     } else { 
    575         setTSOPrev(cap, tso->_link, tso->block_info.prev); 
    576     } 
    577     tso->_link = tso->block_info.prev = END_TSO_QUEUE; 
    578  
    579     IF_DEBUG(sanity, checkRunQueue(cap)); 
    580 } 
    581  
    582 void 
    583 promoteInRunQueue (Capability *cap, StgTSO *tso) 
    584 { 
    585     removeFromRunQueue(cap, tso); 
    586     pushOnRunQueue(cap, tso); 
     566    // no-op, at the moment; if removeFromRunQueue was implemented it'd be 
     567    // removeFromRunQueue(cap, tso); 
     568    // pushOnRunQueue(cap, tso); 
    587569} 
    588570 
    589571/* ---------------------------------------------------------------------------- 
     
    700682#if defined(THREADED_RTS) 
    701683 
    702684    Capability *free_caps[n_capabilities], *cap0; 
    703     nat i, n_free_caps; 
     685    nat i, j, n_free_caps; 
    704686 
    705687    // migration can be turned off with +RTS -qm 
    706688    if (!RtsFlags.ParFlags.migrate) return; 
     
    742724    //   - giving low priority to moving long-lived threads 
    743725 
    744726    if (n_free_caps > 0) { 
    745         StgTSO *prev, *t, *next; 
     727        StgTSO *t; 
    746728#ifdef SPARK_PUSHING 
    747729        rtsBool pushed_to_all; 
    748730#endif 
     
    759741        pushed_to_all = rtsFalse; 
    760742#endif 
    761743 
    762         if (cap->run_queue_hd != END_TSO_QUEUE) { 
    763             prev = cap->run_queue_hd; 
    764             t = prev->_link; 
    765             prev->_link = END_TSO_QUEUE; 
    766             for (; t != END_TSO_QUEUE; t = next) { 
    767                 next = t->_link; 
    768                 t->_link = END_TSO_QUEUE; 
     744        // go through all of the TSOs in the run queue and decide where 
     745        // they should go 
     746        // XXX We can create the new heap more efficiently O(n) by just 
     747        // blitting them in and then re-heapifying 
     748        if (!emptyRunQueue(cap)) { 
     749            StgWord64 k; 
     750            PQueue *oq = cap->run_pqueue; 
     751            cap->run_pqueue = newPQueue(oq->size); 
     752            insertPQueue(cap->run_pqueue, oq->elements[0]); 
     753            for (j = 1; j < oq->size; j++) { 
     754                t = oq->elements[j]; 
    769755                if (t->bound == task->incall // don't move my bound thread 
    770                     || tsoLocked(t)) {  // don't move a locked thread 
    771                     setTSOLink(cap, prev, t); 
    772                     setTSOPrev(cap, t, prev); 
    773                     prev = t; 
    774                 } else if (i == n_free_caps) { 
     756                    || tsoLocked(t)) {  // don't move a locked thread 
     757                    insertPQueue(cap->run_pqueue, t); 
     758                } else if (i == n_free_caps) { 
    775759#ifdef SPARK_PUSHING 
    776                     pushed_to_all = rtsTrue; 
     760                    pushed_to_all = rtsTrue; 
    777761#endif 
    778                     i = 0; 
    779                     // keep one for us 
    780                     setTSOLink(cap, prev, t); 
    781                     setTSOPrev(cap, t, prev); 
    782                     prev = t; 
    783                 } else { 
    784                     leaveRunQueue(cap,t); 
    785                     joinRunQueue(free_caps[i],t); 
     762                    i = 0; 
     763                    insertPQueue(cap->run_pqueue, t); // keep one for us 
     764                } else { 
     765                    leaveRunQueue(cap, t); 
     766                    joinRunQueue(free_caps[i], t); 
    786767 
    787768                    traceEventMigrateThread (cap, t, free_caps[i]->no); 
    788769 
    789                     if (t->bound) { t->bound->task->cap = free_caps[i]; } 
    790                     t->cap = free_caps[i]; 
    791                     i++; 
    792                 } 
    793             } 
    794             cap->run_queue_tl = prev; 
    795  
    796             IF_DEBUG(sanity, checkRunQueue(cap)); 
    797         } 
     770                    if (t->bound) { t->bound->task->cap = free_caps[i]; } 
     771                    t->cap = free_caps[i]; 
     772                    i++; 
     773                } 
     774            } 
     775            freePQueue(oq); 
     776        } 
    798777 
    799778#ifdef SPARK_PUSHING 
    800779        /* JB I left this code in place, it would work but is not necessary */ 
  • rts/Schedule.h

    diff --git a/rts/Schedule.h b/rts/Schedule.h
    index e4425af..9a0a05d 100644
    a b  
    128128 
    129129#if !IN_STG_CODE 
    130130 
     131EXTERN_INLINE void 
     132annulTSO(StgTSO *tso); 
     133 
     134EXTERN_INLINE void 
     135annulTSO(StgTSO *tso) { 
     136    // hack to make some invariants with regards to block_info and _link work 
     137    // this is called whereever we would have stepped all over the 
     138    // fields in the linked list implementation 
     139    ASSERT(tso->_link == END_TSO_QUEUE); 
     140    tso->block_info.closure = (StgClosure*)END_TSO_QUEUE; 
     141} 
     142 
    131143/* END_TSO_QUEUE and friends now defined in includes/StgMiscClosures.h */ 
    132144 
    133145/* Add a thread to the end of the run queue. 
     
    140152EXTERN_INLINE void 
    141153appendToRunQueue (Capability *cap, StgTSO *tso) 
    142154{ 
    143     ASSERT(tso->_link == END_TSO_QUEUE); 
    144     if (cap->run_queue_hd == END_TSO_QUEUE) { 
    145         cap->run_queue_hd = tso; 
    146         tso->block_info.prev = END_TSO_QUEUE; 
    147     } else { 
    148         setTSOLink(cap, cap->run_queue_tl, tso); 
    149         setTSOPrev(cap, tso, cap->run_queue_tl); 
    150     } 
    151     cap->run_queue_tl = tso; 
     155    tso->ss_pass = cap->ss_pass++;                                          // [EMU] 
     156    //tso->ss_pass += STRIDE1 / tso->ss_tickets;                            // [EMU] 
     157    annulTSO(tso); 
     158    insertPQueue(cap->run_pqueue, tso); 
    152159} 
    153160 
    154161EXTERN_INLINE void 
     
    157164EXTERN_INLINE void 
    158165joinRunQueue(Capability *cap, StgTSO *tso) 
    159166{ 
     167    tso->ss_pass = cap->ss_pass++; 
    160168    appendToRunQueue(cap, tso); 
    161169} 
    162170 
     
    169177EXTERN_INLINE void 
    170178pushOnRunQueue (Capability *cap, StgTSO *tso) 
    171179{ 
    172     setTSOLink(cap, tso, cap->run_queue_hd); 
    173     tso->block_info.prev = END_TSO_QUEUE; 
    174     if (cap->run_queue_hd != END_TSO_QUEUE) { 
    175         setTSOPrev(cap, cap->run_queue_hd, tso); 
    176     } 
    177     cap->run_queue_hd = tso; 
    178     if (cap->run_queue_tl == END_TSO_QUEUE) { 
    179         cap->run_queue_tl = tso; 
     180    annulTSO(tso);                                                          // [EMU] 
     181    //tso->ss_pass += STRIDE1 / tso->ss_tickets;                            // [EMU] 
     182    if (cap->promoted_run_queue_hd == END_TSO_QUEUE) { 
     183        // annulTSO invariant used here 
     184    } else { 
     185        setTSOLink(cap, tso, cap->promoted_run_queue_hd); 
    180186    } 
     187    cap->promoted_run_queue_hd = tso; 
    181188} 
    182189 
    183190EXTERN_INLINE void 
     
    186193EXTERN_INLINE void 
    187194fastJoinRunQueue(Capability *cap, StgTSO *tso) 
    188195{ 
     196    tso->ss_pass = cap->ss_pass++;                                          // [EMU] 
    189197    pushOnRunQueue(cap, tso); 
    190198} 
    191199 
     
    193201 */ 
    194202INLINE_HEADER StgTSO * 
    195203popRunQueue (Capability *cap) 
    196 {  
    197     StgTSO *t = cap->run_queue_hd; 
    198     ASSERT(t != END_TSO_QUEUE); 
    199     cap->run_queue_hd = t->_link; 
    200     if (t->_link != END_TSO_QUEUE) { 
    201         t->_link->block_info.prev = END_TSO_QUEUE; 
    202     } 
    203     t->_link = END_TSO_QUEUE; // no write barrier req'd 
    204     if (cap->run_queue_hd == END_TSO_QUEUE) { 
    205         cap->run_queue_tl = END_TSO_QUEUE; 
     204{ 
     205    if (cap->promoted_run_queue_hd == END_TSO_QUEUE) { 
     206        StgTSO *t = deleteMinPQueue(cap->run_pqueue); 
     207        StgTSO *next = peekMinPQueue(cap->run_pqueue); 
     208        /* if (next != NULL) { 
     209            if (cap->ss_pass < next->ss_pass) { 
     210                cap->ss_pass = next->ss_pass; 
     211            } 
     212        } */                                                                // [EMU] 
     213        return t; 
     214    } else { 
     215        StgTSO *t = cap->promoted_run_queue_hd; 
     216        cap->promoted_run_queue_hd = t->_link; 
     217        t->_link = END_TSO_QUEUE; // no write barrier req'd 
     218        return t; 
    206219    } 
    207     return t; 
    208220} 
    209221 
    210222INLINE_HEADER StgTSO * 
    211223peekRunQueue (Capability *cap) 
    212224{ 
    213     return cap->run_queue_hd; 
     225    if (cap->promoted_run_queue_hd == END_TSO_QUEUE) { 
     226        return peekMinPQueue(cap->run_pqueue); 
     227    } else { 
     228        return cap->promoted_run_queue_hd; 
     229    } 
    214230} 
    215231 
    216232EXTERN_INLINE void 
    217 leaveRunQueue (Capability *cap, StgTSO *tso); 
     233leaveRunQueue(Capability *cap, StgTSO *tso); 
    218234 
    219235EXTERN_INLINE void 
    220 leaveRunQueue (Capability *cap, StgTSO *tso) 
     236leaveRunQueue(Capability *cap, StgTSO *tso) 
    221237{ 
    222238    // XXX implement me 
    223239} 
    224240 
    225 void removeFromRunQueue (Capability *cap, StgTSO *tso); 
    226241extern void promoteInRunQueue (Capability *cap, StgTSO *tso); 
    227242 
    228243/* Add a thread to the end of the blocked queue. 
     
    252267INLINE_HEADER rtsBool 
    253268emptyRunQueue(Capability *cap) 
    254269{ 
    255     return emptyQueue(cap->run_queue_hd); 
     270    return cap->promoted_run_queue_hd == END_TSO_QUEUE && peekMinPQueue(cap->run_pqueue) == NULL; 
    256271} 
    257272 
    258273/* assumes that the queue is not empty; so combine this with 
     
    260275INLINE_HEADER rtsBool 
    261276singletonRunQueue(Capability *cap) 
    262277{ 
    263     ASSERT(!emptyRunQueue(cap)); 
    264     return cap->run_queue_hd->_link == END_TSO_QUEUE; 
     278    if (cap->promoted_run_queue_hd != END_TSO_QUEUE) { 
     279        if (cap->run_pqueue->size != 0) return 0; 
     280        return cap->promoted_run_queue_hd->_link == END_TSO_QUEUE; 
     281    } else { 
     282        return cap->run_pqueue->size == 1; 
     283    } 
    265284} 
    266285 
    267286INLINE_HEADER void 
    268287truncateRunQueue(Capability *cap) 
    269288{ 
    270     cap->run_queue_hd = END_TSO_QUEUE; 
    271     cap->run_queue_tl = END_TSO_QUEUE; 
     289    cap->promoted_run_queue_hd = END_TSO_QUEUE; 
     290    truncatePQueue(cap->run_pqueue); 
    272291} 
    273292 
    274293#if !defined(THREADED_RTS) 
  • rts/Threads.c

    diff --git a/rts/Threads.c b/rts/Threads.c
    index 758d368..a0dfbc7f 100644
    a b  
    113113    tso->trec = NO_TREC; 
    114114 
    115115    tso->ss_tickets = DEFAULT_TICKETS; 
     116    tso->ss_pass = cap->ss_pass; 
    116117 
    117118#ifdef PROFILING 
    118119    tso->prof.cccs = CCS_MAIN; 
     
    844845  for (i = 0; i < n_capabilities; i++) { 
    845846      cap = &capabilities[i]; 
    846847      debugBelch("threads on capability %d:\n", cap->no); 
    847       for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->_link) { 
    848           printThreadStatus(t); 
    849       } 
     848      // XXX do the promoted queue 
     849      iteratePQueue(cap->run_pqueue, printThreadStatus); 
    850850  } 
    851851 
    852852  debugBelch("other threads:\n"); 
  • rts/WSDeque.c

    diff --git a/rts/WSDeque.c b/rts/WSDeque.c
    index 8efd1bb..0de37d2 100644
    a b  
    5050 * newWSDeque 
    5151 * -------------------------------------------------------------------------- */ 
    5252 
    53 /* internal helpers ... */ 
    54  
    55 static StgWord 
    56 roundUp2(StgWord val) 
    57 { 
    58     StgWord rounded = 1; 
    59      
    60     /* StgWord is unsigned anyway, only catch 0 */ 
    61     if (val == 0) { 
    62         barf("DeQue,roundUp2: invalid size 0 requested"); 
    63     } 
    64     /* at least 1 bit set, shift up to its place */ 
    65     do { 
    66         rounded = rounded << 1; 
    67     } while (0 != (val = val>>1)); 
    68     return rounded; 
    69 } 
    70  
    7153WSDeque * 
    7254newWSDeque (nat size) 
    7355{ 
  • rts/sm/Sanity.c

    diff --git a/rts/sm/Sanity.c b/rts/sm/Sanity.c
    index f0e1659..a166035 100644
    a b  
    789789void 
    790790checkRunQueue(Capability *cap) 
    791791{ 
    792     StgTSO *prev, *tso; 
    793     prev = END_TSO_QUEUE; 
    794     for (tso = cap->run_queue_hd; tso != END_TSO_QUEUE;  
    795          prev = tso, tso = tso->_link) { 
    796         ASSERT(prev == END_TSO_QUEUE || prev->_link == tso); 
    797         ASSERT(tso->block_info.prev == prev); 
    798     } 
    799     ASSERT(cap->run_queue_tl == prev); 
     792    // [SSS] no-op 
    800793} 
    801794 
    802795/* ----------------------------------------------------------------------------- 
  • rts/sm/Storage.c

    diff --git a/rts/sm/Storage.c b/rts/sm/Storage.c
    index 518ae0d..d541fff 100644
    a b  
    878878} 
    879879 
    880880void 
    881 setTSOPrev (Capability *cap, StgTSO *tso, StgTSO *target) 
    882 { 
    883     if (tso->dirty == 0) { 
    884         tso->dirty = 1; 
    885         recordClosureMutated(cap,(StgClosure*)tso); 
    886     } 
    887     tso->block_info.prev = target; 
    888 } 
    889  
    890 void 
    891881dirty_TSO (Capability *cap, StgTSO *tso) 
    892882{ 
    893883    if (tso->dirty == 0) {