[前][次][番号順一覧][スレッド一覧]

ruby-changes:19925

From: akr <ko1@a...>
Date: Fri, 10 Jun 2011 00:03:20 +0900 (JST)
Subject: [ruby-changes:19925] akr:r31971 (trunk): * io.c: fix IO.copy_stream interrupt handling.

akr	2011-06-10 00:02:46 +0900 (Fri, 10 Jun 2011)

  New Revision: 31971

  http://svn.ruby-lang.org/cgi-bin/viewvc.cgi?view=rev&revision=31971

  Log:
    * io.c: fix IO.copy_stream interrupt handling.
      based on the patch by Eric Wong.  [ruby-core:36156]
    
    * vm_core.h (rb_thread_call_with_gvl): don't declare here.
    
    * thread.c: include internal.h.
      (rb_thread_execute_interrupts): new function.
    
    * internal.h (rb_thread_execute_interrupts): declared.
      (rb_thread_call_with_gvl): declared.

  Modified files:
    trunk/ChangeLog
    trunk/internal.h
    trunk/io.c
    trunk/test/ruby/test_io.rb
    trunk/thread.c
    trunk/vm_core.h

Index: ChangeLog
===================================================================
--- ChangeLog	(revision 31970)
+++ ChangeLog	(revision 31971)
@@ -1,3 +1,16 @@
+Thu Jun  9 23:57:53 2011  Tanaka Akira  <akr@f...>
+
+	* io.c: fix IO.copy_stream interrupt handling.
+	  based on the patch by Eric Wong.  [ruby-core:36156]
+
+	* vm_core.h (rb_thread_call_with_gvl): don't declare here.
+
+	* thread.c: include internal.h.
+	  (rb_thread_execute_interrupts): new function.
+
+	* internal.h (rb_thread_execute_interrupts): declared.
+	  (rb_thread_call_with_gvl): declared.
+
 Thu Jun  9 23:34:01 2011  CHIKANAGA Tomoyuki  <nagachika00@g...>
 
 	* gc.c (rb_objspace_call_finalizer): use rb_typeddata_is_kind_of() for
Index: vm_core.h
===================================================================
--- vm_core.h	(revision 31970)
+++ vm_core.h	(revision 31971)
@@ -656,7 +656,6 @@
 void rb_thread_start_timer_thread(void);
 void rb_thread_stop_timer_thread(void);
 void rb_thread_reset_timer_thread(void);
-void *rb_thread_call_with_gvl(void *(*func)(void *), void *data1);
 int ruby_thread_has_gvl_p(void);
 VALUE rb_make_backtrace(void);
 typedef int rb_backtrace_iter_func(void *, VALUE, int, VALUE);
Index: io.c
===================================================================
--- io.c	(revision 31970)
+++ io.c	(revision 31971)
@@ -14,6 +14,7 @@
 #include "ruby/ruby.h"
 #include "ruby/io.h"
 #include "dln.h"
+#include "internal.h"
 #include <ctype.h>
 #include <errno.h>
 
@@ -8517,13 +8518,57 @@
     VALUE th;
 };
 
+static void *
+exec_interrupts(void *arg)
+{
+    VALUE th = (VALUE)arg;
+    rb_thread_execute_interrupts(th);
+    return NULL;
+}
+
+/*
+ * returns TRUE if the preceding system call was interrupted
+ * so we can continue.  If the thread was interrupted, we
+ * reacquire the GVL to execute interrupts before continuing.
+ */
 static int
