[前][次][番号順一覧][スレッド一覧]

ruby-changes:62928

From: Benoit <ko1@a...>
Date: Mon, 14 Sep 2020 13:44:42 +0900 (JST)
Subject: [ruby-changes:62928] 178c1b0922 (master): Make Mutex per-Fiber instead of per-Thread

https://git.ruby-lang.org/ruby.git/commit/?id=178c1b0922

From 178c1b0922dc727897d81d7cfe9c97d5ffa97fd9 Mon Sep 17 00:00:00 2001
From: Benoit Daloze <eregontp@g...>
Date: Sat, 5 Sep 2020 16:26:24 +1200
Subject: Make Mutex per-Fiber instead of per-Thread

* Enables Mutex to be used as synchronization between multiple Fibers
  of the same Thread.
* With a Fiber scheduler we can yield to another Fiber on contended
  Mutex#lock instead of blocking the entire thread.
* This also makes the behavior of Mutex consistent across CRuby, JRuby and TruffleRuby.
* [Feature #16792]

diff --git a/cont.c b/cont.c
index d228107..0304f4c 100644
--- a/cont.c
+++ b/cont.c
@@ -851,6 +851,12 @@ NOINLINE(static VALUE cont_capture(volatile int *volatile stat)); https://github.com/ruby/ruby/blob/trunk/cont.c#L851
         if (!(th)->ec->tag) rb_raise(rb_eThreadError, "not running thread"); \
     } while (0)
 
+rb_thread_t*
+rb_fiber_threadptr(const rb_fiber_t *fiber)
+{
+    return fiber->cont.saved_ec.thread_ptr;
+}
+
 static VALUE
 cont_thread_value(const rb_context_t *cont)
 {
@@ -1146,6 +1152,11 @@ cont_new(VALUE klass) https://github.com/ruby/ruby/blob/trunk/cont.c#L1152
     return cont;
 }
 
+VALUE rb_fiberptr_self(struct rb_fiber_struct *fiber)
+{
+    return fiber->cont.self;
+}
+
 void
 rb_fiber_init_mjit_cont(struct rb_fiber_struct *fiber)
 {
diff --git a/internal/cont.h b/internal/cont.h
index 81874aa..a365cbe 100644
--- a/internal/cont.h
+++ b/internal/cont.h
@@ -20,4 +20,6 @@ void rb_fiber_reset_root_local_storage(struct rb_thread_struct *); https://github.com/ruby/ruby/blob/trunk/internal/cont.h#L20
 void ruby_register_rollback_func_for_ensure(VALUE (*ensure_func)(VALUE), VALUE (*rollback_func)(VALUE));
 void rb_fiber_init_mjit_cont(struct rb_fiber_struct *fiber);
 
+VALUE rb_fiberptr_self(struct rb_fiber_struct *fiber);
+
 #endif /* INTERNAL_CONT_H */
diff --git a/internal/scheduler.h b/internal/scheduler.h
index f5a41af..44872e3 100644
--- a/internal/scheduler.h
+++ b/internal/scheduler.h
@@ -17,6 +17,9 @@ VALUE rb_scheduler_timeout(struct timeval *timeout); https://github.com/ruby/ruby/blob/trunk/internal/scheduler.h#L17
 VALUE rb_scheduler_kernel_sleep(VALUE scheduler, VALUE duration);
 VALUE rb_scheduler_kernel_sleepv(VALUE scheduler, int argc, VALUE * argv);
 
+VALUE rb_scheduler_mutex_lock(VALUE scheduler, VALUE mutex);
+VALUE rb_scheduler_mutex_unlock(VALUE scheduler, VALUE mutex, VALUE fiber);
+
 VALUE rb_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeout);
 VALUE rb_scheduler_io_wait_readable(VALUE scheduler, VALUE io);
 VALUE rb_scheduler_io_wait_writable(VALUE scheduler, VALUE io);
diff --git a/scheduler.c b/scheduler.c
index 9821d07..9ecc40c 100644
--- a/scheduler.c
+++ b/scheduler.c
@@ -12,6 +12,8 @@ https://github.com/ruby/ruby/blob/trunk/scheduler.c#L12
 #include "ruby/io.h"
 
 static ID id_kernel_sleep;
+static ID id_mutex_lock;
+static ID id_mutex_unlock;
 static ID id_io_read;
 static ID id_io_write;
 static ID id_io_wait;
