[前][次][番号順一覧][スレッド一覧]

ruby-changes:5782

From: shyouhei <ko1@a...>
Date: Sun, 15 Jun 2008 22:49:41 +0900 (JST)
Subject: [ruby-changes:5782] Ruby:r17288 (ruby_1_8_5): merge revision(s) 16379:

shyouhei	2008-06-15 22:49:29 +0900 (Sun, 15 Jun 2008)

  New Revision: 17288

  Modified files:
    branches/ruby_1_8_5/ChangeLog
    branches/ruby_1_8_5/version.h
    branches/ruby_1_8_5/win32/win32.c

  Log:
    merge revision(s) 16379:
    * win32/win32.c (rb_w32_select): backport from trunk.
      [ruby-talk:300743]


  http://svn.ruby-lang.org/cgi-bin/viewvc.cgi/branches/ruby_1_8_5/win32/win32.c?r1=17288&r2=17287&diff_format=u
  http://svn.ruby-lang.org/cgi-bin/viewvc.cgi/branches/ruby_1_8_5/version.h?r1=17288&r2=17287&diff_format=u
  http://svn.ruby-lang.org/cgi-bin/viewvc.cgi/branches/ruby_1_8_5/ChangeLog?r1=17288&r2=17287&diff_format=u

Index: ruby_1_8_5/ChangeLog
===================================================================
--- ruby_1_8_5/ChangeLog	(revision 17287)
+++ ruby_1_8_5/ChangeLog	(revision 17288)
@@ -1,3 +1,8 @@
+Sun Jun 15 22:49:12 2008  NAKAMURA Usaku  <usa@r...>
+
+	* win32/win32.c (rb_w32_select): backport from trunk.
+	  [ruby-talk:300743]
+
 Sun Jun 15 22:47:37 2008  Nobuyoshi Nakada  <nobu@r...>
 
 	* lib/delegate.rb (SimpleDelegator::dup): removed needless argument.
Index: ruby_1_8_5/version.h
===================================================================
--- ruby_1_8_5/version.h	(revision 17287)
+++ ruby_1_8_5/version.h	(revision 17288)
@@ -2,7 +2,7 @@
 #define RUBY_RELEASE_DATE "2008-06-15"
 #define RUBY_VERSION_CODE 185
 #define RUBY_RELEASE_CODE 20080615
-#define RUBY_PATCHLEVEL 205
+#define RUBY_PATCHLEVEL 206
 
 #define RUBY_VERSION_MAJOR 1
 #define RUBY_VERSION_MINOR 8
Index: ruby_1_8_5/win32/win32.c
===================================================================
--- ruby_1_8_5/win32/win32.c	(revision 17287)
+++ ruby_1_8_5/win32/win32.c	(revision 17288)
@@ -371,6 +371,7 @@
 }
 #endif
 
+static CRITICAL_SECTION select_mutex;
 static BOOL fWinsock;
 static char *envarea;
 static void
@@ -384,6 +385,7 @@
 	FreeEnvironmentStrings(envarea);
 	envarea = NULL;
     }
+    DeleteCriticalSection(&select_mutex);
 }
 
 static void
@@ -472,6 +474,8 @@
 
     init_stdhandle();
 
+    InitializeCriticalSection(&select_mutex);
+
     atexit(exit_handler);
 
     // Initialize Winsock
@@ -2043,87 +2047,250 @@
 static int NtSocketsInitialized = 0;
 
 static int
