[前][次][番号順一覧][スレッド一覧]

ruby-changes:58244

From: Masatoshi <ko1@a...>
Date: Mon, 14 Oct 2019 20:30:48 +0900 (JST)
Subject: [ruby-changes:58244] 8488d5b5b6 (master): Automatically close fds on fork (and GC). The connection pools are maintained at thread scope.

https://git.ruby-lang.org/ruby.git/commit/?id=8488d5b5b6

From 8488d5b5b6cc9205e8e0641c514f1f2e38bf7d1e Mon Sep 17 00:00:00 2001
From: Masatoshi SEKI <m_seki@m...>
Date: Mon, 14 Oct 2019 20:30:22 +0900
Subject: Automatically close fds on fork (and GC). The connection pools are
 maintained at thread scope.


diff --git a/lib/drb/drb.rb b/lib/drb/drb.rb
index 8c2ef63..c90ff8e 100644
--- a/lib/drb/drb.rb
+++ b/lib/drb/drb.rb
@@ -1211,6 +1211,49 @@ module DRb https://github.com/ruby/ruby/blob/trunk/lib/drb/drb.rb#L1211
     end
   end
 
+  class ThreadObject
+    include MonitorMixin
+
+    def initialize(&blk)
+      super()
+      @wait_ev = new_cond
+      @req_ev = new_cond
+      @res_ev = new_cond
+      @status = :wait
+      @req = nil
+      @res = nil
+      @thread = Thread.new(self, &blk)
+    end
+
+    def alive?
+      @thread.alive?
+    end
+
+    def method_missing(msg, *arg, &blk)
+      synchronize do
+        @wait_ev.wait_until { @status == :wait }
+        @req = [msg] + arg
+        @status = :req
+        @req_ev.broadcast
+        @res_ev.wait_until { @status == :res }
+        value = @res
+        @req = @res = nil
+        @status = :wait
+        @wait_ev.broadcast
+        return value
+      end
+    end
+
+    def _execute()
+      synchronize do
+        @req_ev.wait_until { @status == :req }
+        @res = yield(@req)
+        @status = :res
+        @res_ev.signal
+      end
+    end
+  end
+
   # Class handling the connection between a DRbObject and the
   # server the real object lives on.
   #
@@ -1222,31 +1265,45 @@ module DRb https://github.com/ruby/ruby/blob/trunk/lib/drb/drb.rb#L1265
   # not normally need to deal with it directly.
   class DRbConn
     POOL_SIZE = 16  # :nodoc:
-    @mutex = Thread::Mutex.new
-    @pool = []
 
-    def self.open(remote_uri)  # :nodoc:
-      begin
-        conn = nil
-        pid = $$
-
-        @mutex.synchronize do
-          #FIXME
-          new_pool = []
-          @pool.each do |c|
-            if c.pid == pid
-              if conn.nil? and c.uri == remote_uri
-                conn = c if c.alive?
-              else
-                new_pool.push c
+    def self.make_pool
+      ThreadObject.new do |queue|
+        pool = []
+        while true
+          queue._execute do |message|
+            case(message[0])
+            when :take then
+              remote_uri = message[1]
+              conn = nil
+              new_pool = []
+              pool.each do |c|
+                if conn.nil? and c.uri == remote_uri
+                  conn = c if c.alive?
+                else
+                  new_pool.push c
+                end
               end
+              pool = new_pool
+              conn
+            when :store then
+              conn = message[1]
+              pool.unshift(conn)
+              pool.pop.close while pool.size > POOL_SIZE
+              conn
             else
-              c.close
+              nil
             end
           end
-          @pool = new_pool
         end
+      end
+    end
+    @pool_proxy = make_pool
 
+    def self.open(remote_uri)  # :nodoc:
+      begin
+        @pool_proxy = make_pool unless @pool_proxy.alive?
+
+        conn = @pool_proxy.take(remote_uri)
         conn = self.new(remote_uri) unless conn
         succ, result = yield(conn)
         return succ, result
@@ -1254,10 +1311,7 @@ module DRb https://github.com/ruby/ruby/blob/trunk/lib/drb/drb.rb#L1311
       ensure
         if conn
           if succ
-            @mutex.synchronize do
-              @pool.unshift(conn)
-              @pool.pop.close while @pool.size > POOL_SIZE
-            end
+            @pool_proxy.store(conn)
           else
             conn.close
           end
@@ -1267,11 +1321,9 @@ module DRb https://github.com/ruby/ruby/blob/trunk/lib/drb/drb.rb#L1321
 
     def initialize(remote_uri)  # :nodoc:
       @uri = remote_uri
-      @pid = $$
       @protocol = DRbProtocol.open(remote_uri, DRb.config)
     end
     attr_reader :uri  # :nodoc:
-    attr_reader :pid  # :nodoc:
 
     def send_message(ref, msg_id, arg, block)  # :nodoc:
       @protocol.send_request(ref, msg_id, arg, block)
-- 
cgit v0.10.2


--
ML: ruby-changes@q...
Info: http://www.atdot.net/~ko1/quickml/

[前][次][番号順一覧][スレッド一覧]