ruby-changes:67029
From: Samuel <ko1@a...>
Date: Tue, 3 Aug 2021 19:24:05 +0900 (JST)
Subject: [ruby-changes:67029] 2d4f29e77e (master): Fix potential hang when joining threads.
https://git.ruby-lang.org/ruby.git/commit/?id=2d4f29e77e From 2d4f29e77e883c29e35417799f8001b8046cde03 Mon Sep 17 00:00:00 2001 From: Samuel Williams <samuel.williams@o...> Date: Wed, 28 Jul 2021 19:55:55 +1200 Subject: Fix potential hang when joining threads. If the thread termination invokes user code after `th->status` becomes `THREAD_KILLED`, and the user unblock function causes that `th->status` to become something else (e.g. `THREAD_RUNNING`), threads waiting in `thread_join_sleep` will hang forever. We move the unblock function call to before the thread status is updated, and allow threads to join as soon as `th->value` becomes defined. This reverts commit 6505c77501f1924571b2fe620c5c7b31ede0cd22. --- test/fiber/scheduler.rb | 16 ++++++++-- test/fiber/test_thread.rb | 39 ++++++++++++++++++++++++ thread.c | 75 ++++++++++++++++++++++++++++++----------------- vm.c | 17 ++++++----- 4 files changed, 111 insertions(+), 36 deletions(-) diff --git a/test/fiber/scheduler.rb b/test/fiber/scheduler.rb index af64e4e..2785561 100644 --- a/test/fiber/scheduler.rb +++ b/test/fiber/scheduler.rb @@ -112,8 +112,10 @@ class Scheduler https://github.com/ruby/ruby/blob/trunk/test/fiber/scheduler.rb#L112 self.run ensure - @urgent.each(&:close) - @urgent = nil + if @urgent + @urgent.each(&:close) + @urgent = nil + end @closed = true @@ -240,3 +242,13 @@ class BrokenUnblockScheduler < Scheduler https://github.com/ruby/ruby/blob/trunk/test/fiber/scheduler.rb#L242 raise "Broken unblock!" end end + +class SleepingUnblockScheduler < Scheduler + # This method is invoked when the thread is exiting. + def unblock(blocker, fiber) + super + + # This changes the current thread state to `THREAD_RUNNING` which causes `thread_join_sleep` to hang. + sleep(0.1) + end +end diff --git a/test/fiber/test_thread.rb b/test/fiber/test_thread.rb index 843604b..5c25c43 100644 --- a/test/fiber/test_thread.rb +++ b/test/fiber/test_thread.rb @@ -20,6 +20,31 @@ class TestFiberThread < Test::Unit::TestCase https://github.com/ruby/ruby/blob/trunk/test/fiber/test_thread.rb#L20 assert_equal :done, thread.value end + def test_thread_join_implicit + sleeping = false + finished = false + + thread = Thread.new do + scheduler = Scheduler.new + Fiber.set_scheduler scheduler + + Fiber.schedule do + sleeping = true + sleep(0.1) + finished = true + end + + :done + end + + Thread.pass until sleeping + + thread.join + + assert_equal :done, thread.value + assert finished, "Scheduler thread's task should be finished!" + end + def test_thread_join_blocking thread = Thread.new do scheduler = Scheduler.new @@ -66,4 +91,18 @@ class TestFiberThread < Test::Unit::TestCase https://github.com/ruby/ruby/blob/trunk/test/fiber/test_thread.rb#L91 thread.join end end + + def test_thread_join_hang + thread = Thread.new do + scheduler = SleepingUnblockScheduler.new + + Fiber.set_scheduler scheduler + + Fiber.schedule do + Thread.new{sleep(0.01)}.value + end + end + + thread.join + end end diff --git a/thread.c b/thread.c index 81c1aaa..7d10545 100644 --- a/thread.c +++ b/thread.c @@ -632,6 +632,7 @@ thread_cleanup_func_before_exec(void *th_ptr) https://github.com/ruby/ruby/blob/trunk/thread.c#L632 { rb_thread_t *th = th_ptr; th->status = THREAD_KILLED; + // The thread stack doesn't exist in the forked process: th->ec->machine.stack_start = th->ec->machine.stack_end = NULL; @@ -688,7 +689,7 @@ rb_vm_proc_local_ep(VALUE proc) https://github.com/ruby/ruby/blob/trunk/thread.c#L689 VALUE rb_vm_invoke_proc_with_self(rb_execution_context_t *ec, rb_proc_t *proc, VALUE self, int argc, const VALUE *argv, int kw_splat, VALUE passed_block_handler); -static void +static VALUE thread_do_start_proc(rb_thread_t *th) { VALUE args = th->invoke_arg.proc.args; @@ -702,7 +703,6 @@ thread_do_start_proc(rb_thread_t *th) https://github.com/ruby/ruby/blob/trunk/thread.c#L703 th->ec->root_lep = rb_vm_proc_local_ep(procval); th->ec->root_svar = Qfalse; - EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_THREAD_BEGIN, th->self, 0, 0, 0, Qundef); vm_check_ints_blocking(th->ec); if (th->invoke_type == thread_invoke_type_ractor_proc) { @@ -713,11 +713,12 @@ thread_do_start_proc(rb_thread_t *th) https://github.com/ruby/ruby/blob/trunk/thread.c#L713 rb_ractor_receive_parameters(th->ec, th->ractor, args_len, (VALUE *)args_ptr); vm_check_ints_blocking(th->ec); - // kick thread - th->value = rb_vm_invoke_proc_with_self(th->ec, proc, self, - args_len, args_ptr, - th->invoke_arg.proc.kw_splat, - VM_BLOCK_HANDLER_NONE); + return rb_vm_invoke_proc_with_self( + th->ec, proc, self, + args_len, args_ptr, + th->invoke_arg.proc.kw_splat, + VM_BLOCK_HANDLER_NONE + ); } else { args_len = RARRAY_LENINT(args); @@ -733,17 +734,12 @@ thread_do_start_proc(rb_thread_t *th) https://github.com/ruby/ruby/blob/trunk/thread.c#L734 vm_check_ints_blocking(th->ec); - // kick thread - th->value = rb_vm_invoke_proc(th->ec, proc, - args_len, args_ptr, - th->invoke_arg.proc.kw_splat, - VM_BLOCK_HANDLER_NONE); - } - - EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_THREAD_END, th->self, 0, 0, 0, Qundef); - - if (th->invoke_type == thread_invoke_type_ractor_proc) { - rb_ractor_atexit(th->ec, th->value); + return rb_vm_invoke_proc( + th->ec, proc, + args_len, args_ptr, + th->invoke_arg.proc.kw_splat, + VM_BLOCK_HANDLER_NONE + ); } } @@ -751,20 +747,33 @@ static void https://github.com/ruby/ruby/blob/trunk/thread.c#L747 thread_do_start(rb_thread_t *th) { native_set_thread_name(th); + VALUE result = Qundef; + + EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_THREAD_BEGIN, th->self, 0, 0, 0, Qundef); switch (th->invoke_type) { case thread_invoke_type_proc: + result = thread_do_start_proc(th); + break; + case thread_invoke_type_ractor_proc: - thread_do_start_proc(th); + result = thread_do_start_proc(th); + rb_ractor_atexit(th->ec, result); break; + case thread_invoke_type_func: - th->value = (*th->invoke_arg.func.func)(th->invoke_arg.func.arg); + result = (*th->invoke_arg.func.func)(th->invoke_arg.func.arg); break; + case thread_invoke_type_none: rb_bug("unreachable"); } rb_fiber_scheduler_set(Qnil); + + th->value = result; + + EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_THREAD_END, th->self, 0, 0, 0, Qundef); } void rb_ec_clear_current_thread_trace_func(const rb_execution_context_t *ec); @@ -817,6 +826,9 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start) https://github.com/ruby/ruby/blob/trunk/thread.c#L826 thread_debug("thread start (get lock): %p\n", (void *)th); + // Ensure that we are not joinable. + VM_ASSERT(th->value == Qundef); + EC_PUSH_TAG(th->ec); if ((state = EC_EXEC_TAG()) == TAG_NONE) { @@ -857,6 +869,12 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start) https://github.com/ruby/ruby/blob/trunk/thread.c#L869 th->value = Qnil; } + // The thread is effectively finished and can be joined. + VM_ASSERT(th->value != Qundef); + + rb_threadptr_join_list_wakeup(th); + rb_threadptr_unlock_all_locking_mutexes(th); + if (th->invoke_type == thread_invoke_type_ractor_proc) { rb_thread_terminate_all(th); rb_ractor_teardown(th->ec); @@ -874,9 +892,6 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start) https://github.com/ruby/ruby/blob/trunk/thread.c#L892 rb_threadptr_raise(ractor_main_th, 1, &errinfo); } - rb_threadptr_join_list_wakeup(th); - rb_threadptr_unlock_all_locking_mutexes(th); - EC_POP_TAG(); rb_ec_clear_current_thread_trace_func(th->ec); @@ -1153,6 +1168,12 @@ remove_from_join_list(VALUE arg) https://github.com/ruby/ruby/blob/trunk/thread.c#L1168 static rb_hrtime_t *double2hrtime(rb_hrtime_t *, double); +static int +thread_finished(rb_thread_t *th) +{ + return th->status == THREAD_KILLED || th->value != Qundef; +} + static VALUE thread_join_sleep(VALUE arg) { @@ -1179,7 +1200,7 @@ thread_join_sleep(VALUE arg) https://github.com/ruby/ruby/blob/trunk/thread.c#L1200 end = rb_hrtime_add(*limit, rb_hrtime_now()); } - while (target_th->status != THREAD_KILLED) { + while (!thread_finished(target_th)) { VALUE scheduler = rb_fiber_scheduler_current(); if (scheduler != Qnil) { @@ -3319,11 +3340,11 @@ rb_thread_status(VALUE thread) https://github.com/ruby/ruby/blob/trunk/thread.c#L3340 static VALUE rb_thread_alive_p(VALUE thread) { - if (rb_threadptr_dead(rb_thread_ptr(thread))) { - return Qfalse; + if (thread_finished(rb_thread_ptr(thread))) { + return Qfalse; } else { - return Qtrue; + return Qtrue; } } diff --git a/vm.c b/vm.c index b743bbf..307c595 100644 --- a/vm.c +++ b/vm.c @@ -3075,6 +3075,8 @@ th_init(rb_thread_t *th, VALUE self) https://github.com/ruby/ruby/blob/trunk/vm.c#L3075 th->thread_id_string[0] = '\0'; #endif + th->value = Qundef; + #if OPT_CALL_THREADED_CODE th->retval = Qundef; #endif @@ -3087,16 +3089,17 @@ static VALUE https://github.com/ruby/ruby/blob/trunk/vm.c#L3089 ruby_thread_init(VALUE self) { rb_thread_t *th = GET_THREAD(); - rb_thread_t *targe_th = rb_thread_ptr(self); + rb_thread_t *target_th = rb_thread_ (... truncated) -- ML: ruby-changes@q... Info: http://www.atdot.net/~ko1/quickml/