[前][次][番号順一覧][スレッド一覧]

ruby-changes:61256

From: Samuel <ko1@a...>
Date: Thu, 14 May 2020 19:11:19 +0900 (JST)
Subject: [ruby-changes:61256] 0e3b0fcdba (master): Thread scheduler for light weight concurrency.

https://git.ruby-lang.org/ruby.git/commit/?id=0e3b0fcdba

From 0e3b0fcdba70cf96a8e0654eb8f50aacb8024bd4 Mon Sep 17 00:00:00 2001
From: Samuel Williams <samuel.williams@o...>
Date: Thu, 14 May 2020 22:10:55 +1200
Subject: Thread scheduler for light weight concurrency.


diff --git a/cont.c b/cont.c
index 1ea9056..a10e058 100644
--- a/cont.c
+++ b/cont.c
@@ -241,12 +241,17 @@ struct rb_fiber_struct { https://github.com/ruby/ruby/blob/trunk/cont.c#L241
      */
     unsigned int transferred : 1;
 
+    /* Whether the fiber is allowed to implicitly yield. */
+    unsigned int blocking : 1;
+
     struct coroutine_context context;
     struct fiber_pool_stack stack;
 };
 
 static struct fiber_pool shared_fiber_pool = {NULL, NULL, 0, 0, 0, 0};
 
+static ID fiber_initialize_keywords[2] = {0};
+
 /*
  * FreeBSD require a first (i.e. addr) argument of mmap(2) is not NULL
  * if MAP_STACK is passed.
@@ -1733,7 +1738,7 @@ fiber_alloc(VALUE klass) https://github.com/ruby/ruby/blob/trunk/cont.c#L1738
 }
 
 static rb_fiber_t*
-fiber_t_alloc(VALUE fiber_value)
+fiber_t_alloc(VALUE fiber_value, unsigned int blocking)
 {
     rb_fiber_t *fiber;
     rb_thread_t *th = GET_THREAD();
@@ -1746,6 +1751,7 @@ fiber_t_alloc(VALUE fiber_value) https://github.com/ruby/ruby/blob/trunk/cont.c#L1751
     fiber = ZALLOC(rb_fiber_t);
     fiber->cont.self = fiber_value;
     fiber->cont.type = FIBER_CONTEXT;
+    fiber->blocking = blocking;
     cont_init(&fiber->cont, th);
 
     fiber->cont.saved_ec.fiber_ptr = fiber;
@@ -1763,9 +1769,9 @@ fiber_t_alloc(VALUE fiber_value) https://github.com/ruby/ruby/blob/trunk/cont.c#L1769
 }
 
 static VALUE
-fiber_initialize(VALUE self, VALUE proc, struct fiber_pool * fiber_pool)
+fiber_initialize(VALUE self, VALUE proc, struct fiber_pool * fiber_pool, unsigned int blocking)
 {
-    rb_fiber_t *fiber = fiber_t_alloc(self);
+    rb_fiber_t *fiber = fiber_t_alloc(self, blocking);
 
     fiber->first_proc = proc;
     fiber->stack.base = NULL;
@@ -1793,17 +1799,66 @@ fiber_prepare_stack(rb_fiber_t *fiber) https://github.com/ruby/ruby/blob/trunk/cont.c#L1799
     sec->local_storage_recursive_hash_for_trace = Qnil;
 }
 
+static struct fiber_pool *
+rb_fiber_pool_default(VALUE pool)
+{
+    return &shared_fiber_pool;
+}
+
+/* :nodoc: */
+static VALUE
+rb_fiber_initialize_kw(int argc, VALUE* argv, VALUE self, int kw_splat)
+{
+    VALUE pool = Qnil;
+    VALUE blocking = Qtrue;
+
+    if (kw_splat != RB_NO_KEYWORDS) {
+      VALUE options = Qnil;
+      VALUE arguments[2] = {Qundef};
+
+      argc = rb_scan_args_kw(kw_splat, argc, argv, ":", &options);
+      rb_get_kwargs(options, fiber_initialize_keywords, 0, 2, arguments);
+
+      blocking = arguments[0];
+      pool = arguments[1];
+    }
+
+    return fiber_initialize(self, rb_block_proc(), rb_fiber_pool_default(pool), RTEST(blocking));
+}
+
 /* :nodoc: */
 static VALUE
 rb_fiber_initialize(int argc, VALUE* argv, VALUE self)
 {
-    return fiber_initialize(self, rb_block_proc(), &shared_fiber_pool);
+    return rb_fiber_initialize_kw(argc, argv, self, rb_keyword_given_p());
 }
 
 VALUE
 rb_fiber_new(rb_block_call_func_t func, VALUE obj)
 {
-    return fiber_initialize(fiber_alloc(rb_cFiber), rb_proc_new(func, obj), &shared_fiber_pool);
+    return fiber_initialize(fiber_alloc(rb_cFiber), rb_proc_new(func, obj), rb_fiber_pool_default(Qnil), 1);
+}
+
+static VALUE
+rb_f_fiber_kw(int argc, VALUE* argv, int kw_splat)
+{
+    rb_thread_t * th = GET_THREAD();
+    VALUE scheduler = th->scheduler;
+    VALUE fiber = Qnil;
+
+    if (scheduler != Qnil) {
+        fiber = rb_funcall_passing_block_kw(scheduler, rb_intern("fiber"), argc, argv, kw_splat);
+    } else {
+        rb_raise(rb_eRuntimeError, "No scheduler is available!");
+    }
+
+    return fiber;
+}
+
+static VALUE
+rb_f_fiber(int argc, VALUE *argv, VALUE obj)
+{
+    return rb_f_fiber_kw(argc, argv, rb_keyword_given_p());
 }
 
 static void rb_fiber_terminate(rb_fiber_t *fiber, int need_interrupt);
