ruby-changes:52145
From: normal <ko1@a...>
Date: Tue, 14 Aug 2018 06:34:28 +0900 (JST)
Subject: [ruby-changes:52145] normal:r64353 (trunk): thread_pthread: use POSIX timer or thread to get rid of races
normal 2018-08-14 06:34:24 +0900 (Tue, 14 Aug 2018) New Revision: 64353 https://svn.ruby-lang.org/cgi-bin/viewvc.cgi?view=revision&revision=64353 Log: thread_pthread: use POSIX timer or thread to get rid of races This closes race condition where GVL is uncontended and a thread receives a signal immediately before calling the blocking function when releasing GVL: 1) check interrupts 2) release GVL 3) blocking function If signal fires after 1) but before 3), that thread may never wake up if GVL is uncontended We also need to wakeup the ubf_list unconditionally on gvl_yield; because two threads can be yielding to each other while waiting on IO#close while waiting on threads in IO#read or IO#gets. [ruby-core:88360] [Misc #14937] Modified files: trunk/configure.ac trunk/thread_pthread.c Index: thread_pthread.c =================================================================== --- thread_pthread.c (revision 64352) +++ thread_pthread.c (revision 64353) @@ -34,6 +34,56 @@ https://github.com/ruby/ruby/blob/trunk/thread_pthread.c#L34 #if defined(__HAIKU__) #include <kernel/OS.h> #endif +#include <time.h> +#include <signal.h> + +#if defined(SIGVTALRM) && !defined(__CYGWIN__) +# define USE_UBF_LIST 1 +#endif + +/* + * UBF_TIMER and ubf_list both use SIGVTALRM. + * UBF_TIMER is to close TOCTTOU signal race on programs + * without GVL contention blocking read/write to sockets. + * + * ubf_list wakeups may be triggered periodically by UBF_TIMER on + * gvl_yield. + */ +#define UBF_TIMER_NONE 0 +#define UBF_TIMER_POSIX 1 +#define UBF_TIMER_PTHREAD 2 + +#ifndef UBF_TIMER +# if defined(HAVE_TIMER_SETTIME) && defined(HAVE_TIMER_CREATE) && \ + defined(CLOCK_MONOTONIC) && defined(USE_UBF_LIST) + /* preferred */ +# define UBF_TIMER UBF_TIMER_POSIX +# elif defined(USE_UBF_LIST) + /* safe, but inefficient */ +# define UBF_TIMER UBF_TIMER_PTHREAD +# else + /* we'll be racy without SIGVTALRM for ubf_list */ +# define UBF_TIMER UBF_TIMER_NONE +# endif +#endif + +#if UBF_TIMER == UBF_TIMER_POSIX +static struct { + timer_t timerid; + rb_atomic_t armed; /* 0: disarmed, 1: arming, 2: armed */ + rb_pid_t owner; +} timer_posix; +#elif UBF_TIMER == UBF_TIMER_PTHREAD +static void *timer_pthread_fn(void *); +static struct { + int low[2]; + rb_atomic_t armed; /* boolean */ + rb_pid_t owner; + pthread_t thid; +} timer_pthread = { + { -1, -1 }, +}; +#endif void rb_native_mutex_lock(rb_nativethread_lock_t *lock); void rb_native_mutex_unlock(rb_nativethread_lock_t *lock); @@ -53,6 +103,7 @@ static int native_cond_timedwait(rb_nati https://github.com/ruby/ruby/blob/trunk/thread_pthread.c#L103 static const struct timespec *sigwait_timeout(rb_thread_t *, int sigwait_fd, const struct timespec *, int *drained_p); +static void rb_timer_disarm(void); #define TIMER_THREAD_CREATED_P() (timer_thread_pipe.owner_process == getpid()) @@ -79,11 +130,30 @@ static const void *const condattr_monoto https://github.com/ruby/ruby/blob/trunk/thread_pthread.c#L130 /* 100ms. 10ms is too small for user level thread scheduling * on recent Linux (tested on 2.6.35) */ -#define TIME_QUANTUM_USEC (100 * 1000) +#define TIME_QUANTUM_MSEC (100) +#define TIME_QUANTUM_USEC (TIME_QUANTUM_MSEC * 1000) +#define TIME_QUANTUM_NSEC (TIME_QUANTUM_USEC * 1000) static struct timespec native_cond_timeout(rb_nativethread_cond_t *, struct timespec rel); +/* + * Designate the next gvl.timer thread, favor the last thread in + * the waitq since it will be in waitq longest + */ +static int +designate_timer_thread(rb_vm_t *vm) +{ + native_thread_data_t *last; + + last = list_tail(&vm->gvl.waitq, native_thread_data_t, ubf_list); + if (last) { + rb_native_cond_signal(&last->cond.gvlq); + return TRUE; + } + return FALSE; +} + static void gvl_acquire_common(rb_vm_t *vm, rb_thread_t *th) { @@ -98,13 +168,16 @@ gvl_acquire_common(rb_vm_t *vm, rb_threa https://github.com/ruby/ruby/blob/trunk/thread_pthread.c#L168 static struct timespec ts; static int err = ETIMEDOUT; + /* take over timing from timer */ + rb_timer_disarm(); + /* * become designated timer thread to kick vm->gvl.acquired * periodically. Continue on old timeout if it expired: */ if (err == ETIMEDOUT) { ts.tv_sec = 0; - ts.tv_nsec = TIME_QUANTUM_USEC * 1000; + ts.tv_nsec = TIME_QUANTUM_NSEC; ts = native_cond_timeout(&nd->cond.gvlq, ts); } vm->gvl.timer = th; @@ -132,18 +205,8 @@ gvl_acquire_common(rb_vm_t *vm, rb_threa https://github.com/ruby/ruby/blob/trunk/thread_pthread.c#L205 } } vm->gvl.acquired = th; - /* - * Designate the next gvl.timer thread, favor the last thread in - * the waitq since it will be in waitq longest - */ if (!vm->gvl.timer) { - native_thread_data_t *last; - - last = list_tail(&vm->gvl.waitq, native_thread_data_t, ubf_list); - if (last) { - rb_native_cond_signal(&last->cond.gvlq); - } - else if (!ubf_threads_empty()) { + if (!designate_timer_thread(vm) && !ubf_threads_empty()) { rb_thread_wakeup_timer_thread(0); } } @@ -181,6 +244,7 @@ gvl_yield(rb_vm_t *vm, rb_thread_t *th) https://github.com/ruby/ruby/blob/trunk/thread_pthread.c#L244 { native_thread_data_t *next; + ubf_wakeup_all_threads(); rb_native_mutex_lock(&vm->gvl.lock); next = gvl_release_common(vm); @@ -200,12 +264,6 @@ gvl_yield(rb_vm_t *vm, rb_thread_t *th) https://github.com/ruby/ruby/blob/trunk/thread_pthread.c#L264 } else { rb_native_mutex_unlock(&vm->gvl.lock); - /* - * GVL was not contended when we released, so we have no potential - * contenders for reacquisition. Perhaps they are stuck in blocking - * region w/o GVL, too, so we kick them: - */ - ubf_wakeup_all_threads(); native_thread_yield(); rb_native_mutex_lock(&vm->gvl.lock); rb_native_cond_broadcast(&vm->gvl.switch_wait_cond); @@ -427,8 +485,7 @@ native_cond_timeout(rb_nativethread_cond https://github.com/ruby/ruby/blob/trunk/thread_pthread.c#L485 #define native_cleanup_push pthread_cleanup_push #define native_cleanup_pop pthread_cleanup_pop -#if defined(SIGVTALRM) && !defined(__CYGWIN__) -#define USE_UBF_LIST 1 +#if defined(USE_UBF_LIST) static rb_nativethread_lock_t ubf_list_lock; #endif @@ -1189,7 +1246,10 @@ unregister_ubf_list(rb_thread_t *th) https://github.com/ruby/ruby/blob/trunk/thread_pthread.c#L1246 if (!list_empty((struct list_head*)node)) { rb_native_mutex_lock(&ubf_list_lock); - list_del_init(node); + list_del_init(node); + if (list_empty(&ubf_list_head) && !rb_signal_buff_size()) { + rb_timer_disarm(); + } rb_native_mutex_unlock(&ubf_list_lock); } } @@ -1222,13 +1282,7 @@ ubf_select(void *ptr) https://github.com/ruby/ruby/blob/trunk/thread_pthread.c#L1282 */ rb_native_mutex_lock(&vm->gvl.lock); if (!vm->gvl.timer) { - native_thread_data_t *last; - - last = list_tail(&vm->gvl.waitq, native_thread_data_t, ubf_list); - if (last) { - rb_native_cond_signal(&last->cond.gvlq); - } - else { + if (!designate_timer_thread(vm)) { rb_thread_wakeup_timer_thread(0); } } @@ -1310,18 +1364,54 @@ rb_thread_wakeup_timer_thread_fd(int fd) https://github.com/ruby/ruby/blob/trunk/thread_pthread.c#L1364 } } +static void +rb_timer_arm(rb_pid_t current) /* async signal safe */ +{ +#if UBF_TIMER == UBF_TIMER_POSIX + if (timer_posix.owner == current && !ATOMIC_CAS(timer_posix.armed, 0, 1)) { + struct itimerspec it; + + it.it_interval.tv_sec = it.it_value.tv_sec = 0; + it.it_interval.tv_nsec = it.it_value.tv_nsec = TIME_QUANTUM_NSEC; + + if (timer_settime(timer_posix.timerid, 0, &it, 0)) + rb_async_bug_errno("timer_settime (arm)", errno); + + switch (ATOMIC_CAS(timer_posix.armed, 1, 2)) { + case 0: + /* somebody requested a disarm while we were arming */ + it.it_interval.tv_nsec = it.it_value.tv_nsec = 0; + if (timer_settime(timer_posix.timerid, 0, &it, 0)) + rb_async_bug_errno("timer_settime (disarm)", errno); + + case 1: return; /* success */ + case 2: + rb_async_bug_errno("UBF_TIMER_POSIX state 2 unexpected", EINVAL); + default: + rb_async_bug_errno("UBF_TIMER_POSIX unknown state", ERANGE); + } + } +#elif UBF_TIMER == UBF_TIMER_PTHREAD + if (current == timer_pthread.owner) { + if (ATOMIC_EXCHANGE(timer_pthread.armed, 1) == 0) + rb_thread_wakeup_timer_thread_fd(timer_pthread.low[1]); + } +#endif +} + void rb_thread_wakeup_timer_thread(int sig) { /* must be safe inside sighandler, so no mutex */ - if (timer_thread_pipe.owner_process == getpid()) { + rb_pid_t current = getpid(); + if (timer_thread_pipe.owner_process == current) { rb_thread_wakeup_timer_thread_fd(timer_thread_pipe.normal[1]); /* * system_working check is required because vm and main_thread are * freed during shutdown */ - if (sig && system_working) { + if (sig && system_working > 0) { volatile rb_execution_context_t *ec; rb_vm_t *vm = GET_VM(); rb_thread_t *mth; @@ -1332,18 +1422,24 @@ rb_thread_wakeup_timer_thread(int sig) https://github.com/ruby/ruby/blob/trunk/thread_pthread.c#L1422 */ if (!vm) return; mth = vm->main_thread; - if (!mth || !system_working) return; + if (!mth || system_working <= 0) return; /* this relies on GC for grace period before cont_free */ ec = ACCESS_ONCE(rb_execution_context_t *, mth->ec); - if (ec) RUBY_VM_SET_TRAP_INTERRUPT(ec); + if (ec) { + RUBY_VM_SET_TRAP_INTERRUPT(ec); + rb_timer_arm(current); + } } + else if (sig == 0 && system_working > 0) { + rb_timer_arm(current); + } } } #define CLOSE_INVALIDATE(expr) \ - close_invalidate(&timer_thread_pipe.expr,"close_invalidate: "#expr) + close_invalidate(&expr,"close_invalidate: "#expr) static void close_invalidate(int *fdp, const char *msg) { @@ -1446,6 +1542,52 @@ native_set_another_thread_name(rb_native https://github.com/ruby/ruby/blob/trunk/thread_pthread.c#L1542 } static void +rb_timer_invalidate(void) +{ +#if UBF_TIMER == UBF_TIMER_PTHREAD + CLOSE_INVALIDATE(timer_pthread.low[0]); + CLOSE_INVALIDATE(timer_pthread.low[1]); +#endif +} + +static void +rb_timer_pthread_create(rb_pid_t current) +{ +#if UBF_TIMER == UBF_TIMER_PTHREAD + int err; + if (timer_pthread.owner == current) + return; + + if (setup_communication_pipe_internal(timer_pthread.low) < 0) + return; + + err = pthread_create(&timer_pthread.thid, 0, timer_pthread_fn, GET_VM()); + if (!err) + timer_pthread.owner = current; + else + rb_warn("pthread_create failed for timer: %s, signals racy", + strerror(err)); +#endif +} + +static void +rb_timer_create(rb_pid_t current) +{ +#if UBF_TIMER == UBF_TIMER_POSIX + struct sigevent sev; + + sev.sigev_notify = SIGEV_SIGNAL; + sev.sigev_signo = SIGVTALRM; + sev.sigev_value.sival_ptr = &timer_posix; + if (!timer_create(CLOCK_MONOTONIC, &sev, &timer_posix.timerid)) + timer_posix.owner = current; + else + rb_warn("timer_create failed: %s, signals racy", strerror(errno)); +#endif + rb_timer_pthread_create(current); +} + +static void rb_thread_create_timer_thread(void) { /* we only create the pipe, and lazy-spawn */ @@ -1453,17 +1595,65 @@ rb_thread_create_timer_thread(void) https://github.com/ruby/ruby/blob/trunk/thread_pthread.c#L1595 rb_pid_t owner = timer_thread_pipe.owner_process; if (owner && owner != current) { - CLOSE_INVALIDATE(normal[0]); - CLOSE_INVALIDATE(normal[1]); + CLOSE_INVALIDATE(timer_thread_pipe.normal[0]); + CLOSE_INVALIDATE(timer_thread_pipe.normal[1]); + rb_timer_invalidate(); } if (setup_communication_pipe_internal(timer_thread_pipe.normal) < 0) return; if (owner != current) { /* validate pipe on this process */ + rb_timer_create(current); sigwait_th = THREAD_INVALID; timer_thread_pipe.owner_process = current; } + else { + /* UBF_TIMER_PTHREAD needs to recreate after fork */ + rb_timer_pthread_create(current); + } +} + +static void +rb_timer_disarm(void) +{ +#if UBF_TIMER == UBF_TIMER_POSIX + static const struct itimerspec zero; + rb_atomic_t armed = ATOMIC_EXCHANGE(timer_posix.armed, 0); + + if (LIKELY(armed) == 0) return; + switch (armed) { + case 1: return; /* rb_timer_arm was arming and will disarm itself */ + case 2: + if (timer_settime(timer_posix.timerid, 0, &zero, 0)) + rb_bug_errno("timer_settime (disarm)", errno); + return; + default: + rb_bug("UBF_TIMER_POSIX bad state: %u\n", (unsigned)armed); + } +#elif UBF_TIMER == UBF_TIMER_PTHREAD + ATOMIC_SET(timer_pthread.armed, 0); +#endif +} + +static void +rb_timer_destroy(void) +{ +#if UBF_TIMER == UBF_TIMER_PTHREAD + rb_pid_t current = getpid(); + if (timer_pthread.owner == current) { + int err; + + timer_pthread.owner = 0; + rb_timer_disarm(); + rb_thread_wakeup_timer_thread_fd(timer_pthread.low[1]); + err = pthread_join(timer_pthread.thid, 0); + if (err) { + rb_raise(rb_eThreadError, "native_thread_join() failed (%d)", err); + } + } +#endif +/* no need to destroy real POSIX timers */ } static int @@ -1471,6 +1661,8 @@ native_stop_timer_thread(void) https://github.com/ruby/ruby/blob/trunk/thread_pthread.c#L1661 { int stopped; stopped = --system_working <= 0; + if (stopped) + rb_timer_destroy(); if (TT_DEBUG) fprintf(stderr, "stop timer thread\n"); return stopped; @@ -1529,14 +1721,18 @@ ruby_stack_overflowed_p(const rb_thread_ https://github.com/ruby/ruby/blob/trunk/thread_pthread.c#L1721 int rb_reserved_fd_p(int fd) { - if ((fd == timer_thread_pipe.normal[0] || - fd == timer_thread_pipe.normal[1]) && - timer_thread_pipe.owner_process == getpid()) { /* async-signal-safe */ +#if UBF_TIMER == UBF_TIMER_PTHREAD + if (fd == timer_pthread.low[0] || fd == timer_pthread.low[1]) + goto check_pid; +#endif + if (fd == timer_thread_pipe.normal[0] || fd == timer_thread_pipe.normal[1]) + goto check_pid; + + return 0; +check_pid: + if (timer_thread_pipe.owner_process == getpid()) /* async-signal-safe */ return 1; - } - else { - return 0; - } + return 0; } rb_nativethread_id_t @@ -1600,8 +1796,17 @@ rb_sleep_cond_put(rb_nativethread_cond_t https://github.com/ruby/ruby/blob/trunk/thread_pthread.c#L1796 int rb_sigwait_fd_get(const rb_thread_t *th) { - if (timer_thread_pipe.owner_process == getpid() && + rb_pid_t current = getpid(); + + if (timer_thread_pipe.owner_process == current && timer_thread_pipe.normal[0] >= 0) { + + /* + * no need to keep firing the timer if any thread is sleeping + * on the signal self-pipe + */ + rb_timer_disarm(); + if (ATOMIC_PTR_CAS(sigwait_th, THREAD_INVALID, th) == THREAD_INVALID) { return timer_thread_pipe.normal[0]; } @@ -1719,4 +1924,37 @@ native_sleep(rb_thread_t *th, struct tim https://github.com/ruby/ruby/blob/trunk/thread_pthread.c#L1924 native_cond_sleep(th, timeout_rel); } } + +#if UBF_TIMER == UBF_TIMER_PTHREAD +static void * +timer_pthread_fn(void *p) +{ + rb_vm_t *vm = p; + pthread_t main_thread_id = vm->main_thread->thread_id; + struct pollfd pfd; + int timeout = -1; + + pfd.fd = timer_pthread.low[0]; + pfd.events = POLLIN; + + while (system_working > 0) { + (void)poll(&pfd, 1, timeout); + (void)consume_communication_pipe(pfd.fd); + + if (system_working > 0 && ATOMIC_CAS(timer_pthread.armed, 1, 1)) { + pthread_kill(main_thread_id, SIGVTALRM); + + if (rb_signal_buff_size() || !ubf_threads_empty()) { + timeout = TIME_QUANTUM_MSEC; + } + else { + ATOMIC_SET(timer_pthread.armed, 0); + timeout = -1; + } + } + } + + return 0; +} +#endif /* UBF_TIMER_PTHREAD */ #endif /* THREAD_SYSTEM_DEPENDENT_IMPLEMENTATION */ Index: configure.ac =================================================================== --- configure.ac (revision 64352) +++ configure.ac (revision 64353) @@ -1998,6 +1998,14 @@ AS_IF([test x"$ac_cv_func_clock_gettime" https://github.com/ruby/ruby/blob/trunk/configure.ac#L1998 ]) ]) AC_CHECK_FUNCS(clock_getres) # clock_getres should be tested after clock_gettime test including librt test. +AC_CHECK_LIB([rt], [timer_create]) +AC_CHECK_LIB([rt], [timer_settime]) +AS_IF([test x"$ac_cv_lib_rt_timer_create" = xyes], [ + AC_DEFINE(HAVE_TIMER_CREATE, 1) +]) +AS_IF([test x"$ac_cv_lib_rt_timer_settime" = xyes], [ + AC_DEFINE(HAVE_TIMER_SETTIME, 1) +]) AC_CACHE_CHECK(for unsetenv returns a value, rb_cv_unsetenv_return_value, [AC_TRY_COMPILE([ -- ML: ruby-changes@q... Info: http://www.atdot.net/~ko1/quickml/