-maygvl_copy_stream_wait_read(struct copy_stream_struct *stp)
+maygvl_copy_stream_continue_p(int has_gvl, struct copy_stream_struct *stp)
 {
+    switch (errno) {
+      case EINTR:
+#if defined(ERESTART)
+      case ERESTART:
+#endif
+	if (rb_thread_interrupted(stp->th))
+            if (has_gvl)
+                rb_thread_execute_interrupts(stp->th);
+            else
+                rb_thread_call_with_gvl(exec_interrupts, (void *)stp->th);
+	return TRUE;
+    }
+    return FALSE;
+}
+
+static int
+maygvl_select(int has_gvl, int n, rb_fdset_t *rfds, rb_fdset_t *wfds, rb_fdset_t *efds, struct timeval *timeout)
+{
+    if (has_gvl)
+        return rb_thread_fd_select(n, rfds, wfds, efds, timeout);
+    else
+        return rb_fd_select(n, rfds, wfds, efds, timeout);
+}
+
+static int
+maygvl_copy_stream_wait_read(int has_gvl, struct copy_stream_struct *stp)
+{
     int ret;
-    rb_fd_zero(&stp->fds);
-    rb_fd_set(stp->src_fd, &stp->fds);
-    ret = rb_fd_select(rb_fd_max(&stp->fds), &stp->fds, NULL, NULL, NULL);
+
+    do {
+	rb_fd_zero(&stp->fds);
+	rb_fd_set(stp->src_fd, &stp->fds);
+        ret = maygvl_select(has_gvl, rb_fd_max(&stp->fds), &stp->fds, NULL, NULL, NULL);
+    } while (ret == -1 && maygvl_copy_stream_continue_p(has_gvl, stp));
+
     if (ret == -1) {
         stp->syserr = "select";
         stp->error_no = errno;
@@ -8536,9 +8581,13 @@
 nogvl_copy_stream_wait_write(struct copy_stream_struct *stp)
 {
     int ret;
-    rb_fd_zero(&stp->fds);
-    rb_fd_set(stp->dst_fd, &stp->fds);
-    ret = rb_fd_select(rb_fd_max(&stp->fds), NULL, &stp->fds, NULL, NULL);
+
+    do {
+	rb_fd_zero(&stp->fds);
+	rb_fd_set(stp->dst_fd, &stp->fds);
+        ret = rb_fd_select(rb_fd_max(&stp->fds), NULL, &stp->fds, NULL, NULL);
+    } while (ret == -1 && maygvl_copy_stream_continue_p(0, stp));
+
     if (ret == -1) {
         stp->syserr = "select";
         stp->error_no = errno;
@@ -8600,13 +8649,13 @@
 
 #ifdef USE_SENDFILE
 static int
-maygvl_copy_stream_wait_readwrite(struct copy_stream_struct *stp)
+maygvl_copy_stream_wait_readwrite(int has_gvl, struct copy_stream_struct *stp)
 {
     int ret;
     rb_fd_zero(&stp->fds);
     rb_fd_set(stp->src_fd, &stp->fds);
     rb_fd_set(stp->dst_fd, &stp->fds);
-    ret = rb_fd_select(rb_fd_max(&stp->fds), &stp->fds, NULL, NULL, NULL);
+    ret = maygvl_select(has_gvl, rb_fd_max(&stp->fds), &stp->fds, NULL, NULL, NULL);
     if (ret == -1) {
         stp->syserr = "select";
         stp->error_no = errno;
@@ -8685,6 +8734,8 @@
         }
     }
     if (ss == -1) {
+	if (maygvl_copy_stream_continue_p(0, stp))
+	    goto retry_sendfile;
         switch (errno) {
 	  case EINVAL:
 #ifdef ENOSYS
@@ -8695,10 +8746,8 @@
 #if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
 	  case EWOULDBLOCK:
 #endif
-            if (maygvl_copy_stream_wait_readwrite(stp) == -1)
+            if (maygvl_copy_stream_wait_readwrite(0, stp) == -1)
                 return -1;
-            if (rb_thread_interrupted(stp->th))
-                return -1;
             goto retry_sendfile;
         }
         stp->syserr = "sendfile";
@@ -8710,12 +8759,22 @@
 #endif
 
 static ssize_t
-maygvl_copy_stream_read(struct copy_stream_struct *stp, char *buf, size_t len, off_t offset)
+maygvl_read(int has_gvl, int fd, void *buf, size_t count)
 {
+    if (has_gvl)
+        return rb_read_internal(fd, buf, count);
+    else
+        return read(fd, buf, count);
+}
+
+static ssize_t
+maygvl_copy_stream_read(int has_gvl, struct copy_stream_struct *stp, char *buf, size_t len, off_t offset)
+{
     ssize_t ss;
   retry_read:
-    if (offset == (off_t)-1)
-        ss = read(stp->src_fd, buf, len);
+    if (offset == (off_t)-1) {
+        ss = maygvl_read(has_gvl, stp->src_fd, buf, len);
+    }
     else {
 #ifdef HAVE_PREAD
         ss = pread(stp->src_fd, buf, len, offset);
@@ -8728,12 +8787,14 @@
         return 0;
     }
     if (ss == -1) {
+	if (maygvl_copy_stream_continue_p(has_gvl, stp))
+	    goto retry_read;
         switch (errno) {
 	  case EAGAIN:
 #if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
 	  case EWOULDBLOCK:
 #endif
-            if (maygvl_copy_stream_wait_read(stp) == -1)
+            if (maygvl_copy_stream_wait_read(has_gvl, stp) == -1)
                 return -1;
             goto retry_read;
 #ifdef ENOSYS
@@ -8757,6 +8818,8 @@
     while (len) {
         ss = write(stp->dst_fd, buf+off, len);
         if (ss == -1) {
+	    if (maygvl_copy_stream_continue_p(0, stp))
+		continue;
             if (errno == EAGAIN || errno == EWOULDBLOCK) {
                 if (nogvl_copy_stream_wait_write(stp) == -1)
                     return -1;
@@ -8811,12 +8874,12 @@
             len = sizeof(buf);
         }
         if (use_pread) {
-            ss = maygvl_copy_stream_read(stp, buf, len, src_offset);
+            ss = maygvl_copy_stream_read(0, stp, buf, len, src_offset);
             if (0 < ss)
                 src_offset += ss;
         }
         else {
-            ss = maygvl_copy_stream_read(stp, buf, len, (off_t)-1);
+            ss = maygvl_copy_stream_read(0, stp, buf, len, (off_t)-1);
         }
         if (ss <= 0) /* EOF or error */
             return;
@@ -8827,9 +8890,6 @@
 
         if (!use_eof)
             copy_length -= ss;
-
-        if (rb_thread_interrupted(stp->th))
-            return;
     }
 }
 
@@ -8890,7 +8950,7 @@
             ssize_t ss;
             rb_thread_wait_fd(stp->src_fd);
             rb_str_resize(buf, buflen);
-            ss = maygvl_copy_stream_read(stp, RSTRING_PTR(buf), l, off);
+            ss = maygvl_copy_stream_read(1, stp, RSTRING_PTR(buf), l, off);
             if (ss == -1)
                 return Qnil;
             if (ss == 0)
Index: thread.c
===================================================================
--- thread.c	(revision 31970)
+++ thread.c	(revision 31971)
@@ -46,6 +46,7 @@
 
 #include "eval_intern.h"
 #include "gc.h"
+#include "internal.h"
 #include "ruby/io.h"
 
 #ifndef USE_NATIVE_THREAD_PRIORITY
@@ -1359,6 +1360,12 @@
 }
 
 void
+rb_thread_execute_interrupts(VALUE th)
+{
+    rb_threadptr_execute_interrupts_rec((rb_thread_t *)th, 0);
+}
+
+void
 rb_gc_mark_threads(void)
 {
     rb_bug("deprecated function rb_gc_mark_threads is called");
Index: internal.h
===================================================================
--- internal.h	(revision 31970)
+++ internal.h	(revision 31971)
@@ -30,6 +30,9 @@
 VALUE rb_obj_is_thread(VALUE obj);
 VALUE rb_obj_is_mutex(VALUE obj);
 
+void rb_thread_execute_interrupts(VALUE th);
+void *rb_thread_call_with_gvl(void *(*func)(void *), void *data1);
+
 #if defined(__cplusplus)
 #if 0
 { /* satisfy cc-mode */
Index: test/ruby/test_io.rb
===================================================================
--- test/ruby/test_io.rb	(revision 31970)
+++ test/ruby/test_io.rb	(revision 31971)
@@ -78,6 +78,14 @@
     }
   end
 
+  def trapping_usr1
+    @usr1_rcvd  = 0
+    trap(:USR1) { @usr1_rcvd += 1 }
+    yield
+    ensure
+      trap(:USR1, "DEFAULT")
+  end
+
   def test_pipe
     r, w = IO.pipe
     assert_instance_of(IO, r)
@@ -594,6 +602,30 @@
           result = t.value
           assert_equal(megacontent, result)
         }
+        with_socketpair {|s1, s2|
+          begin
+            s1.nonblock = true
+          rescue Errno::EBADF
+            skip "nonblocking IO for pipe is not implemented"
+          end
+          trapping_usr1 do
+            nr = 10
+            pid = fork do
+              s1.close
+              IO.select([s2])
+              Process.kill(:USR1, Process.ppid)
+              s2.read
+            end
+            s2.close
+            nr.times do
+              assert_equal megacontent.bytesize, IO.copy_stream("megasrc", s1)
+            end
+            assert_equal(1, @usr1_rcvd)
+            s1.close
+            _, status = Process.waitpid2(pid)
+            assert status.success?, status.inspect
+          end
+        }
       end
     }
   end

--
ML: ruby-changes@q...
Info: http://www.atdot.net/~ko1/quickml/

[前][次][番号順一覧][スレッド一覧]