[前][次][番号順一覧][スレッド一覧]

ruby-changes:46690

From: normal <ko1@a...>
Date: Sat, 20 May 2017 03:53:17 +0900 (JST)
Subject: [ruby-changes:46690] normal:r58805 (trunk): thread_sync.c: rewrite the rest using using ccan/list

normal	2017-05-20 03:53:11 +0900 (Sat, 20 May 2017)

  New Revision: 58805

  https://svn.ruby-lang.org/cgi-bin/viewvc.cgi?view=revision&revision=58805

  Log:
    thread_sync.c: rewrite the rest using using ccan/list
    
    The performance improvement increases as the number of waiters
    increases, due to avoiding the O(n) behavior of rb_ary_delete on
    the waiting thread.  Uncontended queues and condition variables
    performance is not altered significantly.
    
    Function entry cost is slightly increased for ConditionVariable,
    since the data pointer is separately allocated and not embedded
    into the RVALUE slot.
    
    [ruby-core:81235] [Feature #13552]
    
    name                  |trunk  |built
    ----------------------|------:|------:
    vm_thread_condvar1    |  0.858|  0.858
    vm_thread_condvar2    |  1.003|  0.804
    vm_thread_queue       |  0.131|  0.129
    vm_thread_sized_queue |  0.265|  0.251
    vm_thread_sized_queue2|  0.892|  0.859
    vm_thread_sized_queue3|  0.879|  0.845
    vm_thread_sized_queue4|  0.599|  0.486
    
    Speedup ratio: compare with the result of `trunk' (greater is better)
    
    name                  |built
    ----------------------|------:
    vm_thread_condvar1    |  0.999
    vm_thread_condvar2    |  1.246
    vm_thread_queue       |  1.020
    vm_thread_sized_queue |  1.057
    vm_thread_sized_queue2|  1.039
    vm_thread_sized_queue3|  1.041
    vm_thread_sized_queue4|  1.233

  Modified files:
    trunk/thread_sync.c
Index: thread_sync.c
===================================================================
--- thread_sync.c	(revision 58804)
+++ thread_sync.c	(revision 58805)
@@ -4,8 +4,6 @@ https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L4
 static VALUE rb_cMutex, rb_cQueue, rb_cSizedQueue, rb_cConditionVariable;
 static VALUE rb_eClosedQueueError;
 
-/* Mutex */
-
 /* sync_waiter is always on-stack */
 struct sync_waiter {
     rb_thread_t *th;
@@ -14,6 +12,38 @@ struct sync_waiter { https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L12
 
 #define MUTEX_ALLOW_TRAP FL_USER1
 
+static int
+wakeup_one(struct list_head *head)
+{
+    struct sync_waiter *cur = 0, *next = 0;
+
+    list_for_each_safe(head, cur, next, node) {
+	list_del_init(&cur->node);
+	if (cur->th->status != THREAD_KILLED) {
+	    rb_threadptr_interrupt(cur->th);
+	    cur->th->status = THREAD_RUNNABLE;
+	    return TRUE;
+	}
+    }
+    return FALSE;
+}
+
+static void
+wakeup_all(struct list_head *head)
+{
+    struct sync_waiter *cur = 0, *next = 0;
+
+    list_for_each_safe(head, cur, next, node) {
+	list_del_init(&cur->node);
+	if (cur->th->status != THREAD_KILLED) {
+	    rb_threadptr_interrupt(cur->th);
+	    cur->th->status = THREAD_RUNNABLE;
+	}
+    }
+}
+
+/* Mutex */
+
 typedef struct rb_mutex_struct {
     struct rb_thread_struct volatile *th;
     struct rb_mutex_struct *next_mutex;
@@ -491,80 +521,121 @@ void rb_mutex_allow_trap(VALUE self, int https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L521
 
 /* Queue */
 
-enum {
-    QUEUE_QUE,
-    QUEUE_WAITERS,
-    SZQUEUE_WAITERS,
-    SZQUEUE_MAX,
-    END_QUEUE
-};
+PACKED_STRUCT_UNALIGNED(struct rb_queue {
+    struct list_head waitq;
+    const VALUE que;
+    int num_waiting;
+});
+
+PACKED_STRUCT_UNALIGNED(struct rb_szqueue {
+    struct rb_queue q;
+    int num_waiting_push;
+    struct list_head pushq;
+    long max;
+});
 
-#define QUEUE_CLOSED          FL_USER5
+static void
+queue_mark(void *ptr)
+{
+    struct rb_queue *q = ptr;
 
-#define GET_QUEUE_QUE(q)        get_array((q), QUEUE_QUE)
-#define GET_QUEUE_WAITERS(q)    get_array((q), QUEUE_WAITERS)
-#define GET_SZQUEUE_WAITERS(q)  get_array((q), SZQUEUE_WAITERS)
-#define GET_SZQUEUE_MAX(q)      RSTRUCT_GET((q), SZQUEUE_MAX)
-#define GET_SZQUEUE_ULONGMAX(q) NUM2ULONG(GET_SZQUEUE_MAX(q))
+    /* no need to mark threads in waitq, they are on stack */
+    rb_gc_mark(q->que);
+}
 
-static VALUE
-ary_buf_new(void)
+static size_t
+queue_memsize(const void *ptr)
 {
-    return rb_ary_tmp_new(1);
+    return sizeof(struct rb_queue);
 }
 
+static const rb_data_type_t queue_data_type = {
+    "queue",
+    {queue_mark, RUBY_TYPED_DEFAULT_FREE, queue_memsize,},
+    0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED
+};
+
 static VALUE
-get_array(VALUE obj, int idx)
+queue_alloc(VALUE klass)
 {
-    VALUE ary = RSTRUCT_GET(obj, idx);
-    if (!RB_TYPE_P(ary, T_ARRAY)) {
-	rb_raise(rb_eTypeError, "%+"PRIsVALUE" not initialized", obj);
-    }
-    return ary;
+    VALUE obj;
+    struct rb_queue *q;
+
+    obj = TypedData_Make_Struct(klass, struct rb_queue, &queue_data_type, q);
+    list_head_init(&q->waitq);
+    return obj;
 }
 
-static void
-wakeup_first_thread(VALUE list)
+static struct rb_queue *
+queue_ptr(VALUE obj)
 {
-    VALUE thread;
+    struct rb_queue *q;
 
-    while (!NIL_P(thread = rb_ary_shift(list))) {
-	if (RTEST(rb_thread_wakeup_alive(thread))) break;
-    }
+    TypedData_Get_Struct(obj, struct rb_queue, &queue_data_type, q);
+    return q;
 }
 
+#define QUEUE_CLOSED          FL_USER5
+
 static void
-wakeup_all_threads(VALUE list)
+szqueue_mark(void *ptr)
 {
-    VALUE thread;
-    long i;
+    struct rb_szqueue *sq = ptr;
 
-    for (i=0; i<RARRAY_LEN(list); i++) {
-	thread = RARRAY_AREF(list, i);
-	rb_thread_wakeup_alive(thread);
-    }
-    rb_ary_clear(list);
+    queue_mark(&sq->q);
+}
+
+static size_t
+szqueue_memsize(const void *ptr)
+{
+    return sizeof(struct rb_szqueue);
 }
 
-static unsigned long
-queue_length(VALUE self)
+static const rb_data_type_t szqueue_data_type = {
+    "sized_queue",
+    {szqueue_mark, RUBY_TYPED_DEFAULT_FREE, szqueue_memsize,},
+    0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED
+};
+
+static VALUE
+szqueue_alloc(VALUE klass)
+{
+    struct rb_szqueue *sq;
+    VALUE obj = TypedData_Make_Struct(klass, struct rb_szqueue,
+					&szqueue_data_type, sq);
+    list_head_init(&sq->q.waitq);
+    list_head_init(&sq->pushq);
+    return obj;
+}
+
+static struct rb_szqueue *
+szqueue_ptr(VALUE obj)
+{
+    struct rb_szqueue *sq;
+
+    TypedData_Get_Struct(obj, struct rb_szqueue, &szqueue_data_type, sq);
+    return sq;
+}
+
+static VALUE
+ary_buf_new(void)
 {
-    VALUE que = GET_QUEUE_QUE(self);
-    return RARRAY_LEN(que);
+    return rb_ary_tmp_new(1);
 }
 
-static unsigned long
-queue_num_waiting(VALUE self)
+static VALUE
+check_array(VALUE obj, VALUE ary)
 {
-    VALUE waiters = GET_QUEUE_WAITERS(self);
-    return RARRAY_LEN(waiters);
+    if (!RB_TYPE_P(ary, T_ARRAY)) {
+	rb_raise(rb_eTypeError, "%+"PRIsVALUE" not initialized", obj);
+    }
+    return ary;
 }
 
-static unsigned long
-szqueue_num_waiting_producer(VALUE self)
+static long
+queue_length(VALUE self, struct rb_queue *q)
 {
-    VALUE waiters = GET_SZQUEUE_WAITERS(self);
-    return RARRAY_LEN(waiters);
+    return RARRAY_LEN(check_array(self, q->que));
 }
 
 static int
@@ -580,32 +651,12 @@ raise_closed_queue_error(VALUE self) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L651
 }
 
 static VALUE
-queue_closed_result(VALUE self)
+queue_closed_result(VALUE self, struct rb_queue *q)
 {
-    assert(queue_length(self) == 0);
+    assert(queue_length(self, q) == 0);
     return Qnil;
 }
 
-static VALUE
-queue_do_close(VALUE self, int is_szq)
-{
-    if (!queue_closed_p(self)) {
-	FL_SET(self, QUEUE_CLOSED);
-
-	if (queue_num_waiting(self) > 0) {
-	    VALUE waiters = GET_QUEUE_WAITERS(self);
-	    wakeup_all_threads(waiters);
-	}
-
-	if (is_szq && szqueue_num_waiting_producer(self) > 0) {
-	    VALUE waiters = GET_SZQUEUE_WAITERS(self);
-	    wakeup_all_threads(waiters);
-	}
-    }
-
-    return self;
-}
-
 /*
  *  Document-class: Queue
  *
@@ -649,19 +700,20 @@ queue_do_close(VALUE self, int is_szq) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L700
 static VALUE
 rb_queue_initialize(VALUE self)
 {
-    RSTRUCT_SET(self, QUEUE_QUE, ary_buf_new());
-    RSTRUCT_SET(self, QUEUE_WAITERS, ary_buf_new());
+    struct rb_queue *q = queue_ptr(self);
+    RB_OBJ_WRITE(self, &q->que, ary_buf_new());
+    list_head_init(&q->waitq);
     return self;
 }
 
 static VALUE
-queue_do_push(VALUE self, VALUE obj)
+queue_do_push(VALUE self, struct rb_queue *q, VALUE obj)
 {
     if (queue_closed_p(self)) {
 	raise_closed_queue_error(self);
     }
-    rb_ary_push(GET_QUEUE_QUE(self), obj);
-    wakeup_first_thread(GET_QUEUE_WAITERS(self));
+    rb_ary_push(check_array(self, q->que), obj);
+    wakeup_one(&q->waitq);
     return self;
 }
 
@@ -699,7 +751,15 @@ queue_do_push(VALUE self, VALUE obj) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L751
 static VALUE
 rb_queue_close(VALUE self)
 {
-    return queue_do_close(self, FALSE);
+    struct rb_queue *q = queue_ptr(self);
+
+    if (!queue_closed_p(self)) {
+	FL_SET(self, QUEUE_CLOSED);
+
+	wakeup_all(&q->waitq);
+    }
+
+    return self;
 }
 
 /*
@@ -728,52 +788,74 @@ rb_queue_closed_p(VALUE self) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L788
 static VALUE
 rb_queue_push(VALUE self, VALUE obj)
 {
-    return queue_do_push(self, obj);
+    return queue_do_push(self, queue_ptr(self), obj);
 }
 
-struct waiting_delete {
-    VALUE waiting;
-    VALUE th;
+static VALUE
+queue_sleep(VALUE arg)
+{
+    rb_thread_sleep_deadly_allow_spurious_wakeup();
+    return Qnil;
+}
+
+struct queue_waiter {
+    struct sync_waiter w;
+    union {
+	struct rb_queue *q;
+	struct rb_szqueue *sq;
+    } as;
 };
 
 static VALUE
-queue_delete_from_waiting(struct waiting_delete *p)
+queue_sleep_done(VALUE p)
 {
-    rb_ary_delete(p->waiting, p->th);
-    return Qnil;
+    struct queue_waiter *qw = (struct queue_waiter *)p;
+
+    list_del(&qw->w.node);
+    qw->as.q->num_waiting--;
+
+    return Qfalse;
 }
 
 static VALUE
-queue_sleep(VALUE arg)
+szqueue_sleep_done(VALUE p)
 {
-    rb_thread_sleep_deadly_allow_spurious_wakeup();
-    return Qnil;
+    struct queue_waiter *qw = (struct queue_waiter *)p;
+
+    list_del(&qw->w.node);
+    qw->as.sq->num_waiting_push--;
+
+    return Qfalse;
 }
 
 static VALUE
-queue_do_pop(VALUE self, int should_block)
+queue_do_pop(VALUE self, struct rb_queue *q, int should_block)
 {
-    struct waiting_delete args;
-    args.waiting = GET_QUEUE_WAITERS(self);
-    args.th	 = rb_thread_current();
+    check_array(self, q->que);
 
-    while (queue_length(self) == 0) {
+    while (RARRAY_LEN(q->que) == 0) {
 	if (!should_block) {
 	    rb_raise(rb_eThreadError, "queue empty");
 	}
 	else if (queue_closed_p(self)) {
-	    return queue_closed_result(self);
+	    return queue_closed_result(self, q);
 	}
 	else {
-	    assert(queue_length(self) == 0);
+	    struct queue_waiter qw;
+
+	    assert(RARRAY_LEN(q->que) == 0);
 	    assert(queue_closed_p(self) == 0);
 
-	    rb_ary_push(args.waiting, args.th);
-	    rb_ensure(queue_sleep, Qfalse, queue_delete_from_waiting, (VALUE)&args);
+	    qw.w.th = GET_THREAD();
+	    qw.as.q = q;
+	    list_add_tail(&qw.as.q->waitq, &qw.w.node);
+	    qw.as.q->num_waiting++;
+
+	    rb_ensure(queue_sleep, Qfalse, queue_sleep_done, (VALUE)&qw);
 	}
     }
 
-    return rb_ary_shift(GET_QUEUE_QUE(self));
+    return rb_ary_shift(q->que);
 }
 
 static int
@@ -805,7 +887,7 @@ static VALUE https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L887
 rb_queue_pop(int argc, VALUE *argv, VALUE self)
 {
     int should_block = queue_pop_should_block(argc, argv);
-    return queue_do_pop(self, should_block);
+    return queue_do_pop(self, queue_ptr(self), should_block);
 }
 
 /*
@@ -818,7 +900,7 @@ rb_queue_pop(int argc, VALUE *argv, VALU https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L900
 static VALUE
 rb_queue_empty_p(VALUE self)
 {
-    return queue_length(self) == 0 ? Qtrue : Qfalse;
+    return queue_length(self, queue_ptr(self)) == 0 ? Qtrue : Qfalse;
 }
 
 /*
@@ -830,7 +912,9 @@ rb_queue_empty_p(VALUE self) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L912
 static VALUE
 rb_queue_clear(VALUE self)
 {
-    rb_ary_clear(GET_QUEUE_QUE(self));
+    struct rb_queue *q = queue_ptr(self);
+
+    rb_ary_clear(check_array(self, q->que));
     return self;
 }
 
@@ -846,8 +930,7 @@ rb_queue_clear(VALUE self) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L930
 static VALUE
 rb_queue_length(VALUE self)
 {
-    unsigned long len = queue_length(self);
-    return ULONG2NUM(len);
+    return LONG2NUM(queue_length(self, queue_ptr(self)));
 }
 
 /*
@@ -859,8 +942,9 @@ rb_queue_length(VALUE self) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L942
 static VALUE
 rb_queue_num_waiting(VALUE self)
 {
-    unsigned long len = queue_num_waiting(self);
-    return ULONG2NUM(len);
+    struct rb_queue *q = queue_ptr(self);
+
+    return INT2NUM(q->num_waiting);
 }
 
 /*
@@ -883,16 +967,17 @@ static VALUE https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L967
 rb_szqueue_initialize(VALUE self, VALUE vmax)
 {
     long max;
+    struct rb_szqueue *sq = szqueue_ptr(self);
 
     max = NUM2LONG(vmax);
     if (max <= 0) {
 	rb_raise(rb_eArgError, "queue size must be positive");
     }
 
-    RSTRUCT_SET(self, QUEUE_QUE, ary_buf_new());
-    RSTRUCT_SET(self, QUEUE_WAITERS, ary_buf_new());
-    RSTRUCT_SET(self, SZQUEUE_WAITERS, ary_buf_new());
-    RSTRUCT_SET(self, SZQUEUE_MAX, vmax);
+    RB_OBJ_WRITE(self, &sq->q.que, ary_buf_new());
+    list_head_init(&sq->q.waitq);
+    list_head_init(&sq->pushq);
+    sq->max = max;
 
     return self;
 }
@@ -912,7 +997,14 @@ rb_szqueue_initialize(VALUE self, VALUE https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L997
 static VALUE
 rb_szqueue_close(VALUE self)
 {
-    return queue_do_close(self, TRUE);
+    if (!queue_closed_p(self)) {
+	struct rb_szqueue *sq = szqueue_ptr(self);
+
+	FL_SET(self, QUEUE_CLOSED);
+	wakeup_all(&sq->q.waitq);
+	wakeup_all(&sq->pushq);
+    }
+    return self;
 }
 
 /*
@@ -924,7 +1016,7 @@ rb_szqueue_close(VALUE self) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L1016
 static VALUE
 rb_szqueue_max_get(VALUE self)
 {
-    return GET_SZQUEUE_MAX(self);
+    return LONG2NUM(szqueue_ptr(self)->max);
 }
 
 /*
@@ -937,18 +1029,19 @@ rb_szqueue_max_get(VALUE self) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L1029
 static VALUE
 rb_szqueue_max_set(VALUE self, VALUE vmax)
 {
-    long max = NUM2LONG(vmax), diff = 0;
-    VALUE t;
+    long max = NUM2LONG(vmax);
+    long diff = 0;
+    struct rb_szqueue *sq = szqueue_ptr(self);
 
     if (max <= 0) {
 	rb_raise(rb_eArgError, "queue size must be positive");
     }
-    if ((unsigned long)max > GET_SZQUEUE_ULONGMAX(self)) {
-	diff = max - GET_SZQUEUE_ULONGMAX(self);
+    if (max > sq->max) {
+	diff = max - sq->max;
     }
-    RSTRUCT_SET(self, SZQUEUE_MAX, vmax);
-    while (diff-- > 0 && !NIL_P(t = rb_ary_shift(GET_SZQUEUE_WAITERS(self)))) {
-	rb_thread_wakeup_alive(t);
+    sq->max = max;
+    while (diff-- > 0 && wakeup_one(&sq->pushq)) {
+	/* keep waking more up */
     }
     return vmax;
 }
@@ -981,12 +1074,10 @@ szqueue_push_should_block(int argc, cons https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L1074
 static VALUE
 rb_szqueue_push(int argc, VALUE *argv, VALUE self)
 {
-    struct waiting_delete args;
+    struct rb_szqueue *sq = szqueue_ptr(self);
     int should_block = szqueue_push_should_block(argc, argv);
-    args.waiting = GET_SZQUEUE_WAITERS(self);
-    args.th      = rb_thread_current();
 
-    while (queue_length(self) >= GET_SZQUEUE_ULONGMAX(self)) {
+    while (queue_length(self, &sq->q) >= sq->max) {
 	if (!should_block) {
 	    rb_raise(rb_eThreadError, "queue full");
 	}
@@ -994,8 +1085,14 @@ rb_szqueue_push(int argc, VALUE *argv, V https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L1085
 	    goto closed;
 	}
 	else {
-	    rb_ary_push(args.waiting, args.th);
-	    rb_ensure(queue_sleep, Qfalse, queue_delete_from_waiting, (VALUE)&args);
+	    struct queue_waiter qw;
+
+	    qw.w.th = GET_THREAD();
+	    qw.as.sq = sq;
+	    list_add_tail(&sq->pushq, &qw.w.node);
+	    sq->num_waiting_push++;
+
+	    rb_ensure(queue_sleep, Qfalse, szqueue_sleep_done, (VALUE)&qw);
 	}
     }
 
@@ -1004,16 +1101,17 @@ rb_szqueue_push(int argc, VALUE *argv, V https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L1101
 	raise_closed_queue_error(self);
     }
 
-    return queue_do_push(self, argv[0]);
+    return queue_do_push(self, &sq->q, argv[0]);
 }
 
 static VALUE
 szqueue_do_pop(VALUE self, int should_block)
 {
-    VALUE retval = queue_do_pop(self, should_block);
+    struct rb_szqueue *sq = szqueue_ptr(self);
+    VALUE retval = queue_do_pop(self, &sq->q, should_block);
 
-    if (queue_length(self) < GET_SZQUEUE_ULONGMAX(self)) {
-	wakeup_first_thread(GET_SZQUEUE_WAITERS(self));
+    if (queue_length(self, &sq->q) < sq->max) {
+	wakeup_one(&sq->pushq);
     }
 
     return retval;
@@ -1049,11 +1147,21 @@ rb_szqueue_pop(int argc, VALUE *argv, VA https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L1147
 static VALUE
 rb_szqueue_clear(VALUE self)
 {
-    rb_ary_clear(GET_QUEUE_QUE(self));
-    wakeup_all_threads(GET_SZQUEUE_WAITERS(self));
+    struct rb_szqueue *sq = szqueue_ptr(self);
+
+    rb_ary_clear(check_array(self, sq->q.que));
+    wakeup_all(&sq->pushq);
     return self;
 }
 
+static VALUE
+rb_szqueue_length(VALUE self)
+{
+    struct rb_szqueue *sq = szqueue_ptr(self);
+
+    return LONG2NUM(queue_length(self, &sq->q));
+}
+
 /*
  * Document-method: SizedQueue#num_waiting
  *
@@ -1063,18 +1171,32 @@ rb_szqueue_clear(VALUE self) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L1171
 static VALUE
 rb_szqueue_num_waiting(VALUE self)
 {
-    long len = queue_num_waiting(self) + szqueue_num_waiting_producer(self);
-    return ULONG2NUM(len);
+    struct rb_szqueue *sq = szqueue_ptr(self);
+
+    return INT2NUM(sq->q.num_waiting + sq->num_waiting_push);
 }
 
-/* ConditionalVariable */
+/*
+ * Document-method: SizedQueue#empty?
+ * call-seq: empty?
+ *
+ * Returns +true+ if the queue is empty.
+ */
 
-enum {
-    CONDVAR_WAITERS,
-    END_CONDVAR
-};
+static VALUE
+rb_szqueue_empty_p(VALUE self)
+{
+    struct rb_szqueue *sq = szqueue_ptr(self);
 
-#define GET_CONDVAR_WAITERS(cv) get_array((cv), CONDVAR_WAITERS)
+    return queue_length(self, &sq->q) == 0 ? Qtrue : Qfalse;
+}
+
+
+/* ConditionalVariable */
+/* TODO: maybe this can be IMEMO */
+struct rb_condvar {
+    struct list_head waitq;
+};
 
 /*
  *  Document-class: ConditionVariable
@@ -1106,6 +1228,40 @@ enum { https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L1228
  *    }
  */
 
+static size_t
+condvar_memsize(const void *ptr)
+{
+    return sizeof(struct rb_condvar);
+}
+
+static const rb_data_type_t cv_data_type = {
+    "condvar",
+    {0, RUBY_TYPED_DEFAULT_FREE, condvar_memsize,},
+    0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED
+};
+
+static struct rb_condvar *
+condvar_ptr(VALUE self)
+{
+    struct rb_condvar *cv;
+
+    TypedData_Get_Struct(self, struct rb_condvar, &cv_data_type, cv);
+
+    return cv;
+}
+
+static VALUE
+condvar_alloc(VALUE klass)
+{
+    struct rb_condvar *cv;
+    VALUE obj;
+
+    obj = TypedData_Make_Struct(klass, struct rb_condvar, &cv_data_type, cv);
+    list_head_init(&cv->waitq);
+
+    return obj;
+}
+
 /*
  * Document-method: ConditionVariable::new
  *
@@ -1115,7 +1271,8 @@ enum { https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L1271
 static VALUE
 rb_condvar_initialize(VALUE self)
 {
-    RSTRUCT_SET(self, CONDVAR_WAITERS, ary_buf_new());
+    struct rb_condvar *cv = condvar_ptr(self);;
+    list_head_init(&cv->waitq);
     return self;
 }
 
@@ -1134,9 +1291,11 @@ do_sleep(VALUE args) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L1291
 }
 
 static VALUE
-delete_current_thread(VALUE ary)
+delete_from_waitq(struct sync_waiter *w)
 {
-    return rb_ary_delete(ary, rb_thread_current());
+    list_del(&w->node);
+
+    return Qnil;
 }
 
 /*
@@ -1152,16 +1311,18 @@ delete_current_thread(VALUE ary) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L1311
 static VALUE
 rb_condvar_wait(int argc, VALUE *argv, VALUE self)
 {
-    VALUE waiters = GET_CONDVAR_WAITERS(self);
+    struct rb_condvar *cv = condvar_ptr(self);
     VALUE mutex, timeout;
     struct sleep_call args;
+    struct sync_waiter w;
 
     rb_scan_args(argc, argv, "11", &mutex, &timeout);
 
     args.mutex   = mutex;
     args.timeout = timeout;
-    rb_ary_push(waiters, rb_thread_current());
-    rb_ensure(do_sleep, (VALUE)&args, delete_current_thread, waiters);
+    w.th = GET_THREAD();
+    list_add_tail(&cv->waitq, &w.node);
+    rb_ensure(do_sleep, (VALUE)&args, delete_from_waitq, (VALUE)&w);
 
     return self;
 }
@@ -1175,7 +1336,8 @@ rb_condvar_wait(int argc, VALUE *argv, V https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L1336
 static VALUE
 rb_condvar_signal(VALUE self)
 {
-    wakeup_first_thread(GET_CONDVAR_WAITERS(self));
+    struct rb_condvar *cv = condvar_ptr(self);
+    wakeup_one(&cv->waitq);
     return self;
 }
 
@@ -1188,7 +1350,8 @@ rb_condvar_signal(VALUE self) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L1350
 static VALUE
 rb_condvar_broadcast(VALUE self)
 {
-    wakeup_all_threads(GET_CONDVAR_WAITERS(self));
+    struct rb_condvar *cv = condvar_ptr(self);
+    wakeup_all(&cv->waitq);
     return self;
 }
 
@@ -1228,10 +1391,8 @@ Init_thread_sync(void) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L1391
     rb_define_method(rb_cMutex, "owned?", rb_mutex_owned_p, 0);
 
     /* Queue */
-    rb_cQueue = rb_struct_define_without_accessor_under(
-	rb_cThread,
-	"Queue", rb_cObject, rb_st (... truncated)

--
ML: ruby-changes@q...
Info: http://www.atdot.net/~ko1/quickml/

[前][次][番号順一覧][スレッド一覧]