[前][次][番号順一覧][スレッド一覧]

ruby-changes:13974

From: nobu <ko1@a...>
Date: Mon, 16 Nov 2009 01:37:15 +0900 (JST)
Subject: [ruby-changes:13974] Ruby:r25781 (mvm): * thread.c (rb_queue_shift_wait): added.

nobu	2009-11-16 01:36:57 +0900 (Mon, 16 Nov 2009)

  New Revision: 25781

  http://svn.ruby-lang.org/cgi-bin/viewvc.cgi?view=rev&revision=25781

  Log:
    * thread.c (rb_queue_shift_wait): added.
    
    * vm.c (rb_vm_send, rb_vm_recv): new methods.
    
    * vm_core.h (rb_queue_t): added condition variable.
    
    * vm_core.h (rb_vm_t): moved message queue from rb_thread_t.
    
    Index: vm_core.h
    ===================================================================
    --- vm_core.h	(revision 25770)
    +++ vm_core.h	(working copy)
    @@ -265,4 +265,15 @@ void rb_objspace_free(struct rb_objspace
     #endif
     
    +typedef struct rb_queue_element {
    +    struct rb_queue_element *next;
    +    void *value;
    +} rb_queue_element_t;
    +
    +typedef struct rb_queue {
    +    rb_thread_lock_t lock;
    +    rb_thread_cond_t wait;
    +    rb_queue_element_t *head, **tail;
    +} rb_queue_t;
    +
     typedef struct rb_vm_struct {
         VALUE self;
    @@ -311,4 +322,8 @@ typedef struct rb_vm_struct {
         } trap_list[RUBY_NSIG];
     
    +    struct {
    +	rb_queue_t message;
    +    } queue;
    +
         /* hook */
         rb_event_hook_t *event_hooks;
    @@ -397,18 +412,9 @@ struct rb_unblock_callback {
     struct rb_mutex_struct;
     
    -typedef struct rb_queue_element {
    -    struct rb_queue_element *next;
    -    void *value;
    -} rb_queue_element_t;
    -
    -typedef struct rb_queue {
    -    rb_thread_lock_t lock;
    -    rb_queue_element_t *head, **tail;
    -} rb_queue_t;
    -
     void rb_queue_initialize(rb_queue_t *);
     void rb_queue_destroy(rb_queue_t *);
     int rb_queue_push(rb_queue_t *, void *);
     int rb_queue_shift(rb_queue_t *, void **);
    +int rb_queue_shift_wait(rb_queue_t *, void **, const struct timeval *);
     int rb_queue_empty_p(const rb_queue_t *);
     
    @@ -458,5 +464,5 @@ typedef struct rb_thread_struct
     
         struct {
    -	rb_queue_t signal, message;
    +	rb_queue_t signal;
         } queue;
     
    Index: thread.c
    ===================================================================
    --- thread.c	(revision 25778)
    +++ thread.c	(working copy)
    @@ -876,4 +876,7 @@ getclockofday(struct timeval *tp)
     }
     
    +static void add_tv(struct timeval *to, const struct timeval *from);
    +static int subtract_tv(struct timeval *to, const struct timeval *from);
    +
     static void
     sleep_timeval(rb_thread_t *th, struct timeval tv)
    @@ -883,9 +886,5 @@ sleep_timeval(rb_thread_t *th, struct ti
     
         getclockofday(&to);
    -    to.tv_sec += tv.tv_sec;
    -    if ((to.tv_usec += tv.tv_usec) >= 1000000) {
    -	to.tv_sec++;
    -	to.tv_usec -= 1000000;
    -    }
    +    add_tv(&to, &tv);
     
         th->status = THREAD_STOPPED;
    @@ -893,15 +892,10 @@ sleep_timeval(rb_thread_t *th, struct ti
     	native_sleep(th, &tv);
     	RUBY_VM_CHECK_INTS();
    +	tv = to;
     	getclockofday(&tvn);
    -	if (to.tv_sec < tvn.tv_sec) break;
    -	if (to.tv_sec == tvn.tv_sec && to.tv_usec <= tvn.tv_usec) break;
    +	if (!subtract_tv(&tv, &tvn)) break;
     	thread_debug("sleep_timeval: %ld.%.6ld > %ld.%.6ld\n",
     		     (long)to.tv_sec, (long)to.tv_usec,
     		     (long)tvn.tv_sec, (long)tvn.tv_usec);
    -	tv.tv_sec = to.tv_sec - tvn.tv_sec;
    -	if ((tv.tv_usec = to.tv_usec - tvn.tv_usec) < 0) {
    -	    --tv.tv_sec;
    -	    tv.tv_usec += 1000000;
    -	}
         } while (th->status == THREAD_STOPPED);
         th->status = prev_status;
    @@ -1137,4 +1131,5 @@ void
     rb_queue_initialize(rb_queue_t *que)
     {
    +    native_cond_initialize(&que->wait);
         ruby_native_thread_lock_initialize(&que->lock);
         que->head = 0;
    @@ -1155,4 +1150,5 @@ rb_queue_destroy(rb_queue_t *que)
         ruby_native_thread_unlock(&que->lock);
         native_mutex_destroy(&que->lock);
    +    native_cond_destroy(&que->wait);
     }
     
    @@ -1167,8 +1163,45 @@ rb_queue_push(rb_queue_t *que, void *val
         *que->tail = e;
         que->tail = &e->next;
    +    ruby_native_cond_signal(&que->wait);
         ruby_native_thread_unlock(&que->lock);
         return Qtrue;
     }
     
    +#define QUEUE_SHIFT(e, que) (	       \
    +	(e = que->head) != 0 &&	       \
    +	((que->head = e->next) != 0 || \
    +	 (que->tail = &que->head, 1)))
    +
    +int
    +rb_queue_shift_wait(rb_queue_t *que, void **value, const struct timeval *tv)
    +{
    +    rb_queue_element_t *e;
    +    struct timeval to, td, tvn;
    +
    +    if (tv) {
    +	getclockofday(&to);
    +	add_tv(&to, tv);
    +    }
    +    ruby_native_thread_lock(&que->lock);
    +    while (!QUEUE_SHIFT(e, que) &&
    +	   native_cond_timedwait(&que->wait, &que->lock, tv) == ETIMEDOUT) {
    +	if (tv) {
    +	    if (QUEUE_SHIFT(e, que)) break;
    +	    td = to;
    +	    getclockofday(&tvn);
    +	    if (!subtract_tv(&td, &tvn)) break;
    +	    tv = &td;
    +	}
    +    }
    +    ruby_native_thread_unlock(&que->lock);
    +    if (!e) {
    +	errno = ETIMEDOUT;
    +	return FALSE;
    +    }
    +    *value = e->value;
    +    free(e);
    +    return TRUE;
    +}
    +
     int
     rb_queue_shift(rb_queue_t *que, void **value)
    @@ -2479,4 +2512,5 @@ cmp_tv(const struct timeval *a, const st
         return (d != 0) ? d : (a->tv_usec - b->tv_usec);
     }
    +#endif
     
     static int
    @@ -2494,5 +2528,14 @@ subtract_tv(struct timeval *rest, const 
         return 1;
     }
    -#endif
    +
    +static void
    +add_tv(struct timeval *to, const struct timeval *ta)
    +{
    +    to->tv_sec += ta->tv_sec;
    +    if ((to->tv_usec += ta->tv_usec) >= 1000000) {
    +	to->tv_sec++;
    +	to->tv_usec -= 1000000;
    +    }
    +}
     
     static int
    Index: vm.c
    ===================================================================
    --- vm.c	(revision 25779)
    +++ vm.c	(working copy)
    @@ -1504,4 +1504,5 @@ rb_vm_free(void *ptr)
     	ruby_native_cond_signal(&vm->global_vm_waiting);
     	ruby_native_cond_destroy(&vm->global_vm_waiting);
    +	rb_queue_destroy(&vm->queue.message);
     #if defined(ENABLE_VM_OBJSPACE) && ENABLE_VM_OBJSPACE
     	if (objspace) {
    @@ -1538,4 +1539,5 @@ vm_init2(rb_vm_t *vm)
         ruby_native_thread_lock_initialize(&vm->global_vm_lock);
         ruby_native_cond_initialize(&vm->global_vm_waiting);
    +    rb_queue_initialize(&vm->queue.message);
         vm->objspace = rb_objspace_alloc();
         vm->src_encoding_index = -1;
    @@ -1689,5 +1691,4 @@ thread_free(void *ptr)
     #endif
     
    -	rb_queue_destroy(&th->queue.message);
     	rb_queue_destroy(&th->queue.signal);
     
    @@ -1757,5 +1758,4 @@ th_init(rb_thread_t *th, VALUE self)
     
         rb_queue_initialize(&th->queue.signal);
    -    rb_queue_initialize(&th->queue.message);
     
         /* allocate thread stack */
    @@ -2062,4 +2062,46 @@ rb_vm_join(VALUE self)
     }
     
    +VALUE
    +rb_vm_send(VALUE self, VALUE val)
    +{
    +    rb_vm_t *vm;
    +
    +    GetVMPtr(self, vm);
    +    if (!rb_special_const_p(val) && vm->objspace != GET_VM()->objspace) {
    +	rb_raise(rb_eTypeError, "expected special constants");
    +    }
    +    if (!rb_queue_push(&vm->queue.message, (void *)val))
    +	rb_sys_fail(0);
    +    return (VALUE)val;
    +}
    +
    +VALUE
    +rb_vm_recv(VALUE self, const struct timeval *tv)
    +{
    +    rb_vm_t *vm;
    +    void *val;
    +
    +    GetVMPtr(self, vm);
    +    if (!rb_queue_shift_wait(&vm->queue.message, &val, tv))
    +	rb_sys_fail(0);
    +    if (!rb_special_const_p((VALUE)val)) {
    +	rb_raise(rb_eTypeError, "expected special constants");
    +    }
    +    return (VALUE)val;
    +}
    +
    +struct timeval rb_time_interval(VALUE);
    +
    +static VALUE
    +rb_vm_recv_m(int argc, VALUE *argv, VALUE self)
    +{
    +    struct timeval timeout = {0, 0}, *t = 0;
    +    if (rb_scan_args(argc, argv, "01", 0)) {
    +	timeout = rb_time_interval(argv[0]);
    +	t = &timeout;
    +    }
    +    return rb_vm_recv(self, t);
    +}
    +
     void
     Init_VM(void)
    @@ -2080,4 +2122,6 @@ InitVM_VM(void)
         rb_define_method(rb_cRubyVM, "to_s", rb_vm_to_s, 0);
         rb_define_method(rb_cRubyVM, "start", rb_vm_start, 0);
    +    rb_define_method(rb_cRubyVM, "send", rb_vm_send, 1);
    +    rb_define_method(rb_cRubyVM, "recv", rb_vm_recv_m, -1);
         rb_define_method(rb_cRubyVM, "join", rb_vm_join, 0);
         rb_define_singleton_method(rb_cRubyVM, "current", rb_vm_current, 0);

  Modified files:
    branches/mvm/ChangeLog
    branches/mvm/thread.c
    branches/mvm/vm.c
    branches/mvm/vm_core.h

Index: mvm/ChangeLog
===================================================================
--- mvm/ChangeLog	(revision 25780)
+++ mvm/ChangeLog	(revision 25781)
@@ -1,3 +1,13 @@
+Mon Nov 16 01:36:50 2009  Nobuyoshi Nakada  <nobu@r...>
+
+	* thread.c (rb_queue_shift_wait): added.
+
+	* vm.c (rb_vm_send, rb_vm_recv): new methods.
+
+	* vm_core.h (rb_queue_t): added condition variable.
+
+	* vm_core.h (rb_vm_t): moved message queue from rb_thread_t.
+
 Mon Nov 16 01:14:02 2009  Nobuyoshi Nakada  <nobu@r...>
 
 	* vm.c (rb_vm_current): new method RubyVM.current.
Index: mvm/vm_core.h
===================================================================
--- mvm/vm_core.h	(revision 25780)
+++ mvm/vm_core.h	(revision 25781)
@@ -264,6 +264,17 @@
 void rb_objspace_free(struct rb_objspace *);
 #endif
 
+typedef struct rb_queue_element {
+    struct rb_queue_element *next;
+    void *value;
+} rb_queue_element_t;
+
+typedef struct rb_queue {
+    rb_thread_lock_t lock;
+    rb_thread_cond_t wait;
+    rb_queue_element_t *head, **tail;
+} rb_queue_t;
+
 typedef struct rb_vm_struct {
     VALUE self;
 
@@ -310,6 +321,10 @@
 	int safe;
     } trap_list[RUBY_NSIG];
 
+    struct {
+	rb_queue_t message;
+    } queue;
+
     /* hook */
     rb_event_hook_t *event_hooks;
 
@@ -396,20 +411,11 @@
 
 struct rb_mutex_struct;
 
-typedef struct rb_queue_element {
-    struct rb_queue_element *next;
-    void *value;
-} rb_queue_element_t;
-
-typedef struct rb_queue {
-    rb_thread_lock_t lock;
-    rb_queue_element_t *head, **tail;
-} rb_queue_t;
-
 void rb_queue_initialize(rb_queue_t *);
 void rb_queue_destroy(rb_queue_t *);
 int rb_queue_push(rb_queue_t *, void *);
 int rb_queue_shift(rb_queue_t *, void **);
+int rb_queue_shift_wait(rb_queue_t *, void **, const struct timeval *);
 int rb_queue_empty_p(const rb_queue_t *);
 
 typedef struct rb_thread_struct
@@ -457,7 +463,7 @@
     VALUE thrown_errinfo;
 
     struct {
-	rb_queue_t signal, message;
+	rb_queue_t signal;
     } queue;
 
     int interrupt_flag;
Index: mvm/thread.c
===================================================================
--- mvm/thread.c	(revision 25780)
+++ mvm/thread.c	(revision 25781)
@@ -875,6 +875,9 @@
     }
 }
 
