[前][次][番号順一覧][スレッド一覧]

ruby-changes:64211

From: Koichi <ko1@a...>
Date: Wed, 16 Dec 2020 19:13:03 +0900 (JST)
Subject: [ruby-changes:64211] a9a7f4d8b8 (master): Ractor#receive_if to receive only matched messages

https://git.ruby-lang.org/ruby.git/commit/?id=a9a7f4d8b8

From a9a7f4d8b8ec30abc7a47ce700edc7209ae12279 Mon Sep 17 00:00:00 2001
From: Koichi Sasada <ko1@a...>
Date: Tue, 8 Dec 2020 14:04:18 +0900
Subject: Ractor#receive_if to receive only matched messages

Instead of Ractor.receive, Ractor.receive_if can provide a pattern
by a block and you can choose the receiving message.

[Feature #17378]

diff --git a/bootstraptest/test_ractor.rb b/bootstraptest/test_ractor.rb
index b13ecbe..cde0f92 100644
--- a/bootstraptest/test_ractor.rb
+++ b/bootstraptest/test_ractor.rb
@@ -100,6 +100,64 @@ assert_equal 'ok', %q{ https://github.com/ruby/ruby/blob/trunk/bootstraptest/test_ractor.rb#L100
   r.take
 }
 
+# Ractor#receive_if can filter the message
+assert_equal '[2, 3, 1]', %q{
+  r = Ractor.new Ractor.current do |main|
+    main << 1
+    main << 2
+    main << 3
+  end
+  a = []
+  a << Ractor.receive_if{|msg| msg == 2}
+  a << Ractor.receive_if{|msg| msg == 3}
+  a << Ractor.receive
+}
+
+# Ractor#receive_if with break
+assert_equal '[2, [1, :break], 3]', %q{
+  r = Ractor.new Ractor.current do |main|
+    main << 1
+    main << 2
+    main << 3
+  end
+
+  a = []
+  a << Ractor.receive_if{|msg| msg == 2}
+  a << Ractor.receive_if{|msg| break [msg, :break]}
+  a << Ractor.receive
+}
+
+# Ractor#receive_if can't be called recursively
+assert_equal '[[:e1, 1], [:e2, 2]]', %q{
+  r = Ractor.new Ractor.current do |main|
+    main << 1
+    main << 2
+    main << 3
+  end
+
+  a = []
+  
+  Ractor.receive_if do |msg|
+    begin
+      Ractor.receive
+    rescue Ractor::Error
+      a << [:e1, msg]
+    end
+    true # delete 1 from queue
+  end
+
+  Ractor.receive_if do |msg|
+    begin
+      Ractor.receive_if{}
+    rescue Ractor::Error
+      a << [:e2, msg]
+    end
+    true # delete 2 from queue
+  end
+
+  a # 
+}
+
 ###
 ###
 # Ractor still has several memory corruption so skip huge number of tests
diff --git a/ractor.c b/ractor.c
index f7be7da..d0f5185 100644
--- a/ractor.c
+++ b/ractor.c
@@ -168,13 +168,15 @@ ractor_status_p(rb_ractor_t *r, enum ractor_status status) https://github.com/ruby/ruby/blob/trunk/ractor.c#L168
     return rb_ractor_status_p(r, status);
 }
 
+static struct rb_ractor_basket *ractor_queue_at(struct rb_ractor_queue *rq, int i);
+
 static void
 ractor_queue_mark(struct rb_ractor_queue *rq)
 {
     for (int i=0; i<rq->cnt; i++) {
-        int idx = (rq->start + i) % rq->size;
-        rb_gc_mark(rq->baskets[idx].v);
-        rb_gc_mark(rq->baskets[idx].sender);
+        struct rb_ractor_basket *b = ractor_queue_at(rq, i);
+        rb_gc_mark(b->v);
+        rb_gc_mark(b->sender);
     }
 }
 
@@ -191,6 +193,8 @@ ractor_mark(void *ptr) https://github.com/ruby/ruby/blob/trunk/ractor.c#L193
     rb_gc_mark(r->sync.wait.taken_basket.sender);
     rb_gc_mark(r->sync.wait.yielded_basket.v);
     rb_gc_mark(r->sync.wait.yielded_basket.sender);
+    rb_gc_mark(r->receiving_mutex);
+
     rb_gc_mark(r->loc);
     rb_gc_mark(r->name);
     rb_gc_mark(r->r_stdin);
