[前][次][番号順一覧][スレッド一覧]

ruby-changes:50752

From: normal <ko1@a...>
Date: Tue, 27 Mar 2018 18:28:43 +0900 (JST)
Subject: [ruby-changes:50752] normal:r62934 (trunk): thread_sync.c: avoid reaching across stacks of dead threads

normal	2018-03-27 18:28:37 +0900 (Tue, 27 Mar 2018)

  New Revision: 62934

  https://svn.ruby-lang.org/cgi-bin/viewvc.cgi?view=revision&revision=62934

  Log:
    thread_sync.c: avoid reaching across stacks of dead threads
    
    rb_ensure is insufficient cleanup for fork and we must
    reinitialize all waitqueues in the child process.
    
    Unfortunately this increases the footprint of ConditionVariable,
    Queue and SizedQueue by 8 bytes on 32-bit (16 bytes on 64-bit).
    
    [ruby-core:86316] [Bug #14634]

  Modified files:
    trunk/test/thread/test_cv.rb
    trunk/test/thread/test_queue.rb
    trunk/thread.c
    trunk/thread_sync.c
Index: test/thread/test_queue.rb
===================================================================
--- test/thread/test_queue.rb	(revision 62933)
+++ test/thread/test_queue.rb	(revision 62934)
@@ -565,4 +565,52 @@ class TestQueue < Test::Unit::TestCase https://github.com/ruby/ruby/blob/trunk/test/thread/test_queue.rb#L565
       puts 'exit'
     INPUT
   end
+
+  def test_fork_while_queue_waiting
+    q = Queue.new
+    sq = SizedQueue.new(1)
+    thq = Thread.new { q.pop }
+    thsq = Thread.new { sq.pop }
+    Thread.pass until thq.stop? && thsq.stop?
+
+    pid = fork do
+      exit!(1) if q.num_waiting != 0
+      exit!(2) if sq.num_waiting != 0
+      exit!(6) unless q.empty?
+      exit!(7) unless sq.empty?
+      q.push :child_q
+      sq.push :child_sq
+      exit!(3) if q.pop != :child_q
+      exit!(4) if sq.pop != :child_sq
+      exit!(0)
+    end
+    _, s = Process.waitpid2(pid)
+    assert_predicate s, :success?, 'no segfault [ruby-core:86316] [Bug #14634]'
+
+    q.push :thq
+    sq.push :thsq
+    assert_equal :thq, thq.value
+    assert_equal :thsq, thsq.value
+
+    sq.push(1)
+    th = Thread.new { q.pop; sq.pop }
+    thsq = Thread.new { sq.push(2) }
+    Thread.pass until th.stop? && thsq.stop?
+    pid = fork do
+      exit!(1) if q.num_waiting != 0
+      exit!(2) if sq.num_waiting != 0
+      exit!(3) unless q.empty?
+      exit!(4) if sq.empty?
+      exit!(5) if sq.pop != 1
+      exit!(0)
+    end
+    _, s = Process.waitpid2(pid)
+    assert_predicate s, :success?, 'no segfault [ruby-core:86316] [Bug #14634]'
+
+    assert_predicate thsq, :stop?
+    assert_equal 1, sq.pop
+    assert_same sq, thsq.value
+    q.push('restart th')
+    assert_equal 2, th.value
+  end if Process.respond_to?(:fork)
 end
Index: test/thread/test_cv.rb
===================================================================
--- test/thread/test_cv.rb	(revision 62933)
+++ test/thread/test_cv.rb	(revision 62934)
@@ -219,4 +219,23 @@ INPUT https://github.com/ruby/ruby/blob/trunk/test/thread/test_cv.rb#L219
       Marshal.dump(condvar)
     end
   end
+
+  def test_condvar_fork
+    mutex = Mutex.new
+    condvar = ConditionVariable.new
+    thrs = (1..10).map do
+      Thread.new { mutex.synchronize { condvar.wait(mutex) } }
+    end
+    thrs.each { 3.times { Thread.pass } }
+    pid = fork do
+      mutex.synchronize { condvar.broadcast }
+      exit!(0)
+    end
+    _, s = Process.waitpid2(pid)
+    assert_predicate s, :success?, 'no segfault [ruby-core:86316] [Bug #14634]'
+    until thrs.empty?
+      mutex.synchronize { condvar.broadcast }
+      thrs.delete_if { |t| t.join(0.01) }
+    end
+  end if Process.respond_to?(:fork)
 end
Index: thread.c
===================================================================
--- thread.c	(revision 62933)
+++ thread.c	(revision 62934)
@@ -4216,6 +4216,8 @@ rb_thread_atfork_internal(rb_thread_t *t https://github.com/ruby/ruby/blob/trunk/thread.c#L4216
     }
     rb_vm_living_threads_init(vm);
     rb_vm_living_threads_insert(vm, th);
+    rb_thread_sync_reset_all();
+
     vm->sleeper = 0;
     clear_coverage();
 }
