ruby-changes:52144
From: normal <ko1@a...>
Date: Tue, 14 Aug 2018 06:34:26 +0900 (JST)
Subject: [ruby-changes:52144] normal:r64352 (trunk): thread_pthread.c: eliminate timer thread by restructuring GVL
normal 2018-08-14 06:34:20 +0900 (Tue, 14 Aug 2018) New Revision: 64352 https://svn.ruby-lang.org/cgi-bin/viewvc.cgi?view=revision&revision=64352 Log: thread_pthread.c: eliminate timer thread by restructuring GVL This reverts commit 194a6a2c68e9c8a3536b24db18ceac87535a6051 (r64203). Race conditions which caused the original reversion will be fixed in the subsequent commit. [ruby-core:88360] [Misc #14937] Modified files: trunk/internal.h trunk/process.c trunk/signal.c trunk/test/ruby/test_io.rb trunk/test/ruby/test_process.rb trunk/test/ruby/test_thread.rb trunk/thread.c trunk/thread_pthread.c trunk/thread_pthread.h trunk/thread_win32.c trunk/vm_core.h Index: test/ruby/test_thread.rb =================================================================== --- test/ruby/test_thread.rb (revision 64351) +++ test/ruby/test_thread.rb (revision 64352) @@ -952,15 +952,16 @@ _eom https://github.com/ruby/ruby/blob/trunk/test/ruby/test_thread.rb#L952 def test_thread_timer_and_interrupt bug5757 = '[ruby-dev:44985]' pid = nil - cmd = 'Signal.trap(:INT, "DEFAULT"); r,=IO.pipe; Thread.start {Thread.pass until Thread.main.stop?; puts; STDOUT.flush}; r.read' + cmd = 'Signal.trap(:INT, "DEFAULT"); pipe=IO.pipe; Thread.start {Thread.pass until Thread.main.stop?; puts; STDOUT.flush}; pipe[0].read' opt = {} opt[:new_pgroup] = true if /mswin|mingw/ =~ RUBY_PLATFORM s, t, _err = EnvUtil.invoke_ruby(['-e', cmd], "", true, true, opt) do |in_p, out_p, err_p, cpid| + assert IO.select([out_p], nil, nil, 10), 'subprocess not ready' out_p.gets pid = cpid t0 = Time.now.to_f Process.kill(:SIGINT, pid) - Process.wait(pid) + Timeout.timeout(10) { Process.wait(pid) } t1 = Time.now.to_f [$?, t1 - t0, err_p.read] end Index: test/ruby/test_process.rb =================================================================== --- test/ruby/test_process.rb (revision 64351) +++ test/ruby/test_process.rb (revision 64352) @@ -1767,7 +1767,7 @@ class TestProcess < Test::Unit::TestCase https://github.com/ruby/ruby/blob/trunk/test/ruby/test_process.rb#L1767 puts Dir.entries("/proc/self/task") - %W[. ..] end bug4920 = '[ruby-dev:43873]' - assert_equal(2, data.size, bug4920) + assert_include(1..2, data.size, bug4920) assert_not_include(data.map(&:to_i), pid) end else # darwin Index: test/ruby/test_io.rb =================================================================== --- test/ruby/test_io.rb (revision 64351) +++ test/ruby/test_io.rb (revision 64352) @@ -3564,7 +3564,8 @@ __END__ https://github.com/ruby/ruby/blob/trunk/test/ruby/test_io.rb#L3564 end def test_race_gets_and_close - assert_separately([], "#{<<-"begin;"}\n#{<<-"end;"}") + opt = { signal: :ABRT, timeout: 200 } + assert_separately([], "#{<<-"begin;"}\n#{<<-"end;"}", opt) bug13076 = '[ruby-core:78845] [Bug #13076]' begin; 10.times do |i| @@ -3586,9 +3587,9 @@ __END__ https://github.com/ruby/ruby/blob/trunk/test/ruby/test_io.rb#L3587 w.close r.close end - assert_nothing_raised(IOError, bug13076) { - t.each(&:join) - } + t.each do |th| + assert_same(th, th.join(2), bug13076) + end end end; end Index: signal.c =================================================================== --- signal.c (revision 64351) +++ signal.c (revision 64352) @@ -709,9 +709,6 @@ signal_enque(int sig) https://github.com/ruby/ruby/blob/trunk/signal.c#L709 static rb_atomic_t sigchld_hit; -/* Prevent compiler from reordering access */ -#define ACCESS_ONCE(type,x) (*((volatile type *)&(x))) - static RETSIGTYPE sighandler(int sig) { @@ -730,7 +727,7 @@ sighandler(int sig) https://github.com/ruby/ruby/blob/trunk/signal.c#L727 else { signal_enque(sig); } - rb_thread_wakeup_timer_thread(); + rb_thread_wakeup_timer_thread(sig); #if !defined(BSD_SIGNAL) && !defined(POSIX_SIGNAL) ruby_signal(sig, sighandler); #endif @@ -764,7 +761,6 @@ rb_enable_interrupt(void) https://github.com/ruby/ruby/blob/trunk/signal.c#L761 #ifdef HAVE_PTHREAD_SIGMASK sigset_t mask; sigemptyset(&mask); - sigaddset(&mask, RUBY_SIGCHLD); /* timer-thread handles this */ pthread_sigmask(SIG_SETMASK, &mask, NULL); #endif } @@ -1077,7 +1073,6 @@ rb_trap_exit(void) https://github.com/ruby/ruby/blob/trunk/signal.c#L1073 void ruby_waitpid_all(rb_vm_t *); /* process.c */ -/* only runs in the timer-thread */ void ruby_sigchld_handler(rb_vm_t *vm) { Index: vm_core.h =================================================================== --- vm_core.h (revision 64351) +++ vm_core.h (revision 64352) @@ -564,10 +564,12 @@ typedef struct rb_vm_struct { https://github.com/ruby/ruby/blob/trunk/vm_core.h#L564 VALUE self; rb_global_vm_lock_t gvl; - rb_nativethread_lock_t thread_destruct_lock; struct rb_thread_struct *main_thread; - struct rb_thread_struct *running_thread; + + /* persists across uncontended GVL release/acquire for time slice */ + const struct rb_thread_struct *running_thread; + #ifdef USE_SIGALTSTACK void *main_altstack; #endif @@ -1583,7 +1585,7 @@ void rb_vm_pop_frame(rb_execution_contex https://github.com/ruby/ruby/blob/trunk/vm_core.h#L1585 void rb_thread_start_timer_thread(void); void rb_thread_stop_timer_thread(void); void rb_thread_reset_timer_thread(void); -void rb_thread_wakeup_timer_thread(void); +void rb_thread_wakeup_timer_thread(int); static inline void rb_vm_living_threads_init(rb_vm_t *vm) Index: thread_pthread.c =================================================================== --- thread_pthread.c (revision 64351) +++ thread_pthread.c (revision 64352) @@ -45,27 +45,21 @@ void rb_native_cond_broadcast(rb_nativet https://github.com/ruby/ruby/blob/trunk/thread_pthread.c#L45 void rb_native_cond_wait(rb_nativethread_cond_t *cond, rb_nativethread_lock_t *mutex); void rb_native_cond_initialize(rb_nativethread_cond_t *cond); void rb_native_cond_destroy(rb_nativethread_cond_t *cond); -static void rb_thread_wakeup_timer_thread_low(void); static void clear_thread_cache_altstack(void); - -#define TIMER_THREAD_MASK (1) -#define TIMER_THREAD_SLEEPY (2|TIMER_THREAD_MASK) -#define TIMER_THREAD_BUSY (4|TIMER_THREAD_MASK) - -#if defined(HAVE_POLL) && defined(HAVE_FCNTL) && defined(F_GETFL) && \ - defined(F_SETFL) && defined(O_NONBLOCK) && \ - defined(F_GETFD) && defined(F_SETFD) && defined(FD_CLOEXEC) -/* The timer thread sleeps while only one Ruby thread is running. */ -# define TIMER_IMPL TIMER_THREAD_SLEEPY -#else -# define TIMER_IMPL TIMER_THREAD_BUSY -#endif - -static struct { - pthread_t id; - int created; -} timer_thread; -#define TIMER_THREAD_CREATED_P() (timer_thread.created != 0) +static void ubf_wakeup_all_threads(void); +static int ubf_threads_empty(void); +static int native_cond_timedwait(rb_nativethread_cond_t *, pthread_mutex_t *, + const struct timespec *); +static const struct timespec *sigwait_timeout(rb_thread_t *, int sigwait_fd, + const struct timespec *, + int *drained_p); + +#define TIMER_THREAD_CREATED_P() (timer_thread_pipe.owner_process == getpid()) + +/* for testing, and in case we come across a platform w/o pipes: */ +#define BUSY_WAIT_SIGNALS (0) +#define THREAD_INVALID ((const rb_thread_t *)-1) +static const rb_thread_t *sigwait_th; #ifdef HAVE_SCHED_YIELD #define native_thread_yield() (void)sched_yield() @@ -82,49 +76,96 @@ static pthread_condattr_t *condattr_mono https://github.com/ruby/ruby/blob/trunk/thread_pthread.c#L76 static const void *const condattr_monotonic = NULL; #endif +/* 100ms. 10ms is too small for user level thread scheduling + * on recent Linux (tested on 2.6.35) + */ +#define TIME_QUANTUM_USEC (100 * 1000) + +static struct timespec native_cond_timeout(rb_nativethread_cond_t *, + struct timespec rel); + static void -gvl_acquire_common(rb_vm_t *vm) +gvl_acquire_common(rb_vm_t *vm, rb_thread_t *th) { if (vm->gvl.acquired) { + native_thread_data_t *nd = &th->native_thread_data; - if (!vm->gvl.waiting++) { - /* - * Wake up timer thread iff timer thread is slept. - * When timer thread is polling mode, we don't want to - * make confusing timer thread interval time. - */ - rb_thread_wakeup_timer_thread_low(); - } + VM_ASSERT(th->unblock.func == 0 && "we reuse ubf_list for GVL waitq"); - while (vm->gvl.acquired) { - rb_native_cond_wait(&vm->gvl.cond, &vm->gvl.lock); - } + list_add_tail(&vm->gvl.waitq, &nd->ubf_list); + do { + if (!vm->gvl.timer) { + static struct timespec ts; + static int err = ETIMEDOUT; + + /* + * become designated timer thread to kick vm->gvl.acquired + * periodically. Continue on old timeout if it expired: + */ + if (err == ETIMEDOUT) { + ts.tv_sec = 0; + ts.tv_nsec = TIME_QUANTUM_USEC * 1000; + ts = native_cond_timeout(&nd->cond.gvlq, ts); + } + vm->gvl.timer = th; + err = native_cond_timedwait(&nd->cond.gvlq, &vm->gvl.lock, &ts); + vm->gvl.timer = 0; + ubf_wakeup_all_threads(); + + /* + * Timeslice. We can't touch thread_destruct_lock here, + * as the process may fork while this thread is contending + * for GVL: + */ + if (vm->gvl.acquired) timer_thread_function(); + } + else { + rb_native_cond_wait(&nd->cond.gvlq, &vm->gvl.lock); + } + } while (vm->gvl.acquired); - --vm->gvl.waiting; + list_del_init(&nd->ubf_list); - if (vm->gvl.need_yield) { - vm->gvl.need_yield = 0; + if (vm->gvl.need_yield) { + vm->gvl.need_yield = 0; rb_native_cond_signal(&vm->gvl.switch_cond); - } + } } + vm->gvl.acquired = th; + /* + * Designate the next gvl.timer thread, favor the last thread in + * the waitq since it will be in waitq longest + */ + if (!vm->gvl.timer) { + native_thread_data_t *last; - vm->gvl.acquired = 1; + last = list_tail(&vm->gvl.waitq, native_thread_data_t, ubf_list); + if (last) { + rb_native_cond_signal(&last->cond.gvlq); + } + else if (!ubf_threads_empty()) { + rb_thread_wakeup_timer_thread(0); + } + } } static void gvl_acquire(rb_vm_t *vm, rb_thread_t *th) { rb_native_mutex_lock(&vm->gvl.lock); - gvl_acquire_common(vm); + gvl_acquire_common(vm, th); rb_native_mutex_unlock(&vm->gvl.lock); } -static void +static native_thread_data_t * gvl_release_common(rb_vm_t *vm) { + native_thread_data_t *next; vm->gvl.acquired = 0; - if (vm->gvl.waiting > 0) - rb_native_cond_signal(&vm->gvl.cond); + next = list_top(&vm->gvl.waitq, native_thread_data_t, ubf_list); + if (next) rb_native_cond_signal(&next->cond.gvlq); + + return next; } static void @@ -138,34 +179,38 @@ gvl_release(rb_vm_t *vm) https://github.com/ruby/ruby/blob/trunk/thread_pthread.c#L179 static void gvl_yield(rb_vm_t *vm, rb_thread_t *th) { - rb_native_mutex_lock(&vm->gvl.lock); + native_thread_data_t *next; - gvl_release_common(vm); + rb_native_mutex_lock(&vm->gvl.lock); + next = gvl_release_common(vm); /* An another thread is processing GVL yield. */ if (UNLIKELY(vm->gvl.wait_yield)) { - while (vm->gvl.wait_yield) + while (vm->gvl.wait_yield) rb_native_cond_wait(&vm->gvl.switch_wait_cond, &vm->gvl.lock); - goto acquire; } - - if (vm->gvl.waiting > 0) { - /* Wait until another thread task take GVL. */ - vm->gvl.need_yield = 1; - vm->gvl.wait_yield = 1; - while (vm->gvl.need_yield) + else if (next) { + /* Wait until another thread task takes GVL. */ + vm->gvl.need_yield = 1; + vm->gvl.wait_yield = 1; + while (vm->gvl.need_yield) rb_native_cond_wait(&vm->gvl.switch_cond, &vm->gvl.lock); - vm->gvl.wait_yield = 0; + vm->gvl.wait_yield = 0; + rb_native_cond_broadcast(&vm->gvl.switch_wait_cond); } else { - rb_native_mutex_unlock(&vm->gvl.lock); - sched_yield(); + rb_native_mutex_unlock(&vm->gvl.lock); + /* + * GVL was not contended when we released, so we have no potential + * contenders for reacquisition. Perhaps they are stuck in blocking + * region w/o GVL, too, so we kick them: + */ + ubf_wakeup_all_threads(); + native_thread_yield(); rb_native_mutex_lock(&vm->gvl.lock); + rb_native_cond_broadcast(&vm->gvl.switch_wait_cond); } - - rb_native_cond_broadcast(&vm->gvl.switch_wait_cond); - acquire: - gvl_acquire_common(vm); + gvl_acquire_common(vm, th); rb_native_mutex_unlock(&vm->gvl.lock); } @@ -173,11 +218,11 @@ static void https://github.com/ruby/ruby/blob/trunk/thread_pthread.c#L218 gvl_init(rb_vm_t *vm) { rb_native_mutex_initialize(&vm->gvl.lock); - rb_native_cond_initialize(&vm->gvl.cond); rb_native_cond_initialize(&vm->gvl.switch_cond); rb_native_cond_initialize(&vm->gvl.switch_wait_cond); + list_head_init(&vm->gvl.waitq); vm->gvl.acquired = 0; - vm->gvl.waiting = 0; + vm->gvl.timer = 0; vm->gvl.need_yield = 0; vm->gvl.wait_yield = 0; } @@ -185,10 +230,16 @@ gvl_init(rb_vm_t *vm) https://github.com/ruby/ruby/blob/trunk/thread_pthread.c#L230 static void gvl_destroy(rb_vm_t *vm) { - rb_native_cond_destroy(&vm->gvl.switch_wait_cond); - rb_native_cond_destroy(&vm->gvl.switch_cond); - rb_native_cond_destroy(&vm->gvl.cond); - rb_native_mutex_destroy(&vm->gvl.lock); + /* + * only called once at VM shutdown (not atfork), another thread + * may still grab vm->gvl.lock when calling gvl_release at + * the end of thread_start_func_2 + */ + if (0) { + rb_native_cond_destroy(&vm->gvl.switch_wait_cond); + rb_native_cond_destroy(&vm->gvl.switch_cond); + rb_native_mutex_destroy(&vm->gvl.lock); + } clear_thread_cache_altstack(); } @@ -433,7 +484,9 @@ native_thread_init(rb_thread_t *th) https://github.com/ruby/ruby/blob/trunk/thread_pthread.c#L484 #ifdef USE_UBF_LIST list_node_init(&nd->ubf_list); #endif - rb_native_cond_initialize(&nd->sleep_cond); + rb_native_cond_initialize(&nd->cond.gvlq); + if (&nd->cond.gvlq != &nd->cond.intr) + rb_native_cond_initialize(&nd->cond.intr); ruby_thread_set_native(th); } @@ -444,7 +497,11 @@ native_thread_init(rb_thread_t *th) https://github.com/ruby/ruby/blob/trunk/thread_pthread.c#L497 static void native_thread_destroy(rb_thread_t *th) { - rb_native_cond_destroy(&th->native_thread_data.sleep_cond); + native_thread_data_t *nd = &th->native_thread_data; + + rb_native_cond_destroy(&nd->cond.gvlq); + if (&nd->cond.gvlq != &nd->cond.intr) + rb_native_cond_destroy(&nd->cond.intr); /* * prevent false positive from ruby_thread_has_gvl_p if that @@ -1012,17 +1069,6 @@ native_thread_create(rb_thread_t *th) https://github.com/ruby/ruby/blob/trunk/thread_pthread.c#L1069 return err; } -#if (TIMER_IMPL & TIMER_THREAD_MASK) -static void -native_thread_join(pthread_t th) -{ - int err = pthread_join(th, 0); - if (err) { - rb_raise(rb_eThreadError, "native_thread_join() failed (%d)", err); - } -} -#endif /* TIMER_THREAD_MASK */ - #if USE_NATIVE_THREAD_PRIORITY static void @@ -1064,15 +1110,15 @@ ubf_pthread_cond_signal(void *ptr) https://github.com/ruby/ruby/blob/trunk/thread_pthread.c#L1110 { rb_thread_t *th = (rb_thread_t *)ptr; thread_debug("ubf_pthread_cond_signal (%p)\n", (void *)th); - rb_native_cond_signal(&th->native_thread_data.sleep_cond); + rb_native_cond_signal(&th->native_thread_data.cond.intr); } static void -native_sleep(rb_thread_t *th, struct timespec *timeout_rel) +native_cond_sleep(rb_thread_t *th, struct timespec *timeout_rel) { struct timespec timeout; rb_nativethread_lock_t *lock = &th->interrupt_lock; - rb_nativethread_cond_t *cond = &th->native_thread_data.sleep_cond; + rb_nativethread_cond_t *cond = &th->native_thread_data.cond.intr; if (timeout_rel) { /* Solaris cond_timedwait() return EINVAL if an argument is greater than @@ -1164,17 +1210,30 @@ static void https://github.com/ruby/ruby/blob/trunk/thread_pthread.c#L1210 ubf_select(void *ptr) { rb_thread_t *th = (rb_thread_t *)ptr; + rb_vm_t *vm = th->vm; + register_ubf_list(th); /* * ubf_wakeup_thread() doesn't guarantee to wake up a target thread. * Therefore, we repeatedly call ubf_wakeup_thread() until a target thread - * exit from ubf function. - * In the other hands, we shouldn't call rb_thread_wakeup_timer_thread() - * if running on timer thread because it may make endless wakeups. + * exit from ubf function. We must designate a timer-thread to perform + * this operation. */ - if (!pthread_equal(pthread_self(), timer_thread.id)) - rb_thread_wakeup_timer_thread(); + rb_native_mutex_lock(&vm->gvl.lock); + if (!vm->gvl.timer) { + native_thread_data_t *last; + + last = list_tail(&vm->gvl.waitq, native_thread_data_t, ubf_list); + if (last) { + rb_native_cond_signal(&last->cond.gvlq); + } + else { + rb_thread_wakeup_timer_thread(0); + } + } + rb_native_mutex_unlock(&vm->gvl.lock); + ubf_wakeup_thread(th); } @@ -1211,39 +1270,16 @@ static int ubf_threads_empty(void) { ret https://github.com/ruby/ruby/blob/trunk/thread_pthread.c#L1270 #define TT_DEBUG 0 #define WRITE_CONST(fd, str) (void)(write((fd),(str),sizeof(str)-1)<0) -/* 100ms. 10ms is too small for user level thread scheduling - * on recent Linux (tested on 2.6.35) - */ -#define TIME_QUANTUM_USEC (100 * 1000) - -#if TIMER_IMPL == TIMER_THREAD_SLEEPY static struct { - /* - * Read end of each pipe is closed inside timer thread for shutdown - * Write ends are closed by a normal Ruby thread during shutdown - */ + /* pipes are closed in forked children when owner_process does not match */ int normal[2]; - int low[2]; /* volatile for signal handler use: */ volatile rb_pid_t owner_process; } timer_thread_pipe = { {-1, -1}, - {-1, -1}, /* low priority */ }; -NORETURN(static void async_bug_fd(const char *mesg, int errno_arg, int fd)); -static void -async_bug_fd(const char *mesg, int errno_arg, int fd) -{ - char buff[64]; - size_t n = strlcpy(buff, mesg, sizeof(buff)); - if (n < sizeof(buff)-3) { - ruby_snprintf(buff+n, sizeof(buff)-n, "(%d)", fd); - } - rb_async_bug_errno(buff, errno_arg); -} - /* only use signal-safe system calls here */ static void rb_thread_wakeup_timer_thread_fd(int fd) @@ -1275,49 +1311,33 @@ rb_thread_wakeup_timer_thread_fd(int fd) https://github.com/ruby/ruby/blob/trunk/thread_pthread.c#L1311 } void -rb_thread_wakeup_timer_thread(void) +rb_thread_wakeup_timer_thread(int sig) { /* must be safe inside sighandler, so no mutex */ if (timer_thread_pipe.owner_process == getpid()) { - rb_thread_wakeup_timer_thread_fd(timer_thread_pipe.normal[1]); - } -} + rb_thread_wakeup_timer_thread_fd(timer_thread_pipe.normal[1]); -static void -rb_thread_wakeup_timer_thread_low(void) -{ - if (timer_thread_pipe.owner_process == getpid()) { - rb_thread_wakeup_timer_thread_fd(timer_thread_pipe.low[1]); - } -} + /* + * system_working check is required because vm and main_thread are + * freed during shutdown + */ + if (sig && system_working) { + volatile rb_execution_context_t *ec; + rb_vm_t *vm = GET_VM(); + rb_thread_t *mth; + + /* + * FIXME: root VM and main_thread should be static and not + * on heap for maximum safety (and startup/shutdown speed) + */ + if (!vm) return; + mth = vm->main_thread; + if (!mth || !system_working) return; -/* VM-dependent API is not available for this function */ -static void -consume_communication_pipe(int fd) -{ -#define CCP_READ_BUFF (... truncated) -- ML: ruby-changes@q... Info: http://www.atdot.net/~ko1/quickml/