ruby-changes:26030
From: kosaki <ko1@a...>
Date: Sat, 1 Dec 2012 03:55:19 +0900 (JST)
Subject: [ruby-changes:26030] kosaki:r38087 (trunk): * lib/thread.rb (SizedQueue#pop): rewrite by using ConditionVariable.
kosaki 2012-12-01 03:55:09 +0900 (Sat, 01 Dec 2012) New Revision: 38087 http://svn.ruby-lang.org/cgi-bin/viewvc.cgi?view=rev&revision=38087 Log: * lib/thread.rb (SizedQueue#pop): rewrite by using ConditionVariable. * lib/thread.rb (SizedQueue#push): ditto. * lib/thread.rb (SizedQueue#max): ditto. * lib/thread.rb (Queue#pop): ditto. * lib/thread.rb (Queue#push): ditto. * lib/thread.rb (SizedQueue#num_waiting): adopt the above changes. * lib/thread.rb (SizedQueue#initialize): ditto. * lib/thread.rb (Queue#num_waiting): ditto. * lib/thread.rb (Queue#initialize): ditto. * test/thread/test_queue.rb (test_sized_queue_and_wakeup): ditto. Modified files: trunk/ChangeLog trunk/lib/thread.rb trunk/test/thread/test_queue.rb Index: ChangeLog =================================================================== --- ChangeLog (revision 38086) +++ ChangeLog (revision 38087) @@ -1,3 +1,17 @@ +Sat Dec 1 03:29:52 2012 KOSAKI Motohiro <kosaki.motohiro@g...> + + * lib/thread.rb (SizedQueue#pop): rewrite by using ConditionVariable. + * lib/thread.rb (SizedQueue#push): ditto. + * lib/thread.rb (SizedQueue#max): ditto. + * lib/thread.rb (Queue#pop): ditto. + * lib/thread.rb (Queue#push): ditto. + + * lib/thread.rb (SizedQueue#num_waiting): adopt the above changes. + * lib/thread.rb (SizedQueue#initialize): ditto. + * lib/thread.rb (Queue#num_waiting): ditto. + * lib/thread.rb (Queue#initialize): ditto. + * test/thread/test_queue.rb (test_sized_queue_and_wakeup): ditto. + Sat Dec 1 03:45:47 2012 Koichi Sasada <ko1@a...> * thread.c (Thread.async_interrupt_timing): fix RDoc. Index: lib/thread.rb =================================================================== --- lib/thread.rb (revision 38086) +++ lib/thread.rb (revision 38087) @@ -149,26 +149,23 @@ # def initialize @que = [] - @waiting = [] @que.taint # enable tainted communication - @waiting.taint + @num_waiting = 0 self.taint @mutex = Mutex.new + @cond = ConditionVariable.new end # # Pushes +obj+ to the queue. # def push(obj) - @mutex.synchronize{ - @que.push obj - begin - t = @waiting.shift - t.wakeup if t - rescue ThreadError - retry + Thread.async_interrupt_timing(StandardError => :on_blocking) do + @mutex.synchronize do + @que.push obj + @cond.signal end - } + end end # @@ -187,23 +184,26 @@ # thread isn't suspended, and an exception is raised. # def pop(non_block=false) - @mutex.synchronize{ - begin + Thread.async_interrupt_timing(StandardError => :on_blocking) do + @mutex.synchronize do while true if @que.empty? - raise ThreadError, "queue empty" if non_block - # @waiting.include? check is necessary for avoiding a race against - # Thread.wakeup [Bug 5195] - @waiting.push Thread.current unless @waiting.include?(Thread.current) - @mutex.sleep + if non_block + raise ThreadError, "queue empty" + else + begin + @num_waiting += 1 + @cond.wait @mutex + ensure + @num_waiting -= 1 + end + end else return @que.shift end end - ensure - @waiting.delete(Thread.current) end - } + end end # @@ -246,7 +246,7 @@ # Returns the number of threads waiting on the queue. # def num_waiting - @waiting.size + @num_waiting end end @@ -263,8 +263,8 @@ def initialize(max) raise ArgumentError, "queue size must be positive" unless max > 0 @max = max - @queue_wait = [] - @queue_wait.taint # enable tainted comunication + @enque_cond = ConditionVariable.new + @num_enqueue_waiting = 0 super() end @@ -280,22 +280,15 @@ # def max=(max) raise ArgumentError, "queue size must be positive" unless max > 0 - diff = nil - @mutex.synchronize { + + @mutex.synchronize do if max <= @max @max = max else diff = max - @max @max = max - end - } - if diff - diff.times do - begin - t = @queue_wait.shift - t.run if t - rescue ThreadError - retry + diff.times do + @enque_cond.signal end end end @@ -307,25 +300,22 @@ # until space becomes available. # def push(obj) - @mutex.synchronize{ - begin + Thread.async_interrupt_timing(RuntimeError => :on_blocking) do + @mutex.synchronize do while true break if @que.length < @max - @queue_wait.push Thread.current unless @queue_wait.include?(Thread.current) - @mutex.sleep + @num_enqueue_waiting += 1 + begin + @enque_cond.wait @mutex + ensure + @num_enqueue_waiting -= 1 + end end - ensure - @queue_wait.delete(Thread.current) - end - @que.push obj - begin - t = @waiting.shift - t.wakeup if t - rescue ThreadError - retry + @que.push obj + @cond.signal end - } + end end # @@ -343,16 +333,11 @@ # def pop(*args) retval = super - @mutex.synchronize { + @mutex.synchronize do if @que.length < @max - begin - t = @queue_wait.shift - t.wakeup if t - rescue ThreadError - retry - end + @enque_cond.signal end - } + end retval end @@ -370,7 +355,7 @@ # Returns the number of threads waiting on the queue. # def num_waiting - @waiting.size + @queue_wait.size + @num_waiting + @num_enqueue_waiting end end Index: test/thread/test_queue.rb =================================================================== --- test/thread/test_queue.rb (revision 38086) +++ test/thread/test_queue.rb (revision 38087) @@ -69,7 +69,8 @@ t2 = Thread.start { sq.push(2) } sleep 0.1 until t1.stop? && t2.stop? - queue_wait = sq.instance_eval{ @queue_wait } + enque_cond = sq.instance_eval{ @enque_cond } + queue_wait = enque_cond.instance_eval { @waiters } assert_equal(queue_wait.uniq, queue_wait) end -- ML: ruby-changes@q... Info: http://www.atdot.net/~ko1/quickml/