+static void add_tv(struct timeval *to, const struct timeval *from);
+static int subtract_tv(struct timeval *to, const struct timeval *from);
+
 static void
 sleep_timeval(rb_thread_t *th, struct timeval tv)
 {
@@ -882,27 +885,18 @@
     enum rb_thread_status prev_status = th->status;
 
     getclockofday(&to);
-    to.tv_sec += tv.tv_sec;
-    if ((to.tv_usec += tv.tv_usec) >= 1000000) {
-	to.tv_sec++;
-	to.tv_usec -= 1000000;
-    }
+    add_tv(&to, &tv);
 
     th->status = THREAD_STOPPED;
     do {
 	native_sleep(th, &tv);
 	RUBY_VM_CHECK_INTS();
+	tv = to;
 	getclockofday(&tvn);
-	if (to.tv_sec < tvn.tv_sec) break;
-	if (to.tv_sec == tvn.tv_sec && to.tv_usec <= tvn.tv_usec) break;
+	if (!subtract_tv(&tv, &tvn)) break;
 	thread_debug("sleep_timeval: %ld.%.6ld > %ld.%.6ld\n",
 		     (long)to.tv_sec, (long)to.tv_usec,
 		     (long)tvn.tv_sec, (long)tvn.tv_usec);
-	tv.tv_sec = to.tv_sec - tvn.tv_sec;
-	if ((tv.tv_usec = to.tv_usec - tvn.tv_usec) < 0) {
-	    --tv.tv_sec;
-	    tv.tv_usec += 1000000;
-	}
     } while (th->status == THREAD_STOPPED);
     th->status = prev_status;
 }
