ruby-changes:61256
From: Samuel <ko1@a...>
Date: Thu, 14 May 2020 19:11:19 +0900 (JST)
Subject: [ruby-changes:61256] 0e3b0fcdba (master): Thread scheduler for light weight concurrency.
https://git.ruby-lang.org/ruby.git/commit/?id=0e3b0fcdba From 0e3b0fcdba70cf96a8e0654eb8f50aacb8024bd4 Mon Sep 17 00:00:00 2001 From: Samuel Williams <samuel.williams@o...> Date: Thu, 14 May 2020 22:10:55 +1200 Subject: Thread scheduler for light weight concurrency. diff --git a/cont.c b/cont.c index 1ea9056..a10e058 100644 --- a/cont.c +++ b/cont.c @@ -241,12 +241,17 @@ struct rb_fiber_struct { https://github.com/ruby/ruby/blob/trunk/cont.c#L241 */ unsigned int transferred : 1; + /* Whether the fiber is allowed to implicitly yield. */ + unsigned int blocking : 1; + struct coroutine_context context; struct fiber_pool_stack stack; }; static struct fiber_pool shared_fiber_pool = {NULL, NULL, 0, 0, 0, 0}; +static ID fiber_initialize_keywords[2] = {0}; + /* * FreeBSD require a first (i.e. addr) argument of mmap(2) is not NULL * if MAP_STACK is passed. @@ -1733,7 +1738,7 @@ fiber_alloc(VALUE klass) https://github.com/ruby/ruby/blob/trunk/cont.c#L1738 } static rb_fiber_t* -fiber_t_alloc(VALUE fiber_value) +fiber_t_alloc(VALUE fiber_value, unsigned int blocking) { rb_fiber_t *fiber; rb_thread_t *th = GET_THREAD(); @@ -1746,6 +1751,7 @@ fiber_t_alloc(VALUE fiber_value) https://github.com/ruby/ruby/blob/trunk/cont.c#L1751 fiber = ZALLOC(rb_fiber_t); fiber->cont.self = fiber_value; fiber->cont.type = FIBER_CONTEXT; + fiber->blocking = blocking; cont_init(&fiber->cont, th); fiber->cont.saved_ec.fiber_ptr = fiber; @@ -1763,9 +1769,9 @@ fiber_t_alloc(VALUE fiber_value) https://github.com/ruby/ruby/blob/trunk/cont.c#L1769 } static VALUE -fiber_initialize(VALUE self, VALUE proc, struct fiber_pool * fiber_pool) +fiber_initialize(VALUE self, VALUE proc, struct fiber_pool * fiber_pool, unsigned int blocking) { - rb_fiber_t *fiber = fiber_t_alloc(self); + rb_fiber_t *fiber = fiber_t_alloc(self, blocking); fiber->first_proc = proc; fiber->stack.base = NULL; @@ -1793,17 +1799,66 @@ fiber_prepare_stack(rb_fiber_t *fiber) https://github.com/ruby/ruby/blob/trunk/cont.c#L1799 sec->local_storage_recursive_hash_for_trace = Qnil; } +static struct fiber_pool * +rb_fiber_pool_default(VALUE pool) +{ + return &shared_fiber_pool; +} + +/* :nodoc: */ +static VALUE +rb_fiber_initialize_kw(int argc, VALUE* argv, VALUE self, int kw_splat) +{ + VALUE pool = Qnil; + VALUE blocking = Qtrue; + + if (kw_splat != RB_NO_KEYWORDS) { + VALUE options = Qnil; + VALUE arguments[2] = {Qundef}; + + argc = rb_scan_args_kw(kw_splat, argc, argv, ":", &options); + rb_get_kwargs(options, fiber_initialize_keywords, 0, 2, arguments); + + blocking = arguments[0]; + pool = arguments[1]; + } + + return fiber_initialize(self, rb_block_proc(), rb_fiber_pool_default(pool), RTEST(blocking)); +} + /* :nodoc: */ static VALUE rb_fiber_initialize(int argc, VALUE* argv, VALUE self) { - return fiber_initialize(self, rb_block_proc(), &shared_fiber_pool); + return rb_fiber_initialize_kw(argc, argv, self, rb_keyword_given_p()); } VALUE rb_fiber_new(rb_block_call_func_t func, VALUE obj) { - return fiber_initialize(fiber_alloc(rb_cFiber), rb_proc_new(func, obj), &shared_fiber_pool); + return fiber_initialize(fiber_alloc(rb_cFiber), rb_proc_new(func, obj), rb_fiber_pool_default(Qnil), 1); +} + +static VALUE +rb_f_fiber_kw(int argc, VALUE* argv, int kw_splat) +{ + rb_thread_t * th = GET_THREAD(); + VALUE scheduler = th->scheduler; + VALUE fiber = Qnil; + + if (scheduler != Qnil) { + fiber = rb_funcall_passing_block_kw(scheduler, rb_intern("fiber"), argc, argv, kw_splat); + } else { + rb_raise(rb_eRuntimeError, "No scheduler is available!"); + } + + return fiber; +} + +static VALUE +rb_f_fiber(int argc, VALUE *argv, VALUE obj) +{ + return rb_f_fiber_kw(argc, argv, rb_keyword_given_p()); } static void rb_fiber_terminate(rb_fiber_t *fiber, int need_interrupt); @@ -1820,6 +1875,10 @@ rb_fiber_start(void) https://github.com/ruby/ruby/blob/trunk/cont.c#L1875 VM_ASSERT(th->ec == ruby_current_execution_context_ptr); VM_ASSERT(FIBER_RESUMED_P(fiber)); + if (fiber->blocking) { + th->blocking += 1; + } + EC_PUSH_TAG(th->ec); if ((state = EC_EXEC_TAG()) == TAG_NONE) { rb_context_t *cont = &VAR_FROM_MEMORY(fiber)->cont; @@ -1892,6 +1951,7 @@ rb_threadptr_root_fiber_setup(rb_thread_t *th) https://github.com/ruby/ruby/blob/trunk/cont.c#L1951 fiber->cont.type = FIBER_CONTEXT; fiber->cont.saved_ec.fiber_ptr = fiber; fiber->cont.saved_ec.thread_ptr = th; + fiber->blocking = 1; fiber_status_set(fiber, FIBER_RESUMED); /* skip CREATED */ th->ec = &fiber->cont.saved_ec; } @@ -2044,11 +2104,15 @@ fiber_switch(rb_fiber_t *fiber, int argc, const VALUE *argv, int is_resume, int https://github.com/ruby/ruby/blob/trunk/cont.c#L2104 } } + VM_ASSERT(FIBER_RUNNABLE_P(fiber)); + if (is_resume) { fiber->prev = fiber_current(); } - VM_ASSERT(FIBER_RUNNABLE_P(fiber)); + if (fiber_current()->blocking) { + th->blocking -= 1; + } cont->argc = argc; cont->kw_splat = kw_splat; @@ -2060,6 +2124,10 @@ fiber_switch(rb_fiber_t *fiber, int argc, const VALUE *argv, int is_resume, int https://github.com/ruby/ruby/blob/trunk/cont.c#L2124 fiber_stack_release(fiber); } + if (fiber_current()->blocking) { + th->blocking += 1; + } + RUBY_VM_CHECK_INTS(th->ec); EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_FIBER_SWITCH, th->self, 0, 0, 0, Qnil); @@ -2073,6 +2141,12 @@ rb_fiber_transfer(VALUE fiber_value, int argc, const VALUE *argv) https://github.com/ruby/ruby/blob/trunk/cont.c#L2141 return fiber_switch(fiber_ptr(fiber_value), argc, argv, 0, RB_NO_KEYWORDS); } +VALUE +rb_fiber_blocking_p(VALUE fiber) +{ + return (fiber_ptr(fiber)->blocking == 0) ? Qfalse : Qtrue; +} + void rb_fiber_close(rb_fiber_t *fiber) { @@ -2442,6 +2516,9 @@ Init_Cont(void) https://github.com/ruby/ruby/blob/trunk/cont.c#L2516 fiber_pool_initialize(&shared_fiber_pool, stack_size, FIBER_POOL_INITIAL_SIZE, vm_stack_size); + fiber_initialize_keywords[0] = rb_intern_const("blocking"); + fiber_initialize_keywords[1] = rb_intern_const("pool"); + char * fiber_shared_fiber_pool_free_stacks = getenv("RUBY_SHARED_FIBER_POOL_FREE_STACKS"); if (fiber_shared_fiber_pool_free_stacks) { shared_fiber_pool.free_stacks = atoi(fiber_shared_fiber_pool_free_stacks); @@ -2452,11 +2529,14 @@ Init_Cont(void) https://github.com/ruby/ruby/blob/trunk/cont.c#L2529 rb_eFiberError = rb_define_class("FiberError", rb_eStandardError); rb_define_singleton_method(rb_cFiber, "yield", rb_fiber_s_yield, -1); rb_define_method(rb_cFiber, "initialize", rb_fiber_initialize, -1); + rb_define_method(rb_cFiber, "blocking?", rb_fiber_blocking_p, 0); rb_define_method(rb_cFiber, "resume", rb_fiber_m_resume, -1); rb_define_method(rb_cFiber, "raise", rb_fiber_raise, -1); rb_define_method(rb_cFiber, "to_s", fiber_to_s, 0); rb_define_alias(rb_cFiber, "inspect", "to_s"); + rb_define_global_function("Fiber", rb_f_fiber, -1); + #ifdef RB_EXPERIMENTAL_FIBER_POOL rb_cFiberPool = rb_define_class("Pool", rb_cFiber); rb_define_alloc_func(rb_cFiberPool, fiber_pool_alloc); diff --git a/doc/fiber.rdoc b/doc/fiber.rdoc new file mode 100644 index 0000000..d3c19a0 --- /dev/null +++ b/doc/fiber.rdoc @@ -0,0 +1,137 @@ https://github.com/ruby/ruby/blob/trunk/doc/fiber.rdoc#L1 += Fiber + +Fiber is a flow-control primitive which enable cooperative scheduling. This is +in contrast to threads which can be preemptively scheduled at any time. While +having a similar memory profiles, the cost of context switching fibers can be +significantly less than threads as it does not involve a system call. + +== Design + +=== Scheduler + +The per-thread fiber scheduler interface is used to intercept blocking +operations. A typical implementation would be a wrapper for a gem like +EventMachine or Async. This design provides separation of concerns between the +event loop implementation and application code. It also allows for layered +schedulers which can perform instrumentation. + + class Scheduler + # Wait for the given file descriptor to become readable. + def wait_readable(io) + end + + # Wait for the given file descriptor to become writable. + def wait_writable(io) + end + + # Wait for the given file descriptor to match the specified events within + # the specified timeout. + # @param event [Integer] a bit mask of +IO::WAIT_READABLE+, + # `IO::WAIT_WRITABLE` and `IO::WAIT_PRIORITY`. + # @param timeout [#to_f] the amount of time to wait for the event. + def wait_any(io, events, timeout) + end + + # Sleep the current task for the specified duration, or forever if not + # specified. + # @param duration [#to_f] the amount of time to sleep. + def wait_sleep(duration = nil) + end + + # The Ruby virtual machine is going to enter a system level blocking + # operation. + def enter_blocking_region + end + + # The Ruby virtual machine has completed the system level blocking + # operation. + def exit_blocking_region + end + + # Intercept the creation of a non-blocking fiber. + def fiber(&block) + Fiber.new(blocking: false, &block) + end + + # Invoked when the thread exits. + def run + # Implement event loop here. + end + end + +On CRuby, the following extra methods need to be implemented to handle the +public C interface: + + class Scheduler + # Wrapper for rb_wait_readable(int) C function. + def wait_readable_fd(fd) + wait_readable(::IO.from_fd(fd, autoclose: false)) + end + + # Wrapper for rb_wait_readable(int) C function. + def wait_writable_fd(fd) + wait_writable(::IO.from_fd(fd, autoclose: false)) + end + + # Wrapper for rb_wait_for_single_fd(int) C function. + (... truncated) -- ML: ruby-changes@q... Info: http://www.atdot.net/~ko1/quickml/