@@ -317,33 +321,90 @@ ractor_queue_setup(struct rb_ractor_queue *rq) https://github.com/ruby/ruby/blob/trunk/ractor.c#L321
     rq->baskets = malloc(sizeof(struct rb_ractor_basket) * rq->size);
 }
 
+static struct rb_ractor_basket *
+ractor_queue_at(struct rb_ractor_queue *rq, int i)
+{
+    return &rq->baskets[(rq->start + i) % rq->size];
+}
+
+static void
+ractor_queue_advance(struct rb_ractor_queue *rq)
+{
+    ASSERT_ractor_locking(GET_RACTOR());
+
+    if (rq->reserved_cnt == 0) {
+        rq->cnt--;
+        rq->start = (rq->start + 1) % rq->size;
+        rq->serial++;
+    }
+    else {
+        ractor_queue_at(rq, 0)->type = basket_type_deleted;
+    }
+}
+
+static bool
+ractor_queue_skip_p(struct rb_ractor_queue *rq, int i)
+{
+    struct rb_ractor_basket *b = ractor_queue_at(rq, i);
+    return b->type == basket_type_deleted ||
+           b->type == basket_type_reserved;
+}
+
+static void
+ractor_queue_compact(rb_ractor_t *r, struct rb_ractor_queue *rq)
+{
+    ASSERT_ractor_locking(r);
+
+    while (rq->cnt > 0 && ractor_queue_at(rq, 0)->type == basket_type_deleted) {
+        ractor_queue_advance(rq);
+    }
+}
+
 static bool
 ractor_queue_empty_p(rb_ractor_t *r, struct rb_ractor_queue *rq)
 {
     ASSERT_ractor_locking(r);
-    return rq->cnt == 0;
+
+    if (rq->cnt == 0) {
+        return true;
+    }
+
+    ractor_queue_compact(r, rq);
+
+    for (int i=0; i<rq->cnt; i++) {
+        if (!ractor_queue_skip_p(rq, i)) {
+            return false;
+        }
+    }
+
+    return true;
 }
 
 static bool
 ractor_queue_deq(rb_ractor_t *r, struct rb_ractor_queue *rq, struct rb_ractor_basket *basket)
 {
-    bool b;
+    bool found = false;
 
     RACTOR_LOCK(r);
     {
         if (!ractor_queue_empty_p(r, rq)) {
-            *basket = rq->baskets[rq->start];
-            rq->cnt--;
-            rq->start = (rq->start + 1) % rq->size;
-            b = true;
-        }
-        else {
-            b = false;
+            for (int i=0; i<rq->cnt; i++) {
+                if (!ractor_queue_skip_p(rq, i)) {
+                    struct rb_ractor_basket *b = ractor_queue_at(rq, i);
+                    *basket = *b;
+
+                    // remove from queue
+                    b->type = basket_type_deleted;
+                    ractor_queue_compact(r, rq);
+                    found = true;
+                    break;
+                }
+            }
         }
     }
     RACTOR_UNLOCK(r);
 
-    return b;
+    return found;
 }
 
 static void
@@ -373,24 +434,29 @@ ractor_basket_clear(struct rb_ractor_basket *b) https://github.com/ruby/ruby/blob/trunk/ractor.c#L434
 static VALUE ractor_reset_belonging(VALUE obj); // in this file
 
 static VALUE