@@ -1136,6 +1130,7 @@
 void
 rb_queue_initialize(rb_queue_t *que)
 {
+    native_cond_initialize(&que->wait);
     ruby_native_thread_lock_initialize(&que->lock);
     que->head = 0;
     que->tail = &que->head;
@@ -1154,6 +1149,7 @@
     que->tail = 0;
     ruby_native_thread_unlock(&que->lock);
     native_mutex_destroy(&que->lock);
+    native_cond_destroy(&que->wait);
 }
 
 int
@@ -1166,11 +1162,48 @@
     ruby_native_thread_lock(&que->lock);
     *que->tail = e;
     que->tail = &e->next;
+    ruby_native_cond_signal(&que->wait);
     ruby_native_thread_unlock(&que->lock);
     return Qtrue;
 }
 
+#define QUEUE_SHIFT(e, que) (	       \
+	(e = que->head) != 0 &&	       \
+	((que->head = e->next) != 0 || \
+	 (que->tail = &que->head, 1)))
+
 int
+rb_queue_shift_wait(rb_queue_t *que, void **value, const struct timeval *tv)
+{
+    rb_queue_element_t *e;
+    struct timeval to, td, tvn;
+
+    if (tv) {
+	getclockofday(&to);
+	add_tv(&to, tv);
+    }
+    ruby_native_thread_lock(&que->lock);
+    while (!QUEUE_SHIFT(e, que) &&
+	   native_cond_timedwait(&que->wait, &que->lock, tv) == ETIMEDOUT) {
+	if (tv) {
+	    if (QUEUE_SHIFT(e, que)) break;
+	    td = to;
+	    getclockofday(&tvn);
+	    if (!subtract_tv(&td, &tvn)) break;
+	    tv = &td;
+	}
+    }
+    ruby_native_thread_unlock(&que->lock);
+    if (!e) {
+	errno = ETIMEDOUT;
+	return FALSE;
+    }
+    *value = e->value;
+    free(e);
+    return TRUE;
+}
+
+int
 rb_queue_shift(rb_queue_t *que, void **value)
 {
     rb_queue_element_t *e;
@@ -2478,6 +2511,7 @@
     long d = (a->tv_sec - b->tv_sec);
     return (d != 0) ? d : (a->tv_usec - b->tv_usec);
 }
