[前][次][番号順一覧][スレッド一覧]

ruby-changes:24379

From: ko1 <ko1@a...>
Date: Wed, 18 Jul 2012 14:46:51 +0900 (JST)
Subject: [ruby-changes:24379] ko1:r36430 (trunk): * thread.c (rb_threadptr_async_errinfo_*): manage async errors queue.

ko1	2012-07-18 14:46:40 +0900 (Wed, 18 Jul 2012)

  New Revision: 36430

  http://svn.ruby-lang.org/cgi-bin/viewvc.cgi?view=rev&revision=36430

  Log:
    * thread.c (rb_threadptr_async_errinfo_*): manage async errors queue.
      Async events such as an exception throwed by Thread#raise,
      Thread#kill and thread termination (after main thread termination)
      will be queued to th->async_errinfo_queue.
      - clear: clear the queue.
      - enque: enque err object into queue.
      - deque: deque err object from queue.
      - active_p: return 1 if the queue should be checked.
      rb_thread_t#thrown_errinfo was removed.
    * vm_core.h: add declarations of rb_threadptr_async_errinfo_*.
      remove rb_thread_t#thrown_errinfo field and
      add rb_thread_t#async_errinfo_queue (queue body: Array),
      rb_thread_t#async_errinfo_queue_checked (flag),
      rb_thread_t#async_errinfo_mask_stack(Array, not used yet).
    * vm.c (rb_thread_mark): fix a mark function.
    * cont.c (rb_fiber_start): enque an error.
    * process.c (after_fork): clear async errinfo queue.

  Modified files:
    trunk/ChangeLog
    trunk/cont.c
    trunk/process.c
    trunk/thread.c
    trunk/vm.c
    trunk/vm_core.h

Index: ChangeLog
===================================================================
--- ChangeLog	(revision 36429)
+++ ChangeLog	(revision 36430)
@@ -1,3 +1,27 @@
+Wed Jul 18 14:16:51 2012  Koichi Sasada  <ko1@a...>
+
+	* thread.c (rb_threadptr_async_errinfo_*): manage async errors queue.
+	  Async events such as an exception throwed by Thread#raise,
+	  Thread#kill and thread termination (after main thread termination)
+	  will be queued to th->async_errinfo_queue.
+	  - clear: clear the queue.
+	  - enque: enque err object into queue.
+	  - deque: deque err object from queue.
+	  - active_p: return 1 if the queue should be checked.
+	  rb_thread_t#thrown_errinfo was removed.
+
+	* vm_core.h: add declarations of rb_threadptr_async_errinfo_*.
+	  remove rb_thread_t#thrown_errinfo field and
+	  add rb_thread_t#async_errinfo_queue (queue body: Array),
+	      rb_thread_t#async_errinfo_queue_checked (flag),
+	      rb_thread_t#async_errinfo_mask_stack(Array, not used yet).
+
+	* vm.c (rb_thread_mark): fix a mark function.
+
+	* cont.c (rb_fiber_start): enque an error.
+
+	* process.c (after_fork): clear async errinfo queue.
+
 Wed Jul 18 14:25:55 2012  URABE Shyouhei  <shyouhei@r...>
 
 	* pack.c: (ditto) bitwise operations are not char.  Apply explicit
Index: vm_core.h
===================================================================
--- vm_core.h	(revision 36429)
+++ vm_core.h	(revision 36430)
@@ -446,9 +446,14 @@
     VALUE thgroup;
     VALUE value;
 
+    /* temporary place of errinfo */
     VALUE errinfo;
-    VALUE thrown_errinfo;
 
+    /* async errinfo queue */
+    VALUE async_errinfo_queue;
+    int async_errinfo_queue_checked;
+    VALUE async_errinfo_mask_stack;
+
     rb_atomic_t interrupt_flag;
     rb_thread_lock_t interrupt_lock;
     struct rb_unblock_callback unblock;