@@ -20,6 +22,8 @@ void https://github.com/ruby/ruby/blob/trunk/scheduler.c#L22
 Init_Scheduler(void)
 {
     id_kernel_sleep = rb_intern_const("kernel_sleep");
+    id_mutex_lock = rb_intern_const("mutex_lock");
+    id_mutex_unlock = rb_intern_const("mutex_unlock");
     id_io_read = rb_intern_const("io_read");
     id_io_write = rb_intern_const("io_write");
     id_io_wait = rb_intern_const("io_wait");
@@ -44,6 +48,16 @@ VALUE rb_scheduler_kernel_sleepv(VALUE scheduler, int argc, VALUE * argv) https://github.com/ruby/ruby/blob/trunk/scheduler.c#L48
     return rb_funcallv(scheduler, id_kernel_sleep, argc, argv);
 }
 
+VALUE rb_scheduler_mutex_lock(VALUE scheduler, VALUE mutex)
+{
+    return rb_funcall(scheduler, id_mutex_lock, 1, mutex);
+}
+
+VALUE rb_scheduler_mutex_unlock(VALUE scheduler, VALUE mutex, VALUE fiber)
+{
+    return rb_funcall(scheduler, id_mutex_unlock, 2, mutex, fiber);
+}
+
 VALUE rb_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeout)
 {
     return rb_funcall(scheduler, id_io_wait, 3, io, events, timeout);
diff --git a/spec/ruby/core/mutex/owned_spec.rb b/spec/ruby/core/mutex/owned_spec.rb
index e660625..f881622 100644
--- a/spec/ruby/core/mutex/owned_spec.rb
+++ b/spec/ruby/core/mutex/owned_spec.rb
@@ -40,4 +40,16 @@ describe "Mutex#owned?" do https://github.com/ruby/ruby/blob/trunk/spec/ruby/core/mutex/owned_spec.rb#L40
       m.owned?.should be_false
     end
   end
+
+  ruby_version_is "2.8" do
+    it "is held per Fiber" do
+      m = Mutex.new
+      m.lock
+
+      Fiber.new do
+        m.locked?.should == true
+        m.owned?.should == false
+      end.resume
+    end
+  end
 end
diff --git a/test/fiber/scheduler.rb b/test/fiber/scheduler.rb
index 1f690b4..fa05daf 100644
--- a/test/fiber/scheduler.rb
+++ b/test/fiber/scheduler.rb
@@ -14,6 +14,12 @@ class Scheduler https://github.com/ruby/ruby/blob/trunk/test/fiber/scheduler.rb#L14
     @readable = {}
     @writable = {}
     @waiting = {}
+
+    @urgent = nil
+
+    @lock = Mutex.new
+    @locking = 0
+    @ready = []
   end
 
   attr :readable
@@ -35,9 +41,11 @@ class Scheduler https://github.com/ruby/ruby/blob/trunk/test/fiber/scheduler.rb#L41
   end
 
   def run
-    while @readable.any? or @writable.any? or @waiting.any?
+    @urgent = IO.pipe
+
+    while @readable.any? or @writable.any? or @waiting.any? or @locking.positive?
       # Can only handle file descriptors up to 1024...
-      readable, writable = IO.select(@readable.keys, @writable.keys, [], next_timeout)
+      readable, writable = IO.select(@readable.keys + [@urgent.first], @writable.keys, [], next_timeout)
 
       # puts "readable: #{readable}" if readable&.any?
       # puts "writable: #{writable}" if writable&.any?
@@ -63,7 +71,24 @@ class Scheduler https://github.com/ruby/ruby/blob/trunk/test/fiber/scheduler.rb#L71
           end
         end
       end
+
+      if @ready.any?
+        # Clear out the urgent notification pipe.
+        @urgent.first.read_nonblock(1024)
+
+        ready = nil
+
+        @lock.synchronize do
+          ready, @ready = @ready, Array.new
+        end
+
+        ready.each do |fiber|
+          fiber.resume
+        end
+      end
     end
+  ensure
+    @urgent.each(&:close)
   end
 
   def current_time
@@ -95,6 +120,23 @@ class Scheduler https://github.com/ruby/ruby/blob/trunk/test/fiber/scheduler.rb#L120
     return true
   end
 
+  def mutex_lock(mutex)
+    @locking += 1
+    Fiber.yield
+  ensure
+    @locking -= 1
+  end
+
+  def mutex_unlock(mutex, fiber)
+    @lock.synchronize do
+      @ready << fiber
+
+      if @urgent
+        @urgent.last.write('.')
+      end
+    end
+  end
+
   def fiber(&block)
     fiber = Fiber.new(blocking: false, &block)
 
diff --git a/test/fiber/test_mutex.rb b/test/fiber/test_mutex.rb
index 5179959..393a44f 100644
--- a/test/fiber/test_mutex.rb
+++ b/test/fiber/test_mutex.rb
@@ -14,7 +14,7 @@ class TestFiberMutex < Test::Unit::TestCase https://github.com/ruby/ruby/blob/trunk/test/fiber/test_mutex.rb#L14
         assert_equal Thread.scheduler, scheduler
 
         mutex.synchronize do
-          assert_nil Thread.scheduler
+          assert Thread.scheduler
         end
       end
     end
@@ -22,7 +22,35 @@ class TestFiberMutex < Test::Unit::TestCase https://github.com/ruby/ruby/blob/trunk/test/fiber/test_mutex.rb#L22
     thread.join
   end
 
+  def test_mutex_interleaved_locking
+    mutex = Mutex.new
+
+    thread = Thread.new do
+      scheduler = Scheduler.new
+      Thread.current.scheduler = scheduler
+
+      Fiber.schedule do
+        mutex.lock
+        sleep 0.1
+        mutex.unlock
+      end
+
+      Fiber.schedule do
+        mutex.lock
+        sleep 0.1
+        mutex.unlock
+      end
+
+      scheduler.run
+    end
+
+    thread.join
+  end
+
   def test_mutex_deadlock
+    err = /No live threads left. Deadlock\?/
+    assert_in_out_err %W[-I#{__dir__} -], <<-RUBY, ['in synchronize'], err, success: false
+    require 'scheduler'
     mutex = Mutex.new
 
     thread = Thread.new do
@@ -30,18 +58,18 @@ class TestFiberMutex < Test::Unit::TestCase https://github.com/ruby/ruby/blob/trunk/test/fiber/test_mutex.rb#L58
       Thread.current.scheduler = scheduler
 
       Fiber.schedule do
-        assert_equal Thread.scheduler, scheduler
+        raise unless Thread.scheduler == scheduler
 
         mutex.synchronize do
+          puts 'in synchronize'
           Fiber.yield
         end
       end
 
-      assert_raise ThreadError do
-        mutex.lock
-      end
+      mutex.lock
     end
 
     thread.join
+    RUBY
   end
 end
diff --git a/thread.c b/thread.c
index d0ebfff..c4ff5aa 100644
--- a/thread.c
+++ b/thread.c
@@ -75,11 +75,13 @@ https://github.com/ruby/ruby/blob/trunk/thread.c#L75
 #include "hrtime.h"
 #include "internal.h"
 #include "internal/class.h"
+#include "internal/cont.h"
 #include "internal/error.h"
 #include "internal/hash.h"
 #include "internal/io.h"
 #include "internal/object.h"
 #include "internal/proc.h"
+#include "internal/scheduler.h"
 #include "internal/signal.h"
 #include "internal/thread.h"
 #include "internal/time.h"
@@ -548,7 +550,7 @@ rb_threadptr_unlock_all_locking_mutexes(rb_thread_t *th) https://github.com/ruby/ruby/blob/trunk/thread.c#L550
 	/* rb_warn("mutex #<%p> remains to be locked by terminated thread",
 		(void *)mutexes); */
 	mutexes = mutex->next_mutex;
-	err = rb_mutex_unlock_th(mutex, th);
+	err = rb_mutex_unlock_th(mutex, th, mutex->fiber);
 	if (err) rb_bug("invalid keeping_mutexes: %s", err);
     }
 }
@@ -5040,7 +5042,7 @@ rb_thread_shield_wait(VALUE self) https://github.com/ruby/ruby/blob/trunk/thread.c#L5042
 
     if (!mutex) return Qfalse;
     m = mutex_ptr(mutex);
-    if (m->th == GET_THREAD()) return Qnil;
+    if (m->fiber == GET_EC()->fiber_ptr) return Qnil;
     rb_thread_shield_waiting_inc(self);
     rb_mutex_lock(mutex);
     rb_thread_shield_waiting_dec(self);
@@ -5540,7 +5542,7 @@ debug_deadlock_check(rb_ractor_t *r, VALUE msg) https://github.com/ruby/ruby/blob/trunk/thread.c#L5542
    (... truncated)

--
ML: ruby-changes@q...
Info: http://www.atdot.net/~ko1/quickml/

[前][次][番号順一覧][スレッド一覧]