[前][次][番号順一覧][スレッド一覧]

ruby-changes:70440

From: Samuel <ko1@a...>
Date: Thu, 23 Dec 2021 08:20:23 +0900 (JST)
Subject: [ruby-changes:70440] bed920f073 (master): Add fiber scheduler hooks for `pread`/`pwrite`, and add support to `IO::Buffer`.

https://git.ruby-lang.org/ruby.git/commit/?id=bed920f073

From bed920f0731a1a89a0e5fc7a7428d21be3ffb8a0 Mon Sep 17 00:00:00 2001
From: Samuel Williams <samuel.williams@o...>
Date: Thu, 23 Dec 2021 12:20:09 +1300
Subject: Add fiber scheduler hooks for `pread`/`pwrite`, and add support to
 `IO::Buffer`.

---
 common.mk                      |   1 +
 include/ruby/fiber/scheduler.h |  26 +++++++
 include/ruby/io/buffer.h       |   6 ++
 io_buffer.c                    | 173 +++++++++++++++++++++++++++++++++++++++++
 scheduler.c                    |  27 ++++++-
 test/ruby/test_io_buffer.rb    |  57 ++++++++++++++
 6 files changed, 288 insertions(+), 2 deletions(-)

diff --git a/common.mk b/common.mk
index 8fc0a590f8a..8791069af7c 100644
--- a/common.mk
+++ b/common.mk
@@ -7585,6 +7585,7 @@ io_buffer.$(OBJEXT): {$(VPATH)}backward/2/stdarg.h https://github.com/ruby/ruby/blob/trunk/common.mk#L7585
 io_buffer.$(OBJEXT): {$(VPATH)}config.h
 io_buffer.$(OBJEXT): {$(VPATH)}defines.h
 io_buffer.$(OBJEXT): {$(VPATH)}encoding.h
+io_buffer.$(OBJEXT): {$(VPATH)}fiber/scheduler.h
 io_buffer.$(OBJEXT): {$(VPATH)}intern.h
 io_buffer.$(OBJEXT): {$(VPATH)}internal.h
 io_buffer.$(OBJEXT): {$(VPATH)}internal/anyargs.h
diff --git a/include/ruby/fiber/scheduler.h b/include/ruby/fiber/scheduler.h
index ff587e28c09..a255a1a712c 100644
--- a/include/ruby/fiber/scheduler.h
+++ b/include/ruby/fiber/scheduler.h
@@ -261,6 +261,32 @@ VALUE rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t https://github.com/ruby/ruby/blob/trunk/include/ruby/fiber/scheduler.h#L261
  */
 VALUE rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t length);
 
+/**
+ * Nonblocking read from the passed IO at the specified offset.
+ *
+ * @param[in]   scheduler    Target scheduler.
+ * @param[out]  io           An io object to read from.
+ * @param[out]  buffer       Return buffer.
+ * @param[in]   length       Requested number of bytes to read.
+ * @param[in]   offset       The offset in the given IO to read the data from.
+ * @retval      RUBY_Qundef  `scheduler` doesn't have `#io_read`.
+ * @return      otherwise    What `scheduler.io_read` returns.
+ */
+VALUE rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, VALUE buffer, size_t length, off_t offset);
+
+/**
+ * Nonblocking write to the passed IO at the specified offset.
+ *
+ * @param[in]   scheduler    Target scheduler.
+ * @param[out]  io           An io object to write to.
+ * @param[in]   buffer       What to write.
+ * @param[in]   length       Number of bytes to write.
+ * @param[in]   offset       The offset in the given IO to write the data to.
+ * @retval      RUBY_Qundef  `scheduler` doesn't have `#io_write`.
+ * @return      otherwise    What `scheduler.io_write` returns.
+ */
+VALUE rb_fiber_scheduler_io_pwrite(VALUE scheduler, VALUE io, VALUE buffer, size_t length, off_t offset);
+
 /**
  * Nonblocking read from the passed IO using a native buffer.
  *
diff --git a/include/ruby/io/buffer.h b/include/ruby/io/buffer.h
index 4826a7a76f6..907fec20bb4 100644
--- a/include/ruby/io/buffer.h
+++ b/include/ruby/io/buffer.h
@@ -80,6 +80,12 @@ VALUE rb_io_buffer_transfer(VALUE self); https://github.com/ruby/ruby/blob/trunk/include/ruby/io/buffer.h#L80
 void rb_io_buffer_resize(VALUE self, size_t size);
 void rb_io_buffer_clear(VALUE self, uint8_t value, size_t offset, size_t length);
 
+// The length is the minimum required length.
+VALUE rb_io_buffer_read(VALUE self, VALUE io, size_t length);
+VALUE rb_io_buffer_pread(VALUE self, VALUE io, size_t length, off_t offset);
+VALUE rb_io_buffer_write(VALUE self, VALUE io, size_t length);
+VALUE rb_io_buffer_pwrite(VALUE self, VALUE io, size_t length, off_t offset);
+
 RBIMPL_SYMBOL_EXPORT_END()
 
 #endif  /* RUBY_IO_BUFFER_T */
