ruby-changes:46690
From: normal <ko1@a...>
Date: Sat, 20 May 2017 03:53:17 +0900 (JST)
Subject: [ruby-changes:46690] normal:r58805 (trunk): thread_sync.c: rewrite the rest using using ccan/list
normal 2017-05-20 03:53:11 +0900 (Sat, 20 May 2017) New Revision: 58805 https://svn.ruby-lang.org/cgi-bin/viewvc.cgi?view=revision&revision=58805 Log: thread_sync.c: rewrite the rest using using ccan/list The performance improvement increases as the number of waiters increases, due to avoiding the O(n) behavior of rb_ary_delete on the waiting thread. Uncontended queues and condition variables performance is not altered significantly. Function entry cost is slightly increased for ConditionVariable, since the data pointer is separately allocated and not embedded into the RVALUE slot. [ruby-core:81235] [Feature #13552] name |trunk |built ----------------------|------:|------: vm_thread_condvar1 | 0.858| 0.858 vm_thread_condvar2 | 1.003| 0.804 vm_thread_queue | 0.131| 0.129 vm_thread_sized_queue | 0.265| 0.251 vm_thread_sized_queue2| 0.892| 0.859 vm_thread_sized_queue3| 0.879| 0.845 vm_thread_sized_queue4| 0.599| 0.486 Speedup ratio: compare with the result of `trunk' (greater is better) name |built ----------------------|------: vm_thread_condvar1 | 0.999 vm_thread_condvar2 | 1.246 vm_thread_queue | 1.020 vm_thread_sized_queue | 1.057 vm_thread_sized_queue2| 1.039 vm_thread_sized_queue3| 1.041 vm_thread_sized_queue4| 1.233 Modified files: trunk/thread_sync.c Index: thread_sync.c =================================================================== --- thread_sync.c (revision 58804) +++ thread_sync.c (revision 58805) @@ -4,8 +4,6 @@ https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L4 static VALUE rb_cMutex, rb_cQueue, rb_cSizedQueue, rb_cConditionVariable; static VALUE rb_eClosedQueueError; -/* Mutex */ - /* sync_waiter is always on-stack */ struct sync_waiter { rb_thread_t *th; @@ -14,6 +12,38 @@ struct sync_waiter { https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L12 #define MUTEX_ALLOW_TRAP FL_USER1 +static int +wakeup_one(struct list_head *head) +{ + struct sync_waiter *cur = 0, *next = 0; + + list_for_each_safe(head, cur, next, node) { + list_del_init(&cur->node); + if (cur->th->status != THREAD_KILLED) { + rb_threadptr_interrupt(cur->th); + cur->th->status = THREAD_RUNNABLE; + return TRUE; + } + } + return FALSE; +} + +static void +wakeup_all(struct list_head *head) +{ + struct sync_waiter *cur = 0, *next = 0; + + list_for_each_safe(head, cur, next, node) { + list_del_init(&cur->node); + if (cur->th->status != THREAD_KILLED) { + rb_threadptr_interrupt(cur->th); + cur->th->status = THREAD_RUNNABLE; + } + } +} + +/* Mutex */ + typedef struct rb_mutex_struct { struct rb_thread_struct volatile *th; struct rb_mutex_struct *next_mutex; @@ -491,80 +521,121 @@ void rb_mutex_allow_trap(VALUE self, int https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L521 /* Queue */ -enum { - QUEUE_QUE, - QUEUE_WAITERS, - SZQUEUE_WAITERS, - SZQUEUE_MAX, - END_QUEUE -}; +PACKED_STRUCT_UNALIGNED(struct rb_queue { + struct list_head waitq; + const VALUE que; + int num_waiting; +}); + +PACKED_STRUCT_UNALIGNED(struct rb_szqueue { + struct rb_queue q; + int num_waiting_push; + struct list_head pushq; + long max; +}); -#define QUEUE_CLOSED FL_USER5 +static void +queue_mark(void *ptr) +{ + struct rb_queue *q = ptr; -#define GET_QUEUE_QUE(q) get_array((q), QUEUE_QUE) -#define GET_QUEUE_WAITERS(q) get_array((q), QUEUE_WAITERS) -#define GET_SZQUEUE_WAITERS(q) get_array((q), SZQUEUE_WAITERS) -#define GET_SZQUEUE_MAX(q) RSTRUCT_GET((q), SZQUEUE_MAX) -#define GET_SZQUEUE_ULONGMAX(q) NUM2ULONG(GET_SZQUEUE_MAX(q)) + /* no need to mark threads in waitq, they are on stack */ + rb_gc_mark(q->que); +} -static VALUE -ary_buf_new(void) +static size_t +queue_memsize(const void *ptr) { - return rb_ary_tmp_new(1); + return sizeof(struct rb_queue); } +static const rb_data_type_t queue_data_type = { + "queue", + {queue_mark, RUBY_TYPED_DEFAULT_FREE, queue_memsize,}, + 0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED +}; + static VALUE -get_array(VALUE obj, int idx) +queue_alloc(VALUE klass) { - VALUE ary = RSTRUCT_GET(obj, idx); - if (!RB_TYPE_P(ary, T_ARRAY)) { - rb_raise(rb_eTypeError, "%+"PRIsVALUE" not initialized", obj); - } - return ary; + VALUE obj; + struct rb_queue *q; + + obj = TypedData_Make_Struct(klass, struct rb_queue, &queue_data_type, q); + list_head_init(&q->waitq); + return obj; } -static void -wakeup_first_thread(VALUE list) +static struct rb_queue * +queue_ptr(VALUE obj) { - VALUE thread; + struct rb_queue *q; - while (!NIL_P(thread = rb_ary_shift(list))) { - if (RTEST(rb_thread_wakeup_alive(thread))) break; - } + TypedData_Get_Struct(obj, struct rb_queue, &queue_data_type, q); + return q; } +#define QUEUE_CLOSED FL_USER5 + static void -wakeup_all_threads(VALUE list) +szqueue_mark(void *ptr) { - VALUE thread; - long i; + struct rb_szqueue *sq = ptr; - for (i=0; i<RARRAY_LEN(list); i++) { - thread = RARRAY_AREF(list, i); - rb_thread_wakeup_alive(thread); - } - rb_ary_clear(list); + queue_mark(&sq->q); +} + +static size_t +szqueue_memsize(const void *ptr) +{ + return sizeof(struct rb_szqueue); } -static unsigned long -queue_length(VALUE self) +static const rb_data_type_t szqueue_data_type = { + "sized_queue", + {szqueue_mark, RUBY_TYPED_DEFAULT_FREE, szqueue_memsize,}, + 0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED +}; + +static VALUE +szqueue_alloc(VALUE klass) +{ + struct rb_szqueue *sq; + VALUE obj = TypedData_Make_Struct(klass, struct rb_szqueue, + &szqueue_data_type, sq); + list_head_init(&sq->q.waitq); + list_head_init(&sq->pushq); + return obj; +} + +static struct rb_szqueue * +szqueue_ptr(VALUE obj) +{ + struct rb_szqueue *sq; + + TypedData_Get_Struct(obj, struct rb_szqueue, &szqueue_data_type, sq); + return sq; +} + +static VALUE +ary_buf_new(void) { - VALUE que = GET_QUEUE_QUE(self); - return RARRAY_LEN(que); + return rb_ary_tmp_new(1); } -static unsigned long -queue_num_waiting(VALUE self) +static VALUE +check_array(VALUE obj, VALUE ary) { - VALUE waiters = GET_QUEUE_WAITERS(self); - return RARRAY_LEN(waiters); + if (!RB_TYPE_P(ary, T_ARRAY)) { + rb_raise(rb_eTypeError, "%+"PRIsVALUE" not initialized", obj); + } + return ary; } -static unsigned long -szqueue_num_waiting_producer(VALUE self) +static long +queue_length(VALUE self, struct rb_queue *q) { - VALUE waiters = GET_SZQUEUE_WAITERS(self); - return RARRAY_LEN(waiters); + return RARRAY_LEN(check_array(self, q->que)); } static int @@ -580,32 +651,12 @@ raise_closed_queue_error(VALUE self) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L651 } static VALUE -queue_closed_result(VALUE self) +queue_closed_result(VALUE self, struct rb_queue *q) { - assert(queue_length(self) == 0); + assert(queue_length(self, q) == 0); return Qnil; } -static VALUE -queue_do_close(VALUE self, int is_szq) -{ - if (!queue_closed_p(self)) { - FL_SET(self, QUEUE_CLOSED); - - if (queue_num_waiting(self) > 0) { - VALUE waiters = GET_QUEUE_WAITERS(self); - wakeup_all_threads(waiters); - } - - if (is_szq && szqueue_num_waiting_producer(self) > 0) { - VALUE waiters = GET_SZQUEUE_WAITERS(self); - wakeup_all_threads(waiters); - } - } - - return self; -} - /* * Document-class: Queue * @@ -649,19 +700,20 @@ queue_do_close(VALUE self, int is_szq) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L700 static VALUE rb_queue_initialize(VALUE self) { - RSTRUCT_SET(self, QUEUE_QUE, ary_buf_new()); - RSTRUCT_SET(self, QUEUE_WAITERS, ary_buf_new()); + struct rb_queue *q = queue_ptr(self); + RB_OBJ_WRITE(self, &q->que, ary_buf_new()); + list_head_init(&q->waitq); return self; } static VALUE -queue_do_push(VALUE self, VALUE obj) +queue_do_push(VALUE self, struct rb_queue *q, VALUE obj) { if (queue_closed_p(self)) { raise_closed_queue_error(self); } - rb_ary_push(GET_QUEUE_QUE(self), obj); - wakeup_first_thread(GET_QUEUE_WAITERS(self)); + rb_ary_push(check_array(self, q->que), obj); + wakeup_one(&q->waitq); return self; } @@ -699,7 +751,15 @@ queue_do_push(VALUE self, VALUE obj) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L751 static VALUE rb_queue_close(VALUE self) { - return queue_do_close(self, FALSE); + struct rb_queue *q = queue_ptr(self); + + if (!queue_closed_p(self)) { + FL_SET(self, QUEUE_CLOSED); + + wakeup_all(&q->waitq); + } + + return self; } /* @@ -728,52 +788,74 @@ rb_queue_closed_p(VALUE self) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L788 static VALUE rb_queue_push(VALUE self, VALUE obj) { - return queue_do_push(self, obj); + return queue_do_push(self, queue_ptr(self), obj); } -struct waiting_delete { - VALUE waiting; - VALUE th; +static VALUE +queue_sleep(VALUE arg) +{ + rb_thread_sleep_deadly_allow_spurious_wakeup(); + return Qnil; +} + +struct queue_waiter { + struct sync_waiter w; + union { + struct rb_queue *q; + struct rb_szqueue *sq; + } as; }; static VALUE -queue_delete_from_waiting(struct waiting_delete *p) +queue_sleep_done(VALUE p) { - rb_ary_delete(p->waiting, p->th); - return Qnil; + struct queue_waiter *qw = (struct queue_waiter *)p; + + list_del(&qw->w.node); + qw->as.q->num_waiting--; + + return Qfalse; } static VALUE -queue_sleep(VALUE arg) +szqueue_sleep_done(VALUE p) { - rb_thread_sleep_deadly_allow_spurious_wakeup(); - return Qnil; + struct queue_waiter *qw = (struct queue_waiter *)p; + + list_del(&qw->w.node); + qw->as.sq->num_waiting_push--; + + return Qfalse; } static VALUE -queue_do_pop(VALUE self, int should_block) +queue_do_pop(VALUE self, struct rb_queue *q, int should_block) { - struct waiting_delete args; - args.waiting = GET_QUEUE_WAITERS(self); - args.th = rb_thread_current(); + check_array(self, q->que); - while (queue_length(self) == 0) { + while (RARRAY_LEN(q->que) == 0) { if (!should_block) { rb_raise(rb_eThreadError, "queue empty"); } else if (queue_closed_p(self)) { - return queue_closed_result(self); + return queue_closed_result(self, q); } else { - assert(queue_length(self) == 0); + struct queue_waiter qw; + + assert(RARRAY_LEN(q->que) == 0); assert(queue_closed_p(self) == 0); - rb_ary_push(args.waiting, args.th); - rb_ensure(queue_sleep, Qfalse, queue_delete_from_waiting, (VALUE)&args); + qw.w.th = GET_THREAD(); + qw.as.q = q; + list_add_tail(&qw.as.q->waitq, &qw.w.node); + qw.as.q->num_waiting++; + + rb_ensure(queue_sleep, Qfalse, queue_sleep_done, (VALUE)&qw); } } - return rb_ary_shift(GET_QUEUE_QUE(self)); + return rb_ary_shift(q->que); } static int @@ -805,7 +887,7 @@ static VALUE https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L887 rb_queue_pop(int argc, VALUE *argv, VALUE self) { int should_block = queue_pop_should_block(argc, argv); - return queue_do_pop(self, should_block); + return queue_do_pop(self, queue_ptr(self), should_block); } /* @@ -818,7 +900,7 @@ rb_queue_pop(int argc, VALUE *argv, VALU https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L900 static VALUE rb_queue_empty_p(VALUE self) { - return queue_length(self) == 0 ? Qtrue : Qfalse; + return queue_length(self, queue_ptr(self)) == 0 ? Qtrue : Qfalse; } /* @@ -830,7 +912,9 @@ rb_queue_empty_p(VALUE self) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L912 static VALUE rb_queue_clear(VALUE self) { - rb_ary_clear(GET_QUEUE_QUE(self)); + struct rb_queue *q = queue_ptr(self); + + rb_ary_clear(check_array(self, q->que)); return self; } @@ -846,8 +930,7 @@ rb_queue_clear(VALUE self) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L930 static VALUE rb_queue_length(VALUE self) { - unsigned long len = queue_length(self); - return ULONG2NUM(len); + return LONG2NUM(queue_length(self, queue_ptr(self))); } /* @@ -859,8 +942,9 @@ rb_queue_length(VALUE self) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L942 static VALUE rb_queue_num_waiting(VALUE self) { - unsigned long len = queue_num_waiting(self); - return ULONG2NUM(len); + struct rb_queue *q = queue_ptr(self); + + return INT2NUM(q->num_waiting); } /* @@ -883,16 +967,17 @@ static VALUE https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L967 rb_szqueue_initialize(VALUE self, VALUE vmax) { long max; + struct rb_szqueue *sq = szqueue_ptr(self); max = NUM2LONG(vmax); if (max <= 0) { rb_raise(rb_eArgError, "queue size must be positive"); } - RSTRUCT_SET(self, QUEUE_QUE, ary_buf_new()); - RSTRUCT_SET(self, QUEUE_WAITERS, ary_buf_new()); - RSTRUCT_SET(self, SZQUEUE_WAITERS, ary_buf_new()); - RSTRUCT_SET(self, SZQUEUE_MAX, vmax); + RB_OBJ_WRITE(self, &sq->q.que, ary_buf_new()); + list_head_init(&sq->q.waitq); + list_head_init(&sq->pushq); + sq->max = max; return self; } @@ -912,7 +997,14 @@ rb_szqueue_initialize(VALUE self, VALUE https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L997 static VALUE rb_szqueue_close(VALUE self) { - return queue_do_close(self, TRUE); + if (!queue_closed_p(self)) { + struct rb_szqueue *sq = szqueue_ptr(self); + + FL_SET(self, QUEUE_CLOSED); + wakeup_all(&sq->q.waitq); + wakeup_all(&sq->pushq); + } + return self; } /* @@ -924,7 +1016,7 @@ rb_szqueue_close(VALUE self) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L1016 static VALUE rb_szqueue_max_get(VALUE self) { - return GET_SZQUEUE_MAX(self); + return LONG2NUM(szqueue_ptr(self)->max); } /* @@ -937,18 +1029,19 @@ rb_szqueue_max_get(VALUE self) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L1029 static VALUE rb_szqueue_max_set(VALUE self, VALUE vmax) { - long max = NUM2LONG(vmax), diff = 0; - VALUE t; + long max = NUM2LONG(vmax); + long diff = 0; + struct rb_szqueue *sq = szqueue_ptr(self); if (max <= 0) { rb_raise(rb_eArgError, "queue size must be positive"); } - if ((unsigned long)max > GET_SZQUEUE_ULONGMAX(self)) { - diff = max - GET_SZQUEUE_ULONGMAX(self); + if (max > sq->max) { + diff = max - sq->max; } - RSTRUCT_SET(self, SZQUEUE_MAX, vmax); - while (diff-- > 0 && !NIL_P(t = rb_ary_shift(GET_SZQUEUE_WAITERS(self)))) { - rb_thread_wakeup_alive(t); + sq->max = max; + while (diff-- > 0 && wakeup_one(&sq->pushq)) { + /* keep waking more up */ } return vmax; } @@ -981,12 +1074,10 @@ szqueue_push_should_block(int argc, cons https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L1074 static VALUE rb_szqueue_push(int argc, VALUE *argv, VALUE self) { - struct waiting_delete args; + struct rb_szqueue *sq = szqueue_ptr(self); int should_block = szqueue_push_should_block(argc, argv); - args.waiting = GET_SZQUEUE_WAITERS(self); - args.th = rb_thread_current(); - while (queue_length(self) >= GET_SZQUEUE_ULONGMAX(self)) { + while (queue_length(self, &sq->q) >= sq->max) { if (!should_block) { rb_raise(rb_eThreadError, "queue full"); } @@ -994,8 +1085,14 @@ rb_szqueue_push(int argc, VALUE *argv, V https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L1085 goto closed; } else { - rb_ary_push(args.waiting, args.th); - rb_ensure(queue_sleep, Qfalse, queue_delete_from_waiting, (VALUE)&args); + struct queue_waiter qw; + + qw.w.th = GET_THREAD(); + qw.as.sq = sq; + list_add_tail(&sq->pushq, &qw.w.node); + sq->num_waiting_push++; + + rb_ensure(queue_sleep, Qfalse, szqueue_sleep_done, (VALUE)&qw); } } @@ -1004,16 +1101,17 @@ rb_szqueue_push(int argc, VALUE *argv, V https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L1101 raise_closed_queue_error(self); } - return queue_do_push(self, argv[0]); + return queue_do_push(self, &sq->q, argv[0]); } static VALUE szqueue_do_pop(VALUE self, int should_block) { - VALUE retval = queue_do_pop(self, should_block); + struct rb_szqueue *sq = szqueue_ptr(self); + VALUE retval = queue_do_pop(self, &sq->q, should_block); - if (queue_length(self) < GET_SZQUEUE_ULONGMAX(self)) { - wakeup_first_thread(GET_SZQUEUE_WAITERS(self)); + if (queue_length(self, &sq->q) < sq->max) { + wakeup_one(&sq->pushq); } return retval; @@ -1049,11 +1147,21 @@ rb_szqueue_pop(int argc, VALUE *argv, VA https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L1147 static VALUE rb_szqueue_clear(VALUE self) { - rb_ary_clear(GET_QUEUE_QUE(self)); - wakeup_all_threads(GET_SZQUEUE_WAITERS(self)); + struct rb_szqueue *sq = szqueue_ptr(self); + + rb_ary_clear(check_array(self, sq->q.que)); + wakeup_all(&sq->pushq); return self; } +static VALUE +rb_szqueue_length(VALUE self) +{ + struct rb_szqueue *sq = szqueue_ptr(self); + + return LONG2NUM(queue_length(self, &sq->q)); +} + /* * Document-method: SizedQueue#num_waiting * @@ -1063,18 +1171,32 @@ rb_szqueue_clear(VALUE self) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L1171 static VALUE rb_szqueue_num_waiting(VALUE self) { - long len = queue_num_waiting(self) + szqueue_num_waiting_producer(self); - return ULONG2NUM(len); + struct rb_szqueue *sq = szqueue_ptr(self); + + return INT2NUM(sq->q.num_waiting + sq->num_waiting_push); } -/* ConditionalVariable */ +/* + * Document-method: SizedQueue#empty? + * call-seq: empty? + * + * Returns +true+ if the queue is empty. + */ -enum { - CONDVAR_WAITERS, - END_CONDVAR -}; +static VALUE +rb_szqueue_empty_p(VALUE self) +{ + struct rb_szqueue *sq = szqueue_ptr(self); -#define GET_CONDVAR_WAITERS(cv) get_array((cv), CONDVAR_WAITERS) + return queue_length(self, &sq->q) == 0 ? Qtrue : Qfalse; +} + + +/* ConditionalVariable */ +/* TODO: maybe this can be IMEMO */ +struct rb_condvar { + struct list_head waitq; +}; /* * Document-class: ConditionVariable @@ -1106,6 +1228,40 @@ enum { https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L1228 * } */ +static size_t +condvar_memsize(const void *ptr) +{ + return sizeof(struct rb_condvar); +} + +static const rb_data_type_t cv_data_type = { + "condvar", + {0, RUBY_TYPED_DEFAULT_FREE, condvar_memsize,}, + 0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED +}; + +static struct rb_condvar * +condvar_ptr(VALUE self) +{ + struct rb_condvar *cv; + + TypedData_Get_Struct(self, struct rb_condvar, &cv_data_type, cv); + + return cv; +} + +static VALUE +condvar_alloc(VALUE klass) +{ + struct rb_condvar *cv; + VALUE obj; + + obj = TypedData_Make_Struct(klass, struct rb_condvar, &cv_data_type, cv); + list_head_init(&cv->waitq); + + return obj; +} + /* * Document-method: ConditionVariable::new * @@ -1115,7 +1271,8 @@ enum { https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L1271 static VALUE rb_condvar_initialize(VALUE self) { - RSTRUCT_SET(self, CONDVAR_WAITERS, ary_buf_new()); + struct rb_condvar *cv = condvar_ptr(self);; + list_head_init(&cv->waitq); return self; } @@ -1134,9 +1291,11 @@ do_sleep(VALUE args) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L1291 } static VALUE -delete_current_thread(VALUE ary) +delete_from_waitq(struct sync_waiter *w) { - return rb_ary_delete(ary, rb_thread_current()); + list_del(&w->node); + + return Qnil; } /* @@ -1152,16 +1311,18 @@ delete_current_thread(VALUE ary) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L1311 static VALUE rb_condvar_wait(int argc, VALUE *argv, VALUE self) { - VALUE waiters = GET_CONDVAR_WAITERS(self); + struct rb_condvar *cv = condvar_ptr(self); VALUE mutex, timeout; struct sleep_call args; + struct sync_waiter w; rb_scan_args(argc, argv, "11", &mutex, &timeout); args.mutex = mutex; args.timeout = timeout; - rb_ary_push(waiters, rb_thread_current()); - rb_ensure(do_sleep, (VALUE)&args, delete_current_thread, waiters); + w.th = GET_THREAD(); + list_add_tail(&cv->waitq, &w.node); + rb_ensure(do_sleep, (VALUE)&args, delete_from_waitq, (VALUE)&w); return self; } @@ -1175,7 +1336,8 @@ rb_condvar_wait(int argc, VALUE *argv, V https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L1336 static VALUE rb_condvar_signal(VALUE self) { - wakeup_first_thread(GET_CONDVAR_WAITERS(self)); + struct rb_condvar *cv = condvar_ptr(self); + wakeup_one(&cv->waitq); return self; } @@ -1188,7 +1350,8 @@ rb_condvar_signal(VALUE self) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L1350 static VALUE rb_condvar_broadcast(VALUE self) { - wakeup_all_threads(GET_CONDVAR_WAITERS(self)); + struct rb_condvar *cv = condvar_ptr(self); + wakeup_all(&cv->waitq); return self; } @@ -1228,10 +1391,8 @@ Init_thread_sync(void) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L1391 rb_define_method(rb_cMutex, "owned?", rb_mutex_owned_p, 0); /* Queue */ - rb_cQueue = rb_struct_define_without_accessor_under( - rb_cThread, - "Queue", rb_cObject, rb_st (... truncated) -- ML: ruby-changes@q... Info: http://www.atdot.net/~ko1/quickml/