[前][次][番号順一覧][スレッド一覧]

ruby-changes:26030

From: kosaki <ko1@a...>
Date: Sat, 1 Dec 2012 03:55:19 +0900 (JST)
Subject: [ruby-changes:26030] kosaki:r38087 (trunk): * lib/thread.rb (SizedQueue#pop): rewrite by using ConditionVariable.

kosaki	2012-12-01 03:55:09 +0900 (Sat, 01 Dec 2012)

  New Revision: 38087

  http://svn.ruby-lang.org/cgi-bin/viewvc.cgi?view=rev&revision=38087

  Log:
    * lib/thread.rb (SizedQueue#pop): rewrite by using ConditionVariable.
    * lib/thread.rb (SizedQueue#push): ditto.
    * lib/thread.rb (SizedQueue#max): ditto.
    * lib/thread.rb (Queue#pop): ditto.
    * lib/thread.rb (Queue#push): ditto.
    
    * lib/thread.rb (SizedQueue#num_waiting): adopt the above changes.
    * lib/thread.rb (SizedQueue#initialize): ditto.
    * lib/thread.rb (Queue#num_waiting): ditto.
    * lib/thread.rb (Queue#initialize): ditto.
    * test/thread/test_queue.rb (test_sized_queue_and_wakeup): ditto.

  Modified files:
    trunk/ChangeLog
    trunk/lib/thread.rb
    trunk/test/thread/test_queue.rb

Index: ChangeLog
===================================================================
--- ChangeLog	(revision 38086)
+++ ChangeLog	(revision 38087)
@@ -1,3 +1,17 @@
+Sat Dec  1 03:29:52 2012  KOSAKI Motohiro  <kosaki.motohiro@g...>
+
+	* lib/thread.rb (SizedQueue#pop): rewrite by using ConditionVariable.
+	* lib/thread.rb (SizedQueue#push): ditto.
+	* lib/thread.rb (SizedQueue#max): ditto.
+	* lib/thread.rb (Queue#pop): ditto.
+	* lib/thread.rb (Queue#push): ditto.
+
+	* lib/thread.rb (SizedQueue#num_waiting): adopt the above changes.
+	* lib/thread.rb (SizedQueue#initialize): ditto.
+	* lib/thread.rb (Queue#num_waiting): ditto.
+	* lib/thread.rb (Queue#initialize): ditto.
+	* test/thread/test_queue.rb (test_sized_queue_and_wakeup): ditto.
+
 Sat Dec  1 03:45:47 2012  Koichi Sasada  <ko1@a...>
 
 	* thread.c (Thread.async_interrupt_timing): fix RDoc.
Index: lib/thread.rb
===================================================================
--- lib/thread.rb	(revision 38086)
+++ lib/thread.rb	(revision 38087)
@@ -149,26 +149,23 @@
   #
   def initialize
     @que = []
-    @waiting = []
     @que.taint          # enable tainted communication
-    @waiting.taint
+    @num_waiting = 0
     self.taint
     @mutex = Mutex.new
+    @cond = ConditionVariable.new
   end
 
   #
   # Pushes +obj+ to the queue.
   #
   def push(obj)
-    @mutex.synchronize{
-      @que.push obj
-      begin
-        t = @waiting.shift
-        t.wakeup if t
-      rescue ThreadError
-        retry
+    Thread.async_interrupt_timing(StandardError => :on_blocking) do
+      @mutex.synchronize do
+        @que.push obj
+        @cond.signal
       end
-    }
+    end
   end
 
   #
@@ -187,23 +184,26 @@
   # thread isn't suspended, and an exception is raised.
   #
   def pop(non_block=false)
-    @mutex.synchronize{
-      begin
+    Thread.async_interrupt_timing(StandardError => :on_blocking) do
+      @mutex.synchronize do
         while true
           if @que.empty?
-            raise ThreadError, "queue empty" if non_block
-            # @waiting.include? check is necessary for avoiding a race against
-            # Thread.wakeup [Bug 5195]
-            @waiting.push Thread.current unless @waiting.include?(Thread.current)
-            @mutex.sleep
+            if non_block
+              raise ThreadError, "queue empty"
+            else
+              begin
+                @num_waiting += 1
+                @cond.wait @mutex
+              ensure
+                @num_waiting -= 1
+              end
+            end
           else
             return @que.shift
           end
         end
-      ensure
-        @waiting.delete(Thread.current)
       end
-    }
+    end
   end
 
   #
@@ -246,7 +246,7 @@
   # Returns the number of threads waiting on the queue.
   #
   def num_waiting
-    @waiting.size
+    @num_waiting
   end
 end
 
@@ -263,8 +263,8 @@
   def initialize(max)
     raise ArgumentError, "queue size must be positive" unless max > 0
     @max = max
-    @queue_wait = []
-    @queue_wait.taint           # enable tainted comunication
+    @enque_cond = ConditionVariable.new
+    @num_enqueue_waiting = 0
     super()
   end
 
@@ -280,22 +280,15 @@
   #
   def max=(max)
     raise ArgumentError, "queue size must be positive" unless max > 0
-    diff = nil
-    @mutex.synchronize {
+
+    @mutex.synchronize do
       if max <= @max
         @max = max
       else
         diff = max - @max
         @max = max
-      end
-    }
-    if diff
-      diff.times do
-        begin
-          t = @queue_wait.shift
-          t.run if t
-        rescue ThreadError
-          retry
+        diff.times do
+          @enque_cond.signal
         end
       end
     end
@@ -307,25 +300,22 @@
   # until space becomes available.
   #
   def push(obj)
-    @mutex.synchronize{
-      begin
+    Thread.async_interrupt_timing(RuntimeError => :on_blocking) do
+      @mutex.synchronize do
         while true
           break if @que.length < @max
-          @queue_wait.push Thread.current unless @queue_wait.include?(Thread.current)
-          @mutex.sleep
+          @num_enqueue_waiting += 1
+          begin
+            @enque_cond.wait @mutex
+          ensure
+            @num_enqueue_waiting -= 1
+          end
         end
-      ensure
-        @queue_wait.delete(Thread.current)
-      end
 
-      @que.push obj
-      begin
-        t = @waiting.shift
-        t.wakeup if t
-      rescue ThreadError
-        retry
+        @que.push obj
+        @cond.signal
       end
-    }
+    end
   end
 
   #
@@ -343,16 +333,11 @@
   #
   def pop(*args)
     retval = super
-    @mutex.synchronize {
+    @mutex.synchronize do
       if @que.length < @max
-        begin
-          t = @queue_wait.shift
-          t.wakeup if t
-        rescue ThreadError
-          retry
-        end
+        @enque_cond.signal
       end
-    }
+    end
     retval
   end
 
@@ -370,7 +355,7 @@
   # Returns the number of threads waiting on the queue.
   #
   def num_waiting
-    @waiting.size + @queue_wait.size
+    @num_waiting + @num_enqueue_waiting
   end
 end
 
Index: test/thread/test_queue.rb
===================================================================
--- test/thread/test_queue.rb	(revision 38086)
+++ test/thread/test_queue.rb	(revision 38087)
@@ -69,7 +69,8 @@
     t2 = Thread.start { sq.push(2) }
     sleep 0.1 until t1.stop? && t2.stop?
 
-    queue_wait = sq.instance_eval{ @queue_wait }
+    enque_cond = sq.instance_eval{ @enque_cond }
+    queue_wait = enque_cond.instance_eval { @waiters }
     assert_equal(queue_wait.uniq, queue_wait)
   end
 

--
ML: ruby-changes@q...
Info: http://www.atdot.net/~ko1/quickml/

[前][次][番号順一覧][スレッド一覧]