@@ -767,6 +772,10 @@
 void rb_threadptr_execute_interrupts(rb_thread_t *);
 void rb_threadptr_interrupt(rb_thread_t *th);
 void rb_threadptr_unlock_all_locking_mutexes(rb_thread_t *th);
+void rb_threadptr_async_errinfo_clear(rb_thread_t *th);
+void rb_threadptr_async_errinfo_enque(rb_thread_t *th, VALUE v);
+VALUE rb_threadptr_async_errinfo_deque(rb_thread_t *th);
+int rb_threadptr_async_errinfo_active_p(rb_thread_t *th);
 
 void rb_thread_lock_unlock(rb_thread_lock_t *);
 void rb_thread_lock_destroy(rb_thread_lock_t *);
Index: thread.c
===================================================================
--- thread.c	(revision 36429)
+++ thread.c	(revision 36430)
@@ -68,7 +68,6 @@
 static void sleep_forever(rb_thread_t *th, int nodeadlock);
 static double timeofday(void);
 static int rb_threadptr_dead(rb_thread_t *th);
-
 static void rb_check_deadlock(rb_vm_t *vm);
 
 #define eKillSignal INT2FIX(0)
@@ -131,7 +130,6 @@
     blocking_region_begin(__th, &__region, (ubf), (ubfarg)); \
     exec; \
     blocking_region_end(__th, &__region); \
-    RUBY_VM_CHECK_INTS(); \
 } while(0)
 
 #if THREAD_DEBUG
@@ -313,9 +311,9 @@
 
     if (th != main_thread) {
 	thread_debug("terminate_i: %p\n", (void *)th);
+	rb_threadptr_async_errinfo_enque(th, eTerminateSignal);
+	th->status = THREAD_TO_KILL;
 	rb_threadptr_interrupt(th);
-	th->thrown_errinfo = eTerminateSignal;
-	th->status = THREAD_TO_KILL;
     }
     else {
 	thread_debug("terminate_i: main thread (%p)\n", (void *)th);
@@ -564,6 +562,10 @@
     th->priority = GET_THREAD()->priority;
     th->thgroup = GET_THREAD()->thgroup;
 
+    th->async_errinfo_queue = rb_ary_new();
+    th->async_errinfo_queue_checked = 0;
+    th->async_errinfo_mask_stack = rb_ary_new();
+
     native_mutex_initialize(&th->interrupt_lock);
     if (GET_VM()->event_hooks != NULL)
 	th->event_flags |= RUBY_EVENT_VM;
@@ -1133,6 +1135,10 @@
 	val = func(data1);
 	saved_errno = errno;
     }, ubf, data2);
+
+    /* TODO: check */
+    RUBY_VM_CHECK_INTS();
+
     errno = saved_errno;
 
     return val;
@@ -1144,13 +1150,28 @@
     VALUE val;
     rb_thread_t *th = GET_THREAD();
     int saved_errno = 0;
+    int state;
 
     th->waiting_fd = fd;
-    BLOCKING_REGION({
-	val = func(data1);
-	saved_errno = errno;
-    }, ubf_select, th);
+
+    TH_PUSH_TAG(th);
+    if ((state = EXEC_TAG()) == 0) {
+	BLOCKING_REGION({
+	    val = func(data1);
+	    saved_errno = errno;
+	}, ubf_select, th);
+    }
+    TH_POP_TAG();
+
+    /* clear waitinf_fd anytime */
     th->waiting_fd = -1;
+
+    if (state) {
+	JUMP_TAG(state);
+    }
+    /* TODO: check func() */
+    RUBY_VM_CHECK_INTS();
+
     errno = saved_errno;
 
     return val;
@@ -1294,12 +1315,14 @@
 	}
 
 	/* exception from another thread */
