ruby-changes:63030
From: Samuel <ko1@a...>
Date: Mon, 21 Sep 2020 08:49:03 +0900 (JST)
Subject: [ruby-changes:63030] 70f08f1eed (master): Make `Thread#join` non-blocking.
https://git.ruby-lang.org/ruby.git/commit/?id=70f08f1eed From 70f08f1eed1df4579fef047d28fc3c807183fcfa Mon Sep 17 00:00:00 2001 From: Samuel Williams <samuel.williams@o...> Date: Mon, 21 Sep 2020 09:54:08 +1200 Subject: Make `Thread#join` non-blocking. diff --git a/internal/scheduler.h b/internal/scheduler.h index 73915ad..186f4bd 100644 --- a/internal/scheduler.h +++ b/internal/scheduler.h @@ -19,7 +19,7 @@ VALUE rb_scheduler_close(VALUE scheduler); https://github.com/ruby/ruby/blob/trunk/internal/scheduler.h#L19 VALUE rb_scheduler_kernel_sleep(VALUE scheduler, VALUE duration); VALUE rb_scheduler_kernel_sleepv(VALUE scheduler, int argc, VALUE * argv); -VALUE rb_scheduler_block(VALUE scheduler, VALUE blocker); +VALUE rb_scheduler_block(VALUE scheduler, VALUE blocker, VALUE timeout); VALUE rb_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber); VALUE rb_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeout); diff --git a/scheduler.c b/scheduler.c index 2dfecaf..8ec5039 100644 --- a/scheduler.c +++ b/scheduler.c @@ -61,9 +61,9 @@ VALUE rb_scheduler_kernel_sleepv(VALUE scheduler, int argc, VALUE * argv) https://github.com/ruby/ruby/blob/trunk/scheduler.c#L61 return rb_funcallv(scheduler, id_kernel_sleep, argc, argv); } -VALUE rb_scheduler_block(VALUE scheduler, VALUE blocker) +VALUE rb_scheduler_block(VALUE scheduler, VALUE blocker, VALUE timeout) { - return rb_funcall(scheduler, id_block, 1, blocker); + return rb_funcall(scheduler, id_block, 2, blocker, timeout); } VALUE rb_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber) diff --git a/spec/ruby/core/thread/join_spec.rb b/spec/ruby/core/thread/join_spec.rb index f3c5cdc..6477e17 100644 --- a/spec/ruby/core/thread/join_spec.rb +++ b/spec/ruby/core/thread/join_spec.rb @@ -19,8 +19,13 @@ describe "Thread#join" do https://github.com/ruby/ruby/blob/trunk/spec/ruby/core/thread/join_spec.rb#L19 t.join(0).should equal(t) t.join(0.0).should equal(t) t.join(nil).should equal(t) + end + + it "raises TypeError if the argument is not a valid timeout" do + t = Thread.new {sleep} -> { t.join(:foo) }.should raise_error TypeError -> { t.join("bar") }.should raise_error TypeError + t.kill end it "returns nil if it is not finished when given a timeout" do diff --git a/test/fiber/scheduler.rb b/test/fiber/scheduler.rb index 10854aa..d93d0f1 100644 --- a/test/fiber/scheduler.rb +++ b/test/fiber/scheduler.rb @@ -22,7 +22,7 @@ class Scheduler https://github.com/ruby/ruby/blob/trunk/test/fiber/scheduler.rb#L22 @closed = false @lock = Mutex.new - @locking = 0 + @blocking = 0 @ready = [] end @@ -47,7 +47,7 @@ class Scheduler https://github.com/ruby/ruby/blob/trunk/test/fiber/scheduler.rb#L47 def run @urgent = IO.pipe - while @readable.any? or @writable.any? or @waiting.any? or @locking.positive? + while @readable.any? or @writable.any? or @waiting.any? or @blocking.positive? # Can only handle file descriptors up to 1024... readable, writable = IO.select(@readable.keys + [@urgent.first], @writable.keys, [], next_timeout) @@ -142,12 +142,22 @@ class Scheduler https://github.com/ruby/ruby/blob/trunk/test/fiber/scheduler.rb#L142 end # Used when blocking on synchronization (Mutex#lock, Queue#pop, SizedQueue#push, ...) - def block(blocker) - # p [__method__, blocker] - @locking += 1 + def block(blocker, timeout = nil) + # p [__method__, blocker, timeout] + @blocking += 1 + + if timeout + @waiting[Fiber.current] = current_time + timeout + end + Fiber.yield ensure - @locking -= 1 + @blocking -= 1 + + # Remove from @waiting in the case #unblock was called before the timeout expired: + if timeout + @waiting.delete(Fiber.current) + end end # Used when synchronization wakes up a previously-blocked fiber (Mutex#unlock, Queue#push, ...). diff --git a/thread.c b/thread.c index 53bfbe8..35a35f2 100644 --- a/thread.c +++ b/thread.c @@ -544,6 +544,32 @@ terminate_all(rb_ractor_t *r, const rb_thread_t *main_thread) https://github.com/ruby/ruby/blob/trunk/thread.c#L544 } } +static void +rb_threadptr_join_list_wakeup(rb_thread_t *thread) +{ + struct rb_waiting_list *join_list = thread->join_list; + + while (join_list) { + rb_thread_t *target_thread = join_list->thread; + + if (target_thread->scheduler != Qnil) { + rb_scheduler_unblock(target_thread->scheduler, target_thread->self, rb_fiberptr_self(join_list->fiber)); + } else { + rb_threadptr_interrupt(target_thread); + + switch (target_thread->status) { + case THREAD_STOPPED: + case THREAD_STOPPED_FOREVER: + target_thread->status = THREAD_RUNNABLE; + default: + break; + } + } + + join_list = join_list->next; + } +} + void rb_threadptr_unlock_all_locking_mutexes(rb_thread_t *th) { @@ -758,7 +784,6 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start) https://github.com/ruby/ruby/blob/trunk/thread.c#L784 { STACK_GROW_DIR_DETECTION; enum ruby_tag_type state; - rb_thread_list_t *join_list; VALUE errinfo = Qnil; size_t size = th->vm->default_params.thread_vm_stack_size / sizeof(VALUE); rb_thread_t *ractor_main_th = th->ractor->threads.main; @@ -860,20 +885,9 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start) https://github.com/ruby/ruby/blob/trunk/thread.c#L885 rb_threadptr_interrupt(ractor_main_th); } - /* wake up joining threads */ - join_list = th->join_list; - while (join_list) { - rb_threadptr_interrupt(join_list->th); - switch (join_list->th->status) { - case THREAD_STOPPED: case THREAD_STOPPED_FOREVER: - join_list->th->status = THREAD_RUNNABLE; - default: break; - } - join_list = join_list->next; - } - - rb_threadptr_unlock_all_locking_mutexes(th); - rb_check_deadlock(th->ractor); + rb_threadptr_join_list_wakeup(th); + rb_threadptr_unlock_all_locking_mutexes(th); + rb_check_deadlock(th->ractor); rb_fiber_close(th->ec->fiber_ptr); } @@ -1105,129 +1119,152 @@ rb_thread_create_ractor(rb_ractor_t *g, VALUE args, VALUE proc) https://github.com/ruby/ruby/blob/trunk/thread.c#L1119 struct join_arg { - rb_thread_t *target, *waiting; - rb_hrtime_t *limit; + struct rb_waiting_list *waiting_list; + rb_thread_t *target; + VALUE timeout; }; static VALUE remove_from_join_list(VALUE arg) { struct join_arg *p = (struct join_arg *)arg; - rb_thread_t *target_th = p->target, *th = p->waiting; + rb_thread_t *target_thread = p->target; - if (target_th->status != THREAD_KILLED) { - rb_thread_list_t **p = &target_th->join_list; + if (target_thread->status != THREAD_KILLED) { + struct rb_waiting_list **join_list = &target_thread->join_list; - while (*p) { - if ((*p)->th == th) { - *p = (*p)->next; - break; - } - p = &(*p)->next; - } + while (*join_list) { + if (*join_list == p->waiting_list) { + *join_list = (*join_list)->next; + break; + } + + join_list = &(*join_list)->next; + } } return Qnil; } +static rb_hrtime_t *double2hrtime(rb_hrtime_t *, double); + static VALUE thread_join_sleep(VALUE arg) { struct join_arg *p = (struct join_arg *)arg; - rb_thread_t *target_th = p->target, *th = p->waiting; - rb_hrtime_t end = 0; + rb_thread_t *target_th = p->target, *th = p->waiting_list->thread; + rb_hrtime_t end = 0, rel = 0, *limit = 0; - if (p->limit) { - end = rb_hrtime_add(*p->limit, rb_hrtime_now()); + /* + * This supports INFINITY and negative values, so we can't use + * rb_time_interval right now... + */ + if (p->timeout == Qnil) { + /* unlimited */ + } + else if (FIXNUM_P(p->timeout)) { + rel = rb_sec2hrtime(NUM2TIMET(p->timeout)); + limit = &rel; + } + else { + limit = double2hrtime(&rel, rb_num2dbl(p->timeout)); + } + + if (limit) { + end = rb_hrtime_add(*limit, rb_hrtime_now()); } while (target_th->status != THREAD_KILLED) { - if (!p->limit) { - th->status = THREAD_STOPPED_FOREVER; + if (th->scheduler != Qnil) { + rb_scheduler_block(th->scheduler, target_th->self, p->timeout); + } else if (!limit) { + th->status = THREAD_STOPPED_FOREVER; rb_ractor_sleeper_threads_inc(th->ractor); - rb_check_deadlock(th->ractor); - native_sleep(th, 0); + rb_check_deadlock(th->ractor); + native_sleep(th, 0); rb_ractor_sleeper_threads_dec(th->ractor); - } - else { - if (hrtime_update_expire(p->limit, end)) { - thread_debug("thread_join: timeout (thid: %"PRI_THREAD_ID")\n", - thread_id_str(target_th)); - return Qfalse; - } - th->status = THREAD_STOPPED; - native_sleep(th, p->limit); - } - RUBY_VM_CHECK_INTS_BLOCKING(th->ec); - th->status = THREAD_RUNNABLE; - thread_debug("thread_join: interrupted (thid: %"PRI_THREAD_ID", status: %s)\n", - thread_id_str(target_th), thread_status_name(target_th, TRUE)); + } + else { + if (hrtime_update_expire(limit, end)) { + thread_debug("thread_join: timeout (thid: %"PRI_THREAD_ID")\n", + thread_id_str(target_th)); + return Qfalse; + } + th->status = THREAD_STOPPED; + native_sleep(th, limit); + } + RUBY_VM_CHECK_INTS_BLOCKING(th->ec); + th->status = THREAD_RUNNABLE; + thread_debug("thread_join: interrupted (thid: %"PRI_THREAD_ID", status: %s)\n", + thread_id_str(target_th), thread_status_name(target_th, TRUE)); } return Qtrue; } static VALUE -thread_join(rb_thread_t *target (... truncated) -- ML: ruby-changes@q... Info: http://www.atdot.net/~ko1/quickml/