ruby-changes:68128
From: Samuel <ko1@a...>
Date: Sun, 26 Sep 2021 20:50:14 +0900 (JST)
Subject: [ruby-changes:68128] 5e9ec35104 (ruby_3_0): Wake up join list within thread EC context. (#4471)
https://git.ruby-lang.org/ruby.git/commit/?id=5e9ec35104 From 5e9ec351044fb74f07f2a45a0dab1e226159b7e6 Mon Sep 17 00:00:00 2001 From: Samuel Williams <samuel.williams@o...> Date: Mon, 14 Jun 2021 17:56:53 +1200 Subject: Wake up join list within thread EC context. (#4471) * Wake up join list within thread EC context. * Consume items from join list so that they are not re-executed. If `rb_fiber_scheduler_unblock` raises an exception, it can result in a segfault if `rb_threadptr_join_list_wakeup` is not within a valid EC. This change moves `rb_threadptr_join_list_wakeup` into the thread's top level EC which initially caused an infinite loop because on exception will retry. We explicitly remove items from the thread's join list to avoid this situation. * Verify the required scheduler interface. * Test several scheduler hooks methods with broken `unblock` implementation. --- scheduler.c | 24 +++++++ test/fiber/scheduler.rb | 8 +++ test/fiber/test_scheduler.rb | 18 ++++- test/fiber/test_sleep.rb | 22 ++++++ test/fiber/test_thread.rb | 20 ++++++ thread.c | 156 +++++++++++++++++++++---------------------- 6 files changed, 167 insertions(+), 81 deletions(-) diff --git a/scheduler.c b/scheduler.c index 88db433..66cbfc6 100644 --- a/scheduler.c +++ b/scheduler.c @@ -49,12 +49,36 @@ rb_scheduler_get(void) https://github.com/ruby/ruby/blob/trunk/scheduler.c#L49 return thread->scheduler; } +static void +verify_interface(VALUE scheduler) +{ + if (!rb_respond_to(scheduler, id_block)) { + rb_raise(rb_eArgError, "Scheduler must implement #block!"); + } + + if (!rb_respond_to(scheduler, id_unblock)) { + rb_raise(rb_eArgError, "Scheduler must implement #unblock!"); + } + + if (!rb_respond_to(scheduler, id_kernel_sleep)) { + rb_raise(rb_eArgError, "Scheduler must implement #kernel_sleep!"); + } + + if (!rb_respond_to(scheduler, id_io_wait)) { + rb_raise(rb_eArgError, "Scheduler must implement #io_wait!"); + } +} + VALUE rb_scheduler_set(VALUE scheduler) { rb_thread_t *thread = GET_THREAD(); VM_ASSERT(thread); + if (scheduler != Qnil) { + verify_interface(scheduler); + } + // We invoke Scheduler#close when setting it to something else, to ensure the previous scheduler runs to completion before changing the scheduler. That way, we do not need to consider interactions, e.g., of a Fiber from the previous scheduler with the new scheduler. if (thread->scheduler != Qnil) { rb_scheduler_close(thread->scheduler); diff --git a/test/fiber/scheduler.rb b/test/fiber/scheduler.rb index b3c3eaf..0553e38 100644 --- a/test/fiber/scheduler.rb +++ b/test/fiber/scheduler.rb @@ -188,3 +188,11 @@ class Scheduler https://github.com/ruby/ruby/blob/trunk/test/fiber/scheduler.rb#L188 return fiber end end + +class BrokenUnblockScheduler < Scheduler + def unblock(blocker, fiber) + super + + raise "Broken unblock!" + end +end diff --git a/test/fiber/test_scheduler.rb b/test/fiber/test_scheduler.rb index 72bde9f..d1fb89d 100644 --- a/test/fiber/test_scheduler.rb +++ b/test/fiber/test_scheduler.rb @@ -66,9 +66,23 @@ class TestFiberScheduler < Test::Unit::TestCase https://github.com/ruby/ruby/blob/trunk/test/fiber/test_scheduler.rb#L66 RUBY end - def test_optional_close + def test_minimal_interface + scheduler = Object.new + + def scheduler.block + end + + def scheduler.unblock + end + + def scheduler.io_wait + end + + def scheduler.kernel_sleep + end + thread = Thread.new do - Fiber.set_scheduler Object.new + Fiber.set_scheduler scheduler end thread.join diff --git a/test/fiber/test_sleep.rb b/test/fiber/test_sleep.rb index e882766..8443697 100644 --- a/test/fiber/test_sleep.rb +++ b/test/fiber/test_sleep.rb @@ -43,4 +43,26 @@ class TestFiberSleep < Test::Unit::TestCase https://github.com/ruby/ruby/blob/trunk/test/fiber/test_sleep.rb#L43 assert_operator seconds, :>=, 2, "actual: %p" % seconds end + + def test_broken_sleep + thread = Thread.new do + Thread.current.report_on_exception = false + + scheduler = Scheduler.new + + def scheduler.kernel_sleep(duration = nil) + raise "Broken sleep!" + end + + Fiber.set_scheduler scheduler + + Fiber.schedule do + sleep 0 + end + end + + assert_raise(RuntimeError) do + thread.join + end + end end diff --git a/test/fiber/test_thread.rb b/test/fiber/test_thread.rb index 5fc80f0..b7323d7 100644 --- a/test/fiber/test_thread.rb +++ b/test/fiber/test_thread.rb @@ -42,4 +42,24 @@ class TestFiberThread < Test::Unit::TestCase https://github.com/ruby/ruby/blob/trunk/test/fiber/test_thread.rb#L42 assert_equal :done, thread.value end + + def test_broken_unblock + thread = Thread.new do + Thread.current.report_on_exception = false + + scheduler = BrokenUnblockScheduler.new + + Fiber.set_scheduler scheduler + + Fiber.schedule do + Thread.new{}.join + end + + scheduler.run + end + + assert_raise(RuntimeError) do + thread.join + end + end end diff --git a/thread.c b/thread.c index ec7a9b7..508772c 100644 --- a/thread.c +++ b/thread.c @@ -539,9 +539,12 @@ terminate_all(rb_ractor_t *r, const rb_thread_t *main_thread) https://github.com/ruby/ruby/blob/trunk/thread.c#L539 static void rb_threadptr_join_list_wakeup(rb_thread_t *thread) { - struct rb_waiting_list *join_list = thread->join_list; + while (thread->join_list) { + struct rb_waiting_list *join_list = thread->join_list; + + // Consume the entry from the join list: + thread->join_list = join_list->next; - while (join_list) { rb_thread_t *target_thread = join_list->thread; if (target_thread->scheduler != Qnil && rb_fiberptr_blocking(join_list->fiber) == 0) { @@ -557,25 +560,20 @@ rb_threadptr_join_list_wakeup(rb_thread_t *thread) https://github.com/ruby/ruby/blob/trunk/thread.c#L560 break; } } - - join_list = join_list->next; } } void rb_threadptr_unlock_all_locking_mutexes(rb_thread_t *th) { - const char *err; - rb_mutex_t *mutex; - rb_mutex_t *mutexes = th->keeping_mutexes; + while (th->keeping_mutexes) { + rb_mutex_t *mutex = th->keeping_mutexes; + th->keeping_mutexes = mutex->next_mutex; + + /* rb_warn("mutex #<%p> remains to be locked by terminated thread", (void *)mutexes); */ - while (mutexes) { - mutex = mutexes; - /* rb_warn("mutex #<%p> remains to be locked by terminated thread", - (void *)mutexes); */ - mutexes = mutex->next_mutex; - err = rb_mutex_unlock_th(mutex, th, mutex->fiber); - if (err) rb_bug("invalid keeping_mutexes: %s", err); + const char *error_message = rb_mutex_unlock_th(mutex, th, mutex->fiber); + if (error_message) rb_bug("invalid keeping_mutexes: %s", error_message); } } @@ -816,87 +814,87 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start) https://github.com/ruby/ruby/blob/trunk/thread.c#L814 th->ec->machine.stack_start = STACK_DIR_UPPER(vm_stack + size, vm_stack); th->ec->machine.stack_maxsize -= size * sizeof(VALUE); - { - thread_debug("thread start (get lock): %p\n", (void *)th); + thread_debug("thread start (get lock): %p\n", (void *)th); - EC_PUSH_TAG(th->ec); - if ((state = EC_EXEC_TAG()) == TAG_NONE) { - SAVE_ROOT_JMPBUF(th, thread_do_start(th)); - } - else { - errinfo = th->ec->errinfo; + EC_PUSH_TAG(th->ec); - if (state == TAG_FATAL) { - if (th->invoke_type == thread_invoke_type_ractor_proc) { - rb_ractor_atexit(th->ec, Qnil); - } - /* fatal error within this thread, need to stop whole script */ - } - else if (rb_obj_is_kind_of(errinfo, rb_eSystemExit)) { - /* exit on main_thread. */ - } - else { - if (th->report_on_exception) { - VALUE mesg = rb_thread_to_s(th->self); - rb_str_cat_cstr(mesg, " terminated with exception (report_on_exception is true):\n"); - rb_write_error_str(mesg); - rb_ec_error_print(th->ec, errinfo); - } + if ((state = EC_EXEC_TAG()) == TAG_NONE) { + SAVE_ROOT_JMPBUF(th, thread_do_start(th)); + } else { + errinfo = th->ec->errinfo; - if (th->invoke_type == thread_invoke_type_ractor_proc) { - rb_ractor_atexit_exception(th->ec); - } + if (state == TAG_FATAL) { + if (th->invoke_type == thread_invoke_type_ractor_proc) { + rb_ractor_atexit(th->ec, Qnil); + } + /* fatal error within this thread, need to stop whole script */ + } + else if (rb_obj_is_kind_of(errinfo, rb_eSystemExit)) { + /* exit on main_thread. */ + } + else { + if (th->report_on_exception) { + VALUE mesg = rb_thread_to_s(th->self); + rb_str_cat_cstr(mesg, " terminated with exception (report_on_exception is true):\n"); + rb_write_error_str(mesg); + rb_ec_error_print(th->ec, errinfo); + } - if (th->vm->thread_abort_on_exception || - th->abort_on_exception || RTEST(ruby_debug)) { - /* exit on main_thread */ - } - else { - errinfo = Qnil; - } - } - th->value = Qnil; - } + if (th->invoke_type == thread_invoke_type_ractor_proc) { + rb_ractor_atexit_exception(th->ec); + } - if (th->invoke_type == thread_invoke_type_ractor_proc) { - rb_thread_terminate_all(th); - rb_ractor_teardown(th->ec); + if (th->vm->thread_abort_on_exception || + th->abort_on_exception || RTEST(ruby_debug)) { + /* exit on main_thread */ + } + else { + errinfo = Qnil; + (... truncated) -- ML: ruby-changes@q... Info: http://www.atdot.net/~ko1/quickml/