[前][次][番号順一覧][スレッド一覧]

ruby-changes:20196

From: ko1 <ko1@a...>
Date: Mon, 27 Jun 2011 09:30:58 +0900 (JST)
Subject: [ruby-changes:20196] ko1:r32244 (trunk): * thread_pthread.c: Stop polling in the timer thread when there are

ko1	2011-06-27 09:30:41 +0900 (Mon, 27 Jun 2011)

  New Revision: 32244

  http://svn.ruby-lang.org/cgi-bin/viewvc.cgi?view=rev&revision=32244

  Log:
    * thread_pthread.c: Stop polling in the timer thread when there are
      no waiting thread.  If there are 2 or more runnable threads,
      the timer thread does polling.  Avoid polling makes power save
      for several computers (0.2W per a Ruby process, when I measured).
      If outside-event such as signal or Thread#kill was occuerred
      when the timer thread does not do polling, then wake-up
      the timer thread using communication-pipe (the timer thread
      waits this communication-pipe with select(2)).
      The discussion about this modification can be found from the post
      [ruby-core:33456] and other related posts.
      Note that Eric Wong and KOSAKI Motohiro give us the huge
      contributions for this modification.  Thanks.
    * thread_pthread.c (rb_thread_wakeup_timer_thread): add a function.
      This function wakes up the timer thread using communication-pipe.
    * thread.c (rb_thread_stop_timer_thread): add a parameter which
      specify closing communication-pipe or not.
    * thread.c (rb_thread_terminate_all): do not stop timer thread here
      (ruby_cleanup() terminate timer thread).
    * signal.c: wake up timer thread using
      rb_thread_wakeup_timer_thread() from signal handler.
    * eval.c (ruby_cleanup): use rb_thread_stop_timer_thread(1).
    * process.c: use rb_thread_stop_timer_thread(0)
      (reuse communication-pipe).
    * thread_win32.c (rb_thread_wakeup_timer_thread): add a dummy
      function.
    * vm_core.h: add and fix decl. of functions.

  Modified files:
    trunk/ChangeLog
    trunk/eval.c
    trunk/process.c
    trunk/signal.c
    trunk/thread.c
    trunk/thread_pthread.c
    trunk/thread_win32.c
    trunk/vm_core.h

Index: thread_win32.c
===================================================================
--- thread_win32.c	(revision 32243)
+++ thread_win32.c	(revision 32244)
@@ -725,6 +725,12 @@
     return 0;
 }
 
