ruby-changes:72789
From: Jean <ko1@a...>
Date: Tue, 2 Aug 2022 18:04:50 +0900 (JST)
Subject: [ruby-changes:72789] e3aabe93aa (master): Implement Queue#pop(timeout: sec)
https://git.ruby-lang.org/ruby.git/commit/?id=e3aabe93aa From e3aabe93aae87a60ba7b8f1a0fd590534647e352 Mon Sep 17 00:00:00 2001 From: Jean Boussier <jean.boussier@g...> Date: Tue, 26 Jul 2022 17:40:00 +0200 Subject: Implement Queue#pop(timeout: sec) [Feature #18774] As well as `SizedQueue#pop(timeout: sec)` If both `non_block=true` and `timeout:` are supplied, ArgumentError is raised. --- .document | 1 + common.mk | 5 ++ hrtime.h | 10 ++++ inits.c | 1 + spec/ruby/shared/queue/deque.rb | 55 +++++++++++++++++++++ test/ruby/test_settracefunc.rb | 21 ++++---- test/ruby/test_thread_queue.rb | 35 ++++++++++++++ thread.c | 35 ++++++++++++-- thread_sync.c | 105 +++++++++++++++++----------------------- thread_sync.rb | 45 +++++++++++++++++ 10 files changed, 238 insertions(+), 75 deletions(-) create mode 100644 thread_sync.rb diff --git a/.document b/.document index 5494bcc7fe..ec2fa09326 100644 --- a/.document +++ b/.document @@ -24,6 +24,7 @@ pack.rb https://github.com/ruby/ruby/blob/trunk/.document#L24 ractor.rb string.rb timev.rb +thread_sync.rb trace_point.rb warning.rb diff --git a/common.mk b/common.mk index aeb87dfb55..4c49690e4a 100644 --- a/common.mk +++ b/common.mk @@ -1062,6 +1062,7 @@ BUILTIN_RB_SRCS = \ https://github.com/ruby/ruby/blob/trunk/common.mk#L1062 $(srcdir)/kernel.rb \ $(srcdir)/ractor.rb \ $(srcdir)/timev.rb \ + $(srcdir)/thread_sync.rb \ $(srcdir)/nilclass.rb \ $(srcdir)/prelude.rb \ $(srcdir)/gem_prelude.rb \ @@ -9447,6 +9448,7 @@ miniinit.$(OBJEXT): {$(VPATH)}st.h https://github.com/ruby/ruby/blob/trunk/common.mk#L9448 miniinit.$(OBJEXT): {$(VPATH)}subst.h miniinit.$(OBJEXT): {$(VPATH)}thread_$(THREAD_MODEL).h miniinit.$(OBJEXT): {$(VPATH)}thread_native.h +miniinit.$(OBJEXT): {$(VPATH)}thread_sync.rb miniinit.$(OBJEXT): {$(VPATH)}timev.rb miniinit.$(OBJEXT): {$(VPATH)}trace_point.rb miniinit.$(OBJEXT): {$(VPATH)}vm_core.h @@ -15230,6 +15232,7 @@ thread.$(OBJEXT): {$(VPATH)}backward/2/limits.h https://github.com/ruby/ruby/blob/trunk/common.mk#L15232 thread.$(OBJEXT): {$(VPATH)}backward/2/long_long.h thread.$(OBJEXT): {$(VPATH)}backward/2/stdalign.h thread.$(OBJEXT): {$(VPATH)}backward/2/stdarg.h +thread.$(OBJEXT): {$(VPATH)}builtin.h thread.$(OBJEXT): {$(VPATH)}config.h thread.$(OBJEXT): {$(VPATH)}debug.h thread.$(OBJEXT): {$(VPATH)}debug_counter.h @@ -15412,6 +15415,8 @@ thread.$(OBJEXT): {$(VPATH)}thread_$(THREAD_MODEL).c https://github.com/ruby/ruby/blob/trunk/common.mk#L15415 thread.$(OBJEXT): {$(VPATH)}thread_$(THREAD_MODEL).h thread.$(OBJEXT): {$(VPATH)}thread_native.h thread.$(OBJEXT): {$(VPATH)}thread_sync.c +thread.$(OBJEXT): {$(VPATH)}thread_sync.rb +thread.$(OBJEXT): {$(VPATH)}thread_sync.rbinc thread.$(OBJEXT): {$(VPATH)}timev.h thread.$(OBJEXT): {$(VPATH)}vm_core.h thread.$(OBJEXT): {$(VPATH)}vm_debug.h diff --git a/hrtime.h b/hrtime.h index 4ac3d54723..80aff5deb3 100644 --- a/hrtime.h +++ b/hrtime.h @@ -36,6 +36,7 @@ https://github.com/ruby/ruby/blob/trunk/hrtime.h#L36 #define RB_HRTIME_PER_MSEC (RB_HRTIME_PER_USEC * (rb_hrtime_t)1000) #define RB_HRTIME_PER_SEC (RB_HRTIME_PER_MSEC * (rb_hrtime_t)1000) #define RB_HRTIME_MAX UINT64_MAX +#define RB_HRTIME_MIN ((rb_hrtime_t)0) /* * Lets try to support time travelers. Lets assume anybody with a time machine @@ -91,6 +92,15 @@ rb_hrtime_add(rb_hrtime_t a, rb_hrtime_t b) https://github.com/ruby/ruby/blob/trunk/hrtime.h#L92 return c; } +static inline rb_hrtime_t +rb_hrtime_sub(rb_hrtime_t a, rb_hrtime_t b) +{ + if (a < b) { + return RB_HRTIME_MIN; + } + return a - b; +} + /* * convert a timeval struct to rb_hrtime_t, clamping at RB_HRTIME_MAX */ diff --git a/inits.c b/inits.c index f41e88d838..22ba6d5a8c 100644 --- a/inits.c +++ b/inits.c @@ -98,6 +98,7 @@ rb_call_builtin_inits(void) https://github.com/ruby/ruby/blob/trunk/inits.c#L98 BUILTIN(array); BUILTIN(kernel); BUILTIN(timev); + BUILTIN(thread_sync); BUILTIN(yjit); BUILTIN(nilclass); BUILTIN(marshal); diff --git a/spec/ruby/shared/queue/deque.rb b/spec/ruby/shared/queue/deque.rb index 8b755dd9b7..ed32bd29c8 100644 --- a/spec/ruby/shared/queue/deque.rb +++ b/spec/ruby/shared/queue/deque.rb @@ -55,6 +55,61 @@ describe :queue_deq, shared: true do https://github.com/ruby/ruby/blob/trunk/spec/ruby/shared/queue/deque.rb#L55 t.join end + describe "with a timeout" do + ruby_version_is "3.2" do + it "returns an item if one is available in time" do + q = @object.call + + t = Thread.new { + q.send(@method, timeout: 1).should == 1 + } + Thread.pass until t.status == "sleep" && q.num_waiting == 1 + q << 1 + t.join + end + + it "returns nil if no item is available in time" do + q = @object.call + + t = Thread.new { + q.send(@method, timeout: 0.1).should == nil + } + t.join + end + + it "does nothing if the timeout is nil" do + q = @object.call + t = Thread.new { + q.send(@method, timeout: nil).should == 1 + } + t.join(0.2).should == nil + q << 1 + t.join + end + + it "raise TypeError if timeout is not a valid numeric" do + q = @object.call + -> { q.send(@method, timeout: "1") }.should raise_error( + TypeError, + "no implicit conversion to float from string", + ) + + -> { q.send(@method, timeout: false) }.should raise_error( + TypeError, + "no implicit conversion to float from false", + ) + end + + it "raise ArgumentError if non_block = true is passed too" do + q = @object.call + -> { q.send(@method, true, timeout: 1) }.should raise_error( + ArgumentError, + "can't set a timeout if non_block is enabled", + ) + end + end + end + describe "in non-blocking mode" do it "removes an item from the queue" do q = @object.call diff --git a/test/ruby/test_settracefunc.rb b/test/ruby/test_settracefunc.rb index 56d457c7d7..31946c8b71 100644 --- a/test/ruby/test_settracefunc.rb +++ b/test/ruby/test_settracefunc.rb @@ -2140,17 +2140,16 @@ CODE https://github.com/ruby/ruby/blob/trunk/test/ruby/test_settracefunc.rb#L2140 m2t_q.push 1 t.join - assert_equal ["c-return", base_line + 31], events[0] - assert_equal ["line", base_line + 32], events[1] - assert_equal ["line", base_line + 33], events[2] - assert_equal ["call", base_line + -6], events[3] - assert_equal ["return", base_line + -4], events[4] - assert_equal ["line", base_line + 34], events[5] - assert_equal ["line", base_line + 35], events[6] - assert_equal ["c-call", base_line + 35], events[7] # Thread.current - assert_equal ["c-return", base_line + 35], events[8] # Thread.current - assert_equal ["c-call", base_line + 35], events[9] # Thread#set_trace_func - assert_equal nil, events[10] + assert_equal ["line", base_line + 32], events[0] + assert_equal ["line", base_line + 33], events[1] + assert_equal ["call", base_line + -6], events[2] + assert_equal ["return", base_line + -4], events[3] + assert_equal ["line", base_line + 34], events[4] + assert_equal ["line", base_line + 35], events[5] + assert_equal ["c-call", base_line + 35], events[6] # Thread.current + assert_equal ["c-return", base_line + 35], events[7] # Thread.current + assert_equal ["c-call", base_line + 35], events[8] # Thread#set_trace_func + assert_equal nil, events[9] end def test_lineno_in_optimized_insn diff --git a/test/ruby/test_thread_queue.rb b/test/ruby/test_thread_queue.rb index ebf7ded3b9..aa4ea0a400 100644 --- a/test/ruby/test_thread_queue.rb +++ b/test/ruby/test_thread_queue.rb @@ -111,6 +111,23 @@ class TestThreadQueue < Test::Unit::TestCase https://github.com/ruby/ruby/blob/trunk/test/ruby/test_thread_queue.rb#L111 assert_equal(0, q.num_waiting) end + def test_queue_pop_timeout + q = Thread::Queue.new + q << 1 + assert_equal 1, q.pop(timeout: 1) + + t1 = Thread.new { q.pop(timeout: 1) } + assert_equal t1, t1.join(2) + assert_nil t1.value + + t2 = Thread.new { q.pop(timeout: 0.1) } + assert_equal t2, t2.join(0.2) + assert_nil t2.value + ensure + t1&.kill + t2&.kill + end + def test_queue_pop_non_block q = Thread::Queue.new assert_raise_with_message(ThreadError, /empty/) do @@ -126,6 +143,24 @@ class TestThreadQueue < Test::Unit::TestCase https://github.com/ruby/ruby/blob/trunk/test/ruby/test_thread_queue.rb#L143 assert_equal(0, q.num_waiting) end + def test_sized_queue_pop_timeout + q = Thread::SizedQueue.new(1) + + q << 1 + assert_equal 1, q.pop(timeout: 1) + + t1 = Thread.new { q.pop(timeout: 1) } + assert_equal t1, t1.join(2) + assert_nil t1.value + + t2 = Thread.new { q.pop(timeout: 0.1) } + assert_equal t2, t2.join(0.2) + assert_nil t2.value + ensure + t1&.kill + t2&.kill + end + def test_sized_queue_pop_non_block q = Thread::SizedQueue.new(1) assert_raise_with_message(ThreadError, /empty/) do diff --git a/thread.c b/thread.c index 411b6d7084..feb89d4352 100644 --- a/thread.c +++ b/thread.c @@ -132,7 +132,7 @@ rb_thread_local_storage(VALUE thread) https://github.com/ruby/ruby/blob/trunk/thread.c#L132 static int sleep_hrtime(rb_thread_t *, rb_hrtime_t, unsigned int fl); static void sleep_forever(rb_thread_t *th, unsigned int fl); -static void rb_thread_sleep_deadly_allow_spurious_wakeup(VALUE blocker); +static void rb_thread_sleep_deadly_allow_spurious_wakeup(VALUE blocker, VALUE timeout, rb_hrtime_t end); static int rb_threadptr_dead(rb_thread_t *th); static void rb_check_deadlock(rb_ractor_t *r); (... truncated) -- ML: ruby-changes@q... Info: http://www.atdot.net/~ko1/quickml/