[前][次][番号順一覧][スレッド一覧]

ruby-changes:68128

From: Samuel <ko1@a...>
Date: Sun, 26 Sep 2021 20:50:14 +0900 (JST)
Subject: [ruby-changes:68128] 5e9ec35104 (ruby_3_0): Wake up join list within thread EC context. (#4471)

https://git.ruby-lang.org/ruby.git/commit/?id=5e9ec35104

From 5e9ec351044fb74f07f2a45a0dab1e226159b7e6 Mon Sep 17 00:00:00 2001
From: Samuel Williams <samuel.williams@o...>
Date: Mon, 14 Jun 2021 17:56:53 +1200
Subject: Wake up join list within thread EC context. (#4471)

* Wake up join list within thread EC context.

* Consume items from join list so that they are not re-executed.

If `rb_fiber_scheduler_unblock` raises an exception, it can result in a
segfault if `rb_threadptr_join_list_wakeup` is not within a valid EC. This
change moves `rb_threadptr_join_list_wakeup` into the thread's top level EC
which initially caused an infinite loop because on exception will retry. We
explicitly remove items from the thread's join list to avoid this situation.

* Verify the required scheduler interface.

* Test several scheduler hooks methods with broken `unblock` implementation.
---
 scheduler.c                  |  24 +++++++
 test/fiber/scheduler.rb      |   8 +++
 test/fiber/test_scheduler.rb |  18 ++++-
 test/fiber/test_sleep.rb     |  22 ++++++
 test/fiber/test_thread.rb    |  20 ++++++
 thread.c                     | 156 +++++++++++++++++++++----------------------
 6 files changed, 167 insertions(+), 81 deletions(-)

diff --git a/scheduler.c b/scheduler.c
index 88db433..66cbfc6 100644
--- a/scheduler.c
+++ b/scheduler.c
@@ -49,12 +49,36 @@ rb_scheduler_get(void) https://github.com/ruby/ruby/blob/trunk/scheduler.c#L49
     return thread->scheduler;
 }
 
+static void
+verify_interface(VALUE scheduler)
+{
+    if (!rb_respond_to(scheduler, id_block)) {
+        rb_raise(rb_eArgError, "Scheduler must implement #block!");
+    }
+
+    if (!rb_respond_to(scheduler, id_unblock)) {
+        rb_raise(rb_eArgError, "Scheduler must implement #unblock!");
+    }
+
+    if (!rb_respond_to(scheduler, id_kernel_sleep)) {
+        rb_raise(rb_eArgError, "Scheduler must implement #kernel_sleep!");
+    }
+
+    if (!rb_respond_to(scheduler, id_io_wait)) {
+        rb_raise(rb_eArgError, "Scheduler must implement #io_wait!");
+    }
+}
+
 VALUE
 rb_scheduler_set(VALUE scheduler)
 {
     rb_thread_t *thread = GET_THREAD();
     VM_ASSERT(thread);
 
+    if (scheduler != Qnil) {
+        verify_interface(scheduler);
+    }
+
     // We invoke Scheduler#close when setting it to something else, to ensure the previous scheduler runs to completion before changing the scheduler. That way, we do not need to consider interactions, e.g., of a Fiber from the previous scheduler with the new scheduler.
     if (thread->scheduler != Qnil) {
         rb_scheduler_close(thread->scheduler);
diff --git a/test/fiber/scheduler.rb b/test/fiber/scheduler.rb
index b3c3eaf..0553e38 100644
--- a/test/fiber/scheduler.rb
+++ b/test/fiber/scheduler.rb
@@ -188,3 +188,11 @@ class Scheduler https://github.com/ruby/ruby/blob/trunk/test/fiber/scheduler.rb#L188
     return fiber
   end
 end
+
+class BrokenUnblockScheduler < Scheduler
+  def unblock(blocker, fiber)
+    super
+
+    raise "Broken unblock!"
+  end
+end
diff --git a/test/fiber/test_scheduler.rb b/test/fiber/test_scheduler.rb
index 72bde9f..d1fb89d 100644
--- a/test/fiber/test_scheduler.rb
+++ b/test/fiber/test_scheduler.rb
@@ -66,9 +66,23 @@ class TestFiberScheduler < Test::Unit::TestCase https://github.com/ruby/ruby/blob/trunk/test/fiber/test_scheduler.rb#L66
     RUBY
   end
 
-  def test_optional_close
+  def test_minimal_interface
+    scheduler = Object.new
+
+    def scheduler.block
+    end
+
+    def scheduler.unblock
+    end
+
+    def scheduler.io_wait
+    end
+
+    def scheduler.kernel_sleep
+    end
+
     thread = Thread.new do
-      Fiber.set_scheduler Object.new
+      Fiber.set_scheduler scheduler
     end
 
     thread.join
diff --git a/test/fiber/test_sleep.rb b/test/fiber/test_sleep.rb
index e882766..8443697 100644
--- a/test/fiber/test_sleep.rb
+++ b/test/fiber/test_sleep.rb
@@ -43,4 +43,26 @@ class TestFiberSleep < Test::Unit::TestCase https://github.com/ruby/ruby/blob/trunk/test/fiber/test_sleep.rb#L43
 
     assert_operator seconds, :>=, 2, "actual: %p" % seconds
   end
+
+  def test_broken_sleep
+    thread = Thread.new do
+      Thread.current.report_on_exception = false
+
+      scheduler = Scheduler.new
+
+      def scheduler.kernel_sleep(duration = nil)
+        raise "Broken sleep!"
+      end
+
+      Fiber.set_scheduler scheduler
+
+      Fiber.schedule do
+        sleep 0
+      end
+    end
+
+    assert_raise(RuntimeError) do
+      thread.join
+    end
+  end
 end
diff --git a/test/fiber/test_thread.rb b/test/fiber/test_thread.rb
index 5fc80f0..b7323d7 100644
--- a/test/fiber/test_thread.rb
+++ b/test/fiber/test_thread.rb
@@ -42,4 +42,24 @@ class TestFiberThread < Test::Unit::TestCase https://github.com/ruby/ruby/blob/trunk/test/fiber/test_thread.rb#L42
 
     assert_equal :done, thread.value
   end
+
+  def test_broken_unblock
+    thread = Thread.new do
+      Thread.current.report_on_exception = false
+
+      scheduler = BrokenUnblockScheduler.new
+
+      Fiber.set_scheduler scheduler
+
+      Fiber.schedule do
+        Thread.new{}.join
+      end
+
+      scheduler.run
+    end
+
+    assert_raise(RuntimeError) do
+      thread.join
+    end
+  end
 end
diff --git a/thread.c b/thread.c
index ec7a9b7..508772c 100644
--- a/thread.c
+++ b/thread.c
@@ -539,9 +539,12 @@ terminate_all(rb_ractor_t *r, const rb_thread_t *main_thread) https://github.com/ruby/ruby/blob/trunk/thread.c#L539
 static void
 rb_threadptr_join_list_wakeup(rb_thread_t *thread)
 {
-    struct rb_waiting_list *join_list = thread->join_list;
+    while (thread->join_list) {
+        struct rb_waiting_list *join_list = thread->join_list;
+
+        // Consume the entry from the join list:
+        thread->join_list = join_list->next;
 
-    while (join_list) {
         rb_thread_t *target_thread = join_list->thread;
 
         if (target_thread->scheduler != Qnil && rb_fiberptr_blocking(join_list->fiber) == 0) {
@@ -557,25 +560,20 @@ rb_threadptr_join_list_wakeup(rb_thread_t *thread) https://github.com/ruby/ruby/blob/trunk/thread.c#L560
                     break;
             }
         }
-
-        join_list = join_list->next;
     }
 }
 
 void
 rb_threadptr_unlock_all_locking_mutexes(rb_thread_t *th)
 {
-    const char *err;
-    rb_mutex_t *mutex;
-    rb_mutex_t *mutexes = th->keeping_mutexes;
+    while (th->keeping_mutexes) {
+        rb_mutex_t *mutex = th->keeping_mutexes;
+        th->keeping_mutexes = mutex->next_mutex;
+
+        /* rb_warn("mutex #<%p> remains to be locked by terminated thread", (void *)mutexes); */
 
-    while (mutexes) {
-	mutex = mutexes;
-	/* rb_warn("mutex #<%p> remains to be locked by terminated thread",
-		(void *)mutexes); */
-	mutexes = mutex->next_mutex;
-	err = rb_mutex_unlock_th(mutex, th, mutex->fiber);
-	if (err) rb_bug("invalid keeping_mutexes: %s", err);
+        const char *error_message = rb_mutex_unlock_th(mutex, th, mutex->fiber);
+        if (error_message) rb_bug("invalid keeping_mutexes: %s", error_message);
     }
 }
 
@@ -816,87 +814,87 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start) https://github.com/ruby/ruby/blob/trunk/thread.c#L814
     th->ec->machine.stack_start = STACK_DIR_UPPER(vm_stack + size, vm_stack);
     th->ec->machine.stack_maxsize -= size * sizeof(VALUE);
 
-    {
-	thread_debug("thread start (get lock): %p\n", (void *)th);
+    thread_debug("thread start (get lock): %p\n", (void *)th);
 
-	EC_PUSH_TAG(th->ec);
-	if ((state = EC_EXEC_TAG()) == TAG_NONE) {
-            SAVE_ROOT_JMPBUF(th, thread_do_start(th));
-	}
-	else {
-	    errinfo = th->ec->errinfo;
+    EC_PUSH_TAG(th->ec);
 
-            if (state == TAG_FATAL) {
-                if (th->invoke_type == thread_invoke_type_ractor_proc) {
-                    rb_ractor_atexit(th->ec, Qnil);
-                }
-		/* fatal error within this thread, need to stop whole script */
-	    }
-	    else if (rb_obj_is_kind_of(errinfo, rb_eSystemExit)) {
-		/* exit on main_thread. */
-	    }
-	    else {
-                if (th->report_on_exception) {
-		    VALUE mesg = rb_thread_to_s(th->self);
-		    rb_str_cat_cstr(mesg, " terminated with exception (report_on_exception is true):\n");
-		    rb_write_error_str(mesg);
-		    rb_ec_error_print(th->ec, errinfo);
-		}
+    if ((state = EC_EXEC_TAG()) == TAG_NONE) {
+        SAVE_ROOT_JMPBUF(th, thread_do_start(th));
+    } else {
+        errinfo = th->ec->errinfo;
 
-                if (th->invoke_type == thread_invoke_type_ractor_proc) {
-                    rb_ractor_atexit_exception(th->ec);
-                }
+        if (state == TAG_FATAL) {
+            if (th->invoke_type == thread_invoke_type_ractor_proc) {
+                rb_ractor_atexit(th->ec, Qnil);
+            }
+            /* fatal error within this thread, need to stop whole script */
+        }
+        else if (rb_obj_is_kind_of(errinfo, rb_eSystemExit)) {
+            /* exit on main_thread. */
+        }
+        else {
+            if (th->report_on_exception) {
+                VALUE mesg = rb_thread_to_s(th->self);
+                rb_str_cat_cstr(mesg, " terminated with exception (report_on_exception is true):\n");
+                rb_write_error_str(mesg);
+                rb_ec_error_print(th->ec, errinfo);
+            }
 
-                if (th->vm->thread_abort_on_exception ||
-                    th->abort_on_exception || RTEST(ruby_debug)) {
-		    /* exit on main_thread */
-		}
-		else {
-		    errinfo = Qnil;
-		}
-	    }
-	    th->value = Qnil;
-	}
+            if (th->invoke_type == thread_invoke_type_ractor_proc) {
+                rb_ractor_atexit_exception(th->ec);
+            }
 
-        if (th->invoke_type == thread_invoke_type_ractor_proc) {
-            rb_thread_terminate_all(th);
-            rb_ractor_teardown(th->ec);
+            if (th->vm->thread_abort_on_exception ||
+                th->abort_on_exception || RTEST(ruby_debug)) {
+                /* exit on main_thread */
+            }
+            else {
+                errinfo = Qnil;
+          (... truncated)

--
ML: ruby-changes@q...
Info: http://www.atdot.net/~ko1/quickml/

[前][次][番号順一覧][スレッド一覧]