ruby-changes:13974
From: nobu <ko1@a...>
Date: Mon, 16 Nov 2009 01:37:15 +0900 (JST)
Subject: [ruby-changes:13974] Ruby:r25781 (mvm): * thread.c (rb_queue_shift_wait): added.
nobu 2009-11-16 01:36:57 +0900 (Mon, 16 Nov 2009) New Revision: 25781 http://svn.ruby-lang.org/cgi-bin/viewvc.cgi?view=rev&revision=25781 Log: * thread.c (rb_queue_shift_wait): added. * vm.c (rb_vm_send, rb_vm_recv): new methods. * vm_core.h (rb_queue_t): added condition variable. * vm_core.h (rb_vm_t): moved message queue from rb_thread_t. Index: vm_core.h =================================================================== --- vm_core.h (revision 25770) +++ vm_core.h (working copy) @@ -265,4 +265,15 @@ void rb_objspace_free(struct rb_objspace #endif +typedef struct rb_queue_element { + struct rb_queue_element *next; + void *value; +} rb_queue_element_t; + +typedef struct rb_queue { + rb_thread_lock_t lock; + rb_thread_cond_t wait; + rb_queue_element_t *head, **tail; +} rb_queue_t; + typedef struct rb_vm_struct { VALUE self; @@ -311,4 +322,8 @@ typedef struct rb_vm_struct { } trap_list[RUBY_NSIG]; + struct { + rb_queue_t message; + } queue; + /* hook */ rb_event_hook_t *event_hooks; @@ -397,18 +412,9 @@ struct rb_unblock_callback { struct rb_mutex_struct; -typedef struct rb_queue_element { - struct rb_queue_element *next; - void *value; -} rb_queue_element_t; - -typedef struct rb_queue { - rb_thread_lock_t lock; - rb_queue_element_t *head, **tail; -} rb_queue_t; - void rb_queue_initialize(rb_queue_t *); void rb_queue_destroy(rb_queue_t *); int rb_queue_push(rb_queue_t *, void *); int rb_queue_shift(rb_queue_t *, void **); +int rb_queue_shift_wait(rb_queue_t *, void **, const struct timeval *); int rb_queue_empty_p(const rb_queue_t *); @@ -458,5 +464,5 @@ typedef struct rb_thread_struct struct { - rb_queue_t signal, message; + rb_queue_t signal; } queue; Index: thread.c =================================================================== --- thread.c (revision 25778) +++ thread.c (working copy) @@ -876,4 +876,7 @@ getclockofday(struct timeval *tp) } +static void add_tv(struct timeval *to, const struct timeval *from); +static int subtract_tv(struct timeval *to, const struct timeval *from); + static void sleep_timeval(rb_thread_t *th, struct timeval tv) @@ -883,9 +886,5 @@ sleep_timeval(rb_thread_t *th, struct ti getclockofday(&to); - to.tv_sec += tv.tv_sec; - if ((to.tv_usec += tv.tv_usec) >= 1000000) { - to.tv_sec++; - to.tv_usec -= 1000000; - } + add_tv(&to, &tv); th->status = THREAD_STOPPED; @@ -893,15 +892,10 @@ sleep_timeval(rb_thread_t *th, struct ti native_sleep(th, &tv); RUBY_VM_CHECK_INTS(); + tv = to; getclockofday(&tvn); - if (to.tv_sec < tvn.tv_sec) break; - if (to.tv_sec == tvn.tv_sec && to.tv_usec <= tvn.tv_usec) break; + if (!subtract_tv(&tv, &tvn)) break; thread_debug("sleep_timeval: %ld.%.6ld > %ld.%.6ld\n", (long)to.tv_sec, (long)to.tv_usec, (long)tvn.tv_sec, (long)tvn.tv_usec); - tv.tv_sec = to.tv_sec - tvn.tv_sec; - if ((tv.tv_usec = to.tv_usec - tvn.tv_usec) < 0) { - --tv.tv_sec; - tv.tv_usec += 1000000; - } } while (th->status == THREAD_STOPPED); th->status = prev_status; @@ -1137,4 +1131,5 @@ void rb_queue_initialize(rb_queue_t *que) { + native_cond_initialize(&que->wait); ruby_native_thread_lock_initialize(&que->lock); que->head = 0; @@ -1155,4 +1150,5 @@ rb_queue_destroy(rb_queue_t *que) ruby_native_thread_unlock(&que->lock); native_mutex_destroy(&que->lock); + native_cond_destroy(&que->wait); } @@ -1167,8 +1163,45 @@ rb_queue_push(rb_queue_t *que, void *val *que->tail = e; que->tail = &e->next; + ruby_native_cond_signal(&que->wait); ruby_native_thread_unlock(&que->lock); return Qtrue; } +#define QUEUE_SHIFT(e, que) ( \ + (e = que->head) != 0 && \ + ((que->head = e->next) != 0 || \ + (que->tail = &que->head, 1))) + +int +rb_queue_shift_wait(rb_queue_t *que, void **value, const struct timeval *tv) +{ + rb_queue_element_t *e; + struct timeval to, td, tvn; + + if (tv) { + getclockofday(&to); + add_tv(&to, tv); + } + ruby_native_thread_lock(&que->lock); + while (!QUEUE_SHIFT(e, que) && + native_cond_timedwait(&que->wait, &que->lock, tv) == ETIMEDOUT) { + if (tv) { + if (QUEUE_SHIFT(e, que)) break; + td = to; + getclockofday(&tvn); + if (!subtract_tv(&td, &tvn)) break; + tv = &td; + } + } + ruby_native_thread_unlock(&que->lock); + if (!e) { + errno = ETIMEDOUT; + return FALSE; + } + *value = e->value; + free(e); + return TRUE; +} + int rb_queue_shift(rb_queue_t *que, void **value) @@ -2479,4 +2512,5 @@ cmp_tv(const struct timeval *a, const st return (d != 0) ? d : (a->tv_usec - b->tv_usec); } +#endif static int @@ -2494,5 +2528,14 @@ subtract_tv(struct timeval *rest, const return 1; } -#endif + +static void +add_tv(struct timeval *to, const struct timeval *ta) +{ + to->tv_sec += ta->tv_sec; + if ((to->tv_usec += ta->tv_usec) >= 1000000) { + to->tv_sec++; + to->tv_usec -= 1000000; + } +} static int Index: vm.c =================================================================== --- vm.c (revision 25779) +++ vm.c (working copy) @@ -1504,4 +1504,5 @@ rb_vm_free(void *ptr) ruby_native_cond_signal(&vm->global_vm_waiting); ruby_native_cond_destroy(&vm->global_vm_waiting); + rb_queue_destroy(&vm->queue.message); #if defined(ENABLE_VM_OBJSPACE) && ENABLE_VM_OBJSPACE if (objspace) { @@ -1538,4 +1539,5 @@ vm_init2(rb_vm_t *vm) ruby_native_thread_lock_initialize(&vm->global_vm_lock); ruby_native_cond_initialize(&vm->global_vm_waiting); + rb_queue_initialize(&vm->queue.message); vm->objspace = rb_objspace_alloc(); vm->src_encoding_index = -1; @@ -1689,5 +1691,4 @@ thread_free(void *ptr) #endif - rb_queue_destroy(&th->queue.message); rb_queue_destroy(&th->queue.signal); @@ -1757,5 +1758,4 @@ th_init(rb_thread_t *th, VALUE self) rb_queue_initialize(&th->queue.signal); - rb_queue_initialize(&th->queue.message); /* allocate thread stack */ @@ -2062,4 +2062,46 @@ rb_vm_join(VALUE self) } +VALUE +rb_vm_send(VALUE self, VALUE val) +{ + rb_vm_t *vm; + + GetVMPtr(self, vm); + if (!rb_special_const_p(val) && vm->objspace != GET_VM()->objspace) { + rb_raise(rb_eTypeError, "expected special constants"); + } + if (!rb_queue_push(&vm->queue.message, (void *)val)) + rb_sys_fail(0); + return (VALUE)val; +} + +VALUE +rb_vm_recv(VALUE self, const struct timeval *tv) +{ + rb_vm_t *vm; + void *val; + + GetVMPtr(self, vm); + if (!rb_queue_shift_wait(&vm->queue.message, &val, tv)) + rb_sys_fail(0); + if (!rb_special_const_p((VALUE)val)) { + rb_raise(rb_eTypeError, "expected special constants"); + } + return (VALUE)val; +} + +struct timeval rb_time_interval(VALUE); + +static VALUE +rb_vm_recv_m(int argc, VALUE *argv, VALUE self) +{ + struct timeval timeout = {0, 0}, *t = 0; + if (rb_scan_args(argc, argv, "01", 0)) { + timeout = rb_time_interval(argv[0]); + t = &timeout; + } + return rb_vm_recv(self, t); +} + void Init_VM(void) @@ -2080,4 +2122,6 @@ InitVM_VM(void) rb_define_method(rb_cRubyVM, "to_s", rb_vm_to_s, 0); rb_define_method(rb_cRubyVM, "start", rb_vm_start, 0); + rb_define_method(rb_cRubyVM, "send", rb_vm_send, 1); + rb_define_method(rb_cRubyVM, "recv", rb_vm_recv_m, -1); rb_define_method(rb_cRubyVM, "join", rb_vm_join, 0); rb_define_singleton_method(rb_cRubyVM, "current", rb_vm_current, 0); Modified files: branches/mvm/ChangeLog branches/mvm/thread.c branches/mvm/vm.c branches/mvm/vm_core.h Index: mvm/ChangeLog =================================================================== --- mvm/ChangeLog (revision 25780) +++ mvm/ChangeLog (revision 25781) @@ -1,3 +1,13 @@ +Mon Nov 16 01:36:50 2009 Nobuyoshi Nakada <nobu@r...> + + * thread.c (rb_queue_shift_wait): added. + + * vm.c (rb_vm_send, rb_vm_recv): new methods. + + * vm_core.h (rb_queue_t): added condition variable. + + * vm_core.h (rb_vm_t): moved message queue from rb_thread_t. + Mon Nov 16 01:14:02 2009 Nobuyoshi Nakada <nobu@r...> * vm.c (rb_vm_current): new method RubyVM.current. Index: mvm/vm_core.h =================================================================== --- mvm/vm_core.h (revision 25780) +++ mvm/vm_core.h (revision 25781) @@ -264,6 +264,17 @@ void rb_objspace_free(struct rb_objspace *); #endif +typedef struct rb_queue_element { + struct rb_queue_element *next; + void *value; +} rb_queue_element_t; + +typedef struct rb_queue { + rb_thread_lock_t lock; + rb_thread_cond_t wait; + rb_queue_element_t *head, **tail; +} rb_queue_t; + typedef struct rb_vm_struct { VALUE self; @@ -310,6 +321,10 @@ int safe; } trap_list[RUBY_NSIG]; + struct { + rb_queue_t message; + } queue; + /* hook */ rb_event_hook_t *event_hooks; @@ -396,20 +411,11 @@ struct rb_mutex_struct; -typedef struct rb_queue_element { - struct rb_queue_element *next; - void *value; -} rb_queue_element_t; - -typedef struct rb_queue { - rb_thread_lock_t lock; - rb_queue_element_t *head, **tail; -} rb_queue_t; - void rb_queue_initialize(rb_queue_t *); void rb_queue_destroy(rb_queue_t *); int rb_queue_push(rb_queue_t *, void *); int rb_queue_shift(rb_queue_t *, void **); +int rb_queue_shift_wait(rb_queue_t *, void **, const struct timeval *); int rb_queue_empty_p(const rb_queue_t *); typedef struct rb_thread_struct @@ -457,7 +463,7 @@ VALUE thrown_errinfo; struct { - rb_queue_t signal, message; + rb_queue_t signal; } queue; int interrupt_flag; Index: mvm/thread.c =================================================================== --- mvm/thread.c (revision 25780) +++ mvm/thread.c (revision 25781) @@ -875,6 +875,9 @@ } } +static void add_tv(struct timeval *to, const struct timeval *from); +static int subtract_tv(struct timeval *to, const struct timeval *from); + static void sleep_timeval(rb_thread_t *th, struct timeval tv) { @@ -882,27 +885,18 @@ enum rb_thread_status prev_status = th->status; getclockofday(&to); - to.tv_sec += tv.tv_sec; - if ((to.tv_usec += tv.tv_usec) >= 1000000) { - to.tv_sec++; - to.tv_usec -= 1000000; - } + add_tv(&to, &tv); th->status = THREAD_STOPPED; do { native_sleep(th, &tv); RUBY_VM_CHECK_INTS(); + tv = to; getclockofday(&tvn); - if (to.tv_sec < tvn.tv_sec) break; - if (to.tv_sec == tvn.tv_sec && to.tv_usec <= tvn.tv_usec) break; + if (!subtract_tv(&tv, &tvn)) break; thread_debug("sleep_timeval: %ld.%.6ld > %ld.%.6ld\n", (long)to.tv_sec, (long)to.tv_usec, (long)tvn.tv_sec, (long)tvn.tv_usec); - tv.tv_sec = to.tv_sec - tvn.tv_sec; - if ((tv.tv_usec = to.tv_usec - tvn.tv_usec) < 0) { - --tv.tv_sec; - tv.tv_usec += 1000000; - } } while (th->status == THREAD_STOPPED); th->status = prev_status; } @@ -1136,6 +1130,7 @@ void rb_queue_initialize(rb_queue_t *que) { + native_cond_initialize(&que->wait); ruby_native_thread_lock_initialize(&que->lock); que->head = 0; que->tail = &que->head; @@ -1154,6 +1149,7 @@ que->tail = 0; ruby_native_thread_unlock(&que->lock); native_mutex_destroy(&que->lock); + native_cond_destroy(&que->wait); } int @@ -1166,11 +1162,48 @@ ruby_native_thread_lock(&que->lock); *que->tail = e; que->tail = &e->next; + ruby_native_cond_signal(&que->wait); ruby_native_thread_unlock(&que->lock); return Qtrue; } +#define QUEUE_SHIFT(e, que) ( \ + (e = que->head) != 0 && \ + ((que->head = e->next) != 0 || \ + (que->tail = &que->head, 1))) + int +rb_queue_shift_wait(rb_queue_t *que, void **value, const struct timeval *tv) +{ + rb_queue_element_t *e; + struct timeval to, td, tvn; + + if (tv) { + getclockofday(&to); + add_tv(&to, tv); + } + ruby_native_thread_lock(&que->lock); + while (!QUEUE_SHIFT(e, que) && + native_cond_timedwait(&que->wait, &que->lock, tv) == ETIMEDOUT) { + if (tv) { + if (QUEUE_SHIFT(e, que)) break; + td = to; + getclockofday(&tvn); + if (!subtract_tv(&td, &tvn)) break; + tv = &td; + } + } + ruby_native_thread_unlock(&que->lock); + if (!e) { + errno = ETIMEDOUT; + return FALSE; + } + *value = e->value; + free(e); + return TRUE; +} + +int rb_queue_shift(rb_queue_t *que, void **value) { rb_queue_element_t *e; @@ -2478,6 +2511,7 @@ long d = (a->tv_sec - b->tv_sec); return (d != 0) ? d : (a->tv_usec - b->tv_usec); } +#endif static int subtract_tv(struct timeval *rest, const struct timeval *wait) @@ -2493,8 +2527,17 @@ rest->tv_usec -= wait->tv_usec; return 1; } -#endif +static void +add_tv(struct timeval *to, const struct timeval *ta) +{ + to->tv_sec += ta->tv_sec; + if ((to->tv_usec += ta->tv_usec) >= 1000000) { + to->tv_sec++; + to->tv_usec -= 1000000; + } +} + static int do_select(int n, fd_set *read, fd_set *write, fd_set *except, struct timeval *timeout) Index: mvm/vm.c =================================================================== --- mvm/vm.c (revision 25780) +++ mvm/vm.c (revision 25781) @@ -1503,6 +1503,7 @@ ruby_native_thread_lock_destroy(&vm->global_vm_lock); ruby_native_cond_signal(&vm->global_vm_waiting); ruby_native_cond_destroy(&vm->global_vm_waiting); + rb_queue_destroy(&vm->queue.message); #if defined(ENABLE_VM_OBJSPACE) && ENABLE_VM_OBJSPACE if (objspace) { rb_objspace_free(objspace); @@ -1537,6 +1538,7 @@ vm->argc = -1; ruby_native_thread_lock_initialize(&vm->global_vm_lock); ruby_native_cond_initialize(&vm->global_vm_waiting); + rb_queue_initialize(&vm->queue.message); vm->objspace = rb_objspace_alloc(); vm->src_encoding_index = -1; vm->global_state_version = 1; @@ -1688,7 +1690,6 @@ } #endif - rb_queue_destroy(&th->queue.message); rb_queue_destroy(&th->queue.signal); if (th->vm && th->vm->main_thread == th) { @@ -1756,7 +1757,6 @@ th->self = self; rb_queue_initialize(&th->queue.signal); - rb_queue_initialize(&th->queue.message); /* allocate thread stack */ th->stack_size = RUBY_VM_THREAD_STACK_SIZE; @@ -2061,6 +2061,48 @@ return INT2NUM(status); } +VALUE +rb_vm_send(VALUE self, VALUE val) +{ + rb_vm_t *vm; + + GetVMPtr(self, vm); + if (!rb_special_const_p(val) && vm->objspace != GET_VM()->objspace) { + rb_raise(rb_eTypeError, "expected special constants"); + } + if (!rb_queue_push(&vm->queue.message, (void *)val)) + rb_sys_fail(0); + return (VALUE)val; +} + +VALUE +rb_vm_recv(VALUE self, const struct timeval *tv) +{ + rb_vm_t *vm; + void *val; + + GetVMPtr(self, vm); + if (!rb_queue_shift_wait(&vm->queue.message, &val, tv)) + rb_sys_fail(0); + if (!rb_special_const_p((VALUE)val)) { + rb_raise(rb_eTypeError, "expected special constants"); + } + return (VALUE)val; +} + +struct timeval rb_time_interval(VALUE); + +static VALUE +rb_vm_recv_m(int argc, VALUE *argv, VALUE self) +{ + struct timeval timeout = {0, 0}, *t = 0; + if (rb_scan_args(argc, argv, "01", 0)) { + timeout = rb_time_interval(argv[0]); + t = &timeout; + } + return rb_vm_recv(self, t); +} + void Init_VM(void) { @@ -2079,6 +2121,8 @@ rb_define_method(rb_cRubyVM, "initialize", rb_vm_initialize, -1); rb_define_method(rb_cRubyVM, "to_s", rb_vm_to_s, 0); rb_define_method(rb_cRubyVM, "start", rb_vm_start, 0); + rb_define_method(rb_cRubyVM, "send", rb_vm_send, 1); + rb_define_method(rb_cRubyVM, "recv", rb_vm_recv_m, -1); rb_define_method(rb_cRubyVM, "join", rb_vm_join, 0); rb_define_singleton_method(rb_cRubyVM, "current", rb_vm_current, 0); -- ML: ruby-changes@q... Info: http://www.atdot.net/~ko1/quickml/