diff --git a/io_buffer.c b/io_buffer.c
index 4487cab7736..fa28c59b329 100644
--- a/io_buffer.c
+++ b/io_buffer.c
@@ -8,6 +8,7 @@ https://github.com/ruby/ruby/blob/trunk/io_buffer.c#L8
 
 #include "ruby/io.h"
 #include "ruby/io/buffer.h"
+#include "ruby/fiber/scheduler.h"
 
 #include "internal.h"
 #include "internal/string.h"
@@ -1864,6 +1865,172 @@ size_t io_buffer_default_size(size_t page_size) { https://github.com/ruby/ruby/blob/trunk/io_buffer.c#L1865
     return platform_agnostic_default_size;
 }
 
+VALUE
+rb_io_buffer_read(VALUE self, VALUE io, size_t length)
+{
+    VALUE scheduler = rb_fiber_scheduler_current();
+    if (scheduler != Qnil) {
+        VALUE result = rb_fiber_scheduler_io_read(scheduler, io, self, length);
+
+        if (result != Qundef) {
+            return result;
+        }
+    }
+
+    struct rb_io_buffer *data = NULL;
+    TypedData_Get_Struct(self, struct rb_io_buffer, &rb_io_buffer_type, data);
+
+    io_buffer_validate_range(data, 0, length);
+
+    int descriptor = rb_io_descriptor(io);
+
+    void * base;
+    size_t size;
+    io_buffer_get_bytes_for_writing(data, &base, &size);
+
+    ssize_t result = read(descriptor, base, size);
+
+    return rb_fiber_scheduler_io_result(result, errno);
+}
+
+static VALUE
+io_buffer_read(VALUE self, VALUE io, VALUE length)
+{
+    return rb_io_buffer_read(self, io, RB_NUM2SIZE(length));
+}
+
+VALUE
+rb_io_buffer_pread(VALUE self, VALUE io, size_t length, off_t offset)
+{
+    VALUE scheduler = rb_fiber_scheduler_current();
+    if (scheduler != Qnil) {
+        VALUE result = rb_fiber_scheduler_io_pread(scheduler, io, self, length, offset);
+
+        if (result != Qundef) {
+            return result;
+        }
+    }
+
+    struct rb_io_buffer *data = NULL;
+    TypedData_Get_Struct(self, struct rb_io_buffer, &rb_io_buffer_type, data);
+
+    io_buffer_validate_range(data, 0, length);
+
+    int descriptor = rb_io_descriptor(io);
+
+    void * base;
+    size_t size;
+    io_buffer_get_bytes_for_writing(data, &base, &size);
+
+#if defined(HAVE_PREAD)
+    ssize_t result = pread(descriptor, base, size, offset);
+#else
+    // This emulation is not thread safe, but the GVL means it's unlikely to be a problem.
+    off_t current_offset = lseek(descriptor, 0, SEEK_CUR);
+    if (current_offset == (off_t)-1)
+        return rb_fiber_scheduler_io_result(-1, errno);
+
+    if (lseek(descriptor, offset, SEEK_SET) == (off_t)-1)
+        return rb_fiber_scheduler_io_result(-1, errno);
+
+    ssize_t result = read(descriptor, base, size);
+
+    if (lseek(descriptor, current_offset, SEEK_SET) == (off_t)-1)
+        return rb_fiber_scheduler_io_result(-1, errno);
+#endif
+
+    return rb_fiber_scheduler_io_result(result, errno);
+}
+
+static VALUE
+io_buffer_pread(VALUE self, VALUE io, VALUE length, VALUE offset)
+{
+    return rb_io_buffer_pread(self, io, RB_NUM2SIZE(length), NUM2OFFT(offset));
+}
+
+VALUE
+rb_io_buffer_write(VALUE self, VALUE io, size_t length)
+{
+    VALUE scheduler = rb_fiber_scheduler_current();
+    if (scheduler != Qnil) {
+        VALUE result = rb_fiber_scheduler_io_write(scheduler, io, self, length);
+
+        if (result != Qundef) {
+            return result;
+        }
+    }
+
+    struct rb_io_buffer *data = NULL;
+    TypedData_Get_Struct(self, struct rb_io_buffer, &rb_io_buffer_type, data);
+
+    io_buffer_validate_range(data, 0, length);
+
+    int descriptor = rb_io_descriptor(io);
+
+    const void * base;
+    size_t size;
+    io_buffer_get_bytes_for_reading(data, &base, &size);
+
+    ssize_t result = write(descriptor, base, length);
+
+    return rb_fiber_scheduler_io_result(result, errno);
+}
+
+static VALUE
+io_buffer_write(VALUE self, VALUE io, VALUE length)
+{
+    return rb_io_buffer_write(self, io, RB_NUM2SIZE(length));
+}
+
+VALUE
+rb_io_buffer_pwrite(VALUE self, VALUE io, size_t length, off_t offset)
+{
+    VALUE scheduler = rb_fiber_scheduler_current();
+    if (scheduler != Qnil) {
+        VALUE result = rb_fiber_scheduler_io_pwrite(scheduler, io, self, length, OFFT2NUM(offset));
+
+        if (result != Qundef) {
+            return result;
+        }
+    }
+
+    struct rb_io_buffer *data = NULL;
+    TypedData_Get_Struct(self, struct rb_io_buffer, &rb_io_buffer_type, data);
+
+    io_buffer_validate_range(data, 0, length);
+
+    int descriptor = rb_io_descriptor(io);
+
+    const void * base;
+    size_t size;
+    io_buffer_get_bytes_for_reading(data, &base, &size);
+
+#if defined(HAVE_PWRITE)
+    ssize_t result = pwrite(descriptor, base, length, offset);
+#else
+    // This emulation is not thread safe, but the GVL means it's unlikely to be a problem.
+    off_t current_offset = lseek(descriptor, 0, SEEK_CUR);
+    if (current_offset == (off_t)-1)
+        return rb_fiber_scheduler_io_result(-1, errno);
+
+    if (lseek(descriptor, offset, SEEK_SET) == (off_t)-1)
+        return rb_fiber_scheduler_io_result(-1, errno);
+
+    ssize_t result = write(descriptor, base, length);
+
+    if (lseek(descriptor, current_offset, SEEK_SET) == (off_t)-1)
+        return rb_fiber_scheduler_io_result(-1, errno);
+#endif
+
+    return rb_fiber_scheduler_io_result(result, errno);
+}
+
+static VALUE
+io_buffer_pwrite(VALUE self, VALUE io, VALUE length, VALUE offset)
+{
+    return rb_io_buffer_pwrite(self, io, RB_NUM2SIZE(length), NUM2OFFT(offset));
+}
+
 /*
  *  Document-class: IO::Buffer
  *
@@ -2038,4 +2205,10 @@ Init_IO_Buffer(void) https://github.com/ruby/ruby/blob/trunk/io_buffer.c#L2205
 
     rb_define_method(rb_cIOBuffer, "get_string", io_buffer_get_string, -1);
     rb_define_method(rb_cIOBuffer, "set_string", io_buffer_set_string, -1);
+
+    // IO operations:
+    rb_define_method(rb_cIOBuffer, "read", io_buffer_read, 2);
+    rb_define_method(rb_cIOBuffer, "pread", io_buffer_pread, 3);
+    rb_define_method(rb_cIOBuffer, "write", io_buffer_write, 2);
+    rb_define_method(rb_cIOBuffer, "pwrite", io_buffer_pwrite, 3);
 }
diff --git a/scheduler.c b/scheduler.c
index 51696ab18fc..06658356b17 100644
--- a/scheduler.c
+++ b/scheduler.c
@@ -25,8 +25,8 @@ static ID id_timeout_after; https://github.com/ruby/ruby/blob/trunk/scheduler.c#L25
 static ID id_kernel_sleep;
 static ID id_process_wait;
 
-static ID id_io_read;
 (... truncated)

--
ML: ruby-changes@q...
Info: http://www.atdot.net/~ko1/quickml/

[前][次][番号順一覧][スレッド一覧]