[前][次][番号順一覧][スレッド一覧]

ruby-changes:53881

From: normal <ko1@a...>
Date: Fri, 30 Nov 2018 12:56:35 +0900 (JST)
Subject: [ruby-changes:53881] normal:r66100 (trunk): vm_trace.c: workqueue as thread-safe version of postponed_job

normal	2018-11-30 12:56:29 +0900 (Fri, 30 Nov 2018)

  New Revision: 66100

  https://svn.ruby-lang.org/cgi-bin/viewvc.cgi?view=revision&revision=66100

  Log:
    vm_trace.c: workqueue as thread-safe version of postponed_job
    
    postponed_job is safe to use in signal handlers, but is not
    thread-safe for MJIT.  Implement a workqueue for MJIT
    thread-safety.
    
    [Bug #15316]

  Modified files:
    trunk/mjit.c
    trunk/mjit_worker.c
    trunk/thread.c
    trunk/vm_core.h
    trunk/vm_trace.c
Index: vm_core.h
===================================================================
--- vm_core.h	(revision 66099)
+++ vm_core.h	(revision 66100)
@@ -638,12 +638,16 @@ typedef struct rb_vm_struct { https://github.com/ruby/ruby/blob/trunk/vm_core.h#L638
     /* relation table of ensure - rollback for callcc */
     struct st_table *ensure_rollback_table;
 
-    /* postponed_job */
+    /* postponed_job (async-signal-safe, NOT thread-safe) */
     struct rb_postponed_job_struct *postponed_job_buffer;
     int postponed_job_index;
 
     int src_encoding_index;
 
+    /* workqueue (thread-safe, NOT async-signal-safe) */
+    struct list_head workqueue; /* <=> rb_workqueue_job.jnode */
+    rb_nativethread_lock_t workqueue_lock;
+
     VALUE verbose, debug, orig_progname, progname;
     VALUE coverages;
     int coverage_mode;
@@ -1628,6 +1632,7 @@ rb_vm_living_threads_init(rb_vm_t *vm) https://github.com/ruby/ruby/blob/trunk/vm_core.h#L1632
 {
     list_head_init(&vm->waiting_fds);
     list_head_init(&vm->waiting_pids);
+    list_head_init(&vm->workqueue);
     list_head_init(&vm->waiting_grps);
     list_head_init(&vm->living_threads);
     vm->living_thread_num = 0;
Index: thread.c
===================================================================
--- thread.c	(revision 66099)
+++ thread.c	(revision 66100)
@@ -419,6 +419,7 @@ rb_vm_gvl_destroy(rb_vm_t *vm) https://github.com/ruby/ruby/blob/trunk/thread.c#L419
     if (0) {
         /* may be held by running threads */
         rb_native_mutex_destroy(&vm->waitpid_lock);
+        rb_native_mutex_destroy(&vm->workqueue_lock);
     }
 }
 
@@ -4422,6 +4423,7 @@ rb_thread_atfork_internal(rb_thread_t *t https://github.com/ruby/ruby/blob/trunk/thread.c#L4423
 
     /* may be held by MJIT threads in parent */
     rb_native_mutex_initialize(&vm->waitpid_lock);
+    rb_native_mutex_initialize(&vm->workqueue_lock);
 
     /* may be held by any thread in parent */
     rb_native_mutex_initialize(&th->interrupt_lock);
@@ -5183,6 +5185,7 @@ Init_Thread(void) https://github.com/ruby/ruby/blob/trunk/thread.c#L5185
 	    gvl_init(th->vm);
 	    gvl_acquire(th->vm, th);
             rb_native_mutex_initialize(&th->vm->waitpid_lock);
+            rb_native_mutex_initialize(&th->vm->workqueue_lock);
             rb_native_mutex_initialize(&th->interrupt_lock);
 
 	    th->pending_interrupt_queue = rb_ary_tmp_new(0);
Index: mjit_worker.c
===================================================================
--- mjit_worker.c	(revision 66099)
+++ mjit_worker.c	(revision 66100)
@@ -1133,6 +1133,9 @@ static mjit_copy_job_t mjit_copy_job; https://github.com/ruby/ruby/blob/trunk/mjit_worker.c#L1133
 
 static void mjit_copy_job_handler(void *data);
 
+/* vm_trace.c */
+int rb_workqueue_register(unsigned flags, rb_postponed_job_func_t , void *);
+
 /* We're lazily copying cache values from main thread because these cache values
    could be different between ones on enqueue timing and ones on dequeue timing.
    Return TRUE if copy succeeds. */
@@ -1148,7 +1151,7 @@ copy_cache_from_main_thread(mjit_copy_jo https://github.com/ruby/ruby/blob/trunk/mjit_worker.c#L1151
         return job->finish_p;
     }
 
-    if (!rb_postponed_job_register(0, mjit_copy_job_handler, (void *)job))
+    if (!rb_workqueue_register(0, mjit_copy_job_handler, (void *)job))
         return FALSE;
     CRITICAL_SECTION_START(3, "in MJIT copy job wait");
     /* checking `stop_worker_p` too because `RUBY_VM_CHECK_INTS(ec)` may not
Index: mjit.c
===================================================================
--- mjit.c	(revision 66099)
+++ mjit.c	(revision 66100)
@@ -106,20 +106,6 @@ mjit_gc_finish_hook(void) https://github.com/ruby/ruby/blob/trunk/mjit.c#L106
     CRITICAL_SECTION_FINISH(4, "mjit_gc_finish_hook");
 }
 
-/* Wrap critical section to prevent [Bug #15316] */
-void
-mjit_postponed_job_register_start_hook(void)
-{
-    CRITICAL_SECTION_START(4, "mjit_postponed_job_register_start_hook");
-}
-
-/* Unwrap critical section of mjit_postponed_job_register_start_hook() */
-void
-mjit_postponed_job_register_finish_hook(void)
-{
-    CRITICAL_SECTION_FINISH(4, "mjit_postponed_job_register_finish_hook");
-}
-
 /* Iseqs can be garbage collected.  This function should call when it
    happens.  It removes iseq from the unit.  */
 void
Index: vm_trace.c
===================================================================
--- vm_trace.c	(revision 66099)
+++ vm_trace.c	(revision 66100)
@@ -1752,12 +1752,18 @@ typedef struct rb_postponed_job_struct { https://github.com/ruby/ruby/blob/trunk/vm_trace.c#L1752
 #define MAX_POSTPONED_JOB                  1000
 #define MAX_POSTPONED_JOB_SPECIAL_ADDITION   24
 
+struct rb_workqueue_job {
+    struct list_node jnode; /* <=> vm->workqueue */
+    rb_postponed_job_t job;
+};
+
 void
 Init_vm_postponed_job(void)
 {
     rb_vm_t *vm = GET_VM();
     vm->postponed_job_buffer = ALLOC_N(rb_postponed_job_t, MAX_POSTPONED_JOB);
     vm->postponed_job_index = 0;
+    /* workqueue is initialized when VM locks are initialized */
 }
 
 enum postponed_job_register_result {
@@ -1766,7 +1772,7 @@ enum postponed_job_register_result { https://github.com/ruby/ruby/blob/trunk/vm_trace.c#L1772
     PJRR_INTERRUPTED = 2
 };
 
-/* Async-signal-safe, thread-safe against MJIT worker thread */
+/* Async-signal-safe */
 static enum postponed_job_register_result
 postponed_job_register(rb_execution_context_t *ec, rb_vm_t *vm,
                        unsigned int flags, rb_postponed_job_func_t func, void *data, int max, int expected_index)
@@ -1774,13 +1780,11 @@ postponed_job_register(rb_execution_cont https://github.com/ruby/ruby/blob/trunk/vm_trace.c#L1780
     rb_postponed_job_t *pjob;
 
     if (expected_index >= max) return PJRR_FULL; /* failed */
-    if (mjit_enabled) mjit_postponed_job_register_start_hook();
 
     if (ATOMIC_CAS(vm->postponed_job_index, expected_index, expected_index+1) == expected_index) {
         pjob = &vm->postponed_job_buffer[expected_index];
     }
     else {
-        if (mjit_enabled) mjit_postponed_job_register_finish_hook();
         return PJRR_INTERRUPTED;
     }
 
@@ -1789,7 +1793,6 @@ postponed_job_register(rb_execution_cont https://github.com/ruby/ruby/blob/trunk/vm_trace.c#L1793
     pjob->data = data;
 
     RUBY_VM_SET_POSTPONED_JOB_INTERRUPT(ec);
-    if (mjit_enabled) mjit_postponed_job_register_finish_hook();
 
     return PJRR_SUCCESS;
 }
@@ -1842,6 +1845,29 @@ rb_postponed_job_register_one(unsigned i https://github.com/ruby/ruby/blob/trunk/vm_trace.c#L1845
     }
 }
 
+/*
+ * thread-safe and called from non-Ruby thread
+ * returns FALSE on failure (ENOMEM), TRUE otherwise
+ */
+int
+rb_workqueue_register(unsigned flags, rb_postponed_job_func_t func, void *data)
+{
+    struct rb_workqueue_job *wq_job = malloc(sizeof(*wq_job));
+    rb_vm_t *vm = GET_VM();
+
+    if (!wq_job) return FALSE;
+    wq_job->job.func = func;
+    wq_job->job.data = data;
+
+    rb_nativethread_lock_lock(&vm->workqueue_lock);
+    list_add_tail(&vm->workqueue, &wq_job->jnode);
+    rb_nativethread_lock_unlock(&vm->workqueue_lock);
+
+    RUBY_VM_SET_POSTPONED_JOB_INTERRUPT(GET_EC());
+
+    return TRUE;
+}
+
 void
 rb_postponed_job_flush(rb_vm_t *vm)
 {
@@ -1849,6 +1875,13 @@ rb_postponed_job_flush(rb_vm_t *vm) https://github.com/ruby/ruby/blob/trunk/vm_trace.c#L1875
     const rb_atomic_t block_mask = POSTPONED_JOB_INTERRUPT_MASK|TRAP_INTERRUPT_MASK;
     volatile rb_atomic_t saved_mask = ec->interrupt_mask & block_mask;
     VALUE volatile saved_errno = ec->errinfo;
+    struct list_head tmp;
+
+    list_head_init(&tmp);
+
+    rb_nativethread_lock_lock(&vm->workqueue_lock);
+    list_append_list(&tmp, &vm->workqueue);
+    rb_nativethread_lock_unlock(&vm->workqueue_lock);
 
     ec->errinfo = Qnil;
     /* mask POSTPONED_JOB dispatch */
@@ -1857,16 +1890,33 @@ rb_postponed_job_flush(rb_vm_t *vm) https://github.com/ruby/ruby/blob/trunk/vm_trace.c#L1890
 	EC_PUSH_TAG(ec);
 	if (EC_EXEC_TAG() == TAG_NONE) {
             int index;
+            struct rb_workqueue_job *wq_job;
+
             while ((index = vm->postponed_job_index) > 0) {
                 if (ATOMIC_CAS(vm->postponed_job_index, index, index-1) == index) {
                     rb_postponed_job_t *pjob = &vm->postponed_job_buffer[index-1];
                     (*pjob->func)(pjob->data);
                 }
 	    }
+            while ((wq_job = list_pop(&tmp, struct rb_workqueue_job, jnode))) {
+                rb_postponed_job_t pjob = wq_job->job;
+
+                free(wq_job);
+                (pjob.func)(pjob.data);
+            }
 	}
 	EC_POP_TAG();
     }
     /* restore POSTPONED_JOB mask */
     ec->interrupt_mask &= ~(saved_mask ^ block_mask);
     ec->errinfo = saved_errno;
+
+    /* don't leak memory if a job threw an exception */
+    if (!list_empty(&tmp)) {
+        rb_nativethread_lock_lock(&vm->workqueue_lock);
+        list_prepend_list(&vm->workqueue, &tmp);
+        rb_nativethread_lock_unlock(&vm->workqueue_lock);
+
+        RUBY_VM_SET_POSTPONED_JOB_INTERRUPT(GET_EC());
+    }
 }

--
ML: ruby-changes@q...
Info: http://www.atdot.net/~ko1/quickml/

[前][次][番号順一覧][スレッド一覧]