ruby-changes:53706
From: normal <ko1@a...>
Date: Thu, 22 Nov 2018 17:46:58 +0900 (JST)
Subject: [ruby-changes:53706] normal:r65922 (trunk): io + socket: make pipes and sockets nonblocking by default
normal 2018-11-22 17:46:51 +0900 (Thu, 22 Nov 2018) New Revision: 65922 https://svn.ruby-lang.org/cgi-bin/viewvc.cgi?view=revision&revision=65922 Log: io + socket: make pipes and sockets nonblocking by default All normal Ruby IO methods (IO#read, IO#gets, IO#write, ...) are all capable of appearing to be "blocking" when presented with a file description with the O_NONBLOCK flag set; so there is little risk of incompatibility within Ruby-using programs. The biggest compatibility risk is when spawning external programs. As a result, stdin, stdout, and stderr are now always made blocking before exec-family calls. This change will make an event-oriented MJIT usable if it is waiting on pipes on POSIX_like platforms. It is ALSO necessary to take advantage of (proposed lightweight concurrency (aka "auto-Fiber") or any similar proposal for network concurrency: https://bugs.ruby-lang.org/issues/13618 Named-pipe (FIFO) are NOT yet non-blocking by default since they are rarely-used and may introduce compatibility problems and extra syscall overhead for a common path. Please revert this commit if there are problems and if I am afk since I am afk a lot, lately. [ruby-core:89950] [Bug #14968] Modified files: trunk/ext/socket/init.c trunk/ext/socket/rubysocket.h trunk/ext/socket/socket.c trunk/io.c trunk/process.c trunk/spec/ruby/core/io/read_nonblock_spec.rb trunk/spec/ruby/core/io/write_nonblock_spec.rb trunk/spec/ruby/library/socket/socket/connect_spec.rb trunk/test/io/nonblock/test_flush.rb trunk/test/ruby/test_io.rb trunk/test/ruby/test_process.rb trunk/test/socket/test_basicsocket.rb trunk/thread.c trunk/win32/win32.c Index: test/socket/test_basicsocket.rb =================================================================== --- test/socket/test_basicsocket.rb (revision 65921) +++ test/socket/test_basicsocket.rb (revision 65922) @@ -159,8 +159,9 @@ class TestSocket_BasicSocket < Test::Uni https://github.com/ruby/ruby/blob/trunk/test/socket/test_basicsocket.rb#L159 set_nb = true buf = String.new if ssock.respond_to?(:nonblock?) - assert_not_predicate(ssock, :nonblock?) - assert_not_predicate(csock, :nonblock?) + assert_predicate(ssock, :nonblock?) + assert_predicate(csock, :nonblock?) + csock.nonblock = ssock.nonblock = false # Linux may use MSG_DONTWAIT to avoid setting O_NONBLOCK if RUBY_PLATFORM.match?(/linux/) && Socket.const_defined?(:MSG_DONTWAIT) Index: test/io/nonblock/test_flush.rb =================================================================== --- test/io/nonblock/test_flush.rb (revision 65921) +++ test/io/nonblock/test_flush.rb (revision 65922) @@ -53,6 +53,7 @@ class TestIONonblock < Test::Unit::TestC https://github.com/ruby/ruby/blob/trunk/test/io/nonblock/test_flush.rb#L53 def test_nonblock IO.pipe {|r, w| + w.nonblock = false assert_equal(false, w.nonblock?) w.nonblock do assert_equal(true, w.nonblock?) Index: test/ruby/test_io.rb =================================================================== --- test/ruby/test_io.rb (revision 65921) +++ test/ruby/test_io.rb (revision 65922) @@ -1360,6 +1360,7 @@ class TestIO < Test::Unit::TestCase https://github.com/ruby/ruby/blob/trunk/test/ruby/test_io.rb#L1360 def test_readpartial_lock with_pipe do |r, w| s = "" + r.nonblock = false if have_nonblock? t = Thread.new { r.readpartial(5, s) } Thread.pass until t.stop? assert_raise(RuntimeError) { s.clear } Index: test/ruby/test_process.rb =================================================================== --- test/ruby/test_process.rb (revision 65921) +++ test/ruby/test_process.rb (revision 65922) @@ -770,6 +770,15 @@ class TestProcess < Test::Unit::TestCase https://github.com/ruby/ruby/blob/trunk/test/ruby/test_process.rb#L770 Process.wait pid end } + + # ensure standard FDs we redirect to are blocking for compatibility + with_pipes(3) do |pipes| + src = 'p [STDIN,STDOUT,STDERR].map(&:nonblock?)' + rdr = { 0 => pipes[0][0], 1 => pipes[1][1], 2 => pipes[2][1] } + pid = spawn(RUBY, '-rio/nonblock', '-e', src, rdr) + assert_equal("[false, false, false]\n", pipes[1][0].gets) + Process.wait pid + end end end Index: io.c =================================================================== --- io.c (revision 65921) +++ io.c (revision 65922) @@ -316,6 +316,27 @@ rb_cloexec_dup2(int oldfd, int newfd) https://github.com/ruby/ruby/blob/trunk/io.c#L316 return ret; } +static int +rb_fd_set_nonblock(int fd) +{ +#ifdef _WIN32 + return rb_w32_set_nonblock(fd); +#elif defined(F_GETFL) + int err; + int oflags = fcntl(fd, F_GETFL); + + if (oflags == -1) + return -1; + if (oflags & O_NONBLOCK) + return 0; + oflags |= O_NONBLOCK; + err = fcntl(fd, F_SETFL, oflags); + if (err == -1) + return -1; +#endif + return 0; +} + int rb_cloexec_pipe(int fildes[2]) { @@ -324,7 +345,7 @@ rb_cloexec_pipe(int fildes[2]) https://github.com/ruby/ruby/blob/trunk/io.c#L345 #if defined(HAVE_PIPE2) static int try_pipe2 = 1; if (try_pipe2) { - ret = pipe2(fildes, O_CLOEXEC); + ret = pipe2(fildes, O_CLOEXEC | O_NONBLOCK); if (ret != -1) return ret; /* pipe2 is available since Linux 2.6.27, glibc 2.9. */ @@ -350,6 +371,8 @@ rb_cloexec_pipe(int fildes[2]) https://github.com/ruby/ruby/blob/trunk/io.c#L371 #endif rb_maygvl_fd_fix_cloexec(fildes[0]); rb_maygvl_fd_fix_cloexec(fildes[1]); + rb_fd_set_nonblock(fildes[0]); + rb_fd_set_nonblock(fildes[1]); return ret; } @@ -2696,27 +2719,9 @@ read_all(rb_io_t *fptr, long siz, VALUE https://github.com/ruby/ruby/blob/trunk/io.c#L2719 void rb_io_set_nonblock(rb_io_t *fptr) { -#ifdef _WIN32 - if (rb_w32_set_nonblock(fptr->fd) != 0) { + if (rb_fd_set_nonblock(fptr->fd) != 0) { rb_sys_fail_path(fptr->pathv); } -#else - int oflags; -#ifdef F_GETFL - oflags = fcntl(fptr->fd, F_GETFL); - if (oflags == -1) { - rb_sys_fail_path(fptr->pathv); - } -#else - oflags = 0; -#endif - if ((oflags & O_NONBLOCK) == 0) { - oflags |= O_NONBLOCK; - if (fcntl(fptr->fd, F_SETFL, oflags) == -1) { - rb_sys_fail_path(fptr->pathv); - } - } -#endif } struct read_internal_arg { Index: thread.c =================================================================== --- thread.c (revision 65921) +++ thread.c (revision 65922) @@ -4068,17 +4068,19 @@ rb_wait_for_single_fd(int fd, int events https://github.com/ruby/ruby/blob/trunk/thread.c#L4068 int result = 0, lerrno; rb_hrtime_t *to, rel, end = 0; int drained; - rb_thread_t *th = GET_THREAD(); nfds_t nfds; rb_unblock_function_t *ubf; + struct waiting_fd wfd; - RUBY_VM_CHECK_INTS_BLOCKING(th->ec); + wfd.th = GET_THREAD(); + wfd.fd = fd; + RUBY_VM_CHECK_INTS_BLOCKING(wfd.th->ec); timeout_prepare(&to, &rel, &end, timeout); fds[0].fd = fd; fds[0].events = (short)events; do { fds[0].revents = 0; - fds[1].fd = rb_sigwait_fd_get(th); + fds[1].fd = rb_sigwait_fd_get(wfd.th); if (fds[1].fd >= 0) { fds[1].events = POLLIN; @@ -4092,27 +4094,29 @@ rb_wait_for_single_fd(int fd, int events https://github.com/ruby/ruby/blob/trunk/thread.c#L4094 } lerrno = 0; - BLOCKING_REGION(th, { + list_add(&wfd.th->vm->waiting_fds, &wfd.wfd_node); + BLOCKING_REGION(wfd.th, { const rb_hrtime_t *sto; struct timespec ts; - sto = sigwait_timeout(th, fds[1].fd, to, &drained); - if (!RUBY_VM_INTERRUPTED(th->ec)) { + sto = sigwait_timeout(wfd.th, fds[1].fd, to, &drained); + if (!RUBY_VM_INTERRUPTED(wfd.th->ec)) { result = ppoll(fds, nfds, rb_hrtime2timespec(&ts, sto), NULL); if (result < 0) lerrno = errno; } - }, ubf, th, TRUE); + }, ubf, wfd.th, TRUE); + list_del(&wfd.wfd_node); if (fds[1].fd >= 0) { if (result > 0 && fds[1].revents) { result--; fds[1].revents = 0; } - (void)check_signals_nogvl(th, fds[1].fd); - rb_sigwait_fd_put(th, fds[1].fd); - rb_sigwait_fd_migrate(th->vm); + (void)check_signals_nogvl(wfd.th, fds[1].fd); + rb_sigwait_fd_put(wfd.th, fds[1].fd); + rb_sigwait_fd_migrate(wfd.th->vm); } - RUBY_VM_CHECK_INTS_BLOCKING(th->ec); + RUBY_VM_CHECK_INTS_BLOCKING(wfd.th->ec); } while (wait_retryable(&result, lerrno, to, end)); if (result < 0) { @@ -4152,6 +4156,7 @@ struct select_args { https://github.com/ruby/ruby/blob/trunk/thread.c#L4156 rb_fdset_t *read; rb_fdset_t *write; rb_fdset_t *except; + struct waiting_fd wfd; struct timeval *tv; }; @@ -4182,6 +4187,7 @@ select_single_cleanup(VALUE ptr) https://github.com/ruby/ruby/blob/trunk/thread.c#L4187 { struct select_args *args = (struct select_args *)ptr; + list_del(&args->wfd.wfd_node); if (args->read) rb_fd_term(args->read); if (args->write) rb_fd_term(args->write); if (args->except) rb_fd_term(args->except); @@ -4202,7 +4208,10 @@ rb_wait_for_single_fd(int fd, int events https://github.com/ruby/ruby/blob/trunk/thread.c#L4208 args.write = (events & RB_WAITFD_OUT) ? init_set_fd(fd, &wfds) : NULL; args.except = (events & RB_WAITFD_PRI) ? init_set_fd(fd, &efds) : NULL; args.tv = tv; + args.wfd.fd = fd; + args.wfd.th = GET_THREAD(); + list_add(&args.wfd.th->vm->waiting_fds, &args.wfd.wfd_node); r = (int)rb_ensure(select_single, ptr, select_single_cleanup, ptr); if (r == -1) errno = args.as.error; Index: spec/ruby/core/io/write_nonblock_spec.rb =================================================================== --- spec/ruby/core/io/write_nonblock_spec.rb (revision 65921) +++ spec/ruby/core/io/write_nonblock_spec.rb (revision 65922) @@ -76,7 +76,6 @@ describe 'IO#write_nonblock' do https://github.com/ruby/ruby/blob/trunk/spec/ruby/core/io/write_nonblock_spec.rb#L76 platform_is_not :windows do it 'sets the IO in nonblock mode' do require 'io/nonblock' - @write.nonblock?.should == false @write.write_nonblock('a') @write.nonblock?.should == true end Index: spec/ruby/core/io/read_nonblock_spec.rb =================================================================== --- spec/ruby/core/io/read_nonblock_spec.rb (revision 65921) +++ spec/ruby/core/io/read_nonblock_spec.rb (revision 65922) @@ -44,7 +44,6 @@ describe "IO#read_nonblock" do https://github.com/ruby/ruby/blob/trunk/spec/ruby/core/io/read_nonblock_spec.rb#L44 platform_is_not :windows do it 'sets the IO in nonblock mode' do require 'io/nonblock' - @read.nonblock?.should == false @write.write "abc" @read.read_nonblock(1).should == "a" @read.nonblock?.should == true Index: spec/ruby/library/socket/socket/connect_spec.rb =================================================================== --- spec/ruby/library/socket/socket/connect_spec.rb (revision 65921) +++ spec/ruby/library/socket/socket/connect_spec.rb (revision 65922) @@ -34,6 +34,12 @@ describe 'Socket#connect' do https://github.com/ruby/ruby/blob/trunk/spec/ruby/library/socket/socket/connect_spec.rb#L34 lambda { @client.connect(@server.getsockname) + + # A second call needed if non-blocking sockets become default + # XXX honestly I don't expect any real code to care about this spec + # as it's too implementation-dependent and checking for connect() + # errors is futile anyways because of TOCTOU + @client.connect(@server.getsockname) }.should raise_error(Errno::EISCONN) end Index: win32/win32.c =================================================================== --- win32/win32.c (revision 65921) +++ win32/win32.c (revision 65922) @@ -4429,11 +4429,11 @@ fcntl(int fd, int cmd, ...) https://github.com/ruby/ruby/blob/trunk/win32/win32.c#L4429 /* License: Ruby's */ int -rb_w32_set_nonblock(int fd) +rb_w32_set_nonblock2(int fd, int nonblock) { SOCKET sock = TO_SOCKET(fd); if (is_socket(sock)) { - return setfl(sock, O_NONBLOCK); + return setfl(sock, nonblock ? O_NONBLOCK : 0); } else if (is_pipe(sock)) { DWORD state; @@ -4441,7 +4441,12 @@ rb_w32_set_nonblock(int fd) https://github.com/ruby/ruby/blob/trunk/win32/win32.c#L4441 errno = map_errno(GetLastError()); return -1; } - state |= PIPE_NOWAIT; + if (nonblock) { + state |= PIPE_NOWAIT; + } + else { + state &= ~PIPE_NOWAIT; + } if (!SetNamedPipeHandleState((HANDLE)sock, &state, NULL, NULL)) { errno = map_errno(GetLastError()); return -1; @@ -4454,6 +4459,12 @@ rb_w32_set_nonblock(int fd) https://github.com/ruby/ruby/blob/trunk/win32/win32.c#L4459 } } +int +rb_w32_set_nonblock(int fd) +{ + return rb_w32_set_nonblock2(fd, TRUE); +} + #ifndef WNOHANG #define WNOHANG -1 #endif Index: process.c =================================================================== --- process.c (revision 65921) +++ process.c (revision 65922) @@ -1474,6 +1474,39 @@ before_exec_non_async_signal_safe(void) https://github.com/ruby/ruby/blob/trunk/process.c#L1474 rb_thread_stop_timer_thread(); } +#define WRITE_CONST(fd, str) (void)(write((fd),(str),sizeof(str)-1)<0) +#ifdef _WIN32 +int rb_w32_set_nonblock2(int fd, int nonblock); +#endif + +static int +set_blocking(int fd) +{ +#ifdef _WIN32 + return rb_w32_set_nonblock2(fd, 0); +#elif defined(F_GETFL) && defined(F_SETFL) + int fl = fcntl(fd, F_GETFL); /* async-signal-safe */ + + /* EBADF ought to be possible */ + if (fl == -1) return fl; + if (fl & O_NONBLOCK) { + fl &= ~O_NONBLOCK; + return fcntl(fd, F_SETFL, fl); + } + return 0; +#endif +} + +static void +stdfd_clear_nonblock(void) +{ + /* many programs cannot deal with non-blocking stdin/stdout/stderr */ + int fd; + for (fd = 0; fd < 3; fd++) { + (void)set_blocking(fd); /* can't do much about errors anyhow */ + } +} + static void before_exec(void) { @@ -3445,6 +3478,11 @@ rb_execarg_run_options(const struct rb_e https://github.com/ruby/ruby/blob/trunk/process.c#L3478 rb_execarg_allocate_dup2_tmpbuf(sargp, RARRAY_LEN(ary)); } } + { + int preserve = errno; + stdfd_clear_nonblock(); + errno = preserve; + } return 0; } @@ -3645,6 +3683,12 @@ read_retry(int fd, void *buf, size_t len https://github.com/ruby/ruby/blob/trunk/process.c#L3683 { ssize_t r; + if (set_blocking(fd) != 0) { +#ifndef _WIN32 + rb_async_bug_errno("set_blocking failed reading child error", errno); +#endif + } + do { r = read(fd, buf, len); } while (r < 0 && errno == EINTR); Index: ext/socket/socket.c =================================================================== --- ext/socket/socket.c (revision 65921) +++ ext/socket/socket.c (revision 65922) @@ -177,14 +177,14 @@ rsock_socketpair0(int domain, int type, https://github.com/ruby/ruby/blob/trunk/ext/socket/socket.c#L177 static int cloexec_state = -1; /* <0: unknown, 0: ignored, >0: working */ if (cloexec_state > 0) { /* common path, if SOCK_CLOEXEC is defined */ - ret = socketpair(domain, type|SOCK_CLOEXEC, protocol, sv); + ret = socketpair(domain, type|SOCK_CLOEXEC|SOCK_NONBLOCK, protocol, sv); if (ret == 0 && (sv[0] <= 2 || sv[1] <= 2)) { goto fix_cloexec; /* highly unlikely */ } goto update_max_fd; } else if (cloexec_state < 0) { /* usually runs once only for detection */ - ret = socketpair(domain, type|SOCK_CLOEXEC, protocol, sv); + ret = socketpair(domain, type|SOCK_CLOEXEC|SOCK_NONBLOCK, protocol, sv); if (ret == 0) { cloexec_state = rsock_detect_cloexec(sv[0]); if ((cloexec_state == 0) || (sv[0] <= 2 || sv[1] <= 2)) @@ -213,6 +213,8 @@ rsock_socketpair0(int domain, int type, https://github.com/ruby/ruby/blob/trunk/ext/socket/socket.c#L213 fix_cloexec: rb_maygvl_fd_fix_cloexec(sv[0]); rb_maygvl_fd_fix_cloexec(sv[1]); + rsock_make_fd_nonblock(sv[0]); + rsock_make_fd_nonblock(sv[1]); update_max_fd: rb_update_max_fd(sv[0]); @@ -231,6 +233,8 @@ rsock_socketpair0(int domain, int type, https://github.com/ruby/ruby/blob/trunk/ext/socket/socket.c#L233 rb_fd_fix_cloexec(sv[0]); rb_fd_fix_cloexec(sv[1]); + rsock_make_fd_nonblock(sv[0]); + rsock_make_fd_nonblock(sv[1]); return ret; } #endif /* !SOCK_CLOEXEC */ Index: ext/socket/rubysocket.h =================================================================== --- ext/socket/rubysocket.h (revision 65921) +++ ext/socket/rubysocket.h (revision 65922) @@ -433,6 +433,8 @@ static inline void rsock_maybe_wait_fd(i https://github.com/ruby/ruby/blob/trunk/ext/socket/rubysocket.h#L433 VALUE rsock_read_nonblock(VALUE sock, VALUE length, VALUE buf, VALUE ex); VALUE rsock_write_nonblock(VALUE sock, VALUE buf, VALUE ex); +void rsock_make_fd_nonblock(int fd); + #if !defined HAVE_INET_NTOP && ! defined _WIN32 const char *inet_ntop(int, const void *, char *, size_t); #elif defined __MINGW32__ Index: ext/socket/init.c =================================================================== --- ext/socket/init.c (revision 65921) +++ ext/socket/init.c (revision 65922) @@ -435,7 +435,7 @@ rsock_socket0(int domain, int type, int https://github.com/ruby/ruby/blob/trunk/ext/socket/init.c#L435 static int cloexec_state = -1; /* <0: unknown, 0: ignored, >0: working */ if (cloexec_state > 0) { /* common path, if SOCK_CLOEXEC is defined */ - ret = socket(domain, type|SOCK_CLOEXEC, proto); + ret = socket(domain, type|SOCK_CLOEXEC|SOCK_NONBLOCK, proto); if (ret >= 0) { if (ret <= 2) goto fix_cloexec; @@ -443,7 +443,7 @@ rsock_socket0(int domain, int type, int https://github.com/ruby/ruby/blob/trunk/ext/socket/init.c#L443 } } else if (cloexec_state < 0) { /* usually runs once only for detection */ - ret = socket(domain, type|SOCK_CLOEXEC, proto); + ret = socket(domain, type|SOCK_CLOEXEC|SOCK_NONBLOCK, proto); if (ret >= 0) { cloexec_state = rsock_detect_cloexec(ret); if (cloexec_state == 0 || ret <= 2) @@ -466,6 +466,7 @@ rsock_socket0(int domain, int type, int https://github.com/ruby/ruby/blob/trunk/ext/socket/init.c#L466 return -1; fix_cloexec: rb_maygvl_fd_fix_cloexec(ret); + rsock_make_fd_nonblock(ret); update_max_fd: rb_update_max_fd(ret); @@ -632,8 +633,8 @@ rsock_connect(int fd, const struct socka https://github.com/ruby/ruby/blob/trunk/ext/socket/init.c#L633 return status; } -static void -make_fd_nonblock(int fd) +void +rsock_make_fd_nonblock(int fd) { int flags; #ifdef F_GETFL @@ -659,6 +660,7 @@ cloexec_accept(int socket, struct sockad https://github.com/ruby/ruby/blob/trunk/ext/socket/init.c#L660 #ifdef HAVE_ACCEPT4 static int try_accept4 = 1; #endif + nonblock = 1; /* TODO remove parameter */ if (address_len) len0 = *address_len; #ifdef HAVE_ACCEPT4 if (try_accept4) { @@ -678,7 +680,7 @@ cloexec_accept(int socket, struct sockad https://github.com/ruby/ruby/blob/trunk/ext/socket/init.c#L680 rb_maygvl_fd_fix_cloexec(ret); #ifndef SOCK_NONBLOCK if (nonblock) { - make_fd_nonblock(ret); + rsock_make_fd_nonblock(ret); } #endif if (address_len && len0 < *address_len) *address_len = len0; @@ -695,7 +697,7 @@ cloexec_accept(int socket, struct sockad https://github.com/ruby/ruby/blob/trunk/ext/socket/init.c#L697 if (address_len && len0 < *address_len) *address_len = len0; rb_maygvl_fd_fix_cloexec(ret); if (nonblock) { - make_fd_nonblock(ret); + rsock_make_fd_nonblock(ret); } return ret; } -- ML: ruby-changes@q... Info: http://www.atdot.net/~ko1/quickml/