@@ -1820,6 +1875,10 @@ rb_fiber_start(void) https://github.com/ruby/ruby/blob/trunk/cont.c#L1875
     VM_ASSERT(th->ec == ruby_current_execution_context_ptr);
     VM_ASSERT(FIBER_RESUMED_P(fiber));
 
+    if (fiber->blocking) {
+        th->blocking += 1;
+    }
+
     EC_PUSH_TAG(th->ec);
     if ((state = EC_EXEC_TAG()) == TAG_NONE) {
         rb_context_t *cont = &VAR_FROM_MEMORY(fiber)->cont;
@@ -1892,6 +1951,7 @@ rb_threadptr_root_fiber_setup(rb_thread_t *th) https://github.com/ruby/ruby/blob/trunk/cont.c#L1951
     fiber->cont.type = FIBER_CONTEXT;
     fiber->cont.saved_ec.fiber_ptr = fiber;
     fiber->cont.saved_ec.thread_ptr = th;
+    fiber->blocking = 1;
     fiber_status_set(fiber, FIBER_RESUMED); /* skip CREATED */
     th->ec = &fiber->cont.saved_ec;
 }
@@ -2044,11 +2104,15 @@ fiber_switch(rb_fiber_t *fiber, int argc, const VALUE *argv, int is_resume, int https://github.com/ruby/ruby/blob/trunk/cont.c#L2104
         }
     }
 
+    VM_ASSERT(FIBER_RUNNABLE_P(fiber));
+
     if (is_resume) {
         fiber->prev = fiber_current();
     }
 
-    VM_ASSERT(FIBER_RUNNABLE_P(fiber));
+    if (fiber_current()->blocking) {
+        th->blocking -= 1;
+    }
 
     cont->argc = argc;
     cont->kw_splat = kw_splat;
@@ -2060,6 +2124,10 @@ fiber_switch(rb_fiber_t *fiber, int argc, const VALUE *argv, int is_resume, int https://github.com/ruby/ruby/blob/trunk/cont.c#L2124
         fiber_stack_release(fiber);
     }
 
+    if (fiber_current()->blocking) {
+        th->blocking += 1;
+    }
+
     RUBY_VM_CHECK_INTS(th->ec);
 
     EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_FIBER_SWITCH, th->self, 0, 0, 0, Qnil);
@@ -2073,6 +2141,12 @@ rb_fiber_transfer(VALUE fiber_value, int argc, const VALUE *argv) https://github.com/ruby/ruby/blob/trunk/cont.c#L2141
     return fiber_switch(fiber_ptr(fiber_value), argc, argv, 0, RB_NO_KEYWORDS);
 }
 