+#endif
 
 static int
 subtract_tv(struct timeval *rest, const struct timeval *wait)
@@ -2493,8 +2527,17 @@
     rest->tv_usec -= wait->tv_usec;
     return 1;
 }
-#endif
 
+static void
+add_tv(struct timeval *to, const struct timeval *ta)
+{
+    to->tv_sec += ta->tv_sec;
+    if ((to->tv_usec += ta->tv_usec) >= 1000000) {
+	to->tv_sec++;
+	to->tv_usec -= 1000000;
+    }
+}
+
 static int
 do_select(int n, fd_set *read, fd_set *write, fd_set *except,
 	  struct timeval *timeout)
Index: mvm/vm.c
===================================================================
--- mvm/vm.c	(revision 25780)
+++ mvm/vm.c	(revision 25781)
@@ -1503,6 +1503,7 @@
 	ruby_native_thread_lock_destroy(&vm->global_vm_lock);
 	ruby_native_cond_signal(&vm->global_vm_waiting);
 	ruby_native_cond_destroy(&vm->global_vm_waiting);
+	rb_queue_destroy(&vm->queue.message);
 #if defined(ENABLE_VM_OBJSPACE) && ENABLE_VM_OBJSPACE
 	if (objspace) {
 	    rb_objspace_free(objspace);
@@ -1537,6 +1538,7 @@
     vm->argc = -1;
     ruby_native_thread_lock_initialize(&vm->global_vm_lock);
     ruby_native_cond_initialize(&vm->global_vm_waiting);
+    rb_queue_initialize(&vm->queue.message);
     vm->objspace = rb_objspace_alloc();
     vm->src_encoding_index = -1;
     vm->global_state_version = 1;
@@ -1688,7 +1690,6 @@
 	}
 #endif
 