+void
+rb_thread_wakeup_timer_thread(void)
+{
+    /* do nothing */
+}
+
 static void
 rb_thread_create_timer_thread(void)
 {
Index: ChangeLog
===================================================================
--- ChangeLog	(revision 32243)
+++ ChangeLog	(revision 32244)
@@ -1,3 +1,40 @@
+Mon Jun 27 09:07:42 2011  Koichi Sasada  <ko1@a...>
+
+	* thread_pthread.c: Stop polling in the timer thread when there are
+	  no waiting thread.  If there are 2 or more runnable threads,
+	  the timer thread does polling.  Avoid polling makes power save
+	  for several computers (0.2W per a Ruby process, when I measured).
+	  If outside-event such as signal or Thread#kill was occuerred
+	  when the timer thread does not do polling, then wake-up
+	  the timer thread using communication-pipe (the timer thread
+	  waits this communication-pipe with select(2)).
+	  The discussion about this modification can be found from the post
+	  [ruby-core:33456] and other related posts.
+	  Note that Eric Wong and KOSAKI Motohiro give us the huge
+	  contributions for this modification.  Thanks.
+
+	* thread_pthread.c (rb_thread_wakeup_timer_thread): add a function.
+	  This function wakes up the timer thread using communication-pipe.
+
+	* thread.c (rb_thread_stop_timer_thread): add a parameter which
+	  specify closing communication-pipe or not.
+
+	* thread.c (rb_thread_terminate_all): do not stop timer thread here
+	  (ruby_cleanup() terminate timer thread).
+
+	* signal.c: wake up timer thread using
+	  rb_thread_wakeup_timer_thread() from signal handler.
+
+	* eval.c (ruby_cleanup): use rb_thread_stop_timer_thread(1).
+
+	* process.c: use rb_thread_stop_timer_thread(0)
+	  (reuse communication-pipe).
+
+	* thread_win32.c (rb_thread_wakeup_timer_thread): add a dummy
+	  function.
+
+	* vm_core.h: add and fix decl. of functions.
+
 Mon Jun 27 08:01:19 2011  Tadayoshi Funaba  <tadf@d...>
 
 	* ext/date/date_parse.c: should use ALLOCA_N.
Index: thread_pthread.c
===================================================================
--- thread_pthread.c	(revision 32243)
+++ thread_pthread.c	(revision 32244)
@@ -19,6 +19,11 @@
 #ifdef HAVE_THR_STKSEGMENT
 #include <thread.h>
 #endif
+#if HAVE_FCNTL_H
+#include <fcntl.h>
+#elif HAVE_SYS_FCNTL_H
+#include <sys/fcntl.h>
+#endif
 
 static void native_mutex_lock(pthread_mutex_t *lock);
 static void native_mutex_unlock(pthread_mutex_t *lock);
@@ -44,10 +49,17 @@
 __gvl_acquire(rb_vm_t *vm)
 {
     if (vm->gvl.acquired) {
+
 	vm->gvl.waiting++;
+	if (vm->gvl.waiting == 1) {
+	    /* transit to polling mode */
+	    rb_thread_wakeup_timer_thread();
+	}
+
 	while (vm->gvl.acquired) {
 	    native_cond_wait(&vm->gvl.cond, &vm->gvl.lock);
 	}
+
 	vm->gvl.waiting--;
 
 	if (vm->gvl.need_yield) {
@@ -973,47 +985,125 @@
 #endif /* USE_SIGNAL_THREAD_LIST */
 
 static pthread_t timer_thread_id;
-static rb_thread_cond_t timer_thread_cond;
-static pthread_mutex_t timer_thread_lock = PTHREAD_MUTEX_INITIALIZER;
+static int timer_thread_pipe[2] = {-1, -1};
+static int timer_thread_pipe_owner_process;
 
+#define TT_DEBUG 0
+
+void
+rb_thread_wakeup_timer_thread(void)
+{
+    int result;
+
+    /* already opened */
+    if (timer_thread_pipe_owner_process == getpid()) {
+	const char *buff = "!";
+      retry:
+	if ((result = write(timer_thread_pipe[1], buff, 1)) <= 0) {
+	    switch (errno) {
+	      case EINTR: goto retry;
+	      case EAGAIN:
+#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
+	      case EWOULDBLOCK:
+#endif
+		break;
+	      default:
+		rb_bug_errno("rb_thread_wakeup_timer_thread - write", errno);
+	    }
+	}
+	if (TT_DEBUG) fprintf(stderr, "rb_thread_wakeup_timer_thread: write\n");
+    }
+    else {
+	/* ignore wakeup */
+    }
+}
+
+static int
+consume_communication_pipe(void)
+{
+    const size_t buff_size = 1024;
+    char buff[buff_size];
+    int result;
+  retry:
+    result = read(timer_thread_pipe[0], buff, buff_size);
+    if (result < 0) {
+	switch (errno) {
+	  case EINTR: goto retry;
+	  default:
+	    rb_bug_errno("consume_communication_pipe: read", errno);
+	}
+    }
+    return result;
+}
+
+static void
+close_communication_pipe(void)
+{
+    if (close(timer_thread_pipe[0]) < 0) {
+	rb_bug_errno("native_stop_timer_thread - close(ttp[0])", errno);
+    }
+    if (close(timer_thread_pipe[1]) < 0) {
+	rb_bug_errno("native_stop_timer_thread - close(ttp[1])", errno);
+    }
+    timer_thread_pipe[0] = timer_thread_pipe[1] = -1;
+}
+
 /* 100ms.  10ms is too small for user level thread scheduling
  * on recent Linux (tested on 2.6.35)
  */
 #define TIME_QUANTUM_USEC (100 * 1000)
 
 static void *
-thread_timer(void *dummy)
+thread_timer(void *p)
 {
-    struct timespec time_quantum;
-    struct timespec timeout;
+    rb_global_vm_lock_t *gvl = (rb_global_vm_lock_t *)p;
+    int result;
+    int len;
+    struct timeval timeout;
 
-    time_quantum.tv_sec = 0;
-    time_quantum.tv_nsec = TIME_QUANTUM_USEC * 1000;
+    if (TT_DEBUG) fprintf(stderr, "start timer thread\n");
 
-    native_mutex_lock(&timer_thread_lock);
-    native_cond_broadcast(&timer_thread_cond);
-    timeout = native_cond_timeout(&timer_thread_cond, time_quantum);
-
     while (system_working > 0) {
-	int err;
+	fd_set rfds;
 
-	err = native_cond_timedwait(&timer_thread_cond, &timer_thread_lock,
-				    &timeout);
-	if (err == 0) {
-	    /*
-	     * Spurious wakeup or native_stop_timer_thread() was called.
-	     * We need to recheck a system_working state.
-	     */
+	/* timer function */
+	ping_signal_thread_list();
+	timer_thread_function(0);
+	if (TT_DEBUG) fprintf(stderr, "tick\n");
+
+	/* wait */
+	FD_ZERO(&rfds);
+	FD_SET(timer_thread_pipe[0], &rfds);
+
+	if (gvl->waiting > 0) {
+	    timeout.tv_sec = 0;
+	    timeout.tv_usec = TIME_QUANTUM_USEC;
+
+	    /* polling (TIME_QUANTUM_USEC usec) */
+	    result = select(timer_thread_pipe[0] + 1, &rfds, 0, 0, &timeout); 
 	}
-	else if (err == ETIMEDOUT) {
-	    ping_signal_thread_list();
-	    timer_thread_function(dummy);
-	    timeout = native_cond_timeout(&timer_thread_cond, time_quantum);
+	else {
+	    /* wait (infinite) */
+	    result = select(timer_thread_pipe[0] + 1, &rfds, 0, 0, 0); 
 	}
-	else
-	    rb_bug_errno("thread_timer/timedwait", err);
+
+	if (result == 0) {
+	    /* maybe timeout */
+	}
+	else if (result > 0) {
+	    len = consume_communication_pipe();
+	}
+	else { /* result < 0 */
+	    if (errno == EINTR) {
+		/* interrupted. ignore */
+	    }
+	    else {
+		rb_bug_errno("thread_timer: select", errno);
+	    }
+	}
     }
-    native_mutex_unlock(&timer_thread_lock);
+
+    if (TT_DEBUG) fprintf(stderr, "finish timer thread\n");
     return NULL;
 }
 
@@ -1027,36 +1117,78 @@
 	int err;
 
 	pthread_attr_init(&attr);
-	native_cond_initialize(&timer_thread_cond, RB_CONDATTR_CLOCK_MONOTONIC);
 #ifdef PTHREAD_STACK_MIN
 	pthread_attr_setstacksize(&attr,
 				  PTHREAD_STACK_MIN + (THREAD_DEBUG ? BUFSIZ : 0));
 #endif
-	native_mutex_lock(&timer_thread_lock);
-	err = pthread_create(&timer_thread_id, &attr, thread_timer, 0);
+
+	/* communication pipe with timer thread and signal handler */
+	if (timer_thread_pipe_owner_process != getpid()) {
+	    if (timer_thread_pipe[0] != -1) {
+		/* close pipe of parent process */
+		close_communication_pipe();
+	    }
+
+	    err = pipe(timer_thread_pipe);
+	    if (err != 0) {
+		rb_bug_errno("thread_timer: Failed to create communication pipe for timer thread", errno);
+	    }
+#if defined(HAVE_FCNTL) && defined(F_GETFL) && defined(F_SETFL)
+	    {
+		int oflags;
+#if defined(O_NONBLOCK)
+		oflags |= O_NONBLOCK;
+		fcntl(timer_thread_pipe[1], F_SETFL, oflags);
+#endif /* defined(O_NONBLOCK) */
+#if defined(FD_CLOEXEC)
+		oflags = fcntl(timer_thread_pipe[0], F_GETFD);
+		fcntl(timer_thread_pipe[0], F_SETFD, oflags | FD_CLOEXEC);
+		oflags = fcntl(timer_thread_pipe[1], F_GETFD);
+		fcntl(timer_thread_pipe[1], F_SETFD, oflags | FD_CLOEXEC);
+#endif /* defined(FD_CLOEXEC) */
+	    }
+#endif /* defined(HAVE_FCNTL) && defined(F_GETFL) && defined(F_SETFL) */
+
+	    /* validate pipe on this process */
+	    timer_thread_pipe_owner_process = getpid();
+	}
+
+	/* create timer thread */
+	if (timer_thread_id) {
+	    rb_bug("rb_thread_create_timer_thread: Timer thread was already created\n");
+	}
+	err = pthread_create(&timer_thread_id, &attr, thread_timer, &GET_VM()->gvl);
 	if (err != 0) {
-	    native_mutex_unlock(&timer_thread_lock);
 	    fprintf(stderr, "[FATAL] Failed to create timer thread (errno: %d)\n", err);
 	    exit(EXIT_FAILURE);
 	}
-	native_cond_wait(&timer_thread_cond, &timer_thread_lock);
-	native_mutex_unlock(&timer_thread_lock);
     }
+
     rb_disable_interrupt(); /* only timer thread recieve signal */
 }
 
 static int
-native_stop_timer_thread(void)
+native_stop_timer_thread(int close_anyway)
 {
     int stopped;
-    native_mutex_lock(&timer_thread_lock);
     stopped = --system_working <= 0;
+
+    if (TT_DEBUG) fprintf(stderr, "stop timer thread\n");
     if (stopped) {
-	native_cond_signal(&timer_thread_cond);
-    }
-    native_mutex_unlock(&timer_thread_lock);
-    if (stopped) {
+	/* join */
+	rb_thread_wakeup_timer_thread();
 	native_thread_join(timer_thread_id);
+	if (TT_DEBUG) fprintf(stderr, "joined timer thread\n");
+	timer_thread_id = 0;
+
+	/* close communication pipe */
+	if (close_anyway) {
+	    /* TODO: Uninstall all signal handlers or mask all signals.
+	     *       This pass is cleaning phase.  It is too rare case
+             *       to generate problem, so we remains it in TODO.
+	     */
+	    close_communication_pipe();
+	}
     }
     return stopped;
 }
@@ -1064,7 +1196,7 @@
 static void
 native_reset_timer_thread(void)
 {
-    timer_thread_id = 0;
+    if (TT_DEBUG)  fprintf(stderr, "reset timer thread\n");
 }
 
 #ifdef HAVE_SIGALTSTACK
Index: vm_core.h
===================================================================
--- vm_core.h	(revision 32243)
+++ vm_core.h	(revision 32244)
@@ -651,8 +651,10 @@
                  const VALUE *argv, const rb_method_entry_t *me);
 
 void rb_thread_start_timer_thread(void);
-void rb_thread_stop_timer_thread(void);
+void rb_thread_stop_timer_thread(int);
 void rb_thread_reset_timer_thread(void);
+void rb_thread_wakeup_timer_thread(void);
+
 int ruby_thread_has_gvl_p(void);
 VALUE rb_make_backtrace(void);
 typedef int rb_backtrace_iter_func(void *, VALUE, int, VALUE);
Index: thread.c
===================================================================
--- thread.c	(revision 32243)
+++ thread.c	(revision 32244)
@@ -363,7 +363,6 @@
 	}
 	POP_TAG();
     }
-    rb_thread_stop_timer_thread();
 }
 
 static void
