[前][次][番号順一覧][スレッド一覧]

ruby-changes:18827

From: nobu <ko1@a...>
Date: Sat, 12 Feb 2011 14:44:32 +0900 (JST)
Subject: [ruby-changes:18827] Ruby:r30852 (trunk): * thread.c (rb_thread_io_blocking_region): new function to run

nobu	2011-02-12 14:44:23 +0900 (Sat, 12 Feb 2011)

  New Revision: 30852

  http://svn.ruby-lang.org/cgi-bin/viewvc.cgi?view=rev&revision=30852

  Log:
    * thread.c (rb_thread_io_blocking_region): new function to run
      blocking region with GIL released, for fd.
    * thread.c (rb_thread_fd_close): implement.  [ruby-core:35203]

  Modified files:
    trunk/ChangeLog
    trunk/ext/socket/basicsocket.c
    trunk/ext/socket/init.c
    trunk/ext/socket/rubysocket.h
    trunk/ext/socket/udpsocket.c
    trunk/ext/socket/unixsocket.c
    trunk/include/ruby/intern.h
    trunk/io.c
    trunk/test/socket/test_socket.rb
    trunk/thread.c
    trunk/vm_core.h

Index: include/ruby/intern.h
===================================================================
--- include/ruby/intern.h	(revision 30851)
+++ include/ruby/intern.h	(revision 30852)
@@ -797,6 +797,7 @@
 int rb_thread_interrupted(VALUE thval);
 VALUE rb_thread_blocking_region(rb_blocking_function_t *func, void *data1,
 				rb_unblock_function_t *ubf, void *data2);
+VALUE rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd);
 #define RUBY_UBF_IO ((rb_unblock_function_t *)-1)
 #define RUBY_UBF_PROCESS ((rb_unblock_function_t *)-1)
 VALUE rb_mutex_new(void);
Index: ChangeLog
===================================================================
--- ChangeLog	(revision 30851)
+++ ChangeLog	(revision 30852)
@@ -1,5 +1,10 @@
-Sat Feb 12 14:42:11 2011  Nobuyoshi Nakada  <nobu@r...>
+Sat Feb 12 14:44:20 2011  Nobuyoshi Nakada  <nobu@r...>
 
+	* thread.c (rb_thread_io_blocking_region): new function to run
+	  blocking region with GIL released, for fd.
+
+	* thread.c (rb_thread_fd_close): implement.  [ruby-core:35203]
+
 	* vm.c (th_init): rename from th_init2.
 
 Sat Feb 12 14:41:36 2011  Nobuyoshi Nakada  <nobu@r...>
Index: vm_core.h
===================================================================
--- vm_core.h	(revision 30851)
+++ vm_core.h	(revision 30852)
@@ -260,6 +260,7 @@
     ruby_error_reenter,
     ruby_error_nomemory,
     ruby_error_sysstack,
+    ruby_error_closed_stream,
     ruby_special_error_count
 };
 
@@ -395,6 +396,8 @@
     /* passing state */
     int state;
 
+    int waiting_fd;
+
     /* for rb_iterate */
     const rb_block_t *passed_block;
 
Index: io.c
===================================================================
--- io.c	(revision 30851)
+++ io.c	(revision 30852)
@@ -605,7 +605,7 @@
     iis.buf = buf;
     iis.capa = count;
 
-    return (ssize_t)rb_thread_blocking_region(internal_read_func, &iis, RUBY_UBF_IO, 0);
+    return (ssize_t)rb_thread_io_blocking_region(internal_read_func, &iis, fd);
 }
 
 static ssize_t
@@ -616,7 +616,7 @@
     iis.buf = buf;
     iis.capa = count;
 
-    return (ssize_t)rb_thread_blocking_region(internal_write_func, &iis, RUBY_UBF_IO, 0);
+    return (ssize_t)rb_thread_io_blocking_region(internal_write_func, &iis, fd);
 }
 
 static long
@@ -653,7 +653,8 @@
 static VALUE
 io_flush_buffer_async(VALUE arg)
 {
-    return rb_thread_blocking_region(io_flush_buffer_sync, (void *)arg, RUBY_UBF_IO, 0);
+    rb_io_t *fptr = (rb_io_t *)arg;
+    return rb_thread_io_blocking_region(io_flush_buffer_sync, fptr, fptr->fd);
 }
 
 static inline int
@@ -7475,7 +7476,7 @@
     ias.offset = offset;
     ias.len    = len;
 