+VALUE
+rb_fiber_blocking_p(VALUE fiber)
+{
+    return (fiber_ptr(fiber)->blocking == 0) ? Qfalse : Qtrue;
+}
+
 void
 rb_fiber_close(rb_fiber_t *fiber)
 {
@@ -2442,6 +2516,9 @@ Init_Cont(void) https://github.com/ruby/ruby/blob/trunk/cont.c#L2516
 
     fiber_pool_initialize(&shared_fiber_pool, stack_size, FIBER_POOL_INITIAL_SIZE, vm_stack_size);
 
+    fiber_initialize_keywords[0] = rb_intern_const("blocking");
+    fiber_initialize_keywords[1] = rb_intern_const("pool");
+
     char * fiber_shared_fiber_pool_free_stacks = getenv("RUBY_SHARED_FIBER_POOL_FREE_STACKS");
     if (fiber_shared_fiber_pool_free_stacks) {
         shared_fiber_pool.free_stacks = atoi(fiber_shared_fiber_pool_free_stacks);
@@ -2452,11 +2529,14 @@ Init_Cont(void) https://github.com/ruby/ruby/blob/trunk/cont.c#L2529
     rb_eFiberError = rb_define_class("FiberError", rb_eStandardError);
     rb_define_singleton_method(rb_cFiber, "yield", rb_fiber_s_yield, -1);
     rb_define_method(rb_cFiber, "initialize", rb_fiber_initialize, -1);
+    rb_define_method(rb_cFiber, "blocking?", rb_fiber_blocking_p, 0);
     rb_define_method(rb_cFiber, "resume", rb_fiber_m_resume, -1);
     rb_define_method(rb_cFiber, "raise", rb_fiber_raise, -1);
     rb_define_method(rb_cFiber, "to_s", fiber_to_s, 0);
     rb_define_alias(rb_cFiber, "inspect", "to_s");
 
+    rb_define_global_function("Fiber", rb_f_fiber, -1);
+
 #ifdef RB_EXPERIMENTAL_FIBER_POOL
     rb_cFiberPool = rb_define_class("Pool", rb_cFiber);
     rb_define_alloc_func(rb_cFiberPool, fiber_pool_alloc);
diff --git a/doc/fiber.rdoc b/doc/fiber.rdoc
new file mode 100644
index 0000000..d3c19a0
--- /dev/null
+++ b/doc/fiber.rdoc
@@ -0,0 +1,137 @@ https://github.com/ruby/ruby/blob/trunk/doc/fiber.rdoc#L1
+= Fiber
+
+Fiber is a flow-control primitive which enable cooperative scheduling. This is
+in contrast to threads which can be preemptively scheduled at any time. While
+having a similar memory profiles, the cost of context switching fibers can be
+significantly less than threads as it does not involve a system call.
+
+== Design
+
+=== Scheduler
+
+The per-thread fiber scheduler interface is used to intercept blocking
+operations. A typical implementation would be a wrapper for a gem like
+EventMachine or Async. This design provides separation of concerns between the
+event loop implementation and application code. It also allows for layered
+schedulers which can perform instrumentation.
+
+  class Scheduler
+    # Wait for the given file descriptor to become readable.
+    def wait_readable(io)
+    end
+
+    # Wait for the given file descriptor to become writable.
+    def wait_writable(io)
+    end
+
+    # Wait for the given file descriptor to match the specified events within
+    # the specified timeout.
+    # @param event [Integer] a bit mask of +IO::WAIT_READABLE+,
+    #   `IO::WAIT_WRITABLE` and `IO::WAIT_PRIORITY`.
+    # @param timeout [#to_f] the amount of time to wait for the event.
+    def wait_any(io, events, timeout)
+    end
+
+    # Sleep the current task for the specified duration, or forever if not
+    # specified.
+    # @param duration [#to_f] the amount of time to sleep.
+    def wait_sleep(duration = nil)
+    end
+
+    # The Ruby virtual machine is going to enter a system level blocking
+    # operation.
+    def enter_blocking_region
+    end
+
+    # The Ruby virtual machine has completed the system level blocking
+    # operation.
+    def exit_blocking_region
+    end
+
+    # Intercept the creation of a non-blocking fiber.
+    def fiber(&block)
+      Fiber.new(blocking: false, &block)
+    end
+
+    # Invoked when the thread exits.
+    def run
+      # Implement event loop here.
+    end
+  end
+
+On CRuby, the following extra methods need to be implemented to handle the
+public C interface:
+
+  class Scheduler
+    # Wrapper for rb_wait_readable(int) C function.
+    def wait_readable_fd(fd)
+      wait_readable(::IO.from_fd(fd, autoclose: false))
+    end
+
+    # Wrapper for rb_wait_readable(int) C function.
+    def wait_writable_fd(fd)
+      wait_writable(::IO.from_fd(fd, autoclose: false))
+    end
+
+    # Wrapper for rb_wait_for_single_fd(int) C function.
+     (... truncated)

--
ML: ruby-changes@q...
Info: http://www.atdot.net/~ko1/quickml/

[前][次][番号順一覧][スレッド一覧]