-	if (th->thrown_errinfo) {
-	    VALUE err = th->thrown_errinfo;
-	    th->thrown_errinfo = 0;
+	if (rb_threadptr_async_errinfo_active_p(th)) {
+	    VALUE err = rb_threadptr_async_errinfo_deque(th);
 	    thread_debug("rb_thread_execute_interrupts: %"PRIdVALUE"\n", err);
 
-	    if (err == eKillSignal || err == eTerminateSignal) {
+	    if (err == eKillSignal      /* Thread#kill receieved */  ||
+		err == eTerminateSignal /* Terminate thread */ ) {
+		rb_threadptr_async_errinfo_clear(th);
+		th->status = THREAD_TO_KILL;
 		th->errinfo = INT2FIX(TAG_FATAL);
 		TH_JUMP_TAG(th, TAG_FATAL);
 	    }
@@ -1353,6 +1376,59 @@
 
 /*****************************************************/
 
+/*
+ * rb_threadptr_async_errinfo_* - manage async errors queue
+ *
+ * Async events such as an exception throwed by Thread#raise,
+ * Thread#kill and thread termination (after main thread termination)
+ * will be queued to th->async_errinfo_queue.
+ * - clear: clear the queue.
+ * - enque: enque err object into queue.
+ * - deque: deque err object from queue.
+ * - active_p: return 1 if the queue should be checked.
+ *
+ * All rb_threadptr_async_errinfo_* functions are called by
+ * a GVL acquired thread, of course.
+ * Note that all "rb_" prefix APIs need GVL to call.
+ */
+
+void
+rb_threadptr_async_errinfo_clear(rb_thread_t *th)
+{
+    rb_ary_clear(th->async_errinfo_queue);
+}
+
+void
+rb_threadptr_async_errinfo_enque(rb_thread_t *th, VALUE v)
+{
+    rb_ary_push(th->async_errinfo_queue, v);
+    th->async_errinfo_queue_checked = 0;
+}
+
+VALUE
+rb_threadptr_async_errinfo_deque(rb_thread_t *th)
+{
+    VALUE err = rb_ary_shift(th->async_errinfo_queue);
+    if (RARRAY_LEN(th->async_errinfo_queue) == 0) {
+	th->async_errinfo_queue_checked = 1;
+    }
+    return err;
+}
+
+int
+rb_threadptr_async_errinfo_active_p(rb_thread_t *th)
+{
+    if (th->async_errinfo_queue_checked) {
+	return 0;
+    }
+    else {
+	return RARRAY_LEN(th->async_errinfo_queue) > 0;
+    }
+}
+
+VALUE
+rb_thread
+
 static void
 rb_threadptr_ready(rb_thread_t *th)
 {
@@ -1364,19 +1440,13 @@
 {
     VALUE exc;
 
-  again:
     if (rb_threadptr_dead(th)) {
 	return Qnil;
     }
 
-    if (th->thrown_errinfo != 0 || th->raised_flag) {
-	rb_thread_schedule();
-	goto again;
-    }
-
     exc = rb_make_exception(argc, argv);
-    th->thrown_errinfo = exc;
-    rb_threadptr_ready(th);
+    rb_threadptr_async_errinfo_enque(th, exc);
+    rb_threadptr_interrupt(th);
     return Qnil;
 }
 
@@ -1436,13 +1506,6 @@
     return 1;
 }
 
-#define THREAD_IO_WAITING_P(th) (			\
-	((th)->status == THREAD_STOPPED ||		\
-	 (th)->status == THREAD_STOPPED_FOREVER) &&	\
-	(th)->blocking_region_buffer &&			\
-	(th)->unblock.func == ubf_select &&		\
-	1)
-
 static int
 thread_fd_close_i(st_data_t key, st_data_t val, st_data_t data)
 {
@@ -1450,14 +1513,10 @@
     rb_thread_t *th;
     GetThreadPtr((VALUE)key, th);
 
-    if (THREAD_IO_WAITING_P(th)) {
-	native_mutex_lock(&th->interrupt_lock);
-	if (THREAD_IO_WAITING_P(th) && th->waiting_fd == fd) {
-	    th->thrown_errinfo = th->vm->special_exceptions[ruby_error_closed_stream];
-	    RUBY_VM_SET_INTERRUPT(th);
-	    (th->unblock.func)(th->unblock.arg);
-	}
-	native_mutex_unlock(&th->interrupt_lock);
+    if (th->waiting_fd == fd) {
+	VALUE err = th->vm->special_exceptions[ruby_error_closed_stream];
+	rb_threadptr_async_errinfo_enque(th, err);
+	rb_threadptr_interrupt(th);
     }
     return ST_CONTINUE;
 }
@@ -1530,10 +1589,9 @@
 
     thread_debug("rb_thread_kill: %p (%p)\n", (void *)th, (void *)th->thread_id);
 
+    rb_threadptr_async_errinfo_enque(th, eKillSignal);
+    th->status = THREAD_TO_KILL;
     rb_threadptr_interrupt(th);
-    th->thrown_errinfo = eKillSignal;
-    th->status = THREAD_TO_KILL;
-
     return thread;
 }
 
@@ -2592,6 +2650,9 @@
 	    result = native_fd_select(n, read, write, except, timeout, th);
 	    if (result < 0) lerrno = errno;
 	}, ubf_select, th);
