root/rts/RaiseAsync.c

Revision 014f1e1feee4c85a82f787ef8f01b44072051172, 33.8 KB (checked in by Simon Marlow <marlowsd@…>, 3 months ago)

raiseAsync: cope with ATOMICALLY_FRAMES inside UPDATE_FRAMES (#5866)

  • Property mode set to 100644
Line 
1/* ---------------------------------------------------------------------------
2 *
3 * (c) The GHC Team, 1998-2006
4 *
5 * Asynchronous exceptions
6 *
7 * --------------------------------------------------------------------------*/
8
9#include "PosixSource.h"
10#include "Rts.h"
11
12#include "sm/Storage.h"
13#include "Threads.h"
14#include "Trace.h"
15#include "RaiseAsync.h"
16#include "Schedule.h"
17#include "Updates.h"
18#include "STM.h"
19#include "sm/Sanity.h"
20#include "Profiling.h"
21#include "Messages.h"
22#if defined(mingw32_HOST_OS)
23#include "win32/IOManager.h"
24#endif
25
26static StgTSO* raiseAsync (Capability *cap,
27                           StgTSO *tso,
28                           StgClosure *exception,
29                           rtsBool stop_at_atomically,
30                           StgUpdateFrame *stop_here);
31
32static void removeFromQueues(Capability *cap, StgTSO *tso);
33
34static void removeFromMVarBlockedQueue (StgTSO *tso);
35
36static void blockedThrowTo (Capability *cap, 
37                            StgTSO *target, MessageThrowTo *msg);
38
39static void throwToSendMsg (Capability *cap USED_IF_THREADS,
40                            Capability *target_cap USED_IF_THREADS, 
41                            MessageThrowTo *msg USED_IF_THREADS);
42
43/* -----------------------------------------------------------------------------
44   throwToSingleThreaded
45
46   This version of throwTo is safe to use if and only if one of the
47   following holds:
48   
49     - !THREADED_RTS
50
51     - all the other threads in the system are stopped (eg. during GC).
52
53     - we surely own the target TSO (eg. we just took it from the
54       run queue of the current capability, or we are running it).
55
56   It doesn't cater for blocking the source thread until the exception
57   has been raised.
58   -------------------------------------------------------------------------- */
59
60static void
61throwToSingleThreaded__ (Capability *cap, StgTSO *tso, StgClosure *exception, 
62                         rtsBool stop_at_atomically, StgUpdateFrame *stop_here)
63{
64    // Thread already dead?
65    if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
66        return;
67    }
68
69    // Remove it from any blocking queues
70    removeFromQueues(cap,tso);
71
72    raiseAsync(cap, tso, exception, stop_at_atomically, stop_here);
73}
74
75void
76throwToSingleThreaded (Capability *cap, StgTSO *tso, StgClosure *exception)
77{
78    throwToSingleThreaded__(cap, tso, exception, rtsFalse, NULL);
79}
80
81void
82throwToSingleThreaded_ (Capability *cap, StgTSO *tso, StgClosure *exception,
83                        rtsBool stop_at_atomically)
84{
85    throwToSingleThreaded__ (cap, tso, exception, stop_at_atomically, NULL);
86}
87
88void // cannot return a different TSO
89suspendComputation (Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here)
90{
91    throwToSingleThreaded__ (cap, tso, NULL, rtsFalse, stop_here);
92}
93
94/* -----------------------------------------------------------------------------
95   throwTo
96
97   This function may be used to throw an exception from one thread to
98   another, during the course of normal execution.  This is a tricky
99   task: the target thread might be running on another CPU, or it
100   may be blocked and could be woken up at any point by another CPU.
101   We have some delicate synchronisation to do.
102
103   The underlying scheme when multiple Capabilities are in use is
104   message passing: when the target of a throwTo is on another
105   Capability, we send a message (a MessageThrowTo closure) to that
106   Capability.
107
108   If the throwTo needs to block because the target TSO is masking
109   exceptions (the TSO_BLOCKEX flag), then the message is placed on
110   the blocked_exceptions queue attached to the target TSO.  When the
111   target TSO enters the unmasked state again, it must check the
112   queue.  The blocked_exceptions queue is not locked; only the
113   Capability owning the TSO may modify it.
114
115   To make things simpler for throwTo, we always create the message
116   first before deciding what to do.  The message may get sent, or it
117   may get attached to a TSO's blocked_exceptions queue, or the
118   exception may get thrown immediately and the message dropped,
119   depending on the current state of the target.
120
121   Currently we send a message if the target belongs to another
122   Capability, and it is
123
124     - NotBlocked, BlockedOnMsgThrowTo,
125       BlockedOnCCall_Interruptible
126
127     - or it is masking exceptions (TSO_BLOCKEX)
128
129   Currently, if the target is BlockedOnMVar, BlockedOnSTM, or
130   BlockedOnBlackHole then we acquire ownership of the TSO by locking
131   its parent container (e.g. the MVar) and then raise the exception.
132   We might change these cases to be more message-passing-like in the
133   future.
134 
135   Returns:
136
137   NULL               exception was raised, ok to continue
138
139   MessageThrowTo *   exception was not raised; the source TSO
140                      should now put itself in the state
141                      BlockedOnMsgThrowTo, and when it is ready
142                      it should unlock the mssage using
143                      unlockClosure(msg, &stg_MSG_THROWTO_info);
144                      If it decides not to raise the exception after
145                      all, it can revoke it safely with
146                      unlockClosure(msg, &stg_MSG_NULL_info);
147
148   -------------------------------------------------------------------------- */
149
150MessageThrowTo *
151throwTo (Capability *cap,       // the Capability we hold
152         StgTSO *source,        // the TSO sending the exception (or NULL)
153         StgTSO *target,        // the TSO receiving the exception
154         StgClosure *exception) // the exception closure
155{
156    MessageThrowTo *msg;
157
158    msg = (MessageThrowTo *) allocate(cap, sizeofW(MessageThrowTo));
159    // message starts locked; the caller has to unlock it when it is
160    // ready.
161    SET_HDR(msg, &stg_WHITEHOLE_info, CCS_SYSTEM);
162    msg->source      = source;
163    msg->target      = target;
164    msg->exception   = exception;
165
166    switch (throwToMsg(cap, msg))
167    {
168    case THROWTO_SUCCESS:
169        return NULL;
170    case THROWTO_BLOCKED:
171    default:
172        return msg;
173    }
174}
175   
176
177nat
178throwToMsg (Capability *cap, MessageThrowTo *msg)
179{
180    StgWord status;
181    StgTSO *target = msg->target;
182    Capability *target_cap;
183
184    goto check_target;
185
186retry:
187    write_barrier();
188    debugTrace(DEBUG_sched, "throwTo: retrying...");
189
190check_target:
191    ASSERT(target != END_TSO_QUEUE);
192
193    // Thread already dead?
194    if (target->what_next == ThreadComplete
195        || target->what_next == ThreadKilled) {
196        return THROWTO_SUCCESS;
197    }
198
199    debugTraceCap(DEBUG_sched, cap,
200                  "throwTo: from thread %lu to thread %lu",
201                  (unsigned long)msg->source->id, 
202                  (unsigned long)msg->target->id);
203
204#ifdef DEBUG
205    traceThreadStatus(DEBUG_sched, target);
206#endif
207
208    target_cap = target->cap;
209    if (target->cap != cap) {
210        throwToSendMsg(cap, target_cap, msg);
211        return THROWTO_BLOCKED;
212    }
213
214    status = target->why_blocked;
215   
216    switch (status) {
217    case NotBlocked:
218    {
219        if ((target->flags & TSO_BLOCKEX) == 0) {
220            // It's on our run queue and not blocking exceptions
221            raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
222            return THROWTO_SUCCESS;
223        } else {
224            blockedThrowTo(cap,target,msg);
225            return THROWTO_BLOCKED;
226        }
227    }
228
229    case BlockedOnMsgThrowTo:
230    {
231        const StgInfoTable *i;
232        MessageThrowTo *m;
233
234        m = target->block_info.throwto;
235
236        // target is local to this cap, but has sent a throwto
237        // message to another cap.
238        //
239        // The source message is locked.  We need to revoke the
240        // target's message so that we can raise the exception, so
241        // we attempt to lock it.
242
243        // There's a possibility of a deadlock if two threads are both
244        // trying to throwTo each other (or more generally, a cycle of
245        // threads).  To break the symmetry we compare the addresses
246        // of the MessageThrowTo objects, and the one for which m <
247        // msg gets to spin, while the other can only try to lock
248        // once, but must then back off and unlock both before trying
249        // again.
250        if (m < msg) {
251            i = lockClosure((StgClosure *)m);
252        } else {
253            i = tryLockClosure((StgClosure *)m);
254            if (i == NULL) {
255//            debugBelch("collision\n");
256                throwToSendMsg(cap, target->cap, msg);
257                return THROWTO_BLOCKED;
258            }
259        }
260
261        if (i == &stg_MSG_NULL_info) {
262            // we know there's a MSG_TRY_WAKEUP on the way, so we
263            // might as well just do it now.  The message will
264            // be a no-op when it arrives.
265            unlockClosure((StgClosure*)m, i);
266            tryWakeupThread(cap, target);
267            goto retry;
268        }
269
270        if (i != &stg_MSG_THROWTO_info) {
271            // if it's a MSG_NULL, this TSO has been woken up by another Cap
272            unlockClosure((StgClosure*)m, i);
273            goto retry;
274        }
275
276        if ((target->flags & TSO_BLOCKEX) &&
277            ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
278            unlockClosure((StgClosure*)m, i);
279            blockedThrowTo(cap,target,msg);
280            return THROWTO_BLOCKED;
281        }
282
283        // nobody else can wake up this TSO after we claim the message
284        doneWithMsgThrowTo(m);
285
286        raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
287        return THROWTO_SUCCESS;
288    }
289
290    case BlockedOnMVar:
291    {
292        /*
293          To establish ownership of this TSO, we need to acquire a
294          lock on the MVar that it is blocked on.
295        */
296        StgMVar *mvar;
297        StgInfoTable *info USED_IF_THREADS;
298       
299        mvar = (StgMVar *)target->block_info.closure;
300
301        // ASSUMPTION: tso->block_info must always point to a
302        // closure.  In the threaded RTS it does.
303        switch (get_itbl(mvar)->type) {
304        case MVAR_CLEAN:
305        case MVAR_DIRTY:
306            break;
307        default:
308            goto retry;
309        }
310
311        info = lockClosure((StgClosure *)mvar);
312
313        // we have the MVar, let's check whether the thread
314        // is still blocked on the same MVar.
315        if (target->why_blocked != BlockedOnMVar
316            || (StgMVar *)target->block_info.closure != mvar) {
317            unlockClosure((StgClosure *)mvar, info);
318            goto retry;
319        }
320
321        if (target->_link == END_TSO_QUEUE) {
322            // the MVar operation has already completed.  There is a
323            // MSG_TRY_WAKEUP on the way, but we can just wake up the
324            // thread now anyway and ignore the message when it
325            // arrives.
326            unlockClosure((StgClosure *)mvar, info);
327            tryWakeupThread(cap, target);
328            goto retry;
329        }
330
331        if ((target->flags & TSO_BLOCKEX) &&
332            ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
333            blockedThrowTo(cap,target,msg);
334            unlockClosure((StgClosure *)mvar, info);
335            return THROWTO_BLOCKED;
336        } else {
337            // revoke the MVar operation
338            removeFromMVarBlockedQueue(target);
339            raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
340            unlockClosure((StgClosure *)mvar, info);
341            return THROWTO_SUCCESS;
342        }
343    }
344
345    case BlockedOnBlackHole:
346    {
347        if (target->flags & TSO_BLOCKEX) {
348            // BlockedOnBlackHole is not interruptible.
349            blockedThrowTo(cap,target,msg);
350            return THROWTO_BLOCKED;
351        } else {
352            // Revoke the message by replacing it with IND. We're not
353            // locking anything here, so we might still get a TRY_WAKEUP
354            // message from the owner of the blackhole some time in the
355            // future, but that doesn't matter.
356            ASSERT(target->block_info.bh->header.info == &stg_MSG_BLACKHOLE_info);
357            OVERWRITE_INFO(target->block_info.bh, &stg_IND_info);
358            raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
359            return THROWTO_SUCCESS;
360        }
361    }
362
363    case BlockedOnSTM:
364        lockTSO(target);
365        // Unblocking BlockedOnSTM threads requires the TSO to be
366        // locked; see STM.c:unpark_tso().
367        if (target->why_blocked != BlockedOnSTM) {
368            unlockTSO(target);
369            goto retry;
370        }
371        if ((target->flags & TSO_BLOCKEX) &&
372            ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
373            blockedThrowTo(cap,target,msg);
374            unlockTSO(target);
375            return THROWTO_BLOCKED;
376        } else {
377            raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
378            unlockTSO(target);
379            return THROWTO_SUCCESS;
380        }
381
382    case BlockedOnCCall_Interruptible:
383#ifdef THREADED_RTS
384    {
385        Task *task = NULL;
386        // walk suspended_ccalls to find the correct worker thread
387        InCall *incall;
388        for (incall = cap->suspended_ccalls; incall != NULL; incall = incall->next) {
389            if (incall->suspended_tso == target) {
390                task = incall->task;
391                break;
392            }
393        }
394        if (task != NULL) {
395            blockedThrowTo(cap, target, msg);
396            if (!((target->flags & TSO_BLOCKEX) &&
397                  ((target->flags & TSO_INTERRUPTIBLE) == 0))) {
398                interruptWorkerTask(task);
399            }
400            return THROWTO_BLOCKED;
401        } else {
402            debugTraceCap(DEBUG_sched, cap, "throwTo: could not find worker thread to kill");
403        }
404        // fall to next
405    }
406#endif
407    case BlockedOnCCall:
408        blockedThrowTo(cap,target,msg);
409        return THROWTO_BLOCKED;
410
411#ifndef THREADEDED_RTS
412    case BlockedOnRead:
413    case BlockedOnWrite:
414    case BlockedOnDelay:
415#if defined(mingw32_HOST_OS)
416    case BlockedOnDoProc:
417#endif
418        if ((target->flags & TSO_BLOCKEX) &&
419            ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
420            blockedThrowTo(cap,target,msg);
421            return THROWTO_BLOCKED;
422        } else {
423            removeFromQueues(cap,target);
424            raiseAsync(cap, target, msg->exception, rtsFalse, NULL);
425            return THROWTO_SUCCESS;
426        }
427#endif
428
429    case ThreadMigrating:
430        // if is is ThreadMigrating and tso->cap is ours, then it
431        // *must* be migrating *to* this capability.  If it were
432        // migrating away from the capability, then tso->cap would
433        // point to the destination.
434        //
435        // There is a MSG_WAKEUP in the message queue for this thread,
436        // but we can just do it preemptively:
437        tryWakeupThread(cap, target);
438        // and now retry, the thread should be runnable.
439        goto retry;
440
441    default:
442        barf("throwTo: unrecognised why_blocked (%d)", target->why_blocked);
443    }
444    barf("throwTo");
445}
446
447static void
448throwToSendMsg (Capability *cap STG_UNUSED,
449                Capability *target_cap USED_IF_THREADS, 
450                MessageThrowTo *msg USED_IF_THREADS)
451           
452{
453#ifdef THREADED_RTS
454    debugTraceCap(DEBUG_sched, cap, "throwTo: sending a throwto message to cap %lu", (unsigned long)target_cap->no);
455
456    sendMessage(cap, target_cap, (Message*)msg);
457#endif
458}
459
460// Block a throwTo message on the target TSO's blocked_exceptions
461// queue.  The current Capability must own the target TSO in order to
462// modify the blocked_exceptions queue.
463static void
464blockedThrowTo (Capability *cap, StgTSO *target, MessageThrowTo *msg)
465{
466    debugTraceCap(DEBUG_sched, cap, "throwTo: blocking on thread %lu",
467                  (unsigned long)target->id);
468
469    ASSERT(target->cap == cap);
470
471    msg->link = target->blocked_exceptions;
472    target->blocked_exceptions = msg;
473    dirty_TSO(cap,target); // we modified the blocked_exceptions queue
474}
475
476/* -----------------------------------------------------------------------------
477   Waking up threads blocked in throwTo
478
479   There are two ways to do this: maybePerformBlockedException() will
480   perform the throwTo() for the thread at the head of the queue
481   immediately, and leave the other threads on the queue.
482   maybePerformBlockedException() also checks the TSO_BLOCKEX flag
483   before raising an exception.
484
485   awakenBlockedExceptionQueue() will wake up all the threads in the
486   queue, but not perform any throwTo() immediately.  This might be
487   more appropriate when the target thread is the one actually running
488   (see Exception.cmm).
489
490   Returns: non-zero if an exception was raised, zero otherwise.
491   -------------------------------------------------------------------------- */
492
493int
494maybePerformBlockedException (Capability *cap, StgTSO *tso)
495{
496    MessageThrowTo *msg;
497    const StgInfoTable *i;
498    StgTSO *source;
499
500    if (tso->what_next == ThreadComplete || tso->what_next == ThreadFinished) {
501        if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
502            awakenBlockedExceptionQueue(cap,tso);
503            return 1;
504        } else {
505            return 0;
506        }
507    }
508
509    if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE && 
510        (tso->flags & TSO_BLOCKEX) != 0) {
511        debugTraceCap(DEBUG_sched, cap, "throwTo: thread %lu has blocked exceptions but is inside block", (unsigned long)tso->id);
512    }
513
514    if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE
515        && ((tso->flags & TSO_BLOCKEX) == 0
516            || ((tso->flags & TSO_INTERRUPTIBLE) && interruptible(tso)))) {
517
518        // We unblock just the first thread on the queue, and perform
519        // its throw immediately.
520    loop:
521        msg = tso->blocked_exceptions;
522        if (msg == END_BLOCKED_EXCEPTIONS_QUEUE) return 0;
523        i = lockClosure((StgClosure*)msg);
524        tso->blocked_exceptions = (MessageThrowTo*)msg->link;
525        if (i == &stg_MSG_NULL_info) {
526            unlockClosure((StgClosure*)msg,i);
527            goto loop;
528        }
529
530        throwToSingleThreaded(cap, msg->target, msg->exception);
531        source = msg->source;
532        doneWithMsgThrowTo(msg);
533        tryWakeupThread(cap, source);
534        return 1;
535    }
536    return 0;
537}
538
539// awakenBlockedExceptionQueue(): Just wake up the whole queue of
540// blocked exceptions.
541
542void
543awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso)
544{
545    MessageThrowTo *msg;
546    const StgInfoTable *i;
547    StgTSO *source;
548
549    for (msg = tso->blocked_exceptions; msg != END_BLOCKED_EXCEPTIONS_QUEUE;
550         msg = (MessageThrowTo*)msg->link) {
551        i = lockClosure((StgClosure *)msg);
552        if (i != &stg_MSG_NULL_info) {
553            source = msg->source;
554            doneWithMsgThrowTo(msg);
555            tryWakeupThread(cap, source);
556        } else {
557            unlockClosure((StgClosure *)msg,i);
558        }
559    }
560    tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE;
561}   
562
563/* -----------------------------------------------------------------------------
564   Remove a thread from blocking queues.
565
566   This is for use when we raise an exception in another thread, which
567   may be blocked.
568
569   Precondition: we have exclusive access to the TSO, via the same set
570   of conditions as throwToSingleThreaded() (c.f.).
571   -------------------------------------------------------------------------- */
572
573static void
574removeFromMVarBlockedQueue (StgTSO *tso)
575{
576    StgMVar *mvar = (StgMVar*)tso->block_info.closure;
577    StgMVarTSOQueue *q = (StgMVarTSOQueue*)tso->_link;
578
579    if (q == (StgMVarTSOQueue*)END_TSO_QUEUE) {
580        // already removed from this MVar
581        return;
582    }
583
584    // Assume the MVar is locked. (not assertable; sometimes it isn't
585    // actually WHITEHOLE'd).
586
587    // We want to remove the MVAR_TSO_QUEUE object from the queue.  It
588    // isn't doubly-linked so we can't actually remove it; instead we
589    // just overwrite it with an IND if possible and let the GC short
590    // it out.  However, we have to be careful to maintain the deque
591    // structure:
592
593    if (mvar->head == q) {
594        mvar->head = q->link;
595        OVERWRITE_INFO(q, &stg_IND_info);
596        if (mvar->tail == q) {
597            mvar->tail = (StgMVarTSOQueue*)END_TSO_QUEUE;
598        }
599    }
600    else if (mvar->tail == q) {
601        // we can't replace it with an IND in this case, because then
602        // we lose the tail pointer when the GC shorts out the IND.
603        // So we use MSG_NULL as a kind of non-dupable indirection;
604        // these are ignored by takeMVar/putMVar.
605        OVERWRITE_INFO(q, &stg_MSG_NULL_info);
606    }
607    else {
608        OVERWRITE_INFO(q, &stg_IND_info);
609    }
610
611    // revoke the MVar operation
612    tso->_link = END_TSO_QUEUE;
613}
614
615static void
616removeFromQueues(Capability *cap, StgTSO *tso)
617{
618  switch (tso->why_blocked) {
619
620  case NotBlocked:
621  case ThreadMigrating:
622      return;
623
624  case BlockedOnSTM:
625    // Be careful: nothing to do here!  We tell the scheduler that the
626    // thread is runnable and we leave it to the stack-walking code to
627    // abort the transaction while unwinding the stack.  We should
628    // perhaps have a debugging test to make sure that this really
629    // happens and that the 'zombie' transaction does not get
630    // committed.
631    goto done;
632
633  case BlockedOnMVar:
634      removeFromMVarBlockedQueue(tso);
635      goto done;
636
637  case BlockedOnBlackHole:
638      // nothing to do
639      goto done;
640
641  case BlockedOnMsgThrowTo:
642  {
643      MessageThrowTo *m = tso->block_info.throwto;
644      // The message is locked by us, unless we got here via
645      // deleteAllThreads(), in which case we own all the
646      // capabilities.
647      // ASSERT(m->header.info == &stg_WHITEHOLE_info);
648
649      // unlock and revoke it at the same time
650      doneWithMsgThrowTo(m);
651      break;
652  }
653
654#if !defined(THREADED_RTS)
655  case BlockedOnRead:
656  case BlockedOnWrite:
657#if defined(mingw32_HOST_OS)
658  case BlockedOnDoProc:
659#endif
660      removeThreadFromDeQueue(cap, &blocked_queue_hd, &blocked_queue_tl, tso);
661#if defined(mingw32_HOST_OS)
662      /* (Cooperatively) signal that the worker thread should abort
663       * the request.
664       */
665      abandonWorkRequest(tso->block_info.async_result->reqID);
666#endif
667      goto done;
668
669  case BlockedOnDelay:
670        removeThreadFromQueue(cap, &sleeping_queue, tso);
671        goto done;
672#endif
673
674  default:
675      barf("removeFromQueues: %d", tso->why_blocked);
676  }
677
678 done:
679  tso->why_blocked = NotBlocked;
680  appendToRunQueue(cap, tso);
681}
682
683/* -----------------------------------------------------------------------------
684 * raiseAsync()
685 *
686 * The following function implements the magic for raising an
687 * asynchronous exception in an existing thread.
688 *
689 * We first remove the thread from any queue on which it might be
690 * blocked.  The possible blockages are MVARs, BLOCKING_QUEUESs, and
691 * TSO blocked_exception queues.
692 *
693 * We strip the stack down to the innermost CATCH_FRAME, building
694 * thunks in the heap for all the active computations, so they can
695 * be restarted if necessary.  When we reach a CATCH_FRAME, we build
696 * an application of the handler to the exception, and push it on
697 * the top of the stack.
698 *
699 * How exactly do we save all the active computations?  We create an
700 * AP_STACK for every UpdateFrame on the stack.  Entering one of these
701 * AP_STACKs pushes everything from the corresponding update frame
702 * upwards onto the stack.  (Actually, it pushes everything up to the
703 * next update frame plus a pointer to the next AP_STACK object.
704 * Entering the next AP_STACK object pushes more onto the stack until we
705 * reach the last AP_STACK object - at which point the stack should look
706 * exactly as it did when we killed the TSO and we can continue
707 * execution by entering the closure on top of the stack.
708 *
709 * We can also kill a thread entirely - this happens if either (a) the
710 * exception passed to raiseAsync is NULL, or (b) there's no
711 * CATCH_FRAME on the stack.  In either case, we strip the entire
712 * stack and replace the thread with a zombie.
713 *
714 * ToDo: in THREADED_RTS mode, this function is only safe if either
715 * (a) we hold all the Capabilities (eg. in GC, or if there is only
716 * one Capability), or (b) we own the Capability that the TSO is
717 * currently blocked on or on the run queue of.
718 *
719 * -------------------------------------------------------------------------- */
720
721static StgTSO *
722raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, 
723           rtsBool stop_at_atomically, StgUpdateFrame *stop_here)
724{
725    StgRetInfoTable *info;
726    StgPtr sp, frame;
727    StgClosure *updatee;
728    nat i;
729    StgStack *stack;
730
731    debugTraceCap(DEBUG_sched, cap,
732                  "raising exception in thread %ld.", (long)tso->id);
733   
734#if defined(PROFILING)
735    /*
736     * Debugging tool: on raising an  exception, show where we are.
737     * See also Exception.cmm:stg_raisezh.
738     * This wasn't done for asynchronous exceptions originally; see #1450
739     */
740    if (RtsFlags.ProfFlags.showCCSOnException && exception != NULL)
741    {
742        fprintCCS_stderr(tso->prof.cccs,exception,tso);
743    }
744#endif
745    // ASSUMES: the thread is not already complete or dead
746    // Upper layers should deal with that.
747    ASSERT(tso->what_next != ThreadComplete && 
748           tso->what_next != ThreadKilled);
749
750    // only if we own this TSO (except that deleteThread() calls this
751    ASSERT(tso->cap == cap);
752
753    stack = tso->stackobj;
754
755    // mark it dirty; we're about to change its stack.
756    dirty_TSO(cap, tso);
757    dirty_STACK(cap, stack);
758
759    sp = stack->sp;
760   
761    if (stop_here != NULL) {
762        updatee = stop_here->updatee;
763    } else {
764        updatee = NULL;
765    }
766
767    // The stack freezing code assumes there's a closure pointer on
768    // the top of the stack, so we have to arrange that this is the case...
769    //
770    if (sp[0] == (W_)&stg_enter_info) {
771        sp++;
772    } else {
773        sp--;
774        sp[0] = (W_)&stg_dummy_ret_closure;
775    }
776
777    frame = sp + 1;
778    while (stop_here == NULL || frame < (StgPtr)stop_here) {
779
780        // 1. Let the top of the stack be the "current closure"
781        //
782        // 2. Walk up the stack until we find either an UPDATE_FRAME or a
783        // CATCH_FRAME.
784        //
785        // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
786        // current closure applied to the chunk of stack up to (but not
787        // including) the update frame.  This closure becomes the "current
788        // closure".  Go back to step 2.
789        //
790        // 4. If it's a CATCH_FRAME, then leave the exception handler on
791        // top of the stack applied to the exception.
792        //
793        // 5. If it's a STOP_FRAME, then kill the thread.
794        //
795        // 6. If it's an UNDERFLOW_FRAME, then continue with the next
796        //    stack chunk.
797        //
798        // NB: if we pass an ATOMICALLY_FRAME then abort the associated
799        // transaction
800       
801        info = get_ret_itbl((StgClosure *)frame);
802
803        switch (info->i.type) {
804
805        case UPDATE_FRAME:
806        {
807            StgAP_STACK * ap;
808            nat words;
809           
810            // First build an AP_STACK consisting of the stack chunk above the
811            // current update frame, with the top word on the stack as the
812            // fun field.
813            //
814            words = frame - sp - 1;
815            ap = (StgAP_STACK *)allocate(cap,AP_STACK_sizeW(words));
816           
817            ap->size = words;
818            ap->fun  = (StgClosure *)sp[0];
819            sp++;
820            for(i=0; i < (nat)words; ++i) {
821                ap->payload[i] = (StgClosure *)*sp++;
822            }
823           
824            SET_HDR(ap,&stg_AP_STACK_info,
825                    ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
826            TICK_ALLOC_UP_THK(words+1,0);
827           
828            //IF_DEBUG(scheduler,
829            //       debugBelch("sched: Updating ");
830            //       printPtr((P_)((StgUpdateFrame *)frame)->updatee);
831            //       debugBelch(" with ");
832            //       printObj((StgClosure *)ap);
833            //  );
834
835            if (((StgUpdateFrame *)frame)->updatee == updatee) {
836                // If this update frame points to the same closure as
837                // the update frame further down the stack
838                // (stop_here), then don't perform the update.  We
839                // want to keep the blackhole in this case, so we can
840                // detect and report the loop (#2783).
841                ap = (StgAP_STACK*)updatee;
842            } else {
843                // Perform the update
844                // TODO: this may waste some work, if the thunk has
845                // already been updated by another thread.
846                updateThunk(cap, tso, 
847                            ((StgUpdateFrame *)frame)->updatee, (StgClosure *)ap);
848            }
849
850            sp += sizeofW(StgUpdateFrame) - 1;
851            sp[0] = (W_)ap; // push onto stack
852            frame = sp + 1;
853            continue; //no need to bump frame
854        }
855
856        case UNDERFLOW_FRAME:
857        {
858            StgAP_STACK * ap;
859            nat words;
860           
861            // First build an AP_STACK consisting of the stack chunk above the
862            // current update frame, with the top word on the stack as the
863            // fun field.
864            //
865            words = frame - sp - 1;
866            ap = (StgAP_STACK *)allocate(cap,AP_STACK_sizeW(words));
867           
868            ap->size = words;
869            ap->fun  = (StgClosure *)sp[0];
870            sp++;
871            for(i=0; i < (nat)words; ++i) {
872                ap->payload[i] = (StgClosure *)*sp++;
873            }
874           
875            SET_HDR(ap,&stg_AP_STACK_NOUPD_info,
876                    ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
877            TICK_ALLOC_SE_THK(words+1,0);
878
879            stack->sp = sp;
880            threadStackUnderflow(cap,tso);
881            stack = tso->stackobj;
882            sp = stack->sp;
883
884            sp--;
885            sp[0] = (W_)ap;
886            frame = sp + 1;
887            continue;
888        }
889
890        case STOP_FRAME:
891        {
892            // We've stripped the entire stack, the thread is now dead.
893            tso->what_next = ThreadKilled;
894            stack->sp = frame + sizeofW(StgStopFrame);
895            goto done;
896        }
897
898        case CATCH_FRAME:
899            // If we find a CATCH_FRAME, and we've got an exception to raise,
900            // then build the THUNK raise(exception), and leave it on
901            // top of the CATCH_FRAME ready to enter.
902            //
903        {
904            StgCatchFrame *cf = (StgCatchFrame *)frame;
905            StgThunk *raise;
906           
907            if (exception == NULL) break;
908
909            // we've got an exception to raise, so let's pass it to the
910            // handler in this frame.
911            //
912            raise = (StgThunk *)allocate(cap,sizeofW(StgThunk)+1);
913            TICK_ALLOC_SE_THK(1,0);
914            SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
915            raise->payload[0] = exception;
916           
917            // throw away the stack from Sp up to the CATCH_FRAME.
918            //
919            sp = frame - 1;
920           
921            /* Ensure that async excpetions are blocked now, so we don't get
922             * a surprise exception before we get around to executing the
923             * handler.
924             */
925            tso->flags |= TSO_BLOCKEX;
926            if ((cf->exceptions_blocked & TSO_INTERRUPTIBLE) == 0) {
927                tso->flags &= ~TSO_INTERRUPTIBLE;
928            } else {
929                tso->flags |= TSO_INTERRUPTIBLE;
930            }
931
932            /* Put the newly-built THUNK on top of the stack, ready to execute
933             * when the thread restarts.
934             */
935            sp[0] = (W_)raise;
936            sp[-1] = (W_)&stg_enter_info;
937            stack->sp = sp-1;
938            tso->what_next = ThreadRunGHC;
939            goto done;
940        }
941           
942        case ATOMICALLY_FRAME:
943            if (stop_at_atomically) {
944                ASSERT(tso->trec->enclosing_trec == NO_TREC);
945                stmCondemnTransaction(cap, tso -> trec);
946                stack->sp = frame - 2;
947                // The ATOMICALLY_FRAME expects to be returned a
948                // result from the transaction, which it stores in the
949                // stack frame.  Hence we arrange to return a dummy
950                // result, so that the GC doesn't get upset (#3578).
951                // Perhaps a better way would be to have a different
952                // ATOMICALLY_FRAME instance for condemned
953                // transactions, but I don't fully understand the
954                // interaction with STM invariants.
955                stack->sp[1] = (W_)&stg_NO_TREC_closure;
956                stack->sp[0] = (W_)&stg_gc_unpt_r1_info;
957                tso->what_next = ThreadRunGHC;
958                goto done;
959            }
960            else
961            {
962                // Freezing an STM transaction.  Just aborting the
963                // transaction would be wrong; this is what we used to
964                // do, and it goes wrong if the ATOMICALLY_FRAME ever
965                // gets back onto the stack again, which it will do if
966                // the transaction is inside unsafePerformIO or
967                // unsafeInterleaveIO and hence inside an UPDATE_FRAME.
968                //
969                // So we want to make it so that if the enclosing
970                // computation is resumed, we will re-execute the
971                // transaction.  We therefore:
972                //
973                //   1. abort the current transaction
974                //   3. replace the stack up to and including the
975                //      atomically frame with a closure representing
976                //      a call to "atomically x", where x is the code
977                //      of the transaction.
978                //   4. continue stripping the stack
979                //
980                StgTRecHeader *trec = tso->trec;
981                StgTRecHeader *outer = trec->enclosing_trec;
982
983                StgThunk *atomically;
984                StgAtomicallyFrame *af = (StgAtomicallyFrame*)frame;
985
986                debugTraceCap(DEBUG_stm, cap,
987                              "raiseAsync: freezing atomically frame")
988                stmAbortTransaction(cap, trec);
989                stmFreeAbortedTRec(cap, trec);
990                tso->trec = outer;
991
992                atomically = (StgThunk*)allocate(cap,sizeofW(StgThunk)+1);
993                TICK_ALLOC_SE_THK(1,0);
994                SET_HDR(atomically,&stg_atomically_info,af->header.prof.ccs);
995                atomically->payload[0] = af->code;
996
997                // discard stack up to and including the ATOMICALLY_FRAME
998                frame += sizeofW(StgAtomicallyFrame);
999                sp = frame - 1;
1000
1001                // replace the ATOMICALLY_FRAME with call to atomically#
1002                sp[0] = (W_)atomically;
1003                continue;
1004            }
1005
1006        case CATCH_STM_FRAME:
1007        case CATCH_RETRY_FRAME:
1008            // CATCH frames within an atomically block: abort the
1009            // inner transaction and continue.  Eventually we will
1010            // hit the outer transaction that will get frozen (see
1011            // above).
1012            //
1013            // In this case (unlike ordinary exceptions) we do not care
1014            // whether the transaction is valid or not because its
1015            // possible validity cannot have caused the exception
1016            // and will not be visible after the abort.
1017        {
1018            StgTRecHeader *trec = tso -> trec;
1019            StgTRecHeader *outer = trec -> enclosing_trec;
1020            debugTraceCap(DEBUG_stm, cap,
1021                          "found atomically block delivering async exception");
1022            stmAbortTransaction(cap, trec);
1023            stmFreeAbortedTRec(cap, trec);
1024            tso -> trec = outer;
1025            break;
1026        };
1027
1028        default:
1029            break;
1030        }
1031
1032        // move on to the next stack frame
1033        frame += stack_frame_sizeW((StgClosure *)frame);
1034    }
1035
1036done:
1037    IF_DEBUG(sanity, checkTSO(tso));
1038
1039    // wake it up
1040    if (tso->why_blocked != NotBlocked) {
1041        tso->why_blocked = NotBlocked;
1042        appendToRunQueue(cap,tso);
1043    }       
1044
1045    return tso;
1046}
1047
Note: See TracBrowser for help on using the browser.