[前][次][番号順一覧][スレッド一覧]

ruby-changes:71985

From: Samuel <ko1@a...>
Date: Sat, 28 May 2022 12:44:39 +0900 (JST)
Subject: [ruby-changes:71985] ce23cfa518 (master): Make `io_binwrite` atomic.

https://git.ruby-lang.org/ruby.git/commit/?id=ce23cfa518

From ce23cfa5182bb53e8b6555fb6a5b2846cd559922 Mon Sep 17 00:00:00 2001
From: Samuel Williams <samuel.williams@o...>
Date: Sun, 9 Jan 2022 01:47:51 +1300
Subject: Make `io_binwrite` atomic.

---
 io.c | 212 +++++++++++++++++++++++++++++++++++++++----------------------------
 1 file changed, 124 insertions(+), 88 deletions(-)

diff --git a/io.c b/io.c
index dab1d945f9..697d571c4a 100644
--- a/io.c
+++ b/io.c
@@ -1581,71 +1581,111 @@ struct write_arg { https://github.com/ruby/ruby/blob/trunk/io.c#L1581
 
 #ifdef HAVE_WRITEV
 static VALUE
-io_binwrite_string(VALUE arg)
+io_binwrite_string_internal(rb_io_t *fptr, const char *ptr, long length)
 {
-    struct binwrite_arg *p = (struct binwrite_arg *)arg;
-    rb_io_t *fptr = p->fptr;
-    long r;
-
     if (fptr->wbuf.len) {
-	struct iovec iov[2];
+        struct iovec iov[2];
 
-	iov[0].iov_base = fptr->wbuf.ptr+fptr->wbuf.off;
-	iov[0].iov_len = fptr->wbuf.len;
-	iov[1].iov_base = (char *)p->ptr;
-	iov[1].iov_len = p->length;
+        iov[0].iov_base = fptr->wbuf.ptr+fptr->wbuf.off;
+        iov[0].iov_len = fptr->wbuf.len;
+        iov[1].iov_base = (void*)ptr;
+        iov[1].iov_len = length;
 
-	r = rb_writev_internal(fptr, iov, 2);
+        long result = rb_writev_internal(fptr, iov, 2);
 
-        if (r < 0)
-            return r;
+        if (result < 0)
+            return result;
 
-	if (fptr->wbuf.len <= r) {
-	    r -= fptr->wbuf.len;
-	    fptr->wbuf.off = 0;
-	    fptr->wbuf.len = 0;
-	}
-	else {
-	    fptr->wbuf.off += (int)r;
-	    fptr->wbuf.len -= (int)r;
-	    r = 0L;
-	}
+        if (result >= fptr->wbuf.len) {
+            // We wrote more than the internal buffer:
+            result -= fptr->wbuf.len;
+            fptr->wbuf.off = 0;
+            fptr->wbuf.len = 0;
+        }
+        else {
+            // We only wrote less data than the internal buffer:
+            fptr->wbuf.off += (int)result;
+            fptr->wbuf.len -= (int)result;
+
+            return 0L;
+        }
+
+        return result;
     }
     else {
-	r = rb_write_internal(fptr, p->ptr, p->length);
+        return rb_write_internal(fptr, ptr, length);
     }
-
-    return r;
 }
 #else
+static VALUE
+io_binwrite_string_internal(rb_io_t *fptr, const char *ptr, long length)
+{
+    long remaining = length;
+
+    if (fptr->wbuf.len) {
+        if (fptr->wbuf.len+length <= fptr->wbuf.capa) {
+            if (fptr->wbuf.capa < fptr->wbuf.off+fptr->wbuf.len+length) {
+                MEMMOVE(fptr->wbuf.ptr, fptr->wbuf.ptr+fptr->wbuf.off, char, fptr->wbuf.len);
+                fptr->wbuf.off = 0;
+            }
+
+            MEMMOVE(fptr->wbuf.ptr+fptr->wbuf.off+fptr->wbuf.len, ptr, char, length);
+            fptr->wbuf.len += (int)length;
+
+            // We copied the entire incoming data to the internal buffer:
+            remaining = 0;
+        }
+
+        // Flush the internal buffer:
+        if (io_fflush(fptr) < 0) {
+            return -1;
+        }
+
+        // If all the data was buffered, we are done:
+        if (remaining == 0) {
+            return length;
+        }
+    }
+
+    // Otherwise, we should write the data directly:
+    return rb_write_internal(fptr, ptr, length);
+}
+#endif
+
 static VALUE
 io_binwrite_string(VALUE arg)
 {
     struct binwrite_arg *p = (struct binwrite_arg *)arg;
-    rb_io_t *fptr = p->fptr;
-    long l, len;
 
-    l = len = p->length;
+    const char *ptr = p->ptr;
+    long remaining = p->length;
 
-    if (fptr->wbuf.len) {
-	if (fptr->wbuf.len+len <= fptr->wbuf.capa) {
-	    if (fptr->wbuf.capa < fptr->wbuf.off+fptr->wbuf.len+len) {
-		MEMMOVE(fptr->wbuf.ptr, fptr->wbuf.ptr+fptr->wbuf.off, char, fptr->wbuf.len);
-		fptr->wbuf.off = 0;
-	    }
-	    MEMMOVE(fptr->wbuf.ptr+fptr->wbuf.off+fptr->wbuf.len, p->ptr, char, len);
-	    fptr->wbuf.len += (int)len;
-	    l = 0;
-	}
-	if (io_fflush(fptr) < 0)
-	    return -2L; /* fail in fflush */
-	if (l == 0)
-	    return len;
+    while (remaining) {
+        // Write as much as possible:
+        long result = (long)io_binwrite_string_internal(p->fptr, ptr, remaining);
+
+        // Finished:
+        if (result == remaining) {
+            break;
+        }
+
+        if (result >= 0) {
+            ptr += result;
+            remaining -= result;
+            errno = EAGAIN;
+        }
+
+        // Wait for it to become writable:
+        if (rb_io_maybe_wait_writable(errno, p->fptr->self, Qnil)) {
+            rb_io_check_closed(p->fptr);
+        } else {
+            // The error was unrelated to waiting for it to become writable, so we fail:
+            return -1;
+        }
     }
 
-    return rb_write_internal(p->fptr, p->ptr, p->length);
+    return p->length;
 }
-#endif
 
 inline static void
 io_allocate_write_buffer(rb_io_t *fptr, int sync)
@@ -1660,65 +1700,57 @@ io_allocate_write_buffer(rb_io_t *fptr, int sync) https://github.com/ruby/ruby/blob/trunk/io.c#L1700
     }
 }
 
