[前][次][番号順一覧][スレッド一覧]

ruby-changes:63030

From: Samuel <ko1@a...>
Date: Mon, 21 Sep 2020 08:49:03 +0900 (JST)
Subject: [ruby-changes:63030] 70f08f1eed (master): Make `Thread#join` non-blocking.

https://git.ruby-lang.org/ruby.git/commit/?id=70f08f1eed

From 70f08f1eed1df4579fef047d28fc3c807183fcfa Mon Sep 17 00:00:00 2001
From: Samuel Williams <samuel.williams@o...>
Date: Mon, 21 Sep 2020 09:54:08 +1200
Subject: Make `Thread#join` non-blocking.


diff --git a/internal/scheduler.h b/internal/scheduler.h
index 73915ad..186f4bd 100644
--- a/internal/scheduler.h
+++ b/internal/scheduler.h
@@ -19,7 +19,7 @@ VALUE rb_scheduler_close(VALUE scheduler); https://github.com/ruby/ruby/blob/trunk/internal/scheduler.h#L19
 VALUE rb_scheduler_kernel_sleep(VALUE scheduler, VALUE duration);
 VALUE rb_scheduler_kernel_sleepv(VALUE scheduler, int argc, VALUE * argv);
 
-VALUE rb_scheduler_block(VALUE scheduler, VALUE blocker);
+VALUE rb_scheduler_block(VALUE scheduler, VALUE blocker, VALUE timeout);
 VALUE rb_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber);
 
 VALUE rb_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeout);
diff --git a/scheduler.c b/scheduler.c
index 2dfecaf..8ec5039 100644
--- a/scheduler.c
+++ b/scheduler.c
@@ -61,9 +61,9 @@ VALUE rb_scheduler_kernel_sleepv(VALUE scheduler, int argc, VALUE * argv) https://github.com/ruby/ruby/blob/trunk/scheduler.c#L61
     return rb_funcallv(scheduler, id_kernel_sleep, argc, argv);
 }
 
-VALUE rb_scheduler_block(VALUE scheduler, VALUE blocker)
+VALUE rb_scheduler_block(VALUE scheduler, VALUE blocker, VALUE timeout)
 {
-    return rb_funcall(scheduler, id_block, 1, blocker);
+    return rb_funcall(scheduler, id_block, 2, blocker, timeout);
 }
 
 VALUE rb_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber)
diff --git a/spec/ruby/core/thread/join_spec.rb b/spec/ruby/core/thread/join_spec.rb
index f3c5cdc..6477e17 100644
--- a/spec/ruby/core/thread/join_spec.rb
+++ b/spec/ruby/core/thread/join_spec.rb
@@ -19,8 +19,13 @@ describe "Thread#join" do https://github.com/ruby/ruby/blob/trunk/spec/ruby/core/thread/join_spec.rb#L19
     t.join(0).should equal(t)
     t.join(0.0).should equal(t)
     t.join(nil).should equal(t)
+  end
+
+  it "raises TypeError if the argument is not a valid timeout" do
+    t = Thread.new {sleep}
     -> { t.join(:foo) }.should raise_error TypeError
     -> { t.join("bar") }.should raise_error TypeError
+    t.kill
   end
 
   it "returns nil if it is not finished when given a timeout" do
diff --git a/test/fiber/scheduler.rb b/test/fiber/scheduler.rb
index 10854aa..d93d0f1 100644
--- a/test/fiber/scheduler.rb
+++ b/test/fiber/scheduler.rb
@@ -22,7 +22,7 @@ class Scheduler https://github.com/ruby/ruby/blob/trunk/test/fiber/scheduler.rb#L22
     @closed = false
 
     @lock = Mutex.new
-    @locking = 0
+    @blocking = 0
     @ready = []
   end
 
@@ -47,7 +47,7 @@ class Scheduler https://github.com/ruby/ruby/blob/trunk/test/fiber/scheduler.rb#L47
   def run
     @urgent = IO.pipe
 
-    while @readable.any? or @writable.any? or @waiting.any? or @locking.positive?
+    while @readable.any? or @writable.any? or @waiting.any? or @blocking.positive?
       # Can only handle file descriptors up to 1024...
       readable, writable = IO.select(@readable.keys + [@urgent.first], @writable.keys, [], next_timeout)
 
