[前][次][番号順一覧][スレッド一覧]

ruby-changes:62925

From: Samuel <ko1@a...>
Date: Mon, 14 Sep 2020 13:44:37 +0900 (JST)
Subject: [ruby-changes:62925] 8eea66a0ca (master): Add support for Queue & SizedQueue.

https://git.ruby-lang.org/ruby.git/commit/?id=8eea66a0ca

From 8eea66a0ca8965ae8319b4c404f61c4d46866f64 Mon Sep 17 00:00:00 2001
From: Samuel Williams <samuel.williams@o...>
Date: Mon, 14 Sep 2020 11:10:02 +1200
Subject: Add support for Queue & SizedQueue.


diff --git a/test/fiber/test_mutex.rb b/test/fiber/test_mutex.rb
index 2103412..1f53ae1 100644
--- a/test/fiber/test_mutex.rb
+++ b/test/fiber/test_mutex.rb
@@ -84,6 +84,37 @@ class TestFiberMutex < Test::Unit::TestCase https://github.com/ruby/ruby/blob/trunk/test/fiber/test_mutex.rb#L84
     assert signalled > 1
   end
 
+  def test_queue
+    queue = Queue.new
+    processed = 0
+
+    thread = Thread.new do
+      scheduler = Scheduler.new
+      Thread.current.scheduler = scheduler
+
+      Fiber.schedule do
+        3.times do |i|
+          queue << i
+          sleep 0.1
+        end
+
+        queue.close
+      end
+
+      Fiber.schedule do
+        while item = queue.pop
+          processed += 1
+        end
+      end
+
+      scheduler.run
+    end
+
+    thread.join
+
+    assert processed == 3
+  end
+
   def test_mutex_deadlock
     err = /No live threads left. Deadlock\?/
     assert_in_out_err %W[-I#{__dir__} -], <<-RUBY, ['in synchronize'], err, success: false
diff --git a/thread.c b/thread.c
index c4ff5aa..ab574e5 100644
--- a/thread.c
+++ b/thread.c
@@ -1481,8 +1481,13 @@ rb_thread_sleep_interruptible(void) https://github.com/ruby/ruby/blob/trunk/thread.c#L1481
 static void
 rb_thread_sleep_deadly_allow_spurious_wakeup(void)
 {
-    thread_debug("rb_thread_sleep_deadly_allow_spurious_wakeup\n");
-    sleep_forever(GET_THREAD(), SLEEP_DEADLOCKABLE);
+    VALUE scheduler = rb_thread_current_scheduler();
+    if (scheduler != Qnil) {
+        rb_scheduler_kernel_sleepv(scheduler, 0, NULL);
+    } else {
+        thread_debug("rb_thread_sleep_deadly_allow_spurious_wakeup\n");
+        sleep_forever(GET_THREAD(), SLEEP_DEADLOCKABLE);
+    }
 }
 
 void
diff --git a/thread_sync.c b/thread_sync.c
index bd60231..c0a6155 100644
--- a/thread_sync.c
+++ b/thread_sync.c
@@ -946,25 +946,29 @@ queue_do_pop(VALUE self, struct rb_queue *q, int should_block) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L946
     check_array(self, q->que);
 
     while (RARRAY_LEN(q->que) == 0) {
-	if (!should_block) {
-	    rb_raise(rb_eThreadError, "queue empty");
-	}
-	else if (queue_closed_p(self)) {
-	    return queue_closed_result(self, q);
-	}
-	else {
-	    struct queue_waiter qw;
+        if (!should_block) {
+            rb_raise(rb_eThreadError, "queue empty");
+        }
+        else if (queue_closed_p(self)) {
+            return queue_closed_result(self, q);
+        }
+        else {
+            rb_execution_context_t *ec = GET_EC();
+            struct queue_waiter qw;
 
-	    assert(RARRAY_LEN(q->que) == 0);
-	    assert(queue_closed_p(self) == 0);
+            assert(RARRAY_LEN(q->que) == 0);
+            assert(queue_closed_p(self) == 0);
 
-	    qw.w.th = GET_THREAD();
-	    qw.as.q = q;
-	    list_add_tail(queue_waitq(qw.as.q), &qw.w.node);
-	    qw.as.q->num_waiting++;
+            qw.w.self = self;
+            qw.w.th = ec->thread_ptr;
+            qw.w.fiber = ec->fiber_ptr;
 
-	    rb_ensure(queue_sleep, self, queue_sleep_done, (VALUE)&qw);
-	}
+            qw.as.q = q;
+            list_add_tail(queue_waitq(qw.as.q), &qw.w.node);
+            qw.as.q->num_waiting++;
+
+            rb_ensure(queue_sleep, self, queue_sleep_done, (VALUE)&qw);
+        }
     }
 
     return rb_ary_shift(q->que);
@@ -1188,27 +1192,31 @@ rb_szqueue_push(int argc, VALUE *argv, VALUE self) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L1192
     int should_block = szqueue_push_should_block(argc, argv);
 
     while (queue_length(self, &sq->q) >= sq->max) {
-	if (!should_block) {
-	    rb_raise(rb_eThreadError, "queue full");
-	}
-	else if (queue_closed_p(self)) {
+        if (!should_block) {
+            rb_raise(rb_eThreadError, "queue full");
+        }
+        else if (queue_closed_p(self)) {
             break;
-	}
-	else {
-	    struct queue_waiter qw;
-	    struct list_head *pushq = szqueue_pushq(sq);
+        }
+        else {
+            rb_execution_context_t *ec = GET_EC();
+            struct queue_waiter qw;
+            struct list_head *pushq = szqueue_pushq(sq);
 
-	    qw.w.th = GET_THREAD();
-	    qw.as.sq = sq;
-	    list_add_tail(pushq, &qw.w.node);
-	    sq->num_waiting_push++;
+            qw.w.self = self;
+            qw.w.th = ec->thread_ptr;
+            qw.w.fiber = ec->fiber_ptr;
 
-	    rb_ensure(queue_sleep, self, szqueue_sleep_done, (VALUE)&qw);
-	}
+            qw.as.sq = sq;
+            list_add_tail(pushq, &qw.w.node);
+            sq->num_waiting_push++;
+
+            rb_ensure(queue_sleep, self, szqueue_sleep_done, (VALUE)&qw);
+        }
     }
 
     if (queue_closed_p(self)) {
-	raise_closed_queue_error(self);
+        raise_closed_queue_error(self);
     }
 
     return queue_do_push(self, &sq->q, argv[0]);
-- 
cgit v0.10.2


--
ML: ruby-changes@q...
Info: http://www.atdot.net/~ko1/quickml/

[前][次][番号順一覧][スレッド一覧]