ruby-changes:64211
From: Koichi <ko1@a...>
Date: Wed, 16 Dec 2020 19:13:03 +0900 (JST)
Subject: [ruby-changes:64211] a9a7f4d8b8 (master): Ractor#receive_if to receive only matched messages
https://git.ruby-lang.org/ruby.git/commit/?id=a9a7f4d8b8 From a9a7f4d8b8ec30abc7a47ce700edc7209ae12279 Mon Sep 17 00:00:00 2001 From: Koichi Sasada <ko1@a...> Date: Tue, 8 Dec 2020 14:04:18 +0900 Subject: Ractor#receive_if to receive only matched messages Instead of Ractor.receive, Ractor.receive_if can provide a pattern by a block and you can choose the receiving message. [Feature #17378] diff --git a/bootstraptest/test_ractor.rb b/bootstraptest/test_ractor.rb index b13ecbe..cde0f92 100644 --- a/bootstraptest/test_ractor.rb +++ b/bootstraptest/test_ractor.rb @@ -100,6 +100,64 @@ assert_equal 'ok', %q{ https://github.com/ruby/ruby/blob/trunk/bootstraptest/test_ractor.rb#L100 r.take } +# Ractor#receive_if can filter the message +assert_equal '[2, 3, 1]', %q{ + r = Ractor.new Ractor.current do |main| + main << 1 + main << 2 + main << 3 + end + a = [] + a << Ractor.receive_if{|msg| msg == 2} + a << Ractor.receive_if{|msg| msg == 3} + a << Ractor.receive +} + +# Ractor#receive_if with break +assert_equal '[2, [1, :break], 3]', %q{ + r = Ractor.new Ractor.current do |main| + main << 1 + main << 2 + main << 3 + end + + a = [] + a << Ractor.receive_if{|msg| msg == 2} + a << Ractor.receive_if{|msg| break [msg, :break]} + a << Ractor.receive +} + +# Ractor#receive_if can't be called recursively +assert_equal '[[:e1, 1], [:e2, 2]]', %q{ + r = Ractor.new Ractor.current do |main| + main << 1 + main << 2 + main << 3 + end + + a = [] + + Ractor.receive_if do |msg| + begin + Ractor.receive + rescue Ractor::Error + a << [:e1, msg] + end + true # delete 1 from queue + end + + Ractor.receive_if do |msg| + begin + Ractor.receive_if{} + rescue Ractor::Error + a << [:e2, msg] + end + true # delete 2 from queue + end + + a # +} + ### ### # Ractor still has several memory corruption so skip huge number of tests diff --git a/ractor.c b/ractor.c index f7be7da..d0f5185 100644 --- a/ractor.c +++ b/ractor.c @@ -168,13 +168,15 @@ ractor_status_p(rb_ractor_t *r, enum ractor_status status) https://github.com/ruby/ruby/blob/trunk/ractor.c#L168 return rb_ractor_status_p(r, status); } +static struct rb_ractor_basket *ractor_queue_at(struct rb_ractor_queue *rq, int i); + static void ractor_queue_mark(struct rb_ractor_queue *rq) { for (int i=0; i<rq->cnt; i++) { - int idx = (rq->start + i) % rq->size; - rb_gc_mark(rq->baskets[idx].v); - rb_gc_mark(rq->baskets[idx].sender); + struct rb_ractor_basket *b = ractor_queue_at(rq, i); + rb_gc_mark(b->v); + rb_gc_mark(b->sender); } } @@ -191,6 +193,8 @@ ractor_mark(void *ptr) https://github.com/ruby/ruby/blob/trunk/ractor.c#L193 rb_gc_mark(r->sync.wait.taken_basket.sender); rb_gc_mark(r->sync.wait.yielded_basket.v); rb_gc_mark(r->sync.wait.yielded_basket.sender); + rb_gc_mark(r->receiving_mutex); + rb_gc_mark(r->loc); rb_gc_mark(r->name); rb_gc_mark(r->r_stdin); @@ -317,33 +321,90 @@ ractor_queue_setup(struct rb_ractor_queue *rq) https://github.com/ruby/ruby/blob/trunk/ractor.c#L321 rq->baskets = malloc(sizeof(struct rb_ractor_basket) * rq->size); } +static struct rb_ractor_basket * +ractor_queue_at(struct rb_ractor_queue *rq, int i) +{ + return &rq->baskets[(rq->start + i) % rq->size]; +} + +static void +ractor_queue_advance(struct rb_ractor_queue *rq) +{ + ASSERT_ractor_locking(GET_RACTOR()); + + if (rq->reserved_cnt == 0) { + rq->cnt--; + rq->start = (rq->start + 1) % rq->size; + rq->serial++; + } + else { + ractor_queue_at(rq, 0)->type = basket_type_deleted; + } +} + +static bool +ractor_queue_skip_p(struct rb_ractor_queue *rq, int i) +{ + struct rb_ractor_basket *b = ractor_queue_at(rq, i); + return b->type == basket_type_deleted || + b->type == basket_type_reserved; +} + +static void +ractor_queue_compact(rb_ractor_t *r, struct rb_ractor_queue *rq) +{ + ASSERT_ractor_locking(r); + + while (rq->cnt > 0 && ractor_queue_at(rq, 0)->type == basket_type_deleted) { + ractor_queue_advance(rq); + } +} + static bool ractor_queue_empty_p(rb_ractor_t *r, struct rb_ractor_queue *rq) { ASSERT_ractor_locking(r); - return rq->cnt == 0; + + if (rq->cnt == 0) { + return true; + } + + ractor_queue_compact(r, rq); + + for (int i=0; i<rq->cnt; i++) { + if (!ractor_queue_skip_p(rq, i)) { + return false; + } + } + + return true; } static bool ractor_queue_deq(rb_ractor_t *r, struct rb_ractor_queue *rq, struct rb_ractor_basket *basket) { - bool b; + bool found = false; RACTOR_LOCK(r); { if (!ractor_queue_empty_p(r, rq)) { - *basket = rq->baskets[rq->start]; - rq->cnt--; - rq->start = (rq->start + 1) % rq->size; - b = true; - } - else { - b = false; + for (int i=0; i<rq->cnt; i++) { + if (!ractor_queue_skip_p(rq, i)) { + struct rb_ractor_basket *b = ractor_queue_at(rq, i); + *basket = *b; + + // remove from queue + b->type = basket_type_deleted; + ractor_queue_compact(r, rq); + found = true; + break; + } + } } } RACTOR_UNLOCK(r); - return b; + return found; } static void @@ -373,24 +434,29 @@ ractor_basket_clear(struct rb_ractor_basket *b) https://github.com/ruby/ruby/blob/trunk/ractor.c#L434 static VALUE ractor_reset_belonging(VALUE obj); // in this file static VALUE -ractor_basket_accept(struct rb_ractor_basket *b) +ractor_basket_value(struct rb_ractor_basket *b) { - VALUE v; - switch (b->type) { case basket_type_ref: - VM_ASSERT(rb_ractor_shareable_p(b->v)); - v = b->v; break; case basket_type_copy: case basket_type_move: case basket_type_will: - v = ractor_reset_belonging(b->v); + b->type = basket_type_ref; + b->v = ractor_reset_belonging(b->v); break; default: rb_bug("unreachable"); } + return b->v; +} + +static VALUE +ractor_basket_accept(struct rb_ractor_basket *b) +{ + VALUE v = ractor_basket_value(b); + if (b->exception) { VALUE cause = v; VALUE err = rb_exc_new_cstr(rb_eRactorRemoteError, "thrown by remote Ractor."); @@ -404,12 +470,22 @@ ractor_basket_accept(struct rb_ractor_basket *b) https://github.com/ruby/ruby/blob/trunk/ractor.c#L470 return v; } +static void +ractor_recursive_receive_if(rb_ractor_t *r) +{ + if (r->receiving_mutex && rb_mutex_locked_p(r->receiving_mutex)) { + rb_raise(rb_eRactorError, "can not call receive/receive_if recursively"); + } +} + static VALUE ractor_try_receive(rb_execution_context_t *ec, rb_ractor_t *r) { struct rb_ractor_queue *rq = &r->sync.incoming_queue; struct rb_ractor_basket basket; + ractor_recursive_receive_if(r); + if (ractor_queue_deq(r, rq, &basket) == false) { if (r->sync.incoming_port_closed) { rb_raise(rb_eRactorClosedError, "The incoming port is already closed"); @@ -616,29 +692,193 @@ ractor_waiting_list_shift(rb_ractor_t *r, struct rb_ractor_waiting_list *wl) https://github.com/ruby/ruby/blob/trunk/ractor.c#L692 } } +static void +ractor_receive_wait(rb_execution_context_t *ec, rb_ractor_t *cr) +{ + VM_ASSERT(cr == rb_ec_ractor_ptr(ec)); + ractor_recursive_receive_if(cr); + + RACTOR_LOCK(cr); + { + if (ractor_queue_empty_p(cr, &cr->sync.incoming_queue)) { + VM_ASSERT(cr->sync.wait.status == wait_none); + cr->sync.wait.status = wait_receiving; + cr->sync.wait.wakeup_status = wakeup_none; + ractor_sleep(ec, cr); + cr->sync.wait.wakeup_status = wakeup_none; + } + } + RACTOR_UNLOCK(cr); +} + static VALUE -ractor_receive(rb_execution_context_t *ec, rb_ractor_t *r) +ractor_receive(rb_execution_context_t *ec, rb_ractor_t *cr) { - VM_ASSERT(r == rb_ec_ractor_ptr(ec)); + VM_ASSERT(cr == rb_ec_ractor_ptr(ec)); VALUE v; - while ((v = ractor_try_receive(ec, r)) == Qundef) { - RACTOR_LOCK(r); + while ((v = ractor_try_receive(ec, cr)) == Qundef) { + ractor_receive_wait(ec, cr); + } + + return v; +} + +#if 0 +// for debug +static const char * +basket_type_name(enum rb_ractor_basket_type type) +{ + switch (type) { +#define T(t) case basket_type_##t: return #t + T(none); + T(ref); + T(copy); + T(move); + T(will); + T(deleted); + T(reserved); + default: rb_bug("unreachable"); + } +} + +static void +rq_dump(struct rb_ractor_queue *rq) +{ + bool bug = false; + for (int i=0; i<rq->cnt; i++) { + struct rb_ractor_basket *b = ractor_queue_at(rq, i); + fprintf(stderr, "%d (start:%d) type:%s %p %s\n", i, rq->start, basket_type_name(b->type), b, RSTRING_PTR(RARRAY_AREF(b->v, 1))); + if (b->type == basket_type_reserved) bug = true; + } + if (bug) rb_bug("!!"); +} +#endif + +struct receive_block_data { + rb_ractor_t *cr; + struct rb_ractor_queue *rq; + VALUE v; + int index; + bool success; +}; + +static void +ractor_receive_if_lock(rb_ractor_t *cr) +{ + VALUE m = cr->receiving_mutex; + if (m == Qfalse) { + m = cr->receiving_mutex = rb_mutex_new(); + } + rb_mutex_lock(m); +} + +static VALUE +receive_if_body(VALUE ptr) +{ + struct receive_block_data *data = (struct receive_block_data *)ptr; + + ractor_receive_if_lock(data->cr); + VALUE block_result = rb_yield(data->v); + + RACTOR_LOCK_SELF(data->cr); + { + struct rb_ractor_basket *b = ractor_queue_at(data->rq, data->index); + VM_ASSERT(b->type == basket_type_reserved); + data->rq->reserved_cnt--; + + if (RTEST(block_result)) { + (... truncated) -- ML: ruby-changes@q... Info: http://www.atdot.net/~ko1/quickml/