-ractor_basket_accept(struct rb_ractor_basket *b)
+ractor_basket_value(struct rb_ractor_basket *b)
 {
-    VALUE v;
-
     switch (b->type) {
       case basket_type_ref:
-        VM_ASSERT(rb_ractor_shareable_p(b->v));
-        v = b->v;
         break;
       case basket_type_copy:
       case basket_type_move:
       case basket_type_will:
-        v = ractor_reset_belonging(b->v);
+        b->type = basket_type_ref;
+        b->v = ractor_reset_belonging(b->v);
         break;
       default:
         rb_bug("unreachable");
     }
 
+    return b->v;
+}
+
+static VALUE
+ractor_basket_accept(struct rb_ractor_basket *b)
+{
+    VALUE v = ractor_basket_value(b);
+
     if (b->exception) {
         VALUE cause = v;
         VALUE err = rb_exc_new_cstr(rb_eRactorRemoteError, "thrown by remote Ractor.");
@@ -404,12 +470,22 @@ ractor_basket_accept(struct rb_ractor_basket *b) https://github.com/ruby/ruby/blob/trunk/ractor.c#L470
     return v;
 }
 
+static void
+ractor_recursive_receive_if(rb_ractor_t *r)
+{
+    if (r->receiving_mutex && rb_mutex_locked_p(r->receiving_mutex)) {
+        rb_raise(rb_eRactorError, "can not call receive/receive_if recursively");
+    }
+}
+
 static VALUE
 ractor_try_receive(rb_execution_context_t *ec, rb_ractor_t *r)
 {
     struct rb_ractor_queue *rq = &r->sync.incoming_queue;
     struct rb_ractor_basket basket;
 
+    ractor_recursive_receive_if(r);
+
     if (ractor_queue_deq(r, rq, &basket) == false) {
         if (r->sync.incoming_port_closed) {
             rb_raise(rb_eRactorClosedError, "The incoming port is already closed");
@@ -616,29 +692,193 @@ ractor_waiting_list_shift(rb_ractor_t *r, struct rb_ractor_waiting_list *wl) https://github.com/ruby/ruby/blob/trunk/ractor.c#L692
     }
 }
 
+static void
+ractor_receive_wait(rb_execution_context_t *ec, rb_ractor_t *cr)
+{
+    VM_ASSERT(cr == rb_ec_ractor_ptr(ec));
+    ractor_recursive_receive_if(cr);
+
+    RACTOR_LOCK(cr);
+    {
+        if (ractor_queue_empty_p(cr, &cr->sync.incoming_queue)) {
+            VM_ASSERT(cr->sync.wait.status == wait_none);
+            cr->sync.wait.status = wait_receiving;
+            cr->sync.wait.wakeup_status = wakeup_none;
+            ractor_sleep(ec, cr);
+            cr->sync.wait.wakeup_status = wakeup_none;
+        }
+    }
+    RACTOR_UNLOCK(cr);
+}
+
 static VALUE
-ractor_receive(rb_execution_context_t *ec, rb_ractor_t *r)
+ractor_receive(rb_execution_context_t *ec, rb_ractor_t *cr)
 {
-    VM_ASSERT(r == rb_ec_ractor_ptr(ec));
+    VM_ASSERT(cr == rb_ec_ractor_ptr(ec));
     VALUE v;
 
-    while ((v = ractor_try_receive(ec, r)) == Qundef) {
-        RACTOR_LOCK(r);
+    while ((v = ractor_try_receive(ec, cr)) == Qundef) {
+        ractor_receive_wait(ec, cr);
+    }
+
+    return v;
+}
+
+#if 0
+// for debug
+static const char *
+basket_type_name(enum rb_ractor_basket_type type)
+{
+    switch (type) {
+#define T(t) case basket_type_##t: return #t
+        T(none);
+        T(ref);
+        T(copy);
+        T(move);
+        T(will);
+        T(deleted);
+        T(reserved);
+      default: rb_bug("unreachable");
+    }
+}
+
+static void
+rq_dump(struct rb_ractor_queue *rq)
+{
+    bool bug = false;
+    for (int i=0; i<rq->cnt; i++) {
+        struct rb_ractor_basket *b = ractor_queue_at(rq, i);
+        fprintf(stderr, "%d (start:%d) type:%s %p %s\n", i, rq->start, basket_type_name(b->type), b, RSTRING_PTR(RARRAY_AREF(b->v, 1)));
+        if (b->type == basket_type_reserved) bug = true;
+    }
+    if (bug) rb_bug("!!");
+}
+#endif
+
+struct receive_block_data {
+    rb_ractor_t *cr;
+    struct rb_ractor_queue *rq;
+    VALUE v;
+    int index;
+    bool success;
+};
+
+static void
+ractor_receive_if_lock(rb_ractor_t *cr)
+{
+    VALUE m = cr->receiving_mutex;
+    if (m == Qfalse) {
+        m = cr->receiving_mutex = rb_mutex_new();
+    }
+    rb_mutex_lock(m);
+}
+
+static VALUE
+receive_if_body(VALUE ptr)
+{
+    struct receive_block_data *data = (struct receive_block_data *)ptr;
+
+    ractor_receive_if_lock(data->cr);
+    VALUE block_result = rb_yield(data->v);
+
+    RACTOR_LOCK_SELF(data->cr);
+    {
+        struct rb_ractor_basket *b = ractor_queue_at(data->rq, data->index);
+        VM_ASSERT(b->type == basket_type_reserved);
+        data->rq->reserved_cnt--;
+
+        if (RTEST(block_result)) {
+ (... truncated)

--
ML: ruby-changes@q...
Info: http://www.atdot.net/~ko1/quickml/

[前][次][番号順一覧][スレッド一覧]