-    if (rv = (int)rb_thread_blocking_region(io_advise_internal, &ias, RUBY_UBF_IO, 0))
+    if (rv = (int)rb_thread_io_blocking_region(io_advise_internal, &ias, fptr->fd))
 	/* posix_fadvise(2) doesn't set errno. On success it returns 0; otherwise
 	   it returns the error code. */
 	rb_syserr_fail(rv, RSTRING_PTR(fptr->pathv));
Index: thread.c
===================================================================
--- thread.c	(revision 30851)
+++ thread.c	(revision 30852)
@@ -73,6 +73,8 @@
 static const VALUE eTerminateSignal = INT2FIX(1);
 static volatile int system_working = 1;
 
+#define closed_stream_error GET_VM()->special_exceptions[ruby_error_closed_stream]
+
 inline static void
 st_delete_wrap(st_table *table, st_data_t key)
 {
@@ -1122,6 +1124,7 @@
     rb_thread_t *th = GET_THREAD();
     int saved_errno = 0;
 
+    th->waiting_fd = -1;
     if (ubf == RUBY_UBF_IO || ubf == RUBY_UBF_PROCESS) {
 	ubf = ubf_select;
 	data2 = th;
@@ -1136,6 +1139,23 @@
     return val;
 }
 
+VALUE
+rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd)
+{
+    VALUE val;
+    rb_thread_t *th = GET_THREAD();
+    int saved_errno = 0;
+
+    th->waiting_fd = fd;
+    BLOCKING_REGION({
+	val = func(data1);
+	saved_errno = errno;
+    }, ubf_select, th);
+    errno = saved_errno;
+
+    return val;
+}
+
 /* alias of rb_thread_blocking_region() */
 
 VALUE
@@ -1427,10 +1447,36 @@
     return 1;
 }
 