+
+    RUBY_VM_CHECK_INTS();
+
     errno = lerrno;
 
     if (result < 0) {
@@ -2815,6 +2876,8 @@
 	if (result < 0) lerrno = errno;
     }, ubf_select, GET_THREAD());
 
+    RUBY_VM_CHECK_INTS();
+
     if (result < 0) {
 	errno = lerrno;
 	switch (errno) {
@@ -4707,6 +4770,10 @@
 	    gvl_init(th->vm);
 	    gvl_acquire(th->vm, th);
 	    native_mutex_initialize(&th->interrupt_lock);
+
+	    th->async_errinfo_queue = rb_ary_new();
+	    th->async_errinfo_queue_checked = 0;
+	    th->async_errinfo_mask_stack = rb_ary_new();
 	}
     }
 
Index: process.c
===================================================================
--- process.c	(revision 36429)
+++ process.c	(revision 36430)
@@ -1055,7 +1055,7 @@
 }
 
 #define before_fork() before_exec()
-#define after_fork() (GET_THREAD()->thrown_errinfo = 0, after_exec())
+#define after_fork() (rb_threadptr_async_errinfo_clear(GET_THREAD()), after_exec())
 
 #include "dln.h"
 
Index: cont.c
===================================================================
--- cont.c	(revision 36429)
+++ cont.c	(revision 36430)
@@ -1164,11 +1164,11 @@
 
     if (state) {
 	if (state == TAG_RAISE) {
-	    th->thrown_errinfo = th->errinfo;
+	    rb_threadptr_async_errinfo_enque(th, th->errinfo);
 	}
 	else {
-	    th->thrown_errinfo =
-	      rb_vm_make_jump_tag_but_local_jump(state, th->errinfo);
+	    VALUE err = rb_vm_make_jump_tag_but_local_jump(state, th->errinfo);
+	    rb_threadptr_async_errinfo_enque(th, err);
 	}
 	RUBY_VM_SET_INTERRUPT(th);
     }
Index: vm.c
===================================================================
--- vm.c	(revision 36429)
+++ vm.c	(revision 36430)
@@ -1645,7 +1645,8 @@
 	RUBY_MARK_UNLESS_NULL(th->thgroup);
 	RUBY_MARK_UNLESS_NULL(th->value);
 	RUBY_MARK_UNLESS_NULL(th->errinfo);
-	RUBY_MARK_UNLESS_NULL(th->thrown_errinfo);
+	RUBY_MARK_UNLESS_NULL(th->async_errinfo_queue);
+	RUBY_MARK_UNLESS_NULL(th->async_errinfo_mask_stack);
 	RUBY_MARK_UNLESS_NULL(th->root_svar);
 	RUBY_MARK_UNLESS_NULL(th->top_self);
 	RUBY_MARK_UNLESS_NULL(th->top_wrapper);

--
ML: ruby-changes@q...
Info: http://www.atdot.net/~ko1/quickml/

[前][次][番号順一覧][スレッド一覧]