[前][次][番号順一覧][スレッド一覧]

ruby-changes:74131

From: Samuel <ko1@a...>
Date: Thu, 20 Oct 2022 09:39:11 +0900 (JST)
Subject: [ruby-changes:74131] 7f175e5648 (master): Avoid missed wakeup with fiber scheduler and Fiber.blocking. (#6588)

https://git.ruby-lang.org/ruby.git/commit/?id=7f175e5648

From 7f175e564875b011efb43537907867dd08d659e8 Mon Sep 17 00:00:00 2001
From: Samuel Williams <samuel.williams@o...>
Date: Thu, 20 Oct 2022 13:38:52 +1300
Subject: Avoid missed wakeup with fiber scheduler and Fiber.blocking. (#6588)

* Ensure that blocked fibers don't prevent valid wakeups.
---
 test/fiber/scheduler.rb      | 11 +++++++++++
 test/fiber/test_scheduler.rb | 42 ++++++++++++++++++++++++++++++++++++++++++
 thread.c                     |  4 ++--
 thread_sync.c                | 25 +++++++++++++++++--------
 4 files changed, 72 insertions(+), 10 deletions(-)

diff --git a/test/fiber/scheduler.rb b/test/fiber/scheduler.rb
index 3fd41ef6f1..204a297133 100644
--- a/test/fiber/scheduler.rb
+++ b/test/fiber/scheduler.rb
@@ -350,3 +350,14 @@ class SleepingUnblockScheduler < Scheduler https://github.com/ruby/ruby/blob/trunk/test/fiber/scheduler.rb#L350
     sleep(0.1)
   end
 end
+
+class SleepingBlockingScheduler < Scheduler
+  def kernel_sleep(duration = nil)
+    # Deliberaly sleep in a blocking state which can trigger a deadlock if the implementation is not correct.
+    Fiber.blocking{sleep 0.0001}
+
+    self.block(:sleep, duration)
+
+    return true
+  end
+end
diff --git a/test/fiber/test_scheduler.rb b/test/fiber/test_scheduler.rb
index 5a24bff04f..300d30ad63 100644
--- a/test/fiber/test_scheduler.rb
+++ b/test/fiber/test_scheduler.rb
@@ -138,4 +138,46 @@ class TestFiberScheduler < Test::Unit::TestCase https://github.com/ruby/ruby/blob/trunk/test/fiber/test_scheduler.rb#L138
       Object.send(:remove_const, :TestFiberSchedulerAutoload)
     end
   end
+
+  def test_deadlock
+    mutex = Thread::Mutex.new
+    condition = Thread::ConditionVariable.new
+    q = 0.0001
+
+    signaller = Thread.new do
+      loop do
+        mutex.synchronize do
+          condition.signal
+        end
+        sleep q
+      end
+    end
+
+    i = 0
+
+    thread = Thread.new do
+      scheduler = SleepingBlockingScheduler.new
+      Fiber.set_scheduler scheduler
+
+      Fiber.schedule do
+        10.times do
+          mutex.synchronize do
+            condition.wait(mutex)
+            sleep q
+            i += 1
+          end
+        end
+      end
+    end
+
+    # Wait for 10 seconds at most... if it doesn't finish, it's deadlocked.
+    thread.join(10)
+
+    # If it's deadlocked, it will never finish, so this will be 0.
+    assert_equal 10, i
+  ensure
+    # Make sure the threads are dead...
+    thread.kill
+    signaller.kill
+  end
 end
diff --git a/thread.c b/thread.c
index e1b194861a..d8925e618e 100644
--- a/thread.c
+++ b/thread.c
@@ -404,7 +404,7 @@ rb_threadptr_join_list_wakeup(rb_thread_t *thread) https://github.com/ruby/ruby/blob/trunk/thread.c#L404
 
         rb_thread_t *target_thread = join_list->thread;
 
-        if (target_thread->scheduler != Qnil && rb_fiberptr_blocking(join_list->fiber) == 0) {
+        if (target_thread->scheduler != Qnil && join_list->fiber) {
             rb_fiber_scheduler_unblock(target_thread->scheduler, target_thread->self, rb_fiberptr_self(join_list->fiber));
         }
         else {
@@ -1091,7 +1091,7 @@ thread_join(rb_thread_t *target_th, VALUE timeout, rb_hrtime_t *limit) https://github.com/ruby/ruby/blob/trunk/thread.c#L1091
         struct rb_waiting_list waiter;
         waiter.next = target_th->join_list;
         waiter.thread = th;
-        waiter.fiber = fiber;
+        waiter.fiber = rb_fiberptr_blocking(fiber) ? NULL : fiber;
         target_th->join_list = &waiter;
 
         struct join_arg arg;
diff --git a/thread_sync.c b/thread_sync.c
index 3888534468..2bcf59137e 100644
--- a/thread_sync.c
+++ b/thread_sync.c
@@ -20,6 +20,16 @@ struct sync_waiter { https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L20
     struct ccan_list_node node;
 };
 
+static inline rb_fiber_t*
+nonblocking_fiber(rb_fiber_t *fiber)
+{
+    if (rb_fiberptr_blocking(fiber)) {
+        return NULL;
+    }
+
+    return fiber;
+}
+
 struct queue_sleep_arg {
     VALUE self;
     VALUE timeout;
@@ -37,8 +47,7 @@ sync_wakeup(struct ccan_list_head *head, long max) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L47
         ccan_list_del_init(&cur->node);
 
         if (cur->th->status != THREAD_KILLED) {
-
-            if (cur->th->scheduler != Qnil && rb_fiberptr_blocking(cur->fiber) == 0) {
+            if (cur->th->scheduler != Qnil && cur->fiber) {
                 rb_fiber_scheduler_unblock(cur->th->scheduler, cur->self, rb_fiberptr_self(cur->fiber));
             }
             else {
@@ -306,7 +315,7 @@ do_mutex_lock(VALUE self, int interruptible_p) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L315
                 struct sync_waiter sync_waiter = {
                     .self = self,
                     .th = th,
-                    .fiber = fiber
+                    .fiber = nonblocking_fiber(fiber)
                 };
 
                 ccan_list_add_tail(&mutex->waitq, &sync_waiter.node);
@@ -339,7 +348,7 @@ do_mutex_lock(VALUE self, int interruptible_p) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L348
                 struct sync_waiter sync_waiter = {
                     .self = self,
                     .th = th,
-                    .fiber = fiber
+                    .fiber = nonblocking_fiber(fiber)
                 };
 
                 ccan_list_add_tail(&mutex->waitq, &sync_waiter.node);
@@ -437,7 +446,7 @@ rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_fiber_t *fiber) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L446
         ccan_list_for_each_safe(&mutex->waitq, cur, next, node) {
             ccan_list_del_init(&cur->node);
 
-            if (cur->th->scheduler != Qnil && rb_fiberptr_blocking(cur->fiber) == 0) {
+            if (cur->th->scheduler != Qnil && cur->fiber) {
                 rb_fiber_scheduler_unblock(cur->th->scheduler, cur->self, rb_fiberptr_self(cur->fiber));
                 goto found;
             }
@@ -1051,7 +1060,7 @@ queue_do_pop(VALUE self, struct rb_queue *q, int should_block, VALUE timeout) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L1060
             assert(queue_closed_p(self) == 0);
 
             struct queue_waiter queue_waiter = {
-                .w = {.self = self, .th = ec->thread_ptr, .fiber = ec->fiber_ptr},
+                .w = {.self = self, .th = ec->thread_ptr, .fiber = nonblocking_fiber(ec->fiber_ptr)},
                 .as = {.q = q}
             };
 
@@ -1258,7 +1267,7 @@ rb_szqueue_push(rb_execution_context_t *ec, VALUE self, VALUE object, VALUE non_ https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L1267
         else {
             rb_execution_context_t *ec = GET_EC();
             struct queue_waiter queue_waiter = {
-                .w = {.self = self, .th = ec->thread_ptr, .fiber = ec->fiber_ptr},
+                .w = {.self = self, .th = ec->thread_ptr, .fiber = nonblocking_fiber(ec->fiber_ptr)},
                 .as = {.sq = sq}
             };
 
@@ -1491,7 +1500,7 @@ rb_condvar_wait(int argc, VALUE *argv, VALUE self) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L1500
     struct sync_waiter sync_waiter = {
         .self = args.mutex,
         .th = ec->thread_ptr,
-        .fiber = ec->fiber_ptr
+        .fiber = nonblocking_fiber(ec->fiber_ptr)
     };
 
     ccan_list_add_tail(&cv->waitq, &sync_waiter.node);
-- 
cgit v1.2.3


--
ML: ruby-changes@q...
Info: http://www.atdot.net/~ko1/quickml/

[前][次][番号順一覧][スレッド一覧]