-extract_file_fd(fd_set *set, fd_set *fileset)
+extract_fd(fd_set *dst, fd_set *src, int (*func)(SOCKET))
 {
-    int idx;
+    int s = 0;
+    if (!src || !dst) return 0;
 
-    fileset->fd_count = 0;
-    if (!set)
-	return 0;
-    for (idx = 0; idx < set->fd_count; idx++) {
-	SOCKET fd = set->fd_array[idx];
+    while (s < src->fd_count) {
+        SOCKET fd = src->fd_array[s];
 
-	if (!is_socket(fd)) {
-	    int i;
+	if (!func || (*func)(fd)) { /* move it to dst */
+	    int d;
 
-	    for (i = 0; i < fileset->fd_count; i++) {
-		if (fileset->fd_array[i] == fd) {
-		    break;
-		}
+	    for (d = 0; d < dst->fd_count; d++) {
+		if (dst->fd_array[d] == fd) break;
 	    }
-	    if (i == fileset->fd_count) {
-		if (fileset->fd_count < FD_SETSIZE) {
-		    fileset->fd_array[i] = fd;
-		    fileset->fd_count++;
-		}
+	    if (d == dst->fd_count && dst->fd_count < FD_SETSIZE) {
+		dst->fd_array[dst->fd_count++] = fd;
 	    }
+	    memmove(
+		&src->fd_array[s],
+		&src->fd_array[s+1], 
+		sizeof(src->fd_array[0]) * (--src->fd_count - s));
 	}
+	else s++;
     }
-    return fileset->fd_count;
+
+    return dst->fd_count;
 }
 
+static int
+is_not_socket(SOCKET sock)
+{
+    return !is_socket(sock);
+}
+
+static int
+is_pipe(SOCKET sock) /* DONT call this for SOCKET! it clains it is PIPE. */
+{
+    int ret;
+
+    RUBY_CRITICAL(
+	ret = (GetFileType((HANDLE)sock) == FILE_TYPE_PIPE)
+    );
+
+    return ret;
+}
+
+static int
+is_readable_pipe(SOCKET sock) /* call this for pipe only */
+{
+    int ret;
+    DWORD n = 0;
+
+    RUBY_CRITICAL(
+	if (PeekNamedPipe((HANDLE)sock, NULL, 0, NULL, &n, NULL)) {
+	    ret = (n > 0);
+	}
+	else {
+	    ret = (GetLastError() == ERROR_BROKEN_PIPE); /* pipe was closed */
+	}
+    );
+
+    return ret;
+}
+
+static int
+is_console(SOCKET sock) /* DONT call this for SOCKET! */
+{
+    int ret;
+    DWORD n = 0;
+    INPUT_RECORD ir;
+
+    RUBY_CRITICAL(
+	ret = (PeekConsoleInput((HANDLE)sock, &ir, 1, &n))
+    );
+
+    return ret;
+}
+
+static int
+is_readable_console(SOCKET sock) /* call this for console only */
+{
+    int ret = 0;
+    DWORD n = 0;
+    INPUT_RECORD ir;
+
+    RUBY_CRITICAL(
+	if (PeekConsoleInput((HANDLE)sock, &ir, 1, &n) && n > 0) {
+	    if (ir.EventType == KEY_EVENT && ir.Event.KeyEvent.bKeyDown &&
+		ir.Event.KeyEvent.uChar.AsciiChar) {
+		ret = 1;
+	    }
+	    else {
+		ReadConsoleInput((HANDLE)sock, &ir, 1, &n);
+	    }
+	}
+    );
+
+    return ret;
+}
+
+static int
+do_select(int nfds, fd_set *rd, fd_set *wr, fd_set *ex,
+            struct timeval *timeout)
+{
+    int r = 0;
+
+    if (nfds == 0) {
+	if (timeout)
+	    rb_w32_sleep(timeout->tv_sec * 1000 + timeout->tv_usec / 1000);
+	else
+	    rb_w32_sleep(INFINITE);
+    }
+    else {
+	RUBY_CRITICAL(
+	    EnterCriticalSection(&select_mutex);
+	    r = select(nfds, rd, wr, ex, timeout);
+	    LeaveCriticalSection(&select_mutex);
+	    if (r == SOCKET_ERROR) {
+		errno = map_errno(WSAGetLastError());
+		r = -1;
+	    }
+	);
+    }
+
+    return r;
+}
+
+static inline int
+subst(struct timeval *rest, const struct timeval *wait)
+{
+    while (rest->tv_usec < wait->tv_usec) {
+	if (rest->tv_sec <= wait->tv_sec) {
+	    return 0;
+	}
+	rest->tv_sec -= 1;
+	rest->tv_usec += 1000 * 1000;
+    }
+    rest->tv_sec -= wait->tv_sec;
+    rest->tv_usec -= wait->tv_usec;
+    return 1;
+}
+
+static inline int
+compare(const struct timeval *t1, const struct timeval *t2)
+{
+    if (t1->tv_sec < t2->tv_sec)
+	return -1;
+    if (t1->tv_sec > t2->tv_sec)
+	return 1;
+    if (t1->tv_usec < t2->tv_usec)
+	return -1;
+    if (t1->tv_usec > t2->tv_usec)
+	return 1;
+    return 0;
+}
+
+#undef Sleep
 long 
 rb_w32_select (int nfds, fd_set *rd, fd_set *wr, fd_set *ex,
 	       struct timeval *timeout)
 {
     long r;
-    fd_set file_rd;
-    fd_set file_wr;
-#ifdef USE_INTERRUPT_WINSOCK
-    fd_set trap;
-#endif /* USE_INTERRUPT_WINSOCK */
-    int file_nfds;
+    fd_set pipe_rd;
+    fd_set cons_rd;
+    fd_set else_rd;
+    fd_set else_wr;
+    int nonsock = 0;
 
+    if (nfds < 0 || (timeout && (timeout->tv_sec < 0 || timeout->tv_usec < 0))) {
+	errno = EINVAL;
+	return -1;
+    }
     if (!NtSocketsInitialized) {
 	StartSockets();
     }
+
+    // assume else_{rd,wr} (other than socket, pipe reader, console reader)
+    // are always readable/writable. but this implementation still has
+    // problem. if pipe's buffer is full, writing to pipe will block
+    // until some data is read from pipe. but ruby is single threaded system,
+    // so whole system will be blocked forever.
+
+    else_rd.fd_count = 0;
+    nonsock += extract_fd(&else_rd, rd, is_not_socket);
+
+    pipe_rd.fd_count = 0;
+    extract_fd(&pipe_rd, &else_rd, is_pipe); // should not call is_pipe for socket
+
+    cons_rd.fd_count = 0;
+    extract_fd(&cons_rd, &else_rd, is_console); // ditto
+
+    else_wr.fd_count = 0;
+    nonsock += extract_fd(&else_wr, wr, is_not_socket);
+
     r = 0;
     if (rd && rd->fd_count > r) r = rd->fd_count;
     if (wr && wr->fd_count > r) r = wr->fd_count;
     if (ex && ex->fd_count > r) r = ex->fd_count;
     if (nfds > r) nfds = r;
-    if (nfds == 0 && timeout) {
-	Sleep(timeout->tv_sec * 1000 + timeout->tv_usec / 1000);
-	return 0;
-    }
-    file_nfds = extract_file_fd(rd, &file_rd);
-    file_nfds += extract_file_fd(wr, &file_wr);
-    if (file_nfds)
+
     {
-	// assume normal files are always readable/writable
-	// fake read/write fd_set and return value
-	if (rd) *rd = file_rd;
-	if (wr) *wr = file_wr;
-	return file_nfds;
+	struct timeval rest;
+	struct timeval wait;
+	struct timeval zero;
+	if (timeout) rest = *timeout;
+	wait.tv_sec = 0; wait.tv_usec = 10 * 1000; // 10ms
+	zero.tv_sec = 0; zero.tv_usec = 0;         //  0ms
+	do {
+	    if (nonsock) {
+		// modifying {else,pipe,cons}_rd is safe because
+		// if they are modified, function returns immediately.
+		extract_fd(&else_rd, &pipe_rd, is_readable_pipe);
+		extract_fd(&else_rd, &cons_rd, is_readable_console);
+	    }
+
+	    if (else_rd.fd_count || else_wr.fd_count) {
+		r = do_select(nfds, rd, wr, ex, &zero); // polling
+		if (r < 0) break; // XXX: should I ignore error and return signaled handles?
+		r += extract_fd(rd, &else_rd, NULL); // move all
+		r += extract_fd(wr, &else_wr, NULL); // move all
+		break;
+	    }
+	    else {
+		struct timeval *dowait =
+		    compare(&rest, &wait) < 0 ? &rest : &wait;
+
+		fd_set orig_rd;
+		fd_set orig_wr;
+		fd_set orig_ex;
+		if (rd) orig_rd = *rd;
+		if (wr) orig_wr = *wr;
+		if (ex) orig_ex = *ex;
+		r = do_select(nfds, rd, wr, ex, &zero);	// polling
+		if (r != 0) break; // signaled or error
+		if (rd) *rd = orig_rd;
+		if (wr) *wr = orig_wr;
+		if (ex) *ex = orig_ex;
+
+		// XXX: should check the time select spent
+		Sleep(dowait->tv_sec * 1000 + dowait->tv_usec / 1000);
+	    }
+	} while (!timeout || subst(&rest, &wait));
     }
 
-#if USE_INTERRUPT_WINSOCK
-    if (ex)
-	trap = *ex;
-    else
-	trap.fd_count = 0;
-    if (trap.fd_count < FD_SETSIZE)
-	trap.fd_array[trap.fd_count++] = (SOCKET)interrupted_event;
-    // else unable to catch interrupt.
-    ex = &trap;
-#endif /* USE_INTERRUPT_WINSOCK */
-
-    RUBY_CRITICAL({
-	r = select(nfds, rd, wr, ex, timeout);
-	if (r == SOCKET_ERROR) {
-	    errno = map_errno(WSAGetLastError());
-	}
-    });
     return r;
 }
 
@@ -3258,7 +3425,6 @@
     return 0;
 }
 
-#undef Sleep
 #define yield_once() Sleep(0)
 #define yield_until(condition) do yield_once(); while (!(condition))
 

--
ML: ruby-changes@q...
Info: http://www.atdot.net/~ko1/quickml/

[前][次][番号順一覧][スレッド一覧]