ruby-changes:72454
From: Jean <ko1@a...>
Date: Fri, 8 Jul 2022 00:49:16 +0900 (JST)
Subject: [ruby-changes:72454] 587d2d199b (master): thread_pthread.c: call SUSPENDED event when entering native_sleep
https://git.ruby-lang.org/ruby.git/commit/?id=587d2d199b From 587d2d199b3f783d03266d42d066949f8a4824d3 Mon Sep 17 00:00:00 2001 From: Jean Boussier <jean.boussier@g...> Date: Thu, 7 Jul 2022 15:20:35 +0200 Subject: thread_pthread.c: call SUSPENDED event when entering native_sleep [Bug #18900] Thread#join and a few other codepaths are using native sleep as a way to suspend the current thread. So we should call the relevant hook when this happen, otherwise some thread may transition directly from `RESUMED` to `READY`. --- .../thread/instrumentation/instrumentation.c | 30 ++++++ test/-ext-/thread/test_instrumentation_api.rb | 102 +++++++++++---------- thread_pthread.c | 2 + 3 files changed, 86 insertions(+), 48 deletions(-) diff --git a/ext/-test-/thread/instrumentation/instrumentation.c b/ext/-test-/thread/instrumentation/instrumentation.c index 517af7e339..2bbce1179a 100644 --- a/ext/-test-/thread/instrumentation/instrumentation.c +++ b/ext/-test-/thread/instrumentation/instrumentation.c @@ -8,6 +8,19 @@ static rb_atomic_t resumed_count = 0; https://github.com/ruby/ruby/blob/trunk/ext/-test-/thread/instrumentation/instrumentation.c#L8 static rb_atomic_t suspended_count = 0; static rb_atomic_t exited_count = 0; +#if __STDC_VERSION__ >= 201112 + #define RB_THREAD_LOCAL_SPECIFIER _Thread_local +#elif defined(__GNUC__) && !defined(RB_THREAD_LOCAL_SPECIFIER_IS_UNSUPPORTED) + /* note that ICC (linux) and Clang are covered by __GNUC__ */ + #define RB_THREAD_LOCAL_SPECIFIER __thread +#else + #define RB_THREAD_LOCAL_SPECIFIER +#endif + +static RB_THREAD_LOCAL_SPECIFIER unsigned int local_ready_count = 0; +static RB_THREAD_LOCAL_SPECIFIER unsigned int local_resumed_count = 0; +static RB_THREAD_LOCAL_SPECIFIER unsigned int local_suspended_count = 0; + void ex_callback(rb_event_flag_t event, const rb_internal_thread_event_data_t *event_data, void *user_data) { @@ -17,12 +30,15 @@ ex_callback(rb_event_flag_t event, const rb_internal_thread_event_data_t *event_ https://github.com/ruby/ruby/blob/trunk/ext/-test-/thread/instrumentation/instrumentation.c#L30 break; case RUBY_INTERNAL_THREAD_EVENT_READY: RUBY_ATOMIC_INC(ready_count); + local_ready_count++; break; case RUBY_INTERNAL_THREAD_EVENT_RESUMED: RUBY_ATOMIC_INC(resumed_count); + local_resumed_count++; break; case RUBY_INTERNAL_THREAD_EVENT_SUSPENDED: RUBY_ATOMIC_INC(suspended_count); + local_suspended_count++; break; case RUBY_INTERNAL_THREAD_EVENT_EXITED: RUBY_ATOMIC_INC(exited_count); @@ -44,6 +60,16 @@ thread_counters(VALUE thread) https://github.com/ruby/ruby/blob/trunk/ext/-test-/thread/instrumentation/instrumentation.c#L60 return array; } +static VALUE +thread_local_counters(VALUE thread) +{ + VALUE array = rb_ary_new2(3); + rb_ary_push(array, UINT2NUM(local_ready_count)); + rb_ary_push(array, UINT2NUM(local_resumed_count)); + rb_ary_push(array, UINT2NUM(local_suspended_count)); + return array; +} + static VALUE thread_reset_counters(VALUE thread) { @@ -52,6 +78,9 @@ thread_reset_counters(VALUE thread) https://github.com/ruby/ruby/blob/trunk/ext/-test-/thread/instrumentation/instrumentation.c#L78 RUBY_ATOMIC_SET(resumed_count, 0); RUBY_ATOMIC_SET(suspended_count, 0); RUBY_ATOMIC_SET(exited_count, 0); + local_ready_count = 0; + local_resumed_count = 0; + local_suspended_count = 0; return Qtrue; } @@ -104,6 +133,7 @@ Init_instrumentation(void) https://github.com/ruby/ruby/blob/trunk/ext/-test-/thread/instrumentation/instrumentation.c#L133 VALUE mBug = rb_define_module("Bug"); VALUE klass = rb_define_module_under(mBug, "ThreadInstrumentation"); rb_define_singleton_method(klass, "counters", thread_counters, 0); + rb_define_singleton_method(klass, "local_counters", thread_local_counters, 0); rb_define_singleton_method(klass, "reset_counters", thread_reset_counters, 0); rb_define_singleton_method(klass, "register_callback", thread_register_callback, 0); rb_define_singleton_method(klass, "unregister_callback", thread_unregister_callback, 0); diff --git a/test/-ext-/thread/test_instrumentation_api.rb b/test/-ext-/thread/test_instrumentation_api.rb index fe91c942c7..78e499c473 100644 --- a/test/-ext-/thread/test_instrumentation_api.rb +++ b/test/-ext-/thread/test_instrumentation_api.rb @@ -3,76 +3,82 @@ require 'envutil' https://github.com/ruby/ruby/blob/trunk/test/-ext-/thread/test_instrumentation_api.rb#L3 class TestThreadInstrumentation < Test::Unit::TestCase def setup - pend("TODO: No windows support yet") if /mswin|mingw|bccwin/ =~ RUBY_PLATFORM - end - - THREADS_COUNT = 3 + pend("No windows support") if /mswin|mingw|bccwin/ =~ RUBY_PLATFORM - def test_thread_instrumentation require '-test-/thread/instrumentation' Bug::ThreadInstrumentation.reset_counters Bug::ThreadInstrumentation::register_callback + end - begin - threads = threaded_cpu_work - assert_equal [false] * THREADS_COUNT, threads.map(&:status) - counters = Bug::ThreadInstrumentation.counters - counters.each do |c| - assert_predicate c, :nonzero?, "Call counters: #{counters.inspect}" - end - - assert_equal THREADS_COUNT, counters.first - assert_in_delta THREADS_COUNT, counters.last, 1 # It's possible that a thread didn't execute its EXIT hook yet. - ensure - Bug::ThreadInstrumentation::unregister_callback + def teardown + return if /mswin|mingw|bccwin/ =~ RUBY_PLATFORM + Bug::ThreadInstrumentation::unregister_callback + end + + THREADS_COUNT = 3 + + def test_thread_instrumentation + threads = threaded_cpu_work + assert_equal [false] * THREADS_COUNT, threads.map(&:status) + counters = Bug::ThreadInstrumentation.counters + counters.each do |c| + assert_predicate c, :nonzero?, "Call counters: #{counters.inspect}" end + + assert_equal THREADS_COUNT, counters.first + assert_in_delta THREADS_COUNT, counters.last, 1 # It's possible that a thread didn't execute its EXIT hook yet. + end + + def test_join_counters # Bug #18900 + thr = Thread.new { fib(30) } + Bug::ThreadInstrumentation.reset_counters + thr.join + assert_equal [1, 1, 1], Bug::ThreadInstrumentation.local_counters end def test_thread_instrumentation_fork_safe skip "No fork()" unless Process.respond_to?(:fork) - require '-test-/thread/instrumentation' - Bug::ThreadInstrumentation::register_callback - read_pipe, write_pipe = IO.pipe - begin - pid = fork do - Bug::ThreadInstrumentation.reset_counters - threads = threaded_cpu_work - write_pipe.write(Marshal.dump(threads.map(&:status))) - write_pipe.write(Marshal.dump(Bug::ThreadInstrumentation.counters)) - write_pipe.close - exit!(0) - end + pid = fork do + Bug::ThreadInstrumentation.reset_counters + threads = threaded_cpu_work + write_pipe.write(Marshal.dump(threads.map(&:status))) + write_pipe.write(Marshal.dump(Bug::ThreadInstrumentation.counters)) write_pipe.close - _, status = Process.wait2(pid) - assert_predicate status, :success? - - thread_statuses = Marshal.load(read_pipe) - assert_equal [false] * THREADS_COUNT, thread_statuses - - counters = Marshal.load(read_pipe) - read_pipe.close - counters.each do |c| - assert_predicate c, :nonzero?, "Call counters: #{counters.inspect}" - end - - assert_equal THREADS_COUNT, counters.first - assert_in_delta THREADS_COUNT, counters.last, 1 # It's possible that a thread didn't execute its EXIT hook yet. - ensure - Bug::ThreadInstrumentation::unregister_callback + exit!(0) end + write_pipe.close + _, status = Process.wait2(pid) + assert_predicate status, :success? + + thread_statuses = Marshal.load(read_pipe) + assert_equal [false] * THREADS_COUNT, thread_statuses + + counters = Marshal.load(read_pipe) + read_pipe.close + counters.each do |c| + assert_predicate c, :nonzero?, "Call counters: #{counters.inspect}" + end + + assert_equal THREADS_COUNT, counters.first + assert_in_delta THREADS_COUNT, counters.last, 1 # It's possible that a thread didn't execute its EXIT hook yet. end def test_thread_instrumentation_unregister - require '-test-/thread/instrumentation' + Bug::ThreadInstrumentation::unregister_callback assert Bug::ThreadInstrumentation::register_and_unregister_callbacks end private - def threaded_cpu_work - THREADS_COUNT.times.map { Thread.new { 100.times { |i| i + i } } }.each(&:join) + def fib(n = 20) + return n if n <= 1 + fib(n-1) + fib(n-2) + end + + def threaded_cpu_work(size = 20) + THREADS_COUNT.times.map { Thread.new { fib(size) } }.each(&:join) end end diff --git a/thread_pthread.c b/thread_pthread.c index 02a3640bdb..8597479765 100644 --- a/thread_pthread.c +++ b/thread_pthread.c @@ -2337,6 +2337,8 @@ native_sleep(rb_thread_t *th, rb_hrtime_t *rel) https://github.com/ruby/ruby/blob/trunk/thread_pthread.c#L2337 int sigwait_fd = rb_sigwait_fd_get(th); rb_ractor_blocking_threads_inc(th->ractor, __FILE__, __LINE__); + RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_SUSPENDED); + if (sigwait_fd >= 0) { rb_native_mutex_lock(&th->interrupt_lock); th->unblock.func = ubf_sigwait; -- cgit v1.2.1 -- ML: ruby-changes@q... Info: http://www.atdot.net/~ko1/quickml/