+#define THREAD_IO_WAITING_P(th) (			\
+	((th)->status == THREAD_STOPPED ||		\
+	 (th)->status == THREAD_STOPPED_FOREVER) &&	\
+	(th)->blocking_region_buffer &&			\
+	(th)->unblock.func == ubf_select &&		\
+	1)
+
+static int
+thread_fd_close_i(st_data_t key, st_data_t val, st_data_t data)
+{
+    int fd = (int)data;
+    rb_thread_t *th;
+    GetThreadPtr((VALUE)key, th);
+
+    if (THREAD_IO_WAITING_P(th)) {
+	native_mutex_lock(&th->interrupt_lock);
+	if (THREAD_IO_WAITING_P(th) && th->waiting_fd == fd) {
+	    th->errinfo = th->vm->special_exceptions[ruby_error_closed_stream];
+	    RUBY_VM_SET_INTERRUPT(th);
+	    (th->unblock.func)(th->unblock.arg);
+	}
+	native_mutex_unlock(&th->interrupt_lock);
+    }
+    return ST_CONTINUE;
+}
+
 void
 rb_thread_fd_close(int fd)
 {
-    /* TODO: fix me */
+    st_foreach(GET_THREAD()->vm->living_threads, thread_fd_close_i, (st_index_t)fd);
 }
 
 /*
@@ -4362,6 +4408,10 @@
 
     rb_define_method(rb_cThread, "inspect", rb_thread_inspect, 0);
 
+    closed_stream_error = rb_exc_new2(rb_eIOError, "stream closed");
+    OBJ_TAINT(closed_stream_error);
+    OBJ_FREEZE(closed_stream_error);
+
     cThGroup = rb_define_class("ThreadGroup", rb_cObject);
     rb_define_alloc_func(cThGroup, thgroup_s_alloc);
     rb_define_method(cThGroup, "list", thgroup_list, 0);
Index: ext/socket/rubysocket.h
===================================================================
--- ext/socket/rubysocket.h	(revision 30851)
+++ ext/socket/rubysocket.h	(revision 30852)
@@ -197,6 +197,7 @@
 #include "constdefs.h"
 
 #define BLOCKING_REGION(func, arg) (long)rb_thread_blocking_region((func), (arg), RUBY_UBF_IO, 0)
+#define BLOCKING_REGION_FD(func, arg) (long)rb_thread_io_blocking_region((func), (arg), (arg)->fd)
 
 #define SockAddrStringValue(v) rsock_sockaddr_string_value(&(v))
 #define SockAddrStringValuePtr(v) rsock_sockaddr_string_value_ptr(&(v))
Index: ext/socket/udpsocket.c
===================================================================
--- ext/socket/udpsocket.c	(revision 30851)
+++ ext/socket/udpsocket.c	(revision 30852)
@@ -177,7 +177,7 @@
 	arg.to = res->ai_addr;
 	arg.tolen = res->ai_addrlen;
 	rb_thread_fd_writable(arg.fd);
-	n = (int)BLOCKING_REGION(rsock_sendto_blocking, &arg);
+	n = (int)BLOCKING_REGION_FD(rsock_sendto_blocking, &arg);
 	if (n >= 0) {
 	    freeaddrinfo(res0);
 	    return INT2FIX(n);
Index: ext/socket/init.c
===================================================================
--- ext/socket/init.c	(revision 30851)
+++ ext/socket/init.c	(revision 30852)
@@ -129,7 +129,7 @@
 
     while (rb_io_check_closed(fptr),
 	   rb_thread_wait_fd(arg.fd),
-	   (slen = BLOCKING_REGION(recvfrom_blocking, &arg)) < 0) {
+	   (slen = BLOCKING_REGION_FD(recvfrom_blocking, &arg)) < 0) {
         if (!rb_io_wait_readable(fptr->fd)) {
             rb_sys_fail("recvfrom(2)");
         }
@@ -380,7 +380,7 @@
     if (socks) func = socks_connect_blocking;
 #endif
     for (;;) {
-	status = (int)BLOCKING_REGION(func, &arg);
+	status = (int)BLOCKING_REGION_FD(func, &arg);
 	if (status < 0) {
 	    switch (errno) {
 	      case EAGAIN:
@@ -515,7 +515,7 @@
     arg.len = len;
   retry:
     rb_thread_wait_fd(fd);
-    fd2 = (int)BLOCKING_REGION(accept_blocking, &arg);
+    fd2 = (int)BLOCKING_REGION_FD(accept_blocking, &arg);
     if (fd2 < 0) {
 	switch (errno) {
 	  case EMFILE:
Index: ext/socket/basicsocket.c
===================================================================
--- ext/socket/basicsocket.c	(revision 30851)
+++ ext/socket/basicsocket.c	(revision 30852)
@@ -559,7 +559,7 @@
     arg.fd = fptr->fd;
     arg.flags = NUM2INT(flags);
     while (rb_thread_fd_writable(arg.fd),
-	   (n = (int)BLOCKING_REGION(func, &arg)) < 0) {
+	   (n = (int)BLOCKING_REGION_FD(func, &arg)) < 0) {
 	if (rb_io_wait_writable(arg.fd)) {
 	    continue;
 	}
Index: ext/socket/unixsocket.c
===================================================================
--- ext/socket/unixsocket.c	(revision 30851)
+++ ext/socket/unixsocket.c	(revision 30852)
@@ -249,7 +249,7 @@
 
     arg.fd = fptr->fd;
     rb_thread_fd_writable(arg.fd);
-    if ((int)BLOCKING_REGION(sendmsg_blocking, &arg) == -1)
+    if ((int)BLOCKING_REGION_FD(sendmsg_blocking, &arg) == -1)
 	rb_sys_fail("sendmsg(2)");
 
     return Qnil;
@@ -335,7 +335,7 @@
 
     arg.fd = fptr->fd;
     rb_thread_wait_fd(arg.fd);
-    if ((int)BLOCKING_REGION(recvmsg_blocking, &arg) == -1)
+    if ((int)BLOCKING_REGION_FD(recvmsg_blocking, &arg) == -1)
 	rb_sys_fail("recvmsg(2)");
 
 #if FD_PASSING_BY_MSG_CONTROL
Index: test/socket/test_socket.rb
===================================================================
--- test/socket/test_socket.rb	(revision 30851)
+++ test/socket/test_socket.rb	(revision 30852)
@@ -410,4 +410,24 @@
     assert_equal(stamp.data[-8,8].unpack("Q")[0], t.subsec * 2**64)
   end
 
+  def test_closed_read
+    require 'timeout'
+    require 'socket'
+    bug4390 = '[ruby-core:35203]'
+    server = TCPServer.new("localhost", 0)
+    serv_thread = Thread.new {server.accept}
+    begin sleep(0.1) end until serv_thread.stop?
+    sock = TCPSocket.new("localhost", server.addr[1])
+    client_thread = Thread.new do
+      sock.readline
+    end
+    begin sleep(0.1) end until client_thread.stop?
+    Timeout.timeout(1) do
+      sock.close
+      sock = nil
+      assert_raise(IOError, bug4390) {client_thread.join}
+    end
+  ensure
+    server.close
+  end
 end if defined?(Socket)

--
ML: ruby-changes@q...
Info: http://www.atdot.net/~ko1/quickml/

[前][次][番号順一覧][スレッド一覧]