ruby-changes:24419
From: ko1 <ko1@a...>
Date: Thu, 19 Jul 2012 23:19:54 +0900 (JST)
Subject: [ruby-changes:24419] ko1:r36470 (trunk): * thread.c (rb_thread_s_control_interrupt,
ko1 2012-07-19 23:19:40 +0900 (Thu, 19 Jul 2012) New Revision: 36470 http://svn.ruby-lang.org/cgi-bin/viewvc.cgi?view=rev&revision=36470 Log: * thread.c (rb_thread_s_control_interrupt, rb_thread_s_check_interrupt): added for Thread.control_intgerrupt and Thread.check_interrupt. See details on rdoc. I'll make an ticket for this feature. * test/ruby/test_thread.rb: add a test for Thread.control_intgerrupt. * thread.c (rb_threadptr_raise): make a new exception object even if argc is 0. * thread.c (rb_thread_kill): kill thread immediately if target thread is current thread. * vm_core.h (RUBY_VM_CHECK_INTS_BLOCKING): added. CHECK_INTS while/after blocking operation. * vm_core.h (RUBY_VM_CHECK_INTS): require rb_thread_t ptr. * cont.c (fiber_switch): use replaced RUBY_VM_CHECK_INTS(). * eval.c (ruby_cleanup): ditto. * insns.def: ditto. * process.c (rb_waitpid): ditto. * vm_eval.c (vm_call0): ditto. * vm_insnhelper.c (vm_call_method): ditto. Modified files: trunk/ChangeLog trunk/cont.c trunk/eval.c trunk/insns.def trunk/process.c trunk/test/ruby/test_thread.rb trunk/thread.c trunk/vm_core.h trunk/vm_eval.c trunk/vm_insnhelper.c Index: ChangeLog =================================================================== --- ChangeLog (revision 36469) +++ ChangeLog (revision 36470) @@ -1,3 +1,36 @@ +Thu Jul 19 15:08:40 2012 Koichi Sasada <ko1@a...> + + * thread.c (rb_thread_s_control_interrupt, + rb_thread_s_check_interrupt): added for + Thread.control_intgerrupt and Thread.check_interrupt. + See details on rdoc. + I'll make an ticket for this feature. + + * test/ruby/test_thread.rb: add a test for Thread.control_intgerrupt. + + * thread.c (rb_threadptr_raise): make a new exception object + even if argc is 0. + + * thread.c (rb_thread_kill): kill thread immediately if target thread + is current thread. + + * vm_core.h (RUBY_VM_CHECK_INTS_BLOCKING): added. + CHECK_INTS while/after blocking operation. + + * vm_core.h (RUBY_VM_CHECK_INTS): require rb_thread_t ptr. + + * cont.c (fiber_switch): use replaced RUBY_VM_CHECK_INTS(). + + * eval.c (ruby_cleanup): ditto. + + * insns.def: ditto. + + * process.c (rb_waitpid): ditto. + + * vm_eval.c (vm_call0): ditto. + + * vm_insnhelper.c (vm_call_method): ditto. + Thu Jul 19 22:46:48 2012 Tanaka Akira <akr@f...> * test/ruby/test_io.rb: remove temporally files early. Index: insns.def =================================================================== --- insns.def (revision 36469) +++ insns.def (revision 36470) @@ -1086,7 +1086,7 @@ } } - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS(th); if (UNLIKELY(VM_FRAME_TYPE_FINISH_P(GET_CFP()))) { #if OPT_CALL_THREADED_CODE @@ -1117,7 +1117,7 @@ (VALUE throwobj) (VALUE val) { - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS(th); val = vm_throw(th, GET_CFP(), throw_state, throwobj); THROW_EXCEPTION(val); /* unreachable */ @@ -1138,7 +1138,7 @@ () () { - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS(th); JUMP(dst); } @@ -1154,7 +1154,7 @@ () { if (RTEST(val)) { - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS(th); JUMP(dst); } } @@ -1171,7 +1171,7 @@ () { if (!RTEST(val)) { - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS(th); JUMP(dst); } } @@ -1220,7 +1220,7 @@ } else if (ic->ic_value.value == Qundef) { - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS(th); rb_thread_schedule(); goto retry; } Index: vm_core.h =================================================================== --- vm_core.h (revision 36469) +++ vm_core.h (revision 36470) @@ -769,25 +769,27 @@ void rb_threadptr_check_signal(rb_thread_t *mth); void rb_threadptr_signal_raise(rb_thread_t *th, int sig); void rb_threadptr_signal_exit(rb_thread_t *th); -void rb_threadptr_execute_interrupts(rb_thread_t *); +void rb_threadptr_execute_interrupts(rb_thread_t *, int); void rb_threadptr_interrupt(rb_thread_t *th); void rb_threadptr_unlock_all_locking_mutexes(rb_thread_t *th); void rb_threadptr_async_errinfo_clear(rb_thread_t *th); void rb_threadptr_async_errinfo_enque(rb_thread_t *th, VALUE v); -VALUE rb_threadptr_async_errinfo_deque(rb_thread_t *th); int rb_threadptr_async_errinfo_active_p(rb_thread_t *th); void rb_thread_lock_unlock(rb_thread_lock_t *); void rb_thread_lock_destroy(rb_thread_lock_t *); -#define RUBY_VM_CHECK_INTS_TH(th) do { \ +#define RUBY_VM_CHECK_INTS_BLOCKING(th) do { \ if (UNLIKELY((th)->interrupt_flag)) { \ - rb_threadptr_execute_interrupts(th); \ + rb_threadptr_execute_interrupts(th, 1); \ } \ } while (0) -#define RUBY_VM_CHECK_INTS() \ - RUBY_VM_CHECK_INTS_TH(GET_THREAD()) +#define RUBY_VM_CHECK_INTS(th) do { \ + if (UNLIKELY((th)->interrupt_flag)) { \ + rb_threadptr_execute_interrupts(th, 0); \ + } \ +} while (0) /* tracer */ void Index: vm_eval.c =================================================================== --- vm_eval.c (revision 36469) +++ vm_eval.c (revision 36470) @@ -103,7 +103,7 @@ if (!klass || !(me = rb_method_entry(klass, id))) { return method_missing(recv, id, argc, argv, NOEX_SUPER); } - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS(th); if (!(def = me->def)) return Qnil; goto again; } @@ -138,7 +138,7 @@ rb_bug("vm_call0: unsupported method type (%d)", def->type); val = Qundef; } - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS(th); return val; } Index: thread.c =================================================================== --- thread.c (revision 36469) +++ thread.c (revision 36470) @@ -265,7 +265,7 @@ struct rb_unblock_callback *old) { check_ints: - RUBY_VM_CHECK_INTS(); /* check signal or so */ + RUBY_VM_CHECK_INTS(th); /* check signal or so */ native_mutex_lock(&th->interrupt_lock); if (th->interrupt_flag) { native_mutex_unlock(&th->interrupt_lock); @@ -545,7 +545,7 @@ static VALUE thread_create_core(VALUE thval, VALUE args, VALUE (*fn)(ANYARGS)) { - rb_thread_t *th; + rb_thread_t *th, *current_th = GET_THREAD(); int err; if (OBJ_FROZEN(GET_THREAD()->thgroup)) { @@ -559,12 +559,12 @@ th->first_proc = fn ? Qfalse : rb_block_proc(); th->first_args = args; /* GC: shouldn't put before above line */ - th->priority = GET_THREAD()->priority; - th->thgroup = GET_THREAD()->thgroup; + th->priority = current_th->priority; + th->thgroup = current_th->thgroup; th->async_errinfo_queue = rb_ary_new(); th->async_errinfo_queue_checked = 0; - th->async_errinfo_mask_stack = rb_ary_new(); + th->async_errinfo_mask_stack = rb_ary_dup(current_th->async_errinfo_mask_stack); native_mutex_initialize(&th->interrupt_lock); if (GET_VM()->event_hooks != NULL) @@ -859,7 +859,7 @@ if (deadlockable) { th->vm->sleeper--; } - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS_BLOCKING(th); } while (th->status == status); th->status = prev_status; } @@ -896,7 +896,7 @@ th->status = THREAD_STOPPED; do { native_sleep(th, &tv); - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS_BLOCKING(th); getclockofday(&tvn); if (to.tv_sec < tvn.tv_sec) break; if (to.tv_sec == tvn.tv_sec && to.tv_usec <= tvn.tv_usec) break; @@ -968,9 +968,9 @@ void rb_thread_polling(void) { - RUBY_VM_CHECK_INTS(); if (!rb_thread_alone()) { rb_thread_t *th = GET_THREAD(); + RUBY_VM_CHECK_INTS_BLOCKING(th); sleep_for_polling(th); } } @@ -985,7 +985,7 @@ void rb_thread_check_ints(void) { - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS_BLOCKING(GET_THREAD()); } /* @@ -1013,7 +1013,7 @@ rb_thread_wait_for(rb_time_timeval(INT2FIX(sec))); } -static void rb_threadptr_execute_interrupts_common(rb_thread_t *); +static void rb_threadptr_execute_interrupts_common(rb_thread_t *, int blocking); static void rb_thread_schedule_limits(unsigned long limits_us) @@ -1040,7 +1040,7 @@ rb_thread_schedule_limits(0); if (UNLIKELY(GET_THREAD()->interrupt_flag)) { - rb_threadptr_execute_interrupts_common(GET_THREAD()); + rb_threadptr_execute_interrupts_common(GET_THREAD(), 0); } } @@ -1076,7 +1076,7 @@ rb_thread_t *th = GET_THREAD(); blocking_region_end(th, region); xfree(region); - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS_BLOCKING(th); errno = saved_errno; } @@ -1181,7 +1181,7 @@ }, ubf, data2); if (!skip_checkints) { - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS_BLOCKING(th); } errno = saved_errno; @@ -1237,7 +1237,7 @@ JUMP_TAG(state); } /* TODO: check func() */ - RUBY_VM_CHECK_INTS(); + RUBY_VM_CHECK_INTS_BLOCKING(th); errno = saved_errno; @@ -1350,13 +1350,349 @@ return Qnil; } +/*****************************************************/ + /* + * rb_threadptr_async_errinfo_* - manage async errors queue * + * Async events such as an exception throwed by Thread#raise, + * Thread#kill and thread termination (after main thread termination) + * will be queued to th->async_errinfo_queue. + * - clear: clear the queue. + * - enque: enque err object into queue. + * - deque: deque err object from queue. + * - active_p: return 1 if the queue should be checked. + * + * All rb_threadptr_async_errinfo_* functions are called by + * a GVL acquired thread, of course. + * Note that all "rb_" prefix APIs need GVL to call. */ +void +rb_threadptr_async_errinfo_clear(rb_thread_t *th) +{ + rb_ary_clear(th->async_errinfo_queue); +} + +void +rb_threadptr_async_errinfo_enque(rb_thread_t *th, VALUE v) +{ + rb_ary_push(th->async_errinfo_queue, v); + th->async_errinfo_queue_checked = 0; +} + +enum interrupt_timing { + INTERRUPT_NONE, + INTERRUPT_IMMEDIATE, + INTERRUPT_ON_BLOCKING, + INTERRUPT_NEVER +}; + +static enum interrupt_timing +rb_threadptr_async_errinfo_check_mask(rb_thread_t *th, VALUE err) +{ + VALUE mask; + long mask_stack_len = RARRAY_LEN(th->async_errinfo_mask_stack); + VALUE *mask_stack = RARRAY_PTR(th->async_errinfo_mask_stack); + VALUE ancestors = rb_mod_ancestors(err); /* TODO: GC guard */ + long ancestors_len = RARRAY_LEN(ancestors); + VALUE *ancestors_ptr = RARRAY_PTR(ancestors); + int i, j; + + for (i=0; i<mask_stack_len; i++) { + mask = mask_stack[mask_stack_len-(i+1)]; + + for (j=0; j<ancestors_len; j++) { + VALUE klass = ancestors_ptr[j]; + VALUE sym; + + /* TODO: remove rb_intern() */ + if ((sym = rb_hash_aref(mask, klass)) != Qnil) { + if (sym == ID2SYM(rb_intern("immediate"))) { + return INTERRUPT_IMMEDIATE; + } + else if (sym == ID2SYM(rb_intern("on_blocking"))) { + return INTERRUPT_ON_BLOCKING; + } + else if (sym == ID2SYM(rb_intern("never"))) { + return INTERRUPT_NEVER; + } + else { + rb_raise(rb_eThreadError, "unknown mask signature"); + } + } + } + /* try to next mask */ + } + return INTERRUPT_NONE; +} + +static int +rb_threadptr_async_errinfo_empty_p(rb_thread_t *th) +{ + return RARRAY_LEN(th->async_errinfo_queue) == 0; +} + +static VALUE +rb_threadptr_async_errinfo_deque(rb_thread_t *th, enum interrupt_timing timing) +{ +#if 1 /* 1 to enable Thread#control_interrupt, 0 to ignore it */ + int i; + + for (i=0; i<RARRAY_LEN(th->async_errinfo_queue); i++) { + VALUE err = RARRAY_PTR(th->async_errinfo_queue)[i]; + + enum interrupt_timing mask_timing = rb_threadptr_async_errinfo_check_mask(th, CLASS_OF(err)); + + switch (mask_timing) { + case INTERRUPT_ON_BLOCKING: + if (timing != INTERRUPT_ON_BLOCKING) { + break; + } + /* fall through */ + case INTERRUPT_NONE: /* default: IMMEDIATE */ + case INTERRUPT_IMMEDIATE: + rb_ary_delete_at(th->async_errinfo_queue, i); + return err; + case INTERRUPT_NEVER: + break; + } + } + + th->async_errinfo_queue_checked = 1; + return Qundef; +#else + VALUE err = rb_ary_shift(th->async_errinfo_queue); + if (rb_threadptr_async_errinfo_empty_p(th)) { + th->async_errinfo_queue_checked = 1; + } + return err; +#endif +} + +int +rb_threadptr_async_errinfo_active_p(rb_thread_t *th) +{ + if (th->async_errinfo_queue_checked || rb_threadptr_async_errinfo_empty_p(th)) { + return 0; + } + else { + return 1; + } +} + +static VALUE +rb_threadptr_interrupt_mask(rb_thread_t *th, VALUE mask, VALUE (*func)(rb_thread_t *th)) +{ + VALUE r = Qnil; + int state; + + rb_ary_push(th->async_errinfo_mask_stack, mask); + if (!rb_threadptr_async_errinfo_empty_p(th)) { + th->async_errinfo_queue_checked = 0; + RUBY_VM_SET_INTERRUPT(th); + } + + TH_PUSH_TAG(th); + if ((state = EXEC_TAG()) == 0) { + r = func(th); + } + TH_POP_TAG(); + + rb_ary_pop(th->async_errinfo_mask_stack); + if (!rb_threadptr_async_errinfo_empty_p(th)) { + th->async_errinfo_queue_checked = 0; + RUBY_VM_SET_INTERRUPT(th); + } + + if (state) { + JUMP_TAG(state); + } + + return r; +} + +/* + * call-seq: + * Thread.control_interrupt(hash) { ... } -> result of the block + * + * Thread.control_interrupt controls interrupt timing. + * + * _interrupt_ means asynchronous event and corresponding procedure + * by Thread#raise, Thread#kill, signal trap (not supported yet) + * and main thread termination (if main thread terminates, then all + * other thread will be killed). + * + * _hash_ has pairs of ExceptionClass and TimingSymbol. TimingSymbol + * is one of them: + * - :immediate Invoke interrupt immediately. + * - :on_blocking Invoke interrupt while _BlockingOperation_. + * - :never Never invoke interrupt. + * + * _BlockingOperation_ means that the operation will block the calling thread, + * such as read and write. On CRuby implementation, _BlockingOperation_ is + * operation executed without GVL. + * + * Masked interrupts are delayed until they are enabled. + * This method is similar to sigprocmask(3). + * + * TODO (DOC): control_interrupt is stacked. + * TODO (DOC): check ancestors. + * TODO (DOC): to prevent all interrupt, {Object => :never} works. + * + * NOTE: Asynchronous interrupts are difficult to use. + * If you need to communicate between threads, + * please consider to use another way such as Queue. + * Or use them with deep understanding about this method. + * + * + * # example: Guard from Thread#raise + * th = Thread.new do + * Thead.control_interrupt(RuntimeError => :never) { + * begin + * # Thread#raise doesn't interrupt here. + * # You can write resource allocation code safely. + * Thread.control_interrupt(RuntimeError => :immediate) { + * # ... + * # It is possible to be interrupted by Thread#raise. + * } + * ensure + * # Thread#raise doesn't interrupt here. + * # You can write resource dealocation code safely. + * end + * } + * end + * Thread.pass + * # ... + * th.raise "stop" + * + * # example: Guard from TimeoutError + * require 'timeout' + * Thread.control_interrupt(TimeoutError => :never) { + * timeout(10){ + * # TimeoutError doesn't occur here + * Thread.control_interrupt(TimeoutError => :on_blocking) { + * # possible to be killed by TimeoutError + * # while blocking operation + * } + * # TimeoutError doesn't occur here + * } + * } + * + * # example: Stack control settings + * Thread.control_interrupt(FooError => :never) { + * Thread.control_interrupt(BarError => :never) { + * # FooError and BarError are prohibited. + * } + * } + * + * # example: check ancestors + * Thread.control_interrupt(Exception => :never) { + * # all exceptions inherited from Exception are prohibited. + * } + * + */ + +static VALUE +control_interrupt_func(rb_thread_t *th) +{ + return rb_yield(Qnil); +} + +static VALUE +rb_thread_s_control_interrupt(VALUE self, VALUE mask_arg) +{ + if (!rb_block_given_p()) { + rb_raise(rb_eArgError, "block is needed."); + } + + return rb_threadptr_interrupt_mask(GET_THREAD(), + rb_convert_type(mask_arg, T_HASH, "Hash", "to_hash"), + control_interrupt_func); +} + +/* + * call-seq: + * Thread.check_interrupt() -> nil + * + * Check queued interrupts. + * + * If there are queued interrupts, process respective procedures. + * + * This method can be defined as the following Ruby code: + * + * def Thread.check_interrupt + * Thread.control_interrupt(Object => :immediate) { + * Thread.pass + * } + * end + * + * Examples: + * + * th = Thread.new{ + * Thread.control_interrupt(RuntimeError => :on_blocking){ + * while true + * ... + * # reach safe point to invoke interrupt + * Thread.check_interrupt + * ... + * end + * } + * } + * ... + * th.raise # stop thread + * + * NOTE: This example can be described by the another code. + * You need to keep to avoid asynchronous interrupts. + * + * flag = true + * th = Thread.new{ + * Thread.control_interrupt(RuntimeError => :on_blocking){ + * while true + * ... + * # reach safe point to invoke interrupt + * break if flag == false + * ... + * end + * } + * } + * ... + * flag = false # stop thread + */ + +static VALUE +check_interrupt_func(rb_thread_t *th) +{ + RUBY_VM_CHECK_INTS(th); + return Qnil; +} + +static VALUE +rb_thread_s_check_interrupt(VALUE self) +{ + rb_thread_t *th = GET_THREAD(); + + if (!rb_threadptr_async_errinfo_empty_p(th)) { + VALUE mask = rb_hash_new(); + rb_hash_aset(mask, rb_cObject, ID2SYM(rb_intern("immediate"))); + rb_threadptr_interrupt_mask(GET_THREAD(), mask, check_interrupt_func); + } + + return Qnil; +} + static void -rb_threadptr_execute_interrupts_common(rb_thread_t *th) +rb_threadptr_to_kill(rb_thread_t *th) { + rb_threadptr_async_errinfo_clear(th); + th->status = THREAD_TO_KILL; + th->errinfo = INT2FIX(TAG_FATAL); + TH_JUMP_TAG(th, TAG_FATAL); +} + +static void +rb_threadptr_execute_interrupts_common(rb_thread_t *th, int blocking_timing) +{ rb_atomic_t interrupt; if (th->raised_flag) return; @@ -1378,16 +1714,16 @@ /* exception from another thread */ if (rb_threadptr_async_errinfo_active_p(th)) { - VALUE err = rb_threadptr_async_errinfo_deque(th); + VALUE err = rb_threadptr_async_errinfo_deque(th, blocking_timing ? INTERRUPT_ON_BLOCKING : INTERRUPT_NONE); thread_debug("rb_thread_execute_interrupts: %"PRIdVALUE"\n", err); - if (err == eKillSignal /* Thread#kill receieved */ || - err == eTerminateSignal /* Terminate thread */ ) { - rb_threadptr_async_errinfo_clear(th); - th->status = THREAD_TO_KILL; - th->errinfo = INT2FIX(TAG_FATAL); - TH_JUMP_TAG(th, TAG_FATAL); + if (err == Qundef) { + /* no error */ } + else if (err == eKillSignal /* Thread#kill receieved */ || + err == eTerminateSignal /* Terminate thread */ ) { + rb_threadptr_to_kill(th); + } else { rb_exc_raise(err); } @@ -1417,9 +1753,9 @@ } void -rb_threadptr_execute_interrupts(rb_thread_t *th) +rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing) { - rb_threadptr_execute_interrupts_common(th); + rb_threadptr_execute_interrupts_common(th, blocking_timing); } void @@ -1427,61 +1763,9 @@ { rb_thread_t *th; GetThreadPtr(thval, th); - rb_threadptr_execute_interrupts_common(th); + rb_threadptr_execute_interrupts_common(th, 1); } -/*****************************************************/ - -/* - * rb_threadptr_async_errinfo_* - manage async errors queue - * - * Async events such as an exception throwed by Thread#raise, - * Thread#kill and thread termination (after main thread termination) - * will be queued to th->async_errinfo_queue. - * - clear: clear the queue. - * - enque: enque err object into queue. - * - deque: deque err object from queue. - * - active_p: return 1 if the queue should be checked. - * - * All rb_threadptr_async_errinfo_* functions are called by - * a GVL acquired thread, of course. - * Note that all "rb_" prefix APIs need GVL to call. - */ - -void -rb_threadptr_async_errinfo_clear(rb_thread_t *th) -{ - rb_ary_clear(th->async_errinfo_queue); -} - -void -rb_threadptr_async_errinfo_enque(rb_thread_t *th, VALUE v) -{ - rb_ary_push(th->async_errinfo_queue, v); - th->async_errinfo_queue_checked = 0; -} - -VALUE -rb_threadptr_async_errinfo_deque(rb_thread_t *th) -{ - VALUE err = rb_ary_shift(th->async_errinfo_queue); - if (RARRAY_LEN(th->async_errinfo_queue) == 0) { - th->async_errinfo_queue_checked = 1; - } - return err; -} - -int -rb_threadptr_async_errinfo_active_p(rb_thread_t *th) -{ - if (th->async_errinfo_queue_checked) { - return 0; - } - else { - return RARRAY_LEN(th->async_errinfo_queue) > 0; - } -} - static void rb_threadptr_ready(rb_thread_t *th) { @@ -1497,7 +1781,12 @@ return Qnil; } - exc = rb_make_exception(argc, argv); + if (argc == 0) { + exc = rb_exc_new(rb_eRuntimeError, 0, 0); + } + else { + exc = rb_make_exception(argc, argv); + } rb_threadptr_async_errinfo_enque(th, exc); (... truncated) -- ML: ruby-changes@q... Info: http://www.atdot.net/~ko1/quickml/