[前][次][番号順一覧][スレッド一覧]

ruby-changes:62926

From: Samuel <ko1@a...>
Date: Mon, 14 Sep 2020 13:44:37 +0900 (JST)
Subject: [ruby-changes:62926] 0f613cc5f1 (master): Add support for ConditionVariable.

https://git.ruby-lang.org/ruby.git/commit/?id=0f613cc5f1

From 0f613cc5f1bbe319ab916be905de263523ef5402 Mon Sep 17 00:00:00 2001
From: Samuel Williams <samuel.williams@o...>
Date: Fri, 11 Sep 2020 20:47:25 +1200
Subject: Add support for ConditionVariable.


diff --git a/test/fiber/scheduler.rb b/test/fiber/scheduler.rb
index b03058a..7003d88 100644
--- a/test/fiber/scheduler.rb
+++ b/test/fiber/scheduler.rb
@@ -97,7 +97,9 @@ class Scheduler https://github.com/ruby/ruby/blob/trunk/test/fiber/scheduler.rb#L97
   end
 
   def kernel_sleep(duration = nil)
-    @waiting[Fiber.current] = current_time + duration
+    if duration
+      @waiting[Fiber.current] = current_time + duration
+    end
 
     Fiber.yield
 
diff --git a/test/fiber/test_mutex.rb b/test/fiber/test_mutex.rb
index 393a44f..2103412 100644
--- a/test/fiber/test_mutex.rb
+++ b/test/fiber/test_mutex.rb
@@ -47,6 +47,43 @@ class TestFiberMutex < Test::Unit::TestCase https://github.com/ruby/ruby/blob/trunk/test/fiber/test_mutex.rb#L47
     thread.join
   end
 
+  def test_condition_variable
+    mutex = Mutex.new
+    condition = ConditionVariable.new
+
+    signalled = 0
+
+    thread = Thread.new do
+      scheduler = Scheduler.new
+      Thread.current.scheduler = scheduler
+
+      Fiber.schedule do
+        mutex.synchronize do
+          3.times do
+            condition.wait(mutex)
+            signalled += 1
+          end
+        end
+      end
+
+      Fiber.schedule do
+        3.times do
+          mutex.synchronize do
+            condition.signal
+          end
+
+          sleep 0.1
+        end
+      end
+
+      scheduler.run
+    end
+
+    thread.join
+
+    assert signalled > 1
+  end
+
   def test_mutex_deadlock
     err = /No live threads left. Deadlock\?/
     assert_in_out_err %W[-I#{__dir__} -], <<-RUBY, ['in synchronize'], err, success: false
diff --git a/thread_sync.c b/thread_sync.c
index 9dd3b32..bd60231 100644
--- a/thread_sync.c
+++ b/thread_sync.c
@@ -4,8 +4,16 @@ https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L4
 static VALUE rb_cMutex, rb_cQueue, rb_cSizedQueue, rb_cConditionVariable;
 static VALUE rb_eClosedQueueError;
 
+/* Mutex */
+typedef struct rb_mutex_struct {
+    rb_fiber_t *fiber;
+    struct rb_mutex_struct *next_mutex;
+    struct list_head waitq; /* protected by GVL */
+} rb_mutex_t;
+
 /* sync_waiter is always on-stack */
 struct sync_waiter {
+    VALUE self;
     rb_thread_t *th;
     rb_fiber_t *fiber;
     struct list_node node;
@@ -19,12 +27,17 @@ sync_wakeup(struct list_head *head, long max) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L27
     struct sync_waiter *cur = 0, *next;
 
     list_for_each_safe(head, cur, next, node) {
-	list_del_init(&cur->node);
-	if (cur->th->status != THREAD_KILLED) {
-	    rb_threadptr_interrupt(cur->th);
-	    cur->th->status = THREAD_RUNNABLE;
-	    if (--max == 0) return;
-	}
+        list_del_init(&cur->node);
+
+        if (cur->th->scheduler != Qnil) {
+            rb_scheduler_mutex_unlock(cur->th->scheduler, cur->self, rb_fiberptr_self(cur->fiber));
+        }
+
+        if (cur->th->status != THREAD_KILLED) {
+            rb_threadptr_interrupt(cur->th);
+            cur->th->status = THREAD_RUNNABLE;
+            if (--max == 0) return;
+        }
     }
 }
 
@@ -40,16 +53,6 @@ wakeup_all(struct list_head *head) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L53
     sync_wakeup(head, LONG_MAX);
 }
 
