Ticket #7606: common.patch

File common.patch, 14.4 KB (added by ezyang, 4 months ago)

Common functionality used by both schedulers

  • compiler/prelude/primops.txt.pp

    diff --git a/compiler/prelude/primops.txt.pp b/compiler/prelude/primops.txt.pp
    index 6d551d9..0ff7e2c 100644
    a b  
    18391839   out_of_line = True 
    18401840   has_side_effects = True 
    18411841 
     1842primop  SetTicketsOp "setTickets#" GenPrimOp 
     1843  ThreadId# -> Int# -> State# RealWorld -> State# RealWorld 
     1844  with 
     1845  out_of_line = True 
     1846  has_side_effects = True 
     1847 
     1848primop  GetTicketsOp "getTickets#" GenPrimOp 
     1849  ThreadId# -> State# RealWorld -> (# State# RealWorld, Int# #) 
     1850  with 
     1851  out_of_line = True 
     1852  has_side_effects = True 
     1853 
     1854primop  ModifyTicketsOp "modifyTickets#" GenPrimOp 
     1855  ThreadId# -> Int# -> Int# -> Int# -> State# RealWorld -> (# State# RealWorld, Int# #) 
     1856  with 
     1857  out_of_line = True 
     1858  has_side_effects = True 
     1859 
    18421860------------------------------------------------------------------------ 
    18431861section "Weak pointers" 
    18441862------------------------------------------------------------------------ 
  • includes/rts/storage/TSO.h

    diff --git a/includes/rts/storage/TSO.h b/includes/rts/storage/TSO.h
    index 82f5a75..20a67a2 100644
    a b  
    168168     */ 
    169169    StgWord32  tot_stack_size; 
    170170 
     171    // These are bounded above by STRIDE1, which is less than a max 32-bit word. 
     172    StgWord32 ss_tickets; 
     173    // 64-bit to prevent overflows; only ever accessed by the task which owns TSO. 
     174    StgWord64 ss_pass; 
     175 
    171176} *StgTSOPtr; 
    172177 
    173178typedef struct StgStack_ { 
  • includes/stg/MiscClosures.h

    diff --git a/includes/stg/MiscClosures.h b/includes/stg/MiscClosures.h
    index 61e6b09..e6d5fcd 100644
    a b  
    402402RTS_FUN_DECL(stg_unmaskAsyncExceptionszh); 
    403403RTS_FUN_DECL(stg_myThreadIdzh); 
    404404RTS_FUN_DECL(stg_labelThreadzh); 
     405RTS_FUN_DECL(stg_getTicketszh); 
     406RTS_FUN_DECL(stg_setTicketszh); 
     407RTS_FUN_DECL(stg_modifyTicketszh); 
    405408RTS_FUN_DECL(stg_isCurrentThreadBoundzh); 
    406409RTS_FUN_DECL(stg_threadStatuszh); 
    407410 
  • rts/Capability.h

    diff --git a/rts/Capability.h b/rts/Capability.h
    index 3348f88..81322c8 100644
    a b  
    5858    StgTSO *run_queue_hd; 
    5959    StgTSO *run_queue_tl; 
    6060 
     61    // [SSS] Stride scheduling extensions.  The Task with this 
     62    // Capability has exclusive access to this variable. 
     63    StgWord64 ss_pass; 
     64 
    6165    // Tasks currently making safe foreign calls.  Doubly-linked. 
    6266    // When returning, a task first acquires the Capability before 
    6367    // removing itself from this list, so that the GC can find all 
  • rts/Linker.c

    diff --git a/rts/Linker.c b/rts/Linker.c
    index fa1de89..c673bac 100644
    a b  
    11301130      SymI_HasProto(stg_mkApUpd0zh)                                     \ 
    11311131      SymI_HasProto(stg_myThreadIdzh)                                   \ 
    11321132      SymI_HasProto(stg_labelThreadzh)                                  \ 
     1133      SymI_HasProto(stg_getTicketszh)                                   \ 
     1134      SymI_HasProto(stg_setTicketszh)                                   \ 
     1135      SymI_HasProto(stg_modifyTicketszh)                                \ 
    11331136      SymI_HasProto(stg_newArrayzh)                                     \ 
    11341137      SymI_HasProto(stg_newArrayArrayzh)                                \ 
    11351138      SymI_HasProto(stg_newBCOzh)                                       \ 
  • rts/PrimOps.cmm

    diff --git a/rts/PrimOps.cmm b/rts/PrimOps.cmm
    index ebcee6a..a737508 100644
    a b  
    628628  return (); 
    629629} 
    630630 
     631stg_setTicketszh ( gcptr threadid, W_ n ) 
     632{ 
     633  ccall setTickets(threadid "ptr", n); 
     634  return (); 
     635} 
     636 
     637stg_getTicketszh ( gcptr threadid ) 
     638{ 
     639  W_ r; 
     640  (r) = ccall getTickets(threadid "ptr"); 
     641  return (r); 
     642} 
     643 
     644stg_modifyTicketszh ( gcptr threadid, W_ n, W_ d, W_ x ) 
     645{ 
     646  W_ r; 
     647  (r) = ccall modifyTickets(threadid "ptr", n, d, x); 
     648  return (r); 
     649} 
     650 
    631651stg_isCurrentThreadBoundzh (/* no args */) 
    632652{ 
    633653  W_ r; 
  • rts/RaiseAsync.c

    diff --git a/rts/RaiseAsync.c b/rts/RaiseAsync.c
    index f5669cb..a32749c 100644
    a b  
    683683 
    684684 done: 
    685685  tso->why_blocked = NotBlocked; 
    686   appendToRunQueue(cap, tso); 
     686  joinRunQueue(cap, tso); 
    687687} 
    688688 
    689689/* ----------------------------------------------------------------------------- 
     
    10451045    // wake it up 
    10461046    if (tso->why_blocked != NotBlocked) { 
    10471047        tso->why_blocked = NotBlocked; 
    1048         appendToRunQueue(cap,tso); 
     1048        joinRunQueue(cap,tso); 
    10491049    }         
    10501050 
    10511051    return tso; 
  • rts/Schedule.c

    diff --git a/rts/Schedule.c b/rts/Schedule.c
    index a21b312..3de00c5 100644
    a b  
    148148static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ); 
    149149static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
    150150                                    nat prev_what_next ); 
    151 static void scheduleHandleThreadBlocked( StgTSO *t ); 
     151static void scheduleHandleThreadBlocked( Capability *cap, StgTSO *t ); 
    152152static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task, 
    153153                                             StgTSO *t ); 
    154154static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc); 
     
    537537        break; 
    538538 
    539539    case ThreadBlocked: 
    540         scheduleHandleThreadBlocked(t); 
     540        scheduleHandleThreadBlocked(cap, t); 
    541541        break; 
    542542 
    543543    case ThreadFinished: 
     
    781781                    setTSOPrev(cap, t, prev); 
    782782                    prev = t; 
    783783                } else { 
    784                     appendToRunQueue(free_caps[i],t); 
     784                    leaveRunQueue(cap,t); 
     785                    joinRunQueue(free_caps[i],t); 
    785786 
    786787                    traceEventMigrateThread (cap, t, free_caps[i]->no); 
    787788 
     
    12171218 * -------------------------------------------------------------------------- */ 
    12181219 
    12191220static void 
    1220 scheduleHandleThreadBlocked( StgTSO *t 
    1221 #if !defined(DEBUG) 
    1222     STG_UNUSED 
    1223 #endif 
    1224     ) 
     1221scheduleHandleThreadBlocked( Capability *cap, StgTSO *t ) 
    12251222{ 
    12261223 
    12271224      // We don't need to do anything.  The thread is blocked, and it 
     
    12341231    //      threadPaused() might have raised a blocked throwTo 
    12351232    //      exception, see maybePerformBlockedException(). 
    12361233 
     1234    leaveRunQueue(cap, t); 
     1235 
    12371236#ifdef DEBUG 
    12381237    traceThreadStatus(DEBUG_sched, t); 
    12391238#endif 
     
    12571256    // blocked mode (see #2910). 
    12581257    awakenBlockedExceptionQueue (cap, t); 
    12591258 
     1259    leaveRunQueue(cap, t); 
     1260 
    12601261      // 
    12611262      // Check whether the thread that just completed was a bound 
    12621263      // thread, and if so return with the result.   
     
    12771278              // queue also ensures that the garbage collector knows about 
    12781279              // this thread and its return value (it gets dropped from the 
    12791280              // step->threads list so there's no other way to find it). 
    1280               appendToRunQueue(cap,t); 
     1281              joinRunQueue(cap,t); 
    12811282              return rtsFalse; 
    12821283#else 
    12831284              // this cannot happen in the threaded RTS, because a 
     
    22962297{ 
    22972298    // The thread goes at the *end* of the run-queue, to avoid possible 
    22982299    // starvation of any threads already on the queue. 
    2299     appendToRunQueue(cap,tso); 
     2300    joinRunQueue(cap,tso); 
    23002301} 
    23012302 
    23022303void 
     
    23072308#if defined(THREADED_RTS) 
    23082309    cpu %= enabled_capabilities; 
    23092310    if (cpu == cap->no) { 
    2310         appendToRunQueue(cap,tso); 
     2311        joinRunQueue(cap,tso); 
    23112312    } else { 
    23122313        migrateThread(cap, tso, &capabilities[cpu]); 
    23132314    } 
    23142315#else 
    2315     appendToRunQueue(cap,tso); 
     2316    joinRunQueue(cap,tso); 
    23162317#endif 
    23172318} 
    23182319 
     
    23372338    task->incall->ret = ret; 
    23382339    task->incall->stat = NoStatus; 
    23392340 
    2340     appendToRunQueue(cap,tso); 
     2341    joinRunQueue(cap,tso); 
    23412342 
    23422343    DEBUG_ONLY( id = tso->id ); 
    23432344    debugTrace(DEBUG_sched, "new bound thread (%lu)", (unsigned long)id); 
  • rts/Schedule.h

    diff --git a/rts/Schedule.h b/rts/Schedule.h
    index 8b7caea..e4425af 100644
    a b  
    116116 
    117117void resurrectThreads (StgTSO *); 
    118118 
     119// STRIDE1 defines the maximum resolution we can achieve in scheduling. 
     120#define STRIDE1 (1 << 20) 
     121// Defualt tickets is set to STRIDE1, so that the IO manager gets 
     122// maximum priority. 
     123#define DEFAULT_TICKETS (1 << 20) 
     124 
    119125/* ----------------------------------------------------------------------------- 
    120126 * Some convenient macros/inline functions... 
    121127 */ 
     
    145151    cap->run_queue_tl = tso; 
    146152} 
    147153 
     154EXTERN_INLINE void 
     155joinRunQueue(Capability *cap, StgTSO *tso); 
     156 
     157EXTERN_INLINE void 
     158joinRunQueue(Capability *cap, StgTSO *tso) 
     159{ 
     160    appendToRunQueue(cap, tso); 
     161} 
     162 
    148163/* Push a thread on the beginning of the run queue. 
    149164 * ASSUMES: cap->running_task is the current task. 
    150165 */ 
     
    165180    } 
    166181} 
    167182 
     183EXTERN_INLINE void 
     184fastJoinRunQueue(Capability *cap, StgTSO *tso); 
     185 
     186EXTERN_INLINE void 
     187fastJoinRunQueue(Capability *cap, StgTSO *tso) 
     188{ 
     189    pushOnRunQueue(cap, tso); 
     190} 
     191 
    168192/* Pop the first thread off the runnable queue. 
    169193 */ 
    170194INLINE_HEADER StgTSO * 
     
    189213    return cap->run_queue_hd; 
    190214} 
    191215 
     216EXTERN_INLINE void 
     217leaveRunQueue (Capability *cap, StgTSO *tso); 
     218 
     219EXTERN_INLINE void 
     220leaveRunQueue (Capability *cap, StgTSO *tso) 
     221{ 
     222    // XXX implement me 
     223} 
     224 
    192225void removeFromRunQueue (Capability *cap, StgTSO *tso); 
    193226extern void promoteInRunQueue (Capability *cap, StgTSO *tso); 
    194227 
  • rts/Sparks.c

    diff --git a/rts/Sparks.c b/rts/Sparks.c
    index 4241656..4e9b5a5 100644
    a b  
    4545 
    4646    traceEventCreateSparkThread(cap, tso->id); 
    4747 
    48     appendToRunQueue(cap,tso); 
     48    joinRunQueue(cap,tso); 
    4949} 
    5050 
    5151/* -------------------------------------------------------------------------- 
  • rts/Threads.c

    diff --git a/rts/Threads.c b/rts/Threads.c
    index b617616..758d368 100644
    a b  
    112112 
    113113    tso->trec = NO_TREC; 
    114114 
     115    tso->ss_tickets = DEFAULT_TICKETS; 
     116 
    115117#ifdef PROFILING 
    116118    tso->prof.cccs = CCS_MAIN; 
    117119#endif 
     
    136138} 
    137139 
    138140/* --------------------------------------------------------------------------- 
     141 * Ticket allocations on threads 
     142 * ------------------------------------------------------------------------ */ 
     143 
     144#define TICKET_ERROR (STRIDE1 + 1) 
     145 
     146void 
     147setTickets(StgTSO *tso, W_ tickets) 
     148{ 
     149    if (tickets > STRIDE1) { 
     150        barf("setTickets: too many tickets"); 
     151    } else if (tickets <= 0) { 
     152        barf("setTickets: too few tickets"); 
     153    } 
     154    ACQUIRE_LOCK(&sched_mutex); 
     155    tso->ss_tickets = tickets; 
     156    RELEASE_LOCK(&sched_mutex); 
     157} 
     158 
     159W_ 
     160modifyTickets(StgTSO *tso, W_ n, W_ d, W_ x) 
     161{ 
     162    ACQUIRE_LOCK(&sched_mutex); 
     163    W_ tickets = (tso->ss_tickets * n) / d + x; 
     164    W_ delta; 
     165    if (tickets > STRIDE1 || tickets <= 0) { 
     166        delta = TICKET_ERROR; 
     167        goto cleanup; 
     168    } 
     169    delta = tso->ss_tickets - tickets; 
     170    tso->ss_tickets = tickets; 
     171cleanup: 
     172    RELEASE_LOCK(&sched_mutex); 
     173    return delta; 
     174} 
     175 
     176W_ 
     177getTickets(StgTSO *tso) 
     178{ 
     179    return tso->ss_tickets; 
     180} 
     181 
     182/* --------------------------------------------------------------------------- 
    139183 * Comparing Thread ids. 
    140184 * 
    141185 * This is used from STG land in the implementation of the 
     
    296340    // just run the thread now, if the BH is not really available, 
    297341    // we'll block again. 
    298342    tso->why_blocked = NotBlocked; 
    299     appendToRunQueue(cap,tso); 
     343    joinRunQueue(cap, tso); 
    300344 
    301345    // We used to set the context switch flag here, which would 
    302346    // trigger a context switch a short time in the future (at the end 
     
    322366    traceEventMigrateThread (from, tso, to->no); 
    323367    // ThreadMigrating tells the target cap that it needs to be added to 
    324368    // the run queue when it receives the MSG_TRY_WAKEUP. 
     369    leaveRunQueue(from, tso); 
    325370    tso->why_blocked = ThreadMigrating; 
    326371    tso->cap = to; 
    327372    tryWakeupThread(from, tso); 
  • rts/Threads.h

    diff --git a/rts/Threads.h b/rts/Threads.h
    index 6d26610..39794f3 100644
    a b  
    1313 
    1414#define END_BLOCKED_EXCEPTIONS_QUEUE ((MessageThrowTo*)END_TSO_QUEUE) 
    1515 
     16void setTickets(StgTSO *tso, W_ n); 
     17W_ modifyTickets(StgTSO *tso, W_ n, W_ d, W_ x); 
     18W_ getTickets(StgTSO *tso); 
     19 
    1620StgTSO * unblockOne (Capability *cap, StgTSO *tso); 
    1721StgTSO * unblockOne_ (Capability *cap, StgTSO *tso, rtsBool allow_migrate); 
    1822 
  • rts/posix/Select.c

    diff --git a/rts/posix/Select.c b/rts/posix/Select.c
    index 3d92a46..b39338b 100644
    a b  
    107107        tso->_link = END_TSO_QUEUE; 
    108108        IF_DEBUG(scheduler,debugBelch("Waking up sleeping thread %lu\n", (unsigned long)tso->id)); 
    109109        // MainCapability: this code is !THREADED_RTS 
    110         pushOnRunQueue(&MainCapability,tso); 
     110        fastJoinRunQueue(&MainCapability,tso); 
    111111        flag = rtsTrue; 
    112112    } 
    113113    return flag; 
     
    305305                IF_DEBUG(scheduler,debugBelch("Waking up blocked thread %lu\n", (unsigned long)tso->id)); 
    306306                  tso->why_blocked = NotBlocked; 
    307307                  tso->_link = END_TSO_QUEUE; 
    308                   pushOnRunQueue(&MainCapability,tso); 
     308                  fastJoinRunQueue(&MainCapability,tso); 
    309309              } else { 
    310310                  if (prev == NULL) 
    311311                      blocked_queue_hd = tso; 
  • rts/win32/AsyncIO.c

    diff --git a/rts/win32/AsyncIO.c b/rts/win32/AsyncIO.c
    index 979df0c..cfd2c0f 100644
    a b  
    303303                        // save the StgAsyncIOResult in the 
    304304                        // stg_block_async_info stack frame, because 
    305305                        // the block_info field will be overwritten by 
    306                         // pushOnRunQueue(). 
     306                        // fastJoinRunQueue(). 
    307307                        tso->stackobj->sp[1] = (W_)tso->block_info.async_result; 
    308                         pushOnRunQueue(&MainCapability, tso); 
     308                        fastJoinRunQueue(&MainCapability, tso); 
    309309                        break; 
    310310                    } 
    311311                    break;