ruby-changes:53881
From: normal <ko1@a...>
Date: Fri, 30 Nov 2018 12:56:35 +0900 (JST)
Subject: [ruby-changes:53881] normal:r66100 (trunk): vm_trace.c: workqueue as thread-safe version of postponed_job
normal 2018-11-30 12:56:29 +0900 (Fri, 30 Nov 2018) New Revision: 66100 https://svn.ruby-lang.org/cgi-bin/viewvc.cgi?view=revision&revision=66100 Log: vm_trace.c: workqueue as thread-safe version of postponed_job postponed_job is safe to use in signal handlers, but is not thread-safe for MJIT. Implement a workqueue for MJIT thread-safety. [Bug #15316] Modified files: trunk/mjit.c trunk/mjit_worker.c trunk/thread.c trunk/vm_core.h trunk/vm_trace.c Index: vm_core.h =================================================================== --- vm_core.h (revision 66099) +++ vm_core.h (revision 66100) @@ -638,12 +638,16 @@ typedef struct rb_vm_struct { https://github.com/ruby/ruby/blob/trunk/vm_core.h#L638 /* relation table of ensure - rollback for callcc */ struct st_table *ensure_rollback_table; - /* postponed_job */ + /* postponed_job (async-signal-safe, NOT thread-safe) */ struct rb_postponed_job_struct *postponed_job_buffer; int postponed_job_index; int src_encoding_index; + /* workqueue (thread-safe, NOT async-signal-safe) */ + struct list_head workqueue; /* <=> rb_workqueue_job.jnode */ + rb_nativethread_lock_t workqueue_lock; + VALUE verbose, debug, orig_progname, progname; VALUE coverages; int coverage_mode; @@ -1628,6 +1632,7 @@ rb_vm_living_threads_init(rb_vm_t *vm) https://github.com/ruby/ruby/blob/trunk/vm_core.h#L1632 { list_head_init(&vm->waiting_fds); list_head_init(&vm->waiting_pids); + list_head_init(&vm->workqueue); list_head_init(&vm->waiting_grps); list_head_init(&vm->living_threads); vm->living_thread_num = 0; Index: thread.c =================================================================== --- thread.c (revision 66099) +++ thread.c (revision 66100) @@ -419,6 +419,7 @@ rb_vm_gvl_destroy(rb_vm_t *vm) https://github.com/ruby/ruby/blob/trunk/thread.c#L419 if (0) { /* may be held by running threads */ rb_native_mutex_destroy(&vm->waitpid_lock); + rb_native_mutex_destroy(&vm->workqueue_lock); } } @@ -4422,6 +4423,7 @@ rb_thread_atfork_internal(rb_thread_t *t https://github.com/ruby/ruby/blob/trunk/thread.c#L4423 /* may be held by MJIT threads in parent */ rb_native_mutex_initialize(&vm->waitpid_lock); + rb_native_mutex_initialize(&vm->workqueue_lock); /* may be held by any thread in parent */ rb_native_mutex_initialize(&th->interrupt_lock); @@ -5183,6 +5185,7 @@ Init_Thread(void) https://github.com/ruby/ruby/blob/trunk/thread.c#L5185 gvl_init(th->vm); gvl_acquire(th->vm, th); rb_native_mutex_initialize(&th->vm->waitpid_lock); + rb_native_mutex_initialize(&th->vm->workqueue_lock); rb_native_mutex_initialize(&th->interrupt_lock); th->pending_interrupt_queue = rb_ary_tmp_new(0); Index: mjit_worker.c =================================================================== --- mjit_worker.c (revision 66099) +++ mjit_worker.c (revision 66100) @@ -1133,6 +1133,9 @@ static mjit_copy_job_t mjit_copy_job; https://github.com/ruby/ruby/blob/trunk/mjit_worker.c#L1133 static void mjit_copy_job_handler(void *data); +/* vm_trace.c */ +int rb_workqueue_register(unsigned flags, rb_postponed_job_func_t , void *); + /* We're lazily copying cache values from main thread because these cache values could be different between ones on enqueue timing and ones on dequeue timing. Return TRUE if copy succeeds. */ @@ -1148,7 +1151,7 @@ copy_cache_from_main_thread(mjit_copy_jo https://github.com/ruby/ruby/blob/trunk/mjit_worker.c#L1151 return job->finish_p; } - if (!rb_postponed_job_register(0, mjit_copy_job_handler, (void *)job)) + if (!rb_workqueue_register(0, mjit_copy_job_handler, (void *)job)) return FALSE; CRITICAL_SECTION_START(3, "in MJIT copy job wait"); /* checking `stop_worker_p` too because `RUBY_VM_CHECK_INTS(ec)` may not Index: mjit.c =================================================================== --- mjit.c (revision 66099) +++ mjit.c (revision 66100) @@ -106,20 +106,6 @@ mjit_gc_finish_hook(void) https://github.com/ruby/ruby/blob/trunk/mjit.c#L106 CRITICAL_SECTION_FINISH(4, "mjit_gc_finish_hook"); } -/* Wrap critical section to prevent [Bug #15316] */ -void -mjit_postponed_job_register_start_hook(void) -{ - CRITICAL_SECTION_START(4, "mjit_postponed_job_register_start_hook"); -} - -/* Unwrap critical section of mjit_postponed_job_register_start_hook() */ -void -mjit_postponed_job_register_finish_hook(void) -{ - CRITICAL_SECTION_FINISH(4, "mjit_postponed_job_register_finish_hook"); -} - /* Iseqs can be garbage collected. This function should call when it happens. It removes iseq from the unit. */ void Index: vm_trace.c =================================================================== --- vm_trace.c (revision 66099) +++ vm_trace.c (revision 66100) @@ -1752,12 +1752,18 @@ typedef struct rb_postponed_job_struct { https://github.com/ruby/ruby/blob/trunk/vm_trace.c#L1752 #define MAX_POSTPONED_JOB 1000 #define MAX_POSTPONED_JOB_SPECIAL_ADDITION 24 +struct rb_workqueue_job { + struct list_node jnode; /* <=> vm->workqueue */ + rb_postponed_job_t job; +}; + void Init_vm_postponed_job(void) { rb_vm_t *vm = GET_VM(); vm->postponed_job_buffer = ALLOC_N(rb_postponed_job_t, MAX_POSTPONED_JOB); vm->postponed_job_index = 0; + /* workqueue is initialized when VM locks are initialized */ } enum postponed_job_register_result { @@ -1766,7 +1772,7 @@ enum postponed_job_register_result { https://github.com/ruby/ruby/blob/trunk/vm_trace.c#L1772 PJRR_INTERRUPTED = 2 }; -/* Async-signal-safe, thread-safe against MJIT worker thread */ +/* Async-signal-safe */ static enum postponed_job_register_result postponed_job_register(rb_execution_context_t *ec, rb_vm_t *vm, unsigned int flags, rb_postponed_job_func_t func, void *data, int max, int expected_index) @@ -1774,13 +1780,11 @@ postponed_job_register(rb_execution_cont https://github.com/ruby/ruby/blob/trunk/vm_trace.c#L1780 rb_postponed_job_t *pjob; if (expected_index >= max) return PJRR_FULL; /* failed */ - if (mjit_enabled) mjit_postponed_job_register_start_hook(); if (ATOMIC_CAS(vm->postponed_job_index, expected_index, expected_index+1) == expected_index) { pjob = &vm->postponed_job_buffer[expected_index]; } else { - if (mjit_enabled) mjit_postponed_job_register_finish_hook(); return PJRR_INTERRUPTED; } @@ -1789,7 +1793,6 @@ postponed_job_register(rb_execution_cont https://github.com/ruby/ruby/blob/trunk/vm_trace.c#L1793 pjob->data = data; RUBY_VM_SET_POSTPONED_JOB_INTERRUPT(ec); - if (mjit_enabled) mjit_postponed_job_register_finish_hook(); return PJRR_SUCCESS; } @@ -1842,6 +1845,29 @@ rb_postponed_job_register_one(unsigned i https://github.com/ruby/ruby/blob/trunk/vm_trace.c#L1845 } } +/* + * thread-safe and called from non-Ruby thread + * returns FALSE on failure (ENOMEM), TRUE otherwise + */ +int +rb_workqueue_register(unsigned flags, rb_postponed_job_func_t func, void *data) +{ + struct rb_workqueue_job *wq_job = malloc(sizeof(*wq_job)); + rb_vm_t *vm = GET_VM(); + + if (!wq_job) return FALSE; + wq_job->job.func = func; + wq_job->job.data = data; + + rb_nativethread_lock_lock(&vm->workqueue_lock); + list_add_tail(&vm->workqueue, &wq_job->jnode); + rb_nativethread_lock_unlock(&vm->workqueue_lock); + + RUBY_VM_SET_POSTPONED_JOB_INTERRUPT(GET_EC()); + + return TRUE; +} + void rb_postponed_job_flush(rb_vm_t *vm) { @@ -1849,6 +1875,13 @@ rb_postponed_job_flush(rb_vm_t *vm) https://github.com/ruby/ruby/blob/trunk/vm_trace.c#L1875 const rb_atomic_t block_mask = POSTPONED_JOB_INTERRUPT_MASK|TRAP_INTERRUPT_MASK; volatile rb_atomic_t saved_mask = ec->interrupt_mask & block_mask; VALUE volatile saved_errno = ec->errinfo; + struct list_head tmp; + + list_head_init(&tmp); + + rb_nativethread_lock_lock(&vm->workqueue_lock); + list_append_list(&tmp, &vm->workqueue); + rb_nativethread_lock_unlock(&vm->workqueue_lock); ec->errinfo = Qnil; /* mask POSTPONED_JOB dispatch */ @@ -1857,16 +1890,33 @@ rb_postponed_job_flush(rb_vm_t *vm) https://github.com/ruby/ruby/blob/trunk/vm_trace.c#L1890 EC_PUSH_TAG(ec); if (EC_EXEC_TAG() == TAG_NONE) { int index; + struct rb_workqueue_job *wq_job; + while ((index = vm->postponed_job_index) > 0) { if (ATOMIC_CAS(vm->postponed_job_index, index, index-1) == index) { rb_postponed_job_t *pjob = &vm->postponed_job_buffer[index-1]; (*pjob->func)(pjob->data); } } + while ((wq_job = list_pop(&tmp, struct rb_workqueue_job, jnode))) { + rb_postponed_job_t pjob = wq_job->job; + + free(wq_job); + (pjob.func)(pjob.data); + } } EC_POP_TAG(); } /* restore POSTPONED_JOB mask */ ec->interrupt_mask &= ~(saved_mask ^ block_mask); ec->errinfo = saved_errno; + + /* don't leak memory if a job threw an exception */ + if (!list_empty(&tmp)) { + rb_nativethread_lock_lock(&vm->workqueue_lock); + list_prepend_list(&vm->workqueue, &tmp); + rb_nativethread_lock_unlock(&vm->workqueue_lock); + + RUBY_VM_SET_POSTPONED_JOB_INTERRUPT(GET_EC()); + } } -- ML: ruby-changes@q... Info: http://www.atdot.net/~ko1/quickml/