@@ -142,12 +142,22 @@ class Scheduler https://github.com/ruby/ruby/blob/trunk/test/fiber/scheduler.rb#L142
   end
 
   # Used when blocking on synchronization (Mutex#lock, Queue#pop, SizedQueue#push, ...)
-  def block(blocker)
-    # p [__method__, blocker]
-    @locking += 1
+  def block(blocker, timeout = nil)
+    # p [__method__, blocker, timeout]
+    @blocking += 1
+
+    if timeout
+      @waiting[Fiber.current] = current_time + timeout
+    end
+
     Fiber.yield
   ensure
-    @locking -= 1
+    @blocking -= 1
+
+    # Remove from @waiting in the case #unblock was called before the timeout expired:
+    if timeout
+      @waiting.delete(Fiber.current)
+    end
   end
 
   # Used when synchronization wakes up a previously-blocked fiber (Mutex#unlock, Queue#push, ...).
diff --git a/thread.c b/thread.c
index 53bfbe8..35a35f2 100644
--- a/thread.c
+++ b/thread.c
@@ -544,6 +544,32 @@ terminate_all(rb_ractor_t *r, const rb_thread_t *main_thread) https://github.com/ruby/ruby/blob/trunk/thread.c#L544
     }
 }
 
+static void
+rb_threadptr_join_list_wakeup(rb_thread_t *thread)
+{
+    struct rb_waiting_list *join_list = thread->join_list;
+
+    while (join_list) {
+        rb_thread_t *target_thread = join_list->thread;
+
+        if (target_thread->scheduler != Qnil) {
+            rb_scheduler_unblock(target_thread->scheduler, target_thread->self, rb_fiberptr_self(join_list->fiber));
+        } else {
+            rb_threadptr_interrupt(target_thread);
+
+            switch (target_thread->status) {
+                case THREAD_STOPPED:
+                case THREAD_STOPPED_FOREVER:
+                    target_thread->status = THREAD_RUNNABLE;
+                default:
+                    break;
+            }
+        }
+
+        join_list = join_list->next;
+    }
+}
+
 void
 rb_threadptr_unlock_all_locking_mutexes(rb_thread_t *th)
 {
@@ -758,7 +784,6 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start) https://github.com/ruby/ruby/blob/trunk/thread.c#L784
 {
     STACK_GROW_DIR_DETECTION;
     enum ruby_tag_type state;
-    rb_thread_list_t *join_list;
     VALUE errinfo = Qnil;
     size_t size = th->vm->default_params.thread_vm_stack_size / sizeof(VALUE);
     rb_thread_t *ractor_main_th = th->ractor->threads.main;
@@ -860,20 +885,9 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start) https://github.com/ruby/ruby/blob/trunk/thread.c#L885
             rb_threadptr_interrupt(ractor_main_th);
 	}
 
-	/* wake up joining threads */
-	join_list = th->join_list;
-	while (join_list) {
-	    rb_threadptr_interrupt(join_list->th);
-	    switch (join_list->th->status) {
-	      case THREAD_STOPPED: case THREAD_STOPPED_FOREVER:
-		join_list->th->status = THREAD_RUNNABLE;
-	      default: break;
-	    }
-	    join_list = join_list->next;
-	}
-
-	rb_threadptr_unlock_all_locking_mutexes(th);
-	rb_check_deadlock(th->ractor);
+        rb_threadptr_join_list_wakeup(th);
+        rb_threadptr_unlock_all_locking_mutexes(th);
+        rb_check_deadlock(th->ractor);
 
         rb_fiber_close(th->ec->fiber_ptr);
     }
