ruby-changes:62925
From: Samuel <ko1@a...>
Date: Mon, 14 Sep 2020 13:44:37 +0900 (JST)
Subject: [ruby-changes:62925] 8eea66a0ca (master): Add support for Queue & SizedQueue.
https://git.ruby-lang.org/ruby.git/commit/?id=8eea66a0ca From 8eea66a0ca8965ae8319b4c404f61c4d46866f64 Mon Sep 17 00:00:00 2001 From: Samuel Williams <samuel.williams@o...> Date: Mon, 14 Sep 2020 11:10:02 +1200 Subject: Add support for Queue & SizedQueue. diff --git a/test/fiber/test_mutex.rb b/test/fiber/test_mutex.rb index 2103412..1f53ae1 100644 --- a/test/fiber/test_mutex.rb +++ b/test/fiber/test_mutex.rb @@ -84,6 +84,37 @@ class TestFiberMutex < Test::Unit::TestCase https://github.com/ruby/ruby/blob/trunk/test/fiber/test_mutex.rb#L84 assert signalled > 1 end + def test_queue + queue = Queue.new + processed = 0 + + thread = Thread.new do + scheduler = Scheduler.new + Thread.current.scheduler = scheduler + + Fiber.schedule do + 3.times do |i| + queue << i + sleep 0.1 + end + + queue.close + end + + Fiber.schedule do + while item = queue.pop + processed += 1 + end + end + + scheduler.run + end + + thread.join + + assert processed == 3 + end + def test_mutex_deadlock err = /No live threads left. Deadlock\?/ assert_in_out_err %W[-I#{__dir__} -], <<-RUBY, ['in synchronize'], err, success: false diff --git a/thread.c b/thread.c index c4ff5aa..ab574e5 100644 --- a/thread.c +++ b/thread.c @@ -1481,8 +1481,13 @@ rb_thread_sleep_interruptible(void) https://github.com/ruby/ruby/blob/trunk/thread.c#L1481 static void rb_thread_sleep_deadly_allow_spurious_wakeup(void) { - thread_debug("rb_thread_sleep_deadly_allow_spurious_wakeup\n"); - sleep_forever(GET_THREAD(), SLEEP_DEADLOCKABLE); + VALUE scheduler = rb_thread_current_scheduler(); + if (scheduler != Qnil) { + rb_scheduler_kernel_sleepv(scheduler, 0, NULL); + } else { + thread_debug("rb_thread_sleep_deadly_allow_spurious_wakeup\n"); + sleep_forever(GET_THREAD(), SLEEP_DEADLOCKABLE); + } } void diff --git a/thread_sync.c b/thread_sync.c index bd60231..c0a6155 100644 --- a/thread_sync.c +++ b/thread_sync.c @@ -946,25 +946,29 @@ queue_do_pop(VALUE self, struct rb_queue *q, int should_block) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L946 check_array(self, q->que); while (RARRAY_LEN(q->que) == 0) { - if (!should_block) { - rb_raise(rb_eThreadError, "queue empty"); - } - else if (queue_closed_p(self)) { - return queue_closed_result(self, q); - } - else { - struct queue_waiter qw; + if (!should_block) { + rb_raise(rb_eThreadError, "queue empty"); + } + else if (queue_closed_p(self)) { + return queue_closed_result(self, q); + } + else { + rb_execution_context_t *ec = GET_EC(); + struct queue_waiter qw; - assert(RARRAY_LEN(q->que) == 0); - assert(queue_closed_p(self) == 0); + assert(RARRAY_LEN(q->que) == 0); + assert(queue_closed_p(self) == 0); - qw.w.th = GET_THREAD(); - qw.as.q = q; - list_add_tail(queue_waitq(qw.as.q), &qw.w.node); - qw.as.q->num_waiting++; + qw.w.self = self; + qw.w.th = ec->thread_ptr; + qw.w.fiber = ec->fiber_ptr; - rb_ensure(queue_sleep, self, queue_sleep_done, (VALUE)&qw); - } + qw.as.q = q; + list_add_tail(queue_waitq(qw.as.q), &qw.w.node); + qw.as.q->num_waiting++; + + rb_ensure(queue_sleep, self, queue_sleep_done, (VALUE)&qw); + } } return rb_ary_shift(q->que); @@ -1188,27 +1192,31 @@ rb_szqueue_push(int argc, VALUE *argv, VALUE self) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L1192 int should_block = szqueue_push_should_block(argc, argv); while (queue_length(self, &sq->q) >= sq->max) { - if (!should_block) { - rb_raise(rb_eThreadError, "queue full"); - } - else if (queue_closed_p(self)) { + if (!should_block) { + rb_raise(rb_eThreadError, "queue full"); + } + else if (queue_closed_p(self)) { break; - } - else { - struct queue_waiter qw; - struct list_head *pushq = szqueue_pushq(sq); + } + else { + rb_execution_context_t *ec = GET_EC(); + struct queue_waiter qw; + struct list_head *pushq = szqueue_pushq(sq); - qw.w.th = GET_THREAD(); - qw.as.sq = sq; - list_add_tail(pushq, &qw.w.node); - sq->num_waiting_push++; + qw.w.self = self; + qw.w.th = ec->thread_ptr; + qw.w.fiber = ec->fiber_ptr; - rb_ensure(queue_sleep, self, szqueue_sleep_done, (VALUE)&qw); - } + qw.as.sq = sq; + list_add_tail(pushq, &qw.w.node); + sq->num_waiting_push++; + + rb_ensure(queue_sleep, self, szqueue_sleep_done, (VALUE)&qw); + } } if (queue_closed_p(self)) { - raise_closed_queue_error(self); + raise_closed_queue_error(self); } return queue_do_push(self, &sq->q, argv[0]); -- cgit v0.10.2 -- ML: ruby-changes@q... Info: http://www.atdot.net/~ko1/quickml/