-/* Mutex */
-
-typedef struct rb_mutex_struct {
-    VALUE self;
-
-    rb_fiber_t *fiber;
-    struct rb_mutex_struct *next_mutex;
-    struct list_head waitq; /* protected by GVL */
-} rb_mutex_t;
-
 #if defined(HAVE_WORKING_FORK)
 static void rb_mutex_abandon_all(rb_mutex_t *mutexes);
 static void rb_mutex_abandon_keeping_mutexes(rb_thread_t *th);
@@ -151,7 +154,6 @@ mutex_alloc(VALUE klass) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L154
 
     obj = TypedData_Make_Struct(klass, rb_mutex_t, &mutex_data_type, mutex);
 
-    mutex->self = obj;
     list_head_init(&mutex->waitq);
     return obj;
 }
@@ -247,8 +249,8 @@ mutex_owned_p(rb_fiber_t *fiber, rb_mutex_t *mutex) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L249
 static VALUE
 do_mutex_lock(VALUE self, int interruptible_p)
 {
-    rb_thread_t *th = GET_THREAD();
     rb_execution_context_t *ec = GET_EC();
+    rb_thread_t *th = ec->thread_ptr;
     rb_fiber_t *fiber = ec->fiber_ptr;
     rb_mutex_t *mutex = mutex_ptr(self);
 
@@ -260,6 +262,7 @@ do_mutex_lock(VALUE self, int interruptible_p) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L262
 
     if (rb_mutex_trylock(self) == Qfalse) {
         struct sync_waiter w = {
+            .self = self,
             .th = th,
             .fiber = fiber
         };
@@ -398,7 +401,7 @@ rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_fiber_t *fiber) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L401
 	    list_del_init(&cur->node);
 
             if (cur->th->scheduler != Qnil) {
-                rb_scheduler_mutex_unlock(cur->th->scheduler, mutex->self, rb_fiberptr_self(cur->fiber));
+                rb_scheduler_mutex_unlock(cur->th->scheduler, cur->self, rb_fiberptr_self(cur->fiber));
             }
 
 	    switch (cur->th->status) {
@@ -498,7 +501,6 @@ rb_mutex_wait_for(VALUE time) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L501
 VALUE
 rb_mutex_sleep(VALUE self, VALUE timeout)
 {
-    time_t beg, end;
     struct timeval t;
 
     if (!NIL_P(timeout)) {
@@ -506,18 +508,23 @@ rb_mutex_sleep(VALUE self, VALUE timeout) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L508
     }
 
     rb_mutex_unlock(self);
-    beg = time(0);
-    if (NIL_P(timeout)) {
-	rb_ensure(rb_mutex_sleep_forever, Qnil, mutex_lock_uninterruptible, self);
+    time_t beg = time(0);
+
+    VALUE scheduler = rb_thread_current_scheduler();
+    if (scheduler != Qnil) {
+        rb_scheduler_kernel_sleep(scheduler, timeout);
+        mutex_lock_uninterruptible(self);
+    } else {
+        if (NIL_P(timeout)) {
+            rb_ensure(rb_mutex_sleep_forever, Qnil, mutex_lock_uninterruptible, self);
+        } else {
+            rb_hrtime_t rel = rb_timeval2hrtime(&t);
+            rb_ensure(rb_mutex_wait_for, (VALUE)&rel, mutex_lock_uninterruptible, self);
+        }
     }
-    else {
-        rb_hrtime_t rel = rb_timeval2hrtime(&t);
 
-        rb_ensure(rb_mutex_wait_for, (VALUE)&rel,
-                  mutex_lock_uninterruptible, self);
-    }
     RUBY_VM_CHECK_INTS_BLOCKING(GET_EC());
-    end = time(0) - beg;
+    time_t end = time(0) - beg;
     return INT2FIX(end);
 }
 
@@ -1429,13 +1436,19 @@ delete_from_waitq(VALUE v) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L1436
 static VALUE
 rb_condvar_wait(int argc, VALUE *argv, VALUE self)
 {
+    rb_execution_context_t *ec = GET_EC();
+
     struct rb_condvar *cv = condvar_ptr(self);
     struct sleep_call args;
-    struct sync_waiter w;
 
     rb_scan_args(argc, argv, "11", &args.mutex, &args.timeout);
 
-    w.th = GET_THREAD();
+    struct sync_waiter w = {
+        .self = args.mutex,
+        .th = ec->thread_ptr,
+        .fiber = ec->fiber_ptr,
+    };
+
     list_add_tail(&cv->waitq, &w.node);
     rb_ensure(do_sleep, (VALUE)&args, delete_from_waitq, (VALUE)&w);
 
-- 
cgit v0.10.2


--
ML: ruby-changes@q...
Info: http://www.atdot.net/~ko1/quickml/

[前][次][番号順一覧][スレッド一覧]