+static inline int
+io_binwrite_requires_flush_write(rb_io_t *fptr, long len, int nosync)
+{
+    // If the requested operation was synchronous and the output mode is synchronus or a TTY:
+    if (!nosync && (fptr->mode & (FMODE_SYNC|FMODE_TTY)))
+        return 1;
+
+    // If the amount of data we want to write exceeds the internal buffer:
+    if (fptr->wbuf.ptr && fptr->wbuf.capa <= fptr->wbuf.len + len)
+        return 1;
+
+    // Otherwise, we can append to the internal buffer:
+    return 0;
+}
+
 static long
 io_binwrite(VALUE str, const char *ptr, long len, rb_io_t *fptr, int nosync)
 {
-    long n, r, offset = 0;
+    if (len <= 0) return len;
 
-    /* don't write anything if current thread has a pending interrupt. */
+    // Don't write anything if current thread has a pending interrupt:
     rb_thread_check_ints();
 
-    if ((n = len) <= 0) return n;
-
     io_allocate_write_buffer(fptr, !nosync);
 
-    if ((!nosync && (fptr->mode & (FMODE_SYNC|FMODE_TTY))) ||
-        (fptr->wbuf.ptr && fptr->wbuf.capa <= fptr->wbuf.len + len)) {
+    if (io_binwrite_requires_flush_write(fptr, len, nosync)) {
         struct binwrite_arg arg;
 
         arg.fptr = fptr;
         arg.str = str;
-      retry:
-        arg.ptr = ptr + offset;
-        arg.length = n;
+        arg.ptr = ptr;
+        arg.length = len;
 
         if (fptr->write_lock) {
-            r = rb_mutex_synchronize(fptr->write_lock, io_binwrite_string, (VALUE)&arg);
+            return rb_mutex_synchronize(fptr->write_lock, io_binwrite_string, (VALUE)&arg);
         }
         else {
-            r = io_binwrite_string((VALUE)&arg);
+            return io_binwrite_string((VALUE)&arg);
         }
-
-        /* xxx: other threads may modify given string. */
-        if (r == n) return len;
-        if (0 <= r) {
-            offset += r;
-            n -= r;
-            errno = EAGAIN;
+    } else {
+        if (fptr->wbuf.off) {
+            if (fptr->wbuf.len)
+                MEMMOVE(fptr->wbuf.ptr, fptr->wbuf.ptr+fptr->wbuf.off, char, fptr->wbuf.len);
+            fptr->wbuf.off = 0;
         }
 
-        if (r == -2L)
-            return -1L;
-        if (rb_io_maybe_wait_writable(errno, fptr->self, Qnil)) {
-            rb_io_check_closed(fptr);
-
-            if (offset < len)
-                goto retry;
-        }
-
-        return -1L;
-    }
+        MEMMOVE(fptr->wbuf.ptr+fptr->wbuf.off+fptr->wbuf.len, ptr, char, len);
+        fptr->wbuf.len += (int)len;
 
-    if (fptr->wbuf.off) {
-        if (fptr->wbuf.len)
-            MEMMOVE(fptr->wbuf.ptr, fptr->wbuf.ptr+fptr->wbuf.off, char, fptr->wbuf.len);
-        fptr->wbuf.off = 0;
+        return len;
     }
-
-    MEMMOVE(fptr->wbuf.ptr+fptr->wbuf.off+fptr->wbuf.len, ptr+offset, char, len);
-    fptr->wbuf.len += (int)len;
-
-    return len;
 }
 
 # define MODE_BTMODE(a,b,c) ((fmode & FMODE_BINMODE) ? (b) : \
@@ -1792,15 +1824,17 @@ io_fwrite(VALUE str, rb_io_t *fptr, int nosync) https://github.com/ruby/ruby/blob/trunk/io.c#L1824
     VALUE tmp;
     long n, len;
     const char *ptr;
+
 #ifdef _WIN32
     if (fptr->mode & FMODE_TTY) {
-	long len = rb_w32_write_console(str, fptr->fd);
-	if (len > 0) return len;
+        long len = rb_w32_write_console(str, fptr->fd);
+        if (len > 0) return len;
     }
 #endif
+
     str = do_writeconv(str, fptr, &converted);
     if (converted)
-	OBJ_FREEZE(str);
+        OBJ_FREEZE(str);
 
     tmp = rb_str_tmp_frozen_acquire(str);
     RSTRING_GETMEM(tmp, ptr, len);
@@ -1830,10 +1864,12 @@ io_write(VALUE io, VALUE str, int nosync) https://github.com/ruby/ruby/blob/trunk/io.c#L1864
     io = GetWriteIO(io);
     str = rb_obj_as_string(str);
     tmp = rb_io_check_io(io);
+
     if (NIL_P(tmp)) {
-	/* port is not IO, call write method for it. */
-	return rb_funcall(io, id_write, 1, str);
+        /* port is not IO, call write method for it. */
+        return rb_funcall(io, id_write, 1, str);
     }
+
     io = tmp;
     if (RSTRING_LEN(str) == 0) return INT2FIX(0);
 
-- 
cgit v1.2.1


--
ML: ruby-changes@q...
Info: http://www.atdot.net/~ko1/quickml/

[前][次][番号順一覧][スレッド一覧]