-	rb_queue_destroy(&th->queue.message);
 	rb_queue_destroy(&th->queue.signal);
 
 	if (th->vm && th->vm->main_thread == th) {
@@ -1756,7 +1757,6 @@
     th->self = self;
 
     rb_queue_initialize(&th->queue.signal);
-    rb_queue_initialize(&th->queue.message);
 
     /* allocate thread stack */
     th->stack_size = RUBY_VM_THREAD_STACK_SIZE;
@@ -2061,6 +2061,48 @@
     return INT2NUM(status);
 }
 
+VALUE
+rb_vm_send(VALUE self, VALUE val)
+{
+    rb_vm_t *vm;
+
+    GetVMPtr(self, vm);
+    if (!rb_special_const_p(val) && vm->objspace != GET_VM()->objspace) {
+	rb_raise(rb_eTypeError, "expected special constants");
+    }
+    if (!rb_queue_push(&vm->queue.message, (void *)val))
+	rb_sys_fail(0);
+    return (VALUE)val;
+}
+
+VALUE
+rb_vm_recv(VALUE self, const struct timeval *tv)
+{
+    rb_vm_t *vm;
+    void *val;
+
+    GetVMPtr(self, vm);
+    if (!rb_queue_shift_wait(&vm->queue.message, &val, tv))
+	rb_sys_fail(0);
+    if (!rb_special_const_p((VALUE)val)) {
+	rb_raise(rb_eTypeError, "expected special constants");
+    }
+    return (VALUE)val;
+}
+
+struct timeval rb_time_interval(VALUE);
+
+static VALUE
+rb_vm_recv_m(int argc, VALUE *argv, VALUE self)
+{
+    struct timeval timeout = {0, 0}, *t = 0;
+    if (rb_scan_args(argc, argv, "01", 0)) {
+	timeout = rb_time_interval(argv[0]);
+	t = &timeout;
+    }
+    return rb_vm_recv(self, t);
+}
+
 void
 Init_VM(void)
 {
@@ -2079,6 +2121,8 @@
     rb_define_method(rb_cRubyVM, "initialize", rb_vm_initialize, -1);
     rb_define_method(rb_cRubyVM, "to_s", rb_vm_to_s, 0);
     rb_define_method(rb_cRubyVM, "start", rb_vm_start, 0);
+    rb_define_method(rb_cRubyVM, "send", rb_vm_send, 1);
+    rb_define_method(rb_cRubyVM, "recv", rb_vm_recv_m, -1);
     rb_define_method(rb_cRubyVM, "join", rb_vm_join, 0);
     rb_define_singleton_method(rb_cRubyVM, "current", rb_vm_current, 0);
 

--
ML: ruby-changes@q...
Info: http://www.atdot.net/~ko1/quickml/

[前][次][番号順一覧][スレッド一覧]