@@ -1105,129 +1119,152 @@ rb_thread_create_ractor(rb_ractor_t *g, VALUE args, VALUE proc) https://github.com/ruby/ruby/blob/trunk/thread.c#L1119
 
 
 struct join_arg {
-    rb_thread_t *target, *waiting;
-    rb_hrtime_t *limit;
+    struct rb_waiting_list *waiting_list;
+    rb_thread_t *target;
+    VALUE timeout;
 };
 
 static VALUE
 remove_from_join_list(VALUE arg)
 {
     struct join_arg *p = (struct join_arg *)arg;
-    rb_thread_t *target_th = p->target, *th = p->waiting;
+    rb_thread_t *target_thread = p->target;
 
-    if (target_th->status != THREAD_KILLED) {
-	rb_thread_list_t **p = &target_th->join_list;
+    if (target_thread->status != THREAD_KILLED) {
+        struct rb_waiting_list **join_list = &target_thread->join_list;
 
-	while (*p) {
-	    if ((*p)->th == th) {
-		*p = (*p)->next;
-		break;
-	    }
-	    p = &(*p)->next;
-	}
+        while (*join_list) {
+            if (*join_list == p->waiting_list) {
+                *join_list = (*join_list)->next;
+                break;
+            }
+            
+            join_list = &(*join_list)->next;
+        }
     }
 
     return Qnil;
 }
 
+static rb_hrtime_t *double2hrtime(rb_hrtime_t *, double);
+
 static VALUE
 thread_join_sleep(VALUE arg)
 {
     struct join_arg *p = (struct join_arg *)arg;
-    rb_thread_t *target_th = p->target, *th = p->waiting;
-    rb_hrtime_t end = 0;
+    rb_thread_t *target_th = p->target, *th = p->waiting_list->thread;
+    rb_hrtime_t end = 0, rel = 0, *limit = 0;
 
-    if (p->limit) {
-        end = rb_hrtime_add(*p->limit, rb_hrtime_now());
+    /*
+     * This supports INFINITY and negative values, so we can't use
+     * rb_time_interval right now...
+     */
+    if (p->timeout == Qnil) {
+        /* unlimited */
+    }
+    else if (FIXNUM_P(p->timeout)) {
+        rel = rb_sec2hrtime(NUM2TIMET(p->timeout));
+        limit = &rel;
+    }
+    else {
+        limit = double2hrtime(&rel, rb_num2dbl(p->timeout));
+    }
+
+    if (limit) {
+        end = rb_hrtime_add(*limit, rb_hrtime_now());
     }
 
     while (target_th->status != THREAD_KILLED) {
-	if (!p->limit) {
-	    th->status = THREAD_STOPPED_FOREVER;
+        if (th->scheduler != Qnil) {
+            rb_scheduler_block(th->scheduler, target_th->self, p->timeout);
+        } else if (!limit) {
+            th->status = THREAD_STOPPED_FOREVER;
             rb_ractor_sleeper_threads_inc(th->ractor);
-	    rb_check_deadlock(th->ractor);
-	    native_sleep(th, 0);
+            rb_check_deadlock(th->ractor);
+            native_sleep(th, 0);
             rb_ractor_sleeper_threads_dec(th->ractor);
-	}
-	else {
-            if (hrtime_update_expire(p->limit, end)) {
-		thread_debug("thread_join: timeout (thid: %"PRI_THREAD_ID")\n",
-			     thread_id_str(target_th));
-		return Qfalse;
-	    }
-	    th->status = THREAD_STOPPED;
-	    native_sleep(th, p->limit);
-	}
-	RUBY_VM_CHECK_INTS_BLOCKING(th->ec);
-	th->status = THREAD_RUNNABLE;
-	thread_debug("thread_join: interrupted (thid: %"PRI_THREAD_ID", status: %s)\n",
-		     thread_id_str(target_th), thread_status_name(target_th, TRUE));
+        }
+        else {
+            if (hrtime_update_expire(limit, end)) {
+                thread_debug("thread_join: timeout (thid: %"PRI_THREAD_ID")\n",
+                             thread_id_str(target_th));
+                return Qfalse;
+            }
+            th->status = THREAD_STOPPED;
+            native_sleep(th, limit);
+        }
+        RUBY_VM_CHECK_INTS_BLOCKING(th->ec);
+        th->status = THREAD_RUNNABLE;
+        thread_debug("thread_join: interrupted (thid: %"PRI_THREAD_ID", status: %s)\n",
+                     thread_id_str(target_th), thread_status_name(target_th, TRUE));
     }
     return Qtrue;
 }
 
 static VALUE
-thread_join(rb_thread_t *target (... truncated)

--
ML: ruby-changes@q...
Info: http://www.atdot.net/~ko1/quickml/

[前][次][番号順一覧][スレッド一覧]