ruby-changes:62926
From: Samuel <ko1@a...>
Date: Mon, 14 Sep 2020 13:44:37 +0900 (JST)
Subject: [ruby-changes:62926] 0f613cc5f1 (master): Add support for ConditionVariable.
https://git.ruby-lang.org/ruby.git/commit/?id=0f613cc5f1 From 0f613cc5f1bbe319ab916be905de263523ef5402 Mon Sep 17 00:00:00 2001 From: Samuel Williams <samuel.williams@o...> Date: Fri, 11 Sep 2020 20:47:25 +1200 Subject: Add support for ConditionVariable. diff --git a/test/fiber/scheduler.rb b/test/fiber/scheduler.rb index b03058a..7003d88 100644 --- a/test/fiber/scheduler.rb +++ b/test/fiber/scheduler.rb @@ -97,7 +97,9 @@ class Scheduler https://github.com/ruby/ruby/blob/trunk/test/fiber/scheduler.rb#L97 end def kernel_sleep(duration = nil) - @waiting[Fiber.current] = current_time + duration + if duration + @waiting[Fiber.current] = current_time + duration + end Fiber.yield diff --git a/test/fiber/test_mutex.rb b/test/fiber/test_mutex.rb index 393a44f..2103412 100644 --- a/test/fiber/test_mutex.rb +++ b/test/fiber/test_mutex.rb @@ -47,6 +47,43 @@ class TestFiberMutex < Test::Unit::TestCase https://github.com/ruby/ruby/blob/trunk/test/fiber/test_mutex.rb#L47 thread.join end + def test_condition_variable + mutex = Mutex.new + condition = ConditionVariable.new + + signalled = 0 + + thread = Thread.new do + scheduler = Scheduler.new + Thread.current.scheduler = scheduler + + Fiber.schedule do + mutex.synchronize do + 3.times do + condition.wait(mutex) + signalled += 1 + end + end + end + + Fiber.schedule do + 3.times do + mutex.synchronize do + condition.signal + end + + sleep 0.1 + end + end + + scheduler.run + end + + thread.join + + assert signalled > 1 + end + def test_mutex_deadlock err = /No live threads left. Deadlock\?/ assert_in_out_err %W[-I#{__dir__} -], <<-RUBY, ['in synchronize'], err, success: false diff --git a/thread_sync.c b/thread_sync.c index 9dd3b32..bd60231 100644 --- a/thread_sync.c +++ b/thread_sync.c @@ -4,8 +4,16 @@ https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L4 static VALUE rb_cMutex, rb_cQueue, rb_cSizedQueue, rb_cConditionVariable; static VALUE rb_eClosedQueueError; +/* Mutex */ +typedef struct rb_mutex_struct { + rb_fiber_t *fiber; + struct rb_mutex_struct *next_mutex; + struct list_head waitq; /* protected by GVL */ +} rb_mutex_t; + /* sync_waiter is always on-stack */ struct sync_waiter { + VALUE self; rb_thread_t *th; rb_fiber_t *fiber; struct list_node node; @@ -19,12 +27,17 @@ sync_wakeup(struct list_head *head, long max) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L27 struct sync_waiter *cur = 0, *next; list_for_each_safe(head, cur, next, node) { - list_del_init(&cur->node); - if (cur->th->status != THREAD_KILLED) { - rb_threadptr_interrupt(cur->th); - cur->th->status = THREAD_RUNNABLE; - if (--max == 0) return; - } + list_del_init(&cur->node); + + if (cur->th->scheduler != Qnil) { + rb_scheduler_mutex_unlock(cur->th->scheduler, cur->self, rb_fiberptr_self(cur->fiber)); + } + + if (cur->th->status != THREAD_KILLED) { + rb_threadptr_interrupt(cur->th); + cur->th->status = THREAD_RUNNABLE; + if (--max == 0) return; + } } } @@ -40,16 +53,6 @@ wakeup_all(struct list_head *head) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L53 sync_wakeup(head, LONG_MAX); } -/* Mutex */ - -typedef struct rb_mutex_struct { - VALUE self; - - rb_fiber_t *fiber; - struct rb_mutex_struct *next_mutex; - struct list_head waitq; /* protected by GVL */ -} rb_mutex_t; - #if defined(HAVE_WORKING_FORK) static void rb_mutex_abandon_all(rb_mutex_t *mutexes); static void rb_mutex_abandon_keeping_mutexes(rb_thread_t *th); @@ -151,7 +154,6 @@ mutex_alloc(VALUE klass) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L154 obj = TypedData_Make_Struct(klass, rb_mutex_t, &mutex_data_type, mutex); - mutex->self = obj; list_head_init(&mutex->waitq); return obj; } @@ -247,8 +249,8 @@ mutex_owned_p(rb_fiber_t *fiber, rb_mutex_t *mutex) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L249 static VALUE do_mutex_lock(VALUE self, int interruptible_p) { - rb_thread_t *th = GET_THREAD(); rb_execution_context_t *ec = GET_EC(); + rb_thread_t *th = ec->thread_ptr; rb_fiber_t *fiber = ec->fiber_ptr; rb_mutex_t *mutex = mutex_ptr(self); @@ -260,6 +262,7 @@ do_mutex_lock(VALUE self, int interruptible_p) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L262 if (rb_mutex_trylock(self) == Qfalse) { struct sync_waiter w = { + .self = self, .th = th, .fiber = fiber }; @@ -398,7 +401,7 @@ rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_fiber_t *fiber) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L401 list_del_init(&cur->node); if (cur->th->scheduler != Qnil) { - rb_scheduler_mutex_unlock(cur->th->scheduler, mutex->self, rb_fiberptr_self(cur->fiber)); + rb_scheduler_mutex_unlock(cur->th->scheduler, cur->self, rb_fiberptr_self(cur->fiber)); } switch (cur->th->status) { @@ -498,7 +501,6 @@ rb_mutex_wait_for(VALUE time) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L501 VALUE rb_mutex_sleep(VALUE self, VALUE timeout) { - time_t beg, end; struct timeval t; if (!NIL_P(timeout)) { @@ -506,18 +508,23 @@ rb_mutex_sleep(VALUE self, VALUE timeout) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L508 } rb_mutex_unlock(self); - beg = time(0); - if (NIL_P(timeout)) { - rb_ensure(rb_mutex_sleep_forever, Qnil, mutex_lock_uninterruptible, self); + time_t beg = time(0); + + VALUE scheduler = rb_thread_current_scheduler(); + if (scheduler != Qnil) { + rb_scheduler_kernel_sleep(scheduler, timeout); + mutex_lock_uninterruptible(self); + } else { + if (NIL_P(timeout)) { + rb_ensure(rb_mutex_sleep_forever, Qnil, mutex_lock_uninterruptible, self); + } else { + rb_hrtime_t rel = rb_timeval2hrtime(&t); + rb_ensure(rb_mutex_wait_for, (VALUE)&rel, mutex_lock_uninterruptible, self); + } } - else { - rb_hrtime_t rel = rb_timeval2hrtime(&t); - rb_ensure(rb_mutex_wait_for, (VALUE)&rel, - mutex_lock_uninterruptible, self); - } RUBY_VM_CHECK_INTS_BLOCKING(GET_EC()); - end = time(0) - beg; + time_t end = time(0) - beg; return INT2FIX(end); } @@ -1429,13 +1436,19 @@ delete_from_waitq(VALUE v) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L1436 static VALUE rb_condvar_wait(int argc, VALUE *argv, VALUE self) { + rb_execution_context_t *ec = GET_EC(); + struct rb_condvar *cv = condvar_ptr(self); struct sleep_call args; - struct sync_waiter w; rb_scan_args(argc, argv, "11", &args.mutex, &args.timeout); - w.th = GET_THREAD(); + struct sync_waiter w = { + .self = args.mutex, + .th = ec->thread_ptr, + .fiber = ec->fiber_ptr, + }; + list_add_tail(&cv->waitq, &w.node); rb_ensure(do_sleep, (VALUE)&args, delete_from_waitq, (VALUE)&w); -- cgit v0.10.2 -- ML: ruby-changes@q... Info: http://www.atdot.net/~ko1/quickml/