ruby-changes:5782
From: shyouhei <ko1@a...>
Date: Sun, 15 Jun 2008 22:49:41 +0900 (JST)
Subject: [ruby-changes:5782] Ruby:r17288 (ruby_1_8_5): merge revision(s) 16379:
shyouhei 2008-06-15 22:49:29 +0900 (Sun, 15 Jun 2008) New Revision: 17288 Modified files: branches/ruby_1_8_5/ChangeLog branches/ruby_1_8_5/version.h branches/ruby_1_8_5/win32/win32.c Log: merge revision(s) 16379: * win32/win32.c (rb_w32_select): backport from trunk. [ruby-talk:300743] http://svn.ruby-lang.org/cgi-bin/viewvc.cgi/branches/ruby_1_8_5/win32/win32.c?r1=17288&r2=17287&diff_format=u http://svn.ruby-lang.org/cgi-bin/viewvc.cgi/branches/ruby_1_8_5/version.h?r1=17288&r2=17287&diff_format=u http://svn.ruby-lang.org/cgi-bin/viewvc.cgi/branches/ruby_1_8_5/ChangeLog?r1=17288&r2=17287&diff_format=u Index: ruby_1_8_5/ChangeLog =================================================================== --- ruby_1_8_5/ChangeLog (revision 17287) +++ ruby_1_8_5/ChangeLog (revision 17288) @@ -1,3 +1,8 @@ +Sun Jun 15 22:49:12 2008 NAKAMURA Usaku <usa@r...> + + * win32/win32.c (rb_w32_select): backport from trunk. + [ruby-talk:300743] + Sun Jun 15 22:47:37 2008 Nobuyoshi Nakada <nobu@r...> * lib/delegate.rb (SimpleDelegator::dup): removed needless argument. Index: ruby_1_8_5/version.h =================================================================== --- ruby_1_8_5/version.h (revision 17287) +++ ruby_1_8_5/version.h (revision 17288) @@ -2,7 +2,7 @@ #define RUBY_RELEASE_DATE "2008-06-15" #define RUBY_VERSION_CODE 185 #define RUBY_RELEASE_CODE 20080615 -#define RUBY_PATCHLEVEL 205 +#define RUBY_PATCHLEVEL 206 #define RUBY_VERSION_MAJOR 1 #define RUBY_VERSION_MINOR 8 Index: ruby_1_8_5/win32/win32.c =================================================================== --- ruby_1_8_5/win32/win32.c (revision 17287) +++ ruby_1_8_5/win32/win32.c (revision 17288) @@ -371,6 +371,7 @@ } #endif +static CRITICAL_SECTION select_mutex; static BOOL fWinsock; static char *envarea; static void @@ -384,6 +385,7 @@ FreeEnvironmentStrings(envarea); envarea = NULL; } + DeleteCriticalSection(&select_mutex); } static void @@ -472,6 +474,8 @@ init_stdhandle(); + InitializeCriticalSection(&select_mutex); + atexit(exit_handler); // Initialize Winsock @@ -2043,87 +2047,250 @@ static int NtSocketsInitialized = 0; static int -extract_file_fd(fd_set *set, fd_set *fileset) +extract_fd(fd_set *dst, fd_set *src, int (*func)(SOCKET)) { - int idx; + int s = 0; + if (!src || !dst) return 0; - fileset->fd_count = 0; - if (!set) - return 0; - for (idx = 0; idx < set->fd_count; idx++) { - SOCKET fd = set->fd_array[idx]; + while (s < src->fd_count) { + SOCKET fd = src->fd_array[s]; - if (!is_socket(fd)) { - int i; + if (!func || (*func)(fd)) { /* move it to dst */ + int d; - for (i = 0; i < fileset->fd_count; i++) { - if (fileset->fd_array[i] == fd) { - break; - } + for (d = 0; d < dst->fd_count; d++) { + if (dst->fd_array[d] == fd) break; } - if (i == fileset->fd_count) { - if (fileset->fd_count < FD_SETSIZE) { - fileset->fd_array[i] = fd; - fileset->fd_count++; - } + if (d == dst->fd_count && dst->fd_count < FD_SETSIZE) { + dst->fd_array[dst->fd_count++] = fd; } + memmove( + &src->fd_array[s], + &src->fd_array[s+1], + sizeof(src->fd_array[0]) * (--src->fd_count - s)); } + else s++; } - return fileset->fd_count; + + return dst->fd_count; } +static int +is_not_socket(SOCKET sock) +{ + return !is_socket(sock); +} + +static int +is_pipe(SOCKET sock) /* DONT call this for SOCKET! it clains it is PIPE. */ +{ + int ret; + + RUBY_CRITICAL( + ret = (GetFileType((HANDLE)sock) == FILE_TYPE_PIPE) + ); + + return ret; +} + +static int +is_readable_pipe(SOCKET sock) /* call this for pipe only */ +{ + int ret; + DWORD n = 0; + + RUBY_CRITICAL( + if (PeekNamedPipe((HANDLE)sock, NULL, 0, NULL, &n, NULL)) { + ret = (n > 0); + } + else { + ret = (GetLastError() == ERROR_BROKEN_PIPE); /* pipe was closed */ + } + ); + + return ret; +} + +static int +is_console(SOCKET sock) /* DONT call this for SOCKET! */ +{ + int ret; + DWORD n = 0; + INPUT_RECORD ir; + + RUBY_CRITICAL( + ret = (PeekConsoleInput((HANDLE)sock, &ir, 1, &n)) + ); + + return ret; +} + +static int +is_readable_console(SOCKET sock) /* call this for console only */ +{ + int ret = 0; + DWORD n = 0; + INPUT_RECORD ir; + + RUBY_CRITICAL( + if (PeekConsoleInput((HANDLE)sock, &ir, 1, &n) && n > 0) { + if (ir.EventType == KEY_EVENT && ir.Event.KeyEvent.bKeyDown && + ir.Event.KeyEvent.uChar.AsciiChar) { + ret = 1; + } + else { + ReadConsoleInput((HANDLE)sock, &ir, 1, &n); + } + } + ); + + return ret; +} + +static int +do_select(int nfds, fd_set *rd, fd_set *wr, fd_set *ex, + struct timeval *timeout) +{ + int r = 0; + + if (nfds == 0) { + if (timeout) + rb_w32_sleep(timeout->tv_sec * 1000 + timeout->tv_usec / 1000); + else + rb_w32_sleep(INFINITE); + } + else { + RUBY_CRITICAL( + EnterCriticalSection(&select_mutex); + r = select(nfds, rd, wr, ex, timeout); + LeaveCriticalSection(&select_mutex); + if (r == SOCKET_ERROR) { + errno = map_errno(WSAGetLastError()); + r = -1; + } + ); + } + + return r; +} + +static inline int +subst(struct timeval *rest, const struct timeval *wait) +{ + while (rest->tv_usec < wait->tv_usec) { + if (rest->tv_sec <= wait->tv_sec) { + return 0; + } + rest->tv_sec -= 1; + rest->tv_usec += 1000 * 1000; + } + rest->tv_sec -= wait->tv_sec; + rest->tv_usec -= wait->tv_usec; + return 1; +} + +static inline int +compare(const struct timeval *t1, const struct timeval *t2) +{ + if (t1->tv_sec < t2->tv_sec) + return -1; + if (t1->tv_sec > t2->tv_sec) + return 1; + if (t1->tv_usec < t2->tv_usec) + return -1; + if (t1->tv_usec > t2->tv_usec) + return 1; + return 0; +} + +#undef Sleep long rb_w32_select (int nfds, fd_set *rd, fd_set *wr, fd_set *ex, struct timeval *timeout) { long r; - fd_set file_rd; - fd_set file_wr; -#ifdef USE_INTERRUPT_WINSOCK - fd_set trap; -#endif /* USE_INTERRUPT_WINSOCK */ - int file_nfds; + fd_set pipe_rd; + fd_set cons_rd; + fd_set else_rd; + fd_set else_wr; + int nonsock = 0; + if (nfds < 0 || (timeout && (timeout->tv_sec < 0 || timeout->tv_usec < 0))) { + errno = EINVAL; + return -1; + } if (!NtSocketsInitialized) { StartSockets(); } + + // assume else_{rd,wr} (other than socket, pipe reader, console reader) + // are always readable/writable. but this implementation still has + // problem. if pipe's buffer is full, writing to pipe will block + // until some data is read from pipe. but ruby is single threaded system, + // so whole system will be blocked forever. + + else_rd.fd_count = 0; + nonsock += extract_fd(&else_rd, rd, is_not_socket); + + pipe_rd.fd_count = 0; + extract_fd(&pipe_rd, &else_rd, is_pipe); // should not call is_pipe for socket + + cons_rd.fd_count = 0; + extract_fd(&cons_rd, &else_rd, is_console); // ditto + + else_wr.fd_count = 0; + nonsock += extract_fd(&else_wr, wr, is_not_socket); + r = 0; if (rd && rd->fd_count > r) r = rd->fd_count; if (wr && wr->fd_count > r) r = wr->fd_count; if (ex && ex->fd_count > r) r = ex->fd_count; if (nfds > r) nfds = r; - if (nfds == 0 && timeout) { - Sleep(timeout->tv_sec * 1000 + timeout->tv_usec / 1000); - return 0; - } - file_nfds = extract_file_fd(rd, &file_rd); - file_nfds += extract_file_fd(wr, &file_wr); - if (file_nfds) + { - // assume normal files are always readable/writable - // fake read/write fd_set and return value - if (rd) *rd = file_rd; - if (wr) *wr = file_wr; - return file_nfds; + struct timeval rest; + struct timeval wait; + struct timeval zero; + if (timeout) rest = *timeout; + wait.tv_sec = 0; wait.tv_usec = 10 * 1000; // 10ms + zero.tv_sec = 0; zero.tv_usec = 0; // 0ms + do { + if (nonsock) { + // modifying {else,pipe,cons}_rd is safe because + // if they are modified, function returns immediately. + extract_fd(&else_rd, &pipe_rd, is_readable_pipe); + extract_fd(&else_rd, &cons_rd, is_readable_console); + } + + if (else_rd.fd_count || else_wr.fd_count) { + r = do_select(nfds, rd, wr, ex, &zero); // polling + if (r < 0) break; // XXX: should I ignore error and return signaled handles? + r += extract_fd(rd, &else_rd, NULL); // move all + r += extract_fd(wr, &else_wr, NULL); // move all + break; + } + else { + struct timeval *dowait = + compare(&rest, &wait) < 0 ? &rest : &wait; + + fd_set orig_rd; + fd_set orig_wr; + fd_set orig_ex; + if (rd) orig_rd = *rd; + if (wr) orig_wr = *wr; + if (ex) orig_ex = *ex; + r = do_select(nfds, rd, wr, ex, &zero); // polling + if (r != 0) break; // signaled or error + if (rd) *rd = orig_rd; + if (wr) *wr = orig_wr; + if (ex) *ex = orig_ex; + + // XXX: should check the time select spent + Sleep(dowait->tv_sec * 1000 + dowait->tv_usec / 1000); + } + } while (!timeout || subst(&rest, &wait)); } -#if USE_INTERRUPT_WINSOCK - if (ex) - trap = *ex; - else - trap.fd_count = 0; - if (trap.fd_count < FD_SETSIZE) - trap.fd_array[trap.fd_count++] = (SOCKET)interrupted_event; - // else unable to catch interrupt. - ex = &trap; -#endif /* USE_INTERRUPT_WINSOCK */ - - RUBY_CRITICAL({ - r = select(nfds, rd, wr, ex, timeout); - if (r == SOCKET_ERROR) { - errno = map_errno(WSAGetLastError()); - } - }); return r; } @@ -3258,7 +3425,6 @@ return 0; } -#undef Sleep #define yield_once() Sleep(0) #define yield_until(condition) do yield_once(); while (!(condition)) -- ML: ruby-changes@q... Info: http://www.atdot.net/~ko1/quickml/