@@ -2985,9 +2984,9 @@
 }
 
 void
-rb_thread_stop_timer_thread(void)
+rb_thread_stop_timer_thread(int close_anyway)
 {
-    if (timer_thread_id && native_stop_timer_thread()) {
+    if (timer_thread_id && native_stop_timer_thread(close_anyway)) {
 	native_reset_timer_thread();
     }
 }
Index: eval.c
===================================================================
--- eval.c	(revision 32243)
+++ eval.c	(revision 32244)
@@ -146,7 +146,7 @@
     ex = error_handle(ex);
     ruby_finalize_1();
     POP_TAG();
-    rb_thread_stop_timer_thread();
+    rb_thread_stop_timer_thread(1);
 
 #if EXIT_SUCCESS != 0 || EXIT_FAILURE != 1
     switch (ex) {
Index: process.c
===================================================================
--- process.c	(revision 32243)
+++ process.c	(revision 32244)
@@ -1014,7 +1014,7 @@
 	 * multiple threads. Therefore we have to kill internal threads at once.
 	 * [ruby-core: 10583]
 	 */
-	rb_thread_stop_timer_thread();
+	rb_thread_stop_timer_thread(0);
     }
 }
 
Index: signal.c
===================================================================
--- signal.c	(revision 32243)
+++ signal.c	(revision 32244)
@@ -507,6 +507,7 @@
 {
     ATOMIC_INC(signal_buff.cnt[sig]);
     ATOMIC_INC(signal_buff.size);
+    rb_thread_wakeup_timer_thread();
 #if !defined(BSD_SIGNAL) && !defined(POSIX_SIGNAL)
     ruby_signal(sig, sighandler);
 #endif

--
ML: ruby-changes@q...
Info: http://www.atdot.net/~ko1/quickml/

[前][次][番号順一覧][スレッド一覧]