ruby-changes:20016
From: kosaki <ko1@a...>
Date: Mon, 13 Jun 2011 23:15:01 +0900 (JST)
Subject: [ruby-changes:20016] kosaki:r32064 (trunk): * thread_pthread.c: rewrite GVL completely.
kosaki 2011-06-13 23:14:53 +0900 (Mon, 13 Jun 2011) New Revision: 32064 http://svn.ruby-lang.org/cgi-bin/viewvc.cgi?view=rev&revision=32064 Log: * thread_pthread.c: rewrite GVL completely. * thread_win32.c: ditto. * thread_pthread.h: ditto. * vm_core.h: ditto. * thread.c: ditto. Modified files: trunk/ChangeLog trunk/thread.c trunk/thread_pthread.c trunk/thread_pthread.h trunk/thread_win32.c trunk/vm_core.h Index: thread_win32.c =================================================================== --- thread_win32.c (revision 32063) +++ thread_win32.c (revision 32064) @@ -13,7 +13,7 @@ #include <process.h> -#define WIN32_WAIT_TIMEOUT 10 /* 10 ms */ +#define TIME_QUANTUM_USEC (100 * 1000) #define RB_CONDATTR_CLOCK_MONOTONIC 1 /* no effect */ #undef Sleep @@ -680,7 +680,7 @@ timer_thread_func(void *dummy) { thread_debug("timer_thread\n"); - while (WaitForSingleObject(timer_thread_lock, WIN32_WAIT_TIMEOUT) == + while (WaitForSingleObject(timer_thread_lock, TIME_QUANTUM_USEC/1000) == WAIT_TIMEOUT) { timer_thread_function(dummy); } Index: ChangeLog =================================================================== --- ChangeLog (revision 32063) +++ ChangeLog (revision 32064) @@ -1,3 +1,11 @@ +Mon Jun 13 23:06:12 2011 KOSAKI Motohiro <kosaki.motohiro@g...> + + * thread_pthread.c: rewrite GVL completely. + * thread_win32.c: ditto. + * thread_pthread.h: ditto. + * vm_core.h: ditto. + * thread.c: ditto. + Mon Jun 13 23:11:52 2011 Tanaka Akira <akr@f...> * test/socket/test_unix.rb: don't use Thread.abort_on_exception. Index: thread_pthread.c =================================================================== --- thread_pthread.c (revision 32063) +++ thread_pthread.c (revision 32064) @@ -37,92 +37,79 @@ #define USE_MONOTONIC_COND 0 #endif -#define GVL_SIMPLE_LOCK 0 #define GVL_DEBUG 0 static void -gvl_show_waiting_threads(rb_vm_t *vm) +__gvl_acquire(rb_vm_t *vm) { - rb_thread_t *th = vm->gvl.waiting_threads; - int i = 0; - while (th) { - fprintf(stderr, "waiting (%d): %p\n", i++, (void *)th); - th = th->native_thread_data.gvl_next; + + if (vm->gvl.acquired) { + vm->gvl.waiting++; + while (vm->gvl.acquired) { + native_cond_wait(&vm->gvl.cond, &vm->gvl.lock); + } + vm->gvl.waiting--; + + if (vm->gvl.need_yield) { + vm->gvl.need_yield = 0; + native_cond_signal(&vm->gvl.switch_cond); + } } + + vm->gvl.acquired = 1; } -#if !GVL_SIMPLE_LOCK static void -gvl_waiting_push(rb_vm_t *vm, rb_thread_t *th) +gvl_acquire(rb_vm_t *vm, rb_thread_t *th) { - th->native_thread_data.gvl_next = 0; - - if (vm->gvl.waiting_threads) { - vm->gvl.waiting_last_thread->native_thread_data.gvl_next = th; - vm->gvl.waiting_last_thread = th; - } - else { - vm->gvl.waiting_threads = th; - vm->gvl.waiting_last_thread = th; - } - th = vm->gvl.waiting_threads; - vm->gvl.waiting++; + native_mutex_lock(&vm->gvl.lock); + __gvl_acquire(vm); + native_mutex_unlock(&vm->gvl.lock); } static void -gvl_waiting_shift(rb_vm_t *vm, rb_thread_t *th) +__gvl_release(rb_vm_t *vm) { - vm->gvl.waiting_threads = vm->gvl.waiting_threads->native_thread_data.gvl_next; - vm->gvl.waiting--; + vm->gvl.acquired = 0; + if (vm->gvl.waiting > 0) + native_cond_signal(&vm->gvl.cond); } -#endif static void -gvl_acquire(rb_vm_t *vm, rb_thread_t *th) +gvl_release(rb_vm_t *vm) { -#if GVL_SIMPLE_LOCK native_mutex_lock(&vm->gvl.lock); -#else - native_mutex_lock(&vm->gvl.lock); - if (vm->gvl.waiting > 0 || vm->gvl.acquired != 0) { - if (GVL_DEBUG) fprintf(stderr, "gvl acquire (%p): sleep\n", (void *)th); - gvl_waiting_push(vm, th); - if (GVL_DEBUG) gvl_show_waiting_threads(vm); - - while (vm->gvl.acquired != 0 || vm->gvl.waiting_threads != th) { - native_cond_wait(&th->native_thread_data.gvl_cond, &vm->gvl.lock); - } - gvl_waiting_shift(vm, th); - } - else { - /* do nothing */ - } - vm->gvl.acquired = 1; + __gvl_release(vm); native_mutex_unlock(&vm->gvl.lock); -#endif - if (GVL_DEBUG) gvl_show_waiting_threads(vm); - if (GVL_DEBUG) fprintf(stderr, "gvl acquire (%p): acquire\n", (void *)th); } +#define HAVE_GVL_YIELD 1 static void -gvl_release(rb_vm_t *vm) +gvl_yield(rb_vm_t *vm, rb_thread_t *th) { -#if GVL_SIMPLE_LOCK - native_mutex_unlock(&vm->gvl.lock); -#else native_mutex_lock(&vm->gvl.lock); - if (vm->gvl.waiting > 0) { - rb_thread_t *th = vm->gvl.waiting_threads; - if (GVL_DEBUG) fprintf(stderr, "gvl release (%p): wakeup: %p\n", (void *)GET_THREAD(), (void *)th); - native_cond_signal(&th->native_thread_data.gvl_cond); + + /* An another thread is processing GVL yield. */ + if (vm->gvl.need_yield) { + native_mutex_unlock(&vm->gvl.lock); + return; } - else { - if (GVL_DEBUG) fprintf(stderr, "gvl release (%p): wakeup: %p\n", (void *)GET_THREAD(), NULL); - /* do nothing */ + + if (vm->gvl.waiting > 0) + vm->gvl.need_yield = 1; + + __gvl_release(vm); + if (vm->gvl.need_yield) { + /* Wait until another thread task take GVL. */ + native_cond_wait(&vm->gvl.switch_cond, &vm->gvl.lock); + } else { + native_mutex_unlock(&vm->gvl.lock); + sched_yield(); + native_mutex_lock(&vm->gvl.lock); } - vm->gvl.acquired = 0; + + __gvl_acquire(vm); native_mutex_unlock(&vm->gvl.lock); -#endif } static void @@ -130,15 +117,12 @@ { if (GVL_DEBUG) fprintf(stderr, "gvl init\n"); -#if GVL_SIMPLE_LOCK native_mutex_initialize(&vm->gvl.lock); -#else - native_mutex_initialize(&vm->gvl.lock); - vm->gvl.waiting_threads = 0; - vm->gvl.waiting_last_thread = 0; - vm->gvl.waiting = 0; + native_cond_initialize(&vm->gvl.cond, RB_CONDATTR_CLOCK_MONOTONIC); + native_cond_initialize(&vm->gvl.switch_cond, RB_CONDATTR_CLOCK_MONOTONIC); vm->gvl.acquired = 0; -#endif + vm->gvl.waiting = 0; + vm->gvl.need_yield = 0; } static void @@ -990,6 +974,11 @@ static rb_thread_cond_t timer_thread_cond; static pthread_mutex_t timer_thread_lock = PTHREAD_MUTEX_INITIALIZER; +/* 100ms. 10ms is too small for user level thread scheduling + * on recent Linux (tested on 2.6.35) + */ +#define TIME_QUANTUM_USEC (100 * 1000) + static void * thread_timer(void *dummy) { @@ -997,7 +986,7 @@ struct timespec timeout; timeout_10ms.tv_sec = 0; - timeout_10ms.tv_nsec = 10 * 1000 * 1000; + timeout_10ms.tv_nsec = TIME_QUANTUM_USEC * 1000; native_mutex_lock(&timer_thread_lock); native_cond_broadcast(&timer_thread_cond); Index: thread_pthread.h =================================================================== --- thread_pthread.h (revision 32063) +++ thread_pthread.h (revision 32064) @@ -35,11 +35,17 @@ #include <semaphore.h> typedef struct rb_global_vm_lock_struct { + /* fast path */ + unsigned long acquired; pthread_mutex_t lock; - struct rb_thread_struct * volatile waiting_threads; - struct rb_thread_struct *waiting_last_thread; - int waiting; - int volatile acquired; + + /* slow path */ + unsigned long waiting; + rb_thread_cond_t cond; + + /* yield */ + rb_thread_cond_t switch_cond; + unsigned long need_yield; } rb_global_vm_lock_t; #endif /* RUBY_THREAD_PTHREAD_H */ Index: vm_core.h =================================================================== --- vm_core.h (revision 32063) +++ vm_core.h (revision 32064) @@ -419,7 +419,6 @@ rb_thread_id_t thread_id; enum rb_thread_status status; int priority; - int slice; native_thread_data_t native_thread_data; void *blocking_region_buffer; @@ -484,6 +483,7 @@ #ifdef USE_SIGALTSTACK void *altstack; #endif + unsigned long running_time_us; } rb_thread_t; /* iseq.c */ @@ -673,6 +673,9 @@ #define GET_THREAD() ruby_current_thread #define rb_thread_set_current_raw(th) (void)(ruby_current_thread = (th)) #define rb_thread_set_current(th) do { \ + if ((th)->vm->running_thread != (th)) { \ + (th)->vm->running_thread->running_time_us = 0; \ + } \ rb_thread_set_current_raw(th); \ (th)->vm->running_thread = (th); \ } while (0) Index: thread.c =================================================================== --- thread.c (revision 32063) +++ thread.c (revision 32064) @@ -1015,7 +1015,7 @@ static void rb_threadptr_execute_interrupts_rec(rb_thread_t *, int); static void -rb_thread_schedule_rec(int sched_depth) +rb_thread_schedule_rec(int sched_depth, unsigned long limits_us) { thread_debug("rb_thread_schedule\n"); if (!rb_thread_alone()) { @@ -1024,11 +1024,19 @@ thread_debug("rb_thread_schedule/switch start\n"); RB_GC_SAVE_MACHINE_CONTEXT(th); + +#if HAVE_GVL_YIELD + { + if (th->running_time_us >= limits_us) + gvl_yield(th->vm, th); + } +#else gvl_release(th->vm); { native_thread_yield(); } gvl_acquire(th->vm, th); +#endif rb_thread_set_current(th); thread_debug("rb_thread_schedule/switch done\n"); @@ -1042,7 +1050,7 @@ void rb_thread_schedule(void) { - rb_thread_schedule_rec(0); + rb_thread_schedule_rec(0, 0); } /* blocking region */ @@ -1333,23 +1341,20 @@ } if (!sched_depth && timer_interrupt) { - sched_depth++; + unsigned long limits_us = 250 * 1000; + + if (th->priority > 0) + limits_us <<= th->priority; + else + limits_us >>= -th->priority; + + if (status == THREAD_RUNNABLE) + th->running_time_us += TIME_QUANTUM_USEC; + + sched_depth++; EXEC_EVENT_HOOK(th, RUBY_EVENT_SWITCH, th->cfp->self, 0, 0); - if (th->slice > 0) { - th->slice--; - } - else { - reschedule: - rb_thread_schedule_rec(sched_depth+1); - if (th->slice < 0) { - th->slice++; - goto reschedule; - } - else { - th->slice = th->priority; - } - } + rb_thread_schedule_rec(sched_depth+1, limits_us); } } } @@ -2293,7 +2298,6 @@ priority = RUBY_THREAD_PRIORITY_MIN; } th->priority = priority; - th->slice = priority; #endif return INT2NUM(th->priority); } -- ML: ruby-changes@q... Info: http://www.atdot.net/~ko1/quickml/