ruby-changes:62928
From: Benoit <ko1@a...>
Date: Mon, 14 Sep 2020 13:44:42 +0900 (JST)
Subject: [ruby-changes:62928] 178c1b0922 (master): Make Mutex per-Fiber instead of per-Thread
https://git.ruby-lang.org/ruby.git/commit/?id=178c1b0922 From 178c1b0922dc727897d81d7cfe9c97d5ffa97fd9 Mon Sep 17 00:00:00 2001 From: Benoit Daloze <eregontp@g...> Date: Sat, 5 Sep 2020 16:26:24 +1200 Subject: Make Mutex per-Fiber instead of per-Thread * Enables Mutex to be used as synchronization between multiple Fibers of the same Thread. * With a Fiber scheduler we can yield to another Fiber on contended Mutex#lock instead of blocking the entire thread. * This also makes the behavior of Mutex consistent across CRuby, JRuby and TruffleRuby. * [Feature #16792] diff --git a/cont.c b/cont.c index d228107..0304f4c 100644 --- a/cont.c +++ b/cont.c @@ -851,6 +851,12 @@ NOINLINE(static VALUE cont_capture(volatile int *volatile stat)); https://github.com/ruby/ruby/blob/trunk/cont.c#L851 if (!(th)->ec->tag) rb_raise(rb_eThreadError, "not running thread"); \ } while (0) +rb_thread_t* +rb_fiber_threadptr(const rb_fiber_t *fiber) +{ + return fiber->cont.saved_ec.thread_ptr; +} + static VALUE cont_thread_value(const rb_context_t *cont) { @@ -1146,6 +1152,11 @@ cont_new(VALUE klass) https://github.com/ruby/ruby/blob/trunk/cont.c#L1152 return cont; } +VALUE rb_fiberptr_self(struct rb_fiber_struct *fiber) +{ + return fiber->cont.self; +} + void rb_fiber_init_mjit_cont(struct rb_fiber_struct *fiber) { diff --git a/internal/cont.h b/internal/cont.h index 81874aa..a365cbe 100644 --- a/internal/cont.h +++ b/internal/cont.h @@ -20,4 +20,6 @@ void rb_fiber_reset_root_local_storage(struct rb_thread_struct *); https://github.com/ruby/ruby/blob/trunk/internal/cont.h#L20 void ruby_register_rollback_func_for_ensure(VALUE (*ensure_func)(VALUE), VALUE (*rollback_func)(VALUE)); void rb_fiber_init_mjit_cont(struct rb_fiber_struct *fiber); +VALUE rb_fiberptr_self(struct rb_fiber_struct *fiber); + #endif /* INTERNAL_CONT_H */ diff --git a/internal/scheduler.h b/internal/scheduler.h index f5a41af..44872e3 100644 --- a/internal/scheduler.h +++ b/internal/scheduler.h @@ -17,6 +17,9 @@ VALUE rb_scheduler_timeout(struct timeval *timeout); https://github.com/ruby/ruby/blob/trunk/internal/scheduler.h#L17 VALUE rb_scheduler_kernel_sleep(VALUE scheduler, VALUE duration); VALUE rb_scheduler_kernel_sleepv(VALUE scheduler, int argc, VALUE * argv); +VALUE rb_scheduler_mutex_lock(VALUE scheduler, VALUE mutex); +VALUE rb_scheduler_mutex_unlock(VALUE scheduler, VALUE mutex, VALUE fiber); + VALUE rb_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeout); VALUE rb_scheduler_io_wait_readable(VALUE scheduler, VALUE io); VALUE rb_scheduler_io_wait_writable(VALUE scheduler, VALUE io); diff --git a/scheduler.c b/scheduler.c index 9821d07..9ecc40c 100644 --- a/scheduler.c +++ b/scheduler.c @@ -12,6 +12,8 @@ https://github.com/ruby/ruby/blob/trunk/scheduler.c#L12 #include "ruby/io.h" static ID id_kernel_sleep; +static ID id_mutex_lock; +static ID id_mutex_unlock; static ID id_io_read; static ID id_io_write; static ID id_io_wait; @@ -20,6 +22,8 @@ void https://github.com/ruby/ruby/blob/trunk/scheduler.c#L22 Init_Scheduler(void) { id_kernel_sleep = rb_intern_const("kernel_sleep"); + id_mutex_lock = rb_intern_const("mutex_lock"); + id_mutex_unlock = rb_intern_const("mutex_unlock"); id_io_read = rb_intern_const("io_read"); id_io_write = rb_intern_const("io_write"); id_io_wait = rb_intern_const("io_wait"); @@ -44,6 +48,16 @@ VALUE rb_scheduler_kernel_sleepv(VALUE scheduler, int argc, VALUE * argv) https://github.com/ruby/ruby/blob/trunk/scheduler.c#L48 return rb_funcallv(scheduler, id_kernel_sleep, argc, argv); } +VALUE rb_scheduler_mutex_lock(VALUE scheduler, VALUE mutex) +{ + return rb_funcall(scheduler, id_mutex_lock, 1, mutex); +} + +VALUE rb_scheduler_mutex_unlock(VALUE scheduler, VALUE mutex, VALUE fiber) +{ + return rb_funcall(scheduler, id_mutex_unlock, 2, mutex, fiber); +} + VALUE rb_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeout) { return rb_funcall(scheduler, id_io_wait, 3, io, events, timeout); diff --git a/spec/ruby/core/mutex/owned_spec.rb b/spec/ruby/core/mutex/owned_spec.rb index e660625..f881622 100644 --- a/spec/ruby/core/mutex/owned_spec.rb +++ b/spec/ruby/core/mutex/owned_spec.rb @@ -40,4 +40,16 @@ describe "Mutex#owned?" do https://github.com/ruby/ruby/blob/trunk/spec/ruby/core/mutex/owned_spec.rb#L40 m.owned?.should be_false end end + + ruby_version_is "2.8" do + it "is held per Fiber" do + m = Mutex.new + m.lock + + Fiber.new do + m.locked?.should == true + m.owned?.should == false + end.resume + end + end end diff --git a/test/fiber/scheduler.rb b/test/fiber/scheduler.rb index 1f690b4..fa05daf 100644 --- a/test/fiber/scheduler.rb +++ b/test/fiber/scheduler.rb @@ -14,6 +14,12 @@ class Scheduler https://github.com/ruby/ruby/blob/trunk/test/fiber/scheduler.rb#L14 @readable = {} @writable = {} @waiting = {} + + @urgent = nil + + @lock = Mutex.new + @locking = 0 + @ready = [] end attr :readable @@ -35,9 +41,11 @@ class Scheduler https://github.com/ruby/ruby/blob/trunk/test/fiber/scheduler.rb#L41 end def run - while @readable.any? or @writable.any? or @waiting.any? + @urgent = IO.pipe + + while @readable.any? or @writable.any? or @waiting.any? or @locking.positive? # Can only handle file descriptors up to 1024... - readable, writable = IO.select(@readable.keys, @writable.keys, [], next_timeout) + readable, writable = IO.select(@readable.keys + [@urgent.first], @writable.keys, [], next_timeout) # puts "readable: #{readable}" if readable&.any? # puts "writable: #{writable}" if writable&.any? @@ -63,7 +71,24 @@ class Scheduler https://github.com/ruby/ruby/blob/trunk/test/fiber/scheduler.rb#L71 end end end + + if @ready.any? + # Clear out the urgent notification pipe. + @urgent.first.read_nonblock(1024) + + ready = nil + + @lock.synchronize do + ready, @ready = @ready, Array.new + end + + ready.each do |fiber| + fiber.resume + end + end end + ensure + @urgent.each(&:close) end def current_time @@ -95,6 +120,23 @@ class Scheduler https://github.com/ruby/ruby/blob/trunk/test/fiber/scheduler.rb#L120 return true end + def mutex_lock(mutex) + @locking += 1 + Fiber.yield + ensure + @locking -= 1 + end + + def mutex_unlock(mutex, fiber) + @lock.synchronize do + @ready << fiber + + if @urgent + @urgent.last.write('.') + end + end + end + def fiber(&block) fiber = Fiber.new(blocking: false, &block) diff --git a/test/fiber/test_mutex.rb b/test/fiber/test_mutex.rb index 5179959..393a44f 100644 --- a/test/fiber/test_mutex.rb +++ b/test/fiber/test_mutex.rb @@ -14,7 +14,7 @@ class TestFiberMutex < Test::Unit::TestCase https://github.com/ruby/ruby/blob/trunk/test/fiber/test_mutex.rb#L14 assert_equal Thread.scheduler, scheduler mutex.synchronize do - assert_nil Thread.scheduler + assert Thread.scheduler end end end @@ -22,7 +22,35 @@ class TestFiberMutex < Test::Unit::TestCase https://github.com/ruby/ruby/blob/trunk/test/fiber/test_mutex.rb#L22 thread.join end + def test_mutex_interleaved_locking + mutex = Mutex.new + + thread = Thread.new do + scheduler = Scheduler.new + Thread.current.scheduler = scheduler + + Fiber.schedule do + mutex.lock + sleep 0.1 + mutex.unlock + end + + Fiber.schedule do + mutex.lock + sleep 0.1 + mutex.unlock + end + + scheduler.run + end + + thread.join + end + def test_mutex_deadlock + err = /No live threads left. Deadlock\?/ + assert_in_out_err %W[-I#{__dir__} -], <<-RUBY, ['in synchronize'], err, success: false + require 'scheduler' mutex = Mutex.new thread = Thread.new do @@ -30,18 +58,18 @@ class TestFiberMutex < Test::Unit::TestCase https://github.com/ruby/ruby/blob/trunk/test/fiber/test_mutex.rb#L58 Thread.current.scheduler = scheduler Fiber.schedule do - assert_equal Thread.scheduler, scheduler + raise unless Thread.scheduler == scheduler mutex.synchronize do + puts 'in synchronize' Fiber.yield end end - assert_raise ThreadError do - mutex.lock - end + mutex.lock end thread.join + RUBY end end diff --git a/thread.c b/thread.c index d0ebfff..c4ff5aa 100644 --- a/thread.c +++ b/thread.c @@ -75,11 +75,13 @@ https://github.com/ruby/ruby/blob/trunk/thread.c#L75 #include "hrtime.h" #include "internal.h" #include "internal/class.h" +#include "internal/cont.h" #include "internal/error.h" #include "internal/hash.h" #include "internal/io.h" #include "internal/object.h" #include "internal/proc.h" +#include "internal/scheduler.h" #include "internal/signal.h" #include "internal/thread.h" #include "internal/time.h" @@ -548,7 +550,7 @@ rb_threadptr_unlock_all_locking_mutexes(rb_thread_t *th) https://github.com/ruby/ruby/blob/trunk/thread.c#L550 /* rb_warn("mutex #<%p> remains to be locked by terminated thread", (void *)mutexes); */ mutexes = mutex->next_mutex; - err = rb_mutex_unlock_th(mutex, th); + err = rb_mutex_unlock_th(mutex, th, mutex->fiber); if (err) rb_bug("invalid keeping_mutexes: %s", err); } } @@ -5040,7 +5042,7 @@ rb_thread_shield_wait(VALUE self) https://github.com/ruby/ruby/blob/trunk/thread.c#L5042 if (!mutex) return Qfalse; m = mutex_ptr(mutex); - if (m->th == GET_THREAD()) return Qnil; + if (m->fiber == GET_EC()->fiber_ptr) return Qnil; rb_thread_shield_waiting_inc(self); rb_mutex_lock(mutex); rb_thread_shield_waiting_dec(self); @@ -5540,7 +5542,7 @@ debug_deadlock_check(rb_ractor_t *r, VALUE msg) https://github.com/ruby/ruby/blob/trunk/thread.c#L5542 (... truncated) -- ML: ruby-changes@q... Info: http://www.atdot.net/~ko1/quickml/