Index: thread_sync.c
===================================================================
--- thread_sync.c	(revision 62933)
+++ thread_sync.c	(revision 62934)
@@ -4,6 +4,14 @@ https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L4
 static VALUE rb_cMutex, rb_cQueue, rb_cSizedQueue, rb_cConditionVariable;
 static VALUE rb_eClosedQueueError;
 
+/*
+ * keep these globally so we can walk and reinitialize them at fork
+ * in the child process
+ */
+static LIST_HEAD(szqueue_list);
+static LIST_HEAD(queue_list);
+static LIST_HEAD(condvar_list);
+
 /* sync_waiter is always on-stack */
 struct sync_waiter {
     rb_thread_t *th;
@@ -54,6 +62,7 @@ typedef struct rb_mutex_struct { https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L62
 static void rb_mutex_abandon_all(rb_mutex_t *mutexes);
 static void rb_mutex_abandon_keeping_mutexes(rb_thread_t *th);
 static void rb_mutex_abandon_locking_mutex(rb_thread_t *th);
+static void rb_thread_sync_reset_all(void);
 #endif
 static const char* rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t volatile *th);
 
@@ -538,7 +547,9 @@ void rb_mutex_allow_trap(VALUE self, int https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L547
 /* Queue */
 
 #define queue_waitq(q) UNALIGNED_MEMBER_PTR(q, waitq)
+#define queue_live(q) UNALIGNED_MEMBER_PTR(q, live)
 PACKED_STRUCT_UNALIGNED(struct rb_queue {
+    struct list_node live;
     struct list_head waitq;
     const VALUE que;
     int num_waiting;
@@ -546,6 +557,7 @@ PACKED_STRUCT_UNALIGNED(struct rb_queue https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L557
 
 #define szqueue_waitq(sq) UNALIGNED_MEMBER_PTR(sq, q.waitq)
 #define szqueue_pushq(sq) UNALIGNED_MEMBER_PTR(sq, pushq)
+#define szqueue_live(sq) UNALIGNED_MEMBER_PTR(sq, q.live)
 PACKED_STRUCT_UNALIGNED(struct rb_szqueue {
     struct rb_queue q;
     int num_waiting_push;
@@ -562,6 +574,14 @@ queue_mark(void *ptr) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L574
     rb_gc_mark(q->que);
 }
 
+static void
+queue_free(void *ptr)
+{
+    struct rb_queue *q = ptr;
+    list_del(queue_live(q));
+    ruby_xfree(ptr);
+}
+
 static size_t
 queue_memsize(const void *ptr)
 {
@@ -570,7 +590,7 @@ queue_memsize(const void *ptr) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L590
 
 static const rb_data_type_t queue_data_type = {
     "queue",
-    {queue_mark, RUBY_TYPED_DEFAULT_FREE, queue_memsize,},
+    {queue_mark, queue_free, queue_memsize,},
     0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED
 };
 
@@ -582,6 +602,7 @@ queue_alloc(VALUE klass) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L602
 
     obj = TypedData_Make_Struct(klass, struct rb_queue, &queue_data_type, q);
     list_head_init(queue_waitq(q));
+    list_add(&queue_list, queue_live(q));
     return obj;
 }
 
@@ -604,6 +625,14 @@ szqueue_mark(void *ptr) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L625
     queue_mark(&sq->q);
 }
 
+static void
+szqueue_free(void *ptr)
+{
+    struct rb_szqueue *sq = ptr;
+    list_del(szqueue_live(sq));
+    ruby_xfree(ptr);
+}
+
 static size_t
 szqueue_memsize(const void *ptr)
 {
@@ -612,7 +641,7 @@ szqueue_memsize(const void *ptr) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L641
 
 static const rb_data_type_t szqueue_data_type = {
     "sized_queue",
-    {szqueue_mark, RUBY_TYPED_DEFAULT_FREE, szqueue_memsize,},
+    {szqueue_mark, szqueue_free, szqueue_memsize,},
     0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED
 };
 
@@ -624,6 +653,7 @@ szqueue_alloc(VALUE klass) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L653
 					&szqueue_data_type, sq);
     list_head_init(szqueue_waitq(sq));
     list_head_init(szqueue_pushq(sq));
+    list_add(&szqueue_list, szqueue_live(sq));
     return obj;
 }
 
@@ -878,7 +908,7 @@ queue_do_pop(VALUE self, struct rb_queue https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L908
 	    list_add_tail(&qw.as.q->waitq, &qw.w.node);
 	    qw.as.q->num_waiting++;
 
-	    rb_ensure(queue_sleep, Qfalse, queue_sleep_done, (VALUE)&qw);
+	    rb_ensure(queue_sleep, self, queue_sleep_done, (VALUE)&qw);
 	}
     }
 
@@ -1120,7 +1150,7 @@ rb_szqueue_push(int argc, VALUE *argv, V https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L1150
 	    list_add_tail(pushq, &qw.w.node);
 	    sq->num_waiting_push++;
 
-	    rb_ensure(queue_sleep, Qfalse, szqueue_sleep_done, (VALUE)&qw);
+	    rb_ensure(queue_sleep, self, szqueue_sleep_done, (VALUE)&qw);
 	}
     }
 
@@ -1233,6 +1263,7 @@ rb_szqueue_empty_p(VALUE self) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L1263
 /* TODO: maybe this can be IMEMO */
 struct rb_condvar {
     struct list_head waitq;
+    struct list_node live;
 };
 
 /*
@@ -1263,6 +1294,14 @@ struct rb_condvar { https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L1294
  *    }
  */
 
+static void
+condvar_free(void *ptr)
+{
+    struct rb_condvar *cv = ptr;
+    list_del(&cv->live);
+    ruby_xfree(ptr);
+}
+
 static size_t
 condvar_memsize(const void *ptr)
 {
@@ -1271,7 +1310,7 @@ condvar_memsize(const void *ptr) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L1310
 
 static const rb_data_type_t cv_data_type = {
     "condvar",
-    {0, RUBY_TYPED_DEFAULT_FREE, condvar_memsize,},
+    {0, condvar_free, condvar_memsize,},
     0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED
 };
 
@@ -1293,6 +1332,7 @@ condvar_alloc(VALUE klass) https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L1332
 
     obj = TypedData_Make_Struct(klass, struct rb_condvar, &cv_data_type, cv);
     list_head_init(&cv->waitq);
+    list_add(&condvar_list, &cv->live);
 
     return obj;
 }
@@ -1406,6 +1446,31 @@ define_thread_class(VALUE outer, const c https://github.com/ruby/ruby/blob/trunk/thread_sync.c#L1446
     return klass;
 }
 
+#if defined(HAVE_WORKING_FORK)
+/* we must not reference stacks of dead threads in a forked child */
+static void
+rb_thread_sync_reset_all(void)
+{
+    struct rb_queue *q = 0;
+    struct rb_szqueue *sq = 0;
+    struct rb_condvar *cv = 0;
+
+    list_for_each(&queue_list, q, live) {
+        list_head_init(queue_waitq(q));
+        q->num_waiting = 0;
+    }
+    list_for_each(&szqueue_list, sq, q.live) {
+        list_head_init(szqueue_waitq(sq));
+        list_head_init(szqueue_pushq(sq));
+        sq->num_waiting_push = 0;
+        sq->q.num_waiting = 0;
+    }
+    list_for_each(&condvar_list, cv, live) {
+        list_head_init(&cv->waitq);
+    }
+}
+#endif
+
 static void
 Init_thread_sync(void)
 {

--
ML: ruby-changes@q...
Info: http://www.atdot.net/~ko1/quickml/

[前][次][番号順一覧][スレッド一覧]