ruby-changes:27843
From: drbrain <ko1@a...>
Date: Sun, 24 Mar 2013 05:10:23 +0900 (JST)
Subject: [ruby-changes:27843] drbrain:r39895 (trunk): * lib/rinda/ring.rb: Add multicast support to Rinda::RingFinger and
drbrain 2013-03-24 05:10:11 +0900 (Sun, 24 Mar 2013) New Revision: 39895 http://svn.ruby-lang.org/cgi-bin/viewvc.cgi?view=rev&revision=39895 Log: * lib/rinda/ring.rb: Add multicast support to Rinda::RingFinger and Rinda::RingServer. [ruby-trunk - Bug #8073] * test/rinda/test_rinda.rb: Test for the above. * NEWS: Update with Rinda multicast support Modified files: trunk/ChangeLog trunk/NEWS trunk/lib/rinda/ring.rb trunk/test/rinda/test_rinda.rb Index: ChangeLog =================================================================== --- ChangeLog (revision 39894) +++ ChangeLog (revision 39895) @@ -1,3 +1,11 @@ https://github.com/ruby/ruby/blob/trunk/ChangeLog#L1 +Sun Mar 24 05:03:36 2013 Eric Hodel <drbrain@s...> + + * lib/rinda/ring.rb: Add multicast support to Rinda::RingFinger and + Rinda::RingServer. [ruby-trunk - Bug #8073] + * test/rinda/test_rinda.rb: Test for the above. + + * NEWS: Update with Rinda multicast support + Sun Mar 24 04:13:27 2013 Eric Hodel <drbrain@s...> * test/rinda/test_rinda.rb: Fixed test failures in r39890 and r39890 Index: lib/rinda/ring.rb =================================================================== --- lib/rinda/ring.rb (revision 39894) +++ lib/rinda/ring.rb (revision 39895) @@ -4,6 +4,7 @@ https://github.com/ruby/ruby/blob/trunk/lib/rinda/ring.rb#L4 require 'drb/drb' require 'rinda/rinda' require 'thread' +require 'ipaddr' module Rinda @@ -27,25 +28,90 @@ module Rinda https://github.com/ruby/ruby/blob/trunk/lib/rinda/ring.rb#L28 include DRbUndumped ## + # Special renewer for the RingServer to allow shutdown + + class Renewer # :nodoc: + include DRbUndumped + + ## + # Set to false to shutdown future requests using this Renewer + + attr_accessor :renew + + def initialize # :nodoc: + @renew = true + end + + def renew # :nodoc: + @renew ? 1 : true + end + end + + ## # Advertises +ts+ on the UDP broadcast address at +port+. - def initialize(ts, port=Ring_PORT) + def initialize(ts, addresses=[Socket::INADDR_ANY], port=Ring_PORT) + @port = port + + if Integer === addresses then + addresses, @port = [Socket::INADDR_ANY], addresses + end + + @renewer = Renewer.new + @ts = ts - @soc = UDPSocket.open - @soc.bind('', port) - @w_service = write_service - @r_service = reply_service + @sockets = addresses.map do |address| + make_socket(address) + end + + @w_services = write_services + @r_service = reply_service end ## - # Creates a thread that picks up UDP packets and passes them to do_write - # for decoding. + # Creates a socket at +address+ - def write_service - Thread.new do - loop do - msg = @soc.recv(1024) - do_write(msg) + def make_socket(address) + addrinfo = Addrinfo.udp(address, @port) + + socket = Socket.new(addrinfo.pfamily, addrinfo.socktype, + addrinfo.protocol) + + if addrinfo.ipv4_multicast? or addrinfo.ipv6_multicast? then + if Socket.const_defined?(:SO_REUSEPORT) then + socket.setsockopt(:SOCKET, :SO_REUSEPORT, true) + else + socket.setsockopt(:SOCKET, :SO_REUSEADDR, true) + end + + if addrinfo.ipv4_multicast? then + mreq = IPAddr.new(addrinfo.ip_address).hton + + IPAddr.new('0.0.0.0').hton + + socket.setsockopt(:IPPROTO_IP, :IP_ADD_MEMBERSHIP, mreq) + else + mreq = IPAddr.new(addrinfo.ip_address).hton + [0].pack('I') + + socket.setsockopt(:IPPROTO_IPV6, :IPV6_JOIN_GROUP, mreq) + end + end + + socket.bind(addrinfo) + + socket + end + + ## + # Creates threads that pick up UDP packets and passes them to do_write for + # decoding. + + def write_services + @sockets.map do |s| + Thread.new(s) do |socket| + loop do + msg = socket.recv(1024) + do_write(msg) + end end end end @@ -80,11 +146,28 @@ module Rinda https://github.com/ruby/ruby/blob/trunk/lib/rinda/ring.rb#L146 # address of the local TupleSpace. def do_reply - tuple = @ts.take([:lookup_ring, DRbObject]) + tuple = @ts.take([:lookup_ring, DRbObject], @renewer) Thread.new { tuple[1].call(@ts) rescue nil} rescue end + ## + # Shuts down the RingServer + + def shutdown + @renewer.renew = false + + @w_services.each do |thread| + thread.kill + end + + @sockets.each do |socket| + socket.close + end + + @r_service.kill + end + end ## @@ -131,6 +214,18 @@ module Rinda https://github.com/ruby/ruby/blob/trunk/lib/rinda/ring.rb#L214 attr_accessor :broadcast_list ## + # Maximum number of hops for sent multicast packets (if using a multicast + # address in the broadcast list). The default is 1 (same as UDP + # broadcast). + + attr_accessor :multicast_hops + + ## + # The interface index to send IPv6 multicast packets from. + + attr_accessor :multicast_interface + + ## # The port that RingFinger will send query packets to. attr_accessor :port @@ -149,6 +244,9 @@ module Rinda https://github.com/ruby/ruby/blob/trunk/lib/rinda/ring.rb#L244 @port = port @primary = nil @rings = [] + + @multicast_hops = 1 + @multicast_interface = 0 end ## @@ -178,15 +276,7 @@ module Rinda https://github.com/ruby/ruby/blob/trunk/lib/rinda/ring.rb#L276 msg = Marshal.dump([[:lookup_ring, DRbObject.new(block)], timeout]) @broadcast_list.each do |it| - soc = UDPSocket.open - begin - soc.setsockopt(Socket::SOL_SOCKET, Socket::SO_BROADCAST, true) - soc.send(msg, 0, it, @port) - rescue - nil - ensure - soc.close - end + send_message(it, msg) end sleep(timeout) end @@ -217,6 +307,44 @@ module Rinda https://github.com/ruby/ruby/blob/trunk/lib/rinda/ring.rb#L307 @primary end + ## + # Creates a socket for +address+ with the appropriate multicast options + # for multicast addresses. + + def make_socket(address) # :nodoc: + addrinfo = Addrinfo.udp(address, @port) + + soc = Socket.new(addrinfo.pfamily, addrinfo.socktype, addrinfo.protocol) + + if addrinfo.ipv4_multicast? then + soc.setsockopt(:IPPROTO_IP, :IP_MULTICAST_LOOP, true) + soc.setsockopt(:IPPROTO_IP, :IP_MULTICAST_TTL, + [@multicast_hops].pack('c')) + elsif addrinfo.ipv6_multicast? then + soc.setsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_LOOP, true) + soc.setsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_HOPS, + [@multicast_hops].pack('I')) + soc.setsockopt(:IPPROTO_IPV6, :IPV6_MULTICAST_IF, + [@multicast_interface].pack('I')) + else + soc.setsockopt(:SOL_SOCKET, :SO_BROADCAST, true) + end + + soc.connect(addrinfo) + + soc + end + + def send_message(address, message) # :nodoc: + soc = make_socket(address) + + soc.send(message, 0) + rescue + nil + ensure + soc.close if soc + end + end ## Index: NEWS =================================================================== --- NEWS (revision 39894) +++ NEWS (revision 39895) @@ -32,6 +32,8 @@ with all sufficient information, see the https://github.com/ruby/ruby/blob/trunk/NEWS#L32 * Net::SMTP * Added Net::SMTP#rset to implement the RSET comamnd +* Rinda::RingServer, Rinda::RingFinger + * Rinda now supports multicast sockets === Stdlib compatibility issues (excluding feature bug fixes) === C API updates Index: test/rinda/test_rinda.rb =================================================================== --- test/rinda/test_rinda.rb (revision 39894) +++ test/rinda/test_rinda.rb (revision 39895) @@ -2,519 +2,620 @@ require 'test/unit' https://github.com/ruby/ruby/blob/trunk/test/rinda/test_rinda.rb#L2 require 'drb/drb' require 'drb/eq' +require 'rinda/ring' require 'rinda/tuplespace' require 'singleton' module Rinda -class MockClock - include Singleton +#class MockClock +# include Singleton +# +# class MyTS < Rinda::TupleSpace +# def keeper_thread +# nil +# end +# end +# +# def initialize +# @now = 2 +# @reso = 1 +# @ts = MyTS.new +# @ts.write([2, :now]) +# @inf = 2**31 - 1 +# end +# +# def now +# @now.to_f +# end +# +# def at(n) +# n +# end +# +# def _forward(n=nil) +# now ,= @ts.take([nil, :now]) +# @now = now + n +# n = @reso if n.nil? +# @ts.write([@now, :now]) +# end +# +# def forward(n) +# while n > 0 +# _forward(@reso) +# n -= @reso +# Thread.pass +# end +# end +# +# def rewind +# now ,= @ts.take([nil, :now]) +# @ts.write([@inf, :now]) +# @ts.take([nil, :now]) +# @now = 2 +# @ts.write([2, :now]) +# end +# +# def sleep(n=nil) +# now ,= @ts.read([nil, :now]) +# @ts.read([(now + n)..@inf, :now]) +# 0 +# end +#end +# +#module Time +# def sleep(n) +# @m.sleep(n) +# end +# module_function :sleep +# +# def at(n) +# n +# end +# module_function :at +# +# def now +# @m ? @m.now : 2 +# end +# module_function :now +# +# def rewind +# @m.rewind +# end +# module_function :rewind +# +# def forward(n) +# @m.forward(n) +# end +# module_function :forward +# +# @m = MockClock.instance +#end +# +#class TupleSpace +# def sleep(n) +# Kernel.sleep(n * 0.01) +# end +#end +# +#module TupleSpaceTestModule +# def sleep(n) +# if Thread.current == Thread.main +# Time.forward(n) +# else +# Time.sleep(n) +# end +# end +# +# def thread_join(th) +# while th.alive? +# Kernel.sleep(0.1) +# sleep(1) +# end +# th.value +# end +# +# def test_00_tuple +# tuple = Rinda::TupleEntry.new([1,2,3]) +# assert(!tuple.canceled?) +# assert(!tuple.expired?) +# assert(tuple.alive?) +# end +# +# def test_00_template +# tmpl = Rinda::Template.new([1,2,3]) +# assert_equal(3, tmpl.size) +# assert_equal(3, tmpl[2]) +# assert(tmpl.match([1,2,3])) +# assert(!tmpl.match([1,nil,3])) +# +# tmpl = Rinda::Template.new([/^rinda/i, nil, :hello]) +# assert_equal(3, tmpl.size) +# assert(tmpl.match(['Rinda', 2, :hello])) +# assert(!tmpl.match(['Rinda', 2, Symbol])) +# assert(!tmpl.match([1, 2, :hello])) +# assert(tmpl.match([/^rinda/i, 2, :hello])) +# +# tmpl = Rinda::Template.new([Symbol]) +# assert_equal(1, tmpl.size) +# assert(tmpl.match([:hello])) +# assert(tmpl.match([Symbol])) +# assert(!tmpl.match(['Symbol'])) +# +# tmpl = Rinda::Template.new({"message"=>String, "name"=>String}) +# assert_equal(2, tmpl.size) +# assert(tmpl.match({"message"=>"Hello", "name"=>"Foo"})) +# assert(!tmpl.match({"message"=>"Hello", "name"=>"Foo", "1"=>2})) +# assert(!tmpl.match({"message"=>"Hi", "name"=>"Foo", "age"=>1})) +# assert(!tmpl.match({"message"=>"Hello", "no_name"=>"Foo"})) +# +# assert_raise(Rinda::InvalidHashTupleKey) do +# Rinda::Template.new({:message=>String, "name"=>String}) +# end +# tmpl = Rinda::Template.new({"name"=>String}) +# assert_equal(1, tmpl.size) +# assert(tmpl.match({"name"=>"Foo"})) +# assert(!tmpl.match({"message"=>"Hello", "name"=>"Foo"})) +# assert(!tmpl.match({"message"=>:symbol, "name"=>"Foo", "1"=>2})) +# assert(!tmpl.match({"message"=>"Hi", "name"=>"Foo", "age"=>1})) +# assert(!tmpl.match({"message"=>"Hello", "no_name"=>"Foo"})) +# +# tmpl = Rinda::Template.new({"message"=>String, "name"=>String}) +# assert_equal(2, tmpl.size) +# assert(tmpl.match({"message"=>"Hello", "name"=>"Foo"})) +# assert(!tmpl.match({"message"=>"Hello", "name"=>"Foo", "1"=>2})) +# assert(!tmpl.match({"message"=>"Hi", "name"=>"Foo", "age"=>1})) +# assert(!tmpl.match({"message"=>"Hello", "no_name"=>"Foo"})) +# +# tmpl = Rinda::Template.new({"message"=>String}) +# assert_equal(1, tmpl.size) +# assert(tmpl.match({"message"=>"Hello"})) +# assert(!tmpl.match({"message"=>"Hello", "name"=>"Foo"})) +# assert(!tmpl.match({"message"=>"Hello", "name"=>"Foo", "1"=>2})) +# assert(!tmpl.match({"message"=>"Hi", "name"=>"Foo", "age"=>1})) +# assert(!tmpl.match({"message"=>"Hello", "no_name"=>"Foo"})) +# +# tmpl = Rinda::Template.new({"message"=>String, "name"=>nil}) +# assert_equal(2, tmpl.size) +# assert(tmpl.match({"message"=>"Hello", "name"=>"Foo"})) +# assert(!tmpl.match({"message"=>"Hello", "name"=>"Foo", "1"=>2})) +# assert(!tmpl.match({"message"=>"Hi", "name"=>"Foo", "age"=>1})) +# assert(!tmpl.match({"message"=>"Hello", "no_name"=>"Foo"})) +# +# assert_raise(Rinda::InvalidHashTupleKey) do +# @ts.write({:message=>String, "name"=>String}) +# end +# +# @ts.write([1, 2, 3]) +# assert_equal([1, 2, 3], @ts.take([1, 2, 3])) +# +# @ts.write({'1'=>1, '2'=>2, '3'=>3}) +# assert_equal({'1'=>1, '2'=>2, '3'=>3}, @ts.take({'1'=>1, '2'=>2, '3'=>3})) +# +# entry = @ts.write(['1'=>1, '2'=>2, '3'=>3]) +# assert_raise(Rinda::RequestExpiredError) do +# assert_equal({'1'=>1, '2'=>2, '3'=>3}, @ts.read({'1'=>1}, 0)) +# end +# entry.cancel +# end +# +# def test_00_DRbObject +# ro = DRbObject.new(nil, "druby://host:1234") +# tmpl = Rinda::DRbObjectTemplate.new +# assert(tmpl === ro) +# +# tmpl = Rinda::DRbObjectTemplate.new("druby://host:1234") +# assert(tmpl === ro) +# +# tmpl = Rinda::DRbObjectTemplate.new("druby://host:12345") +# assert(!(tmpl === ro)) +# +# tmpl = Rinda::DRbObjectTemplate.new(/^druby:\/\/host:/) +# assert(tmpl === ro) +# +# ro = DRbObject.new_with(12345, 1234) +# assert(!(tmpl === ro)) +# +# ro = DRbObject.new_with("druby://foo:12345", 1234) +# assert(!(tmpl === ro)) +# +# tmpl = Rinda::DRbObjectTemplate.new(/^druby:\/\/(foo|bar):/) +# assert(tmpl === ro) +# +# ro = DRbObject.new_with("druby://bar:12345", 1234) +# assert(tmpl === ro) +# +# ro = DRbObject.new_with("druby://baz:12345", 1234) +# assert(!(tmpl === ro)) +# end +# +# def test_inp_rdp +# assert_raise(Rinda::RequestExpiredError) do +# @ts.take([:empty], 0) +# end +# +# assert_raise(Rinda::RequestExpiredError) do +# @ts.read([:empty], 0) +# end +# end +# +# def test_ruby_talk_264062 +# th = Thread.new { @ts.take([:empty], 1) } +# sleep(10) +# assert_raise(Rinda::RequestExpiredError) do +# thread_join(th) +# end +# +# th = Thread.new { @ts.read([:empty], 1) } +# sleep(10) +# assert_raise(Rinda::RequestExpiredError) do +# thread_join(th) +# end +# end +# +# def test_symbol_tuple +# @ts.write([:symbol, :symbol]) +# @ts.write(['string', :string]) +# assert_equal([[:symbol, :symbol]], @ts.read_all([:symbol, nil])) +# assert_equal([[:symbol, :symbol]], @ts.read_all([Symbol, nil])) +# assert_equal([], @ts.read_all([:nil, nil])) +# end +# +# def test_core_01 +# 5.times do +# @ts.write([:req, 2]) +# end +# +# assert_equal([[:req, 2], [:req, 2], [:req, 2], [:req, 2], [:req, 2]], +# @ts.read_all([nil, nil])) +# +# taker = Thread.new(5) do |count| +# s = 0 +# count.times do +# tuple = @ts.take([:req, Integer]) +# assert_equal(2, tuple[1]) +# s += tuple[1] +# end +# @ts.write([:ans, s]) +# s +# end +# +# assert_equal(10, thread_join(taker)) +# assert_equal([:ans, 10], @ts.take([:ans, 10])) +# assert_equal([], @ts.read_all([nil, nil])) +# end +# +# def test_core_02 +# taker = Thread.new(5) do |count| +# s = 0 +# count.times do +# tuple = @ts.take([:req, Integer]) +# assert_equal(2, tuple[1]) +# s += tuple[1] +# end +# @ts.write([:ans, s]) +# s +# end +# +# 5.times do +# @ts.write([:req, 2]) +# end +# +# assert_equal(10, thread_join(taker)) +# assert_equal([:ans, 10], @ts.take([:ans, 10])) +# assert_equal([], @ts.read_all([nil, nil])) +# end +# +# def test_core_03_notify +# notify1 = @ts.notify(nil, [:req, Integer]) +# notify2 = @ts.notify(nil, {"message"=>String, "name"=>String}) +# +# 5.times do +# @ts.write([:req, 2]) +# end +# +# 5.times do +# tuple = @ts.take([:req, Integer]) +# assert_equal(2, tuple[1]) +# end +# +# 5.times do +# assert_equal(['write', [:req, 2]], notify1.pop) +# end +# 5.times do +# assert_equal(['take', [:req, 2]], notify1.pop) +# end +# +# @ts.write({"message"=>"first", "name"=>"3"}) +# @ts.write({"message"=>"second", "name"=>"1"}) +# @ts.write({"message"=>"third", "name"=>"0"}) +# @ts.take({"message"=>"third", "name"=>"0"}) +# @ts.take({"message"=>"first", "name"=>"3"}) +# +# assert_equal(["write", {"message"=>"first", "name"=>"3"}], notify2.pop) +# assert_equal(["write", {"message"=>"second", "name"=>"1"}], notify2.pop) +# assert_equal(["write", {"message"=>"third", "name"=>"0"}], notify2.pop) +# assert_equal(["take", {"message"=>"third", "name"=>"0"}], notify2.pop) +# assert_equal(["take", {"message"=>"first", "name"=>"3"}], notify2.pop) +# end +# +# def test_cancel_01 +# entry = @ts.write([:removeme, 1]) +# assert_equal([[:removeme, 1]], @ts.read_all([nil, nil])) +# entry.cancel +# assert_equal([], @ts.read_all([nil, nil])) +# +# template = nil +# taker = Thread.new do +# @ts.take([:take, nil], 10) do |t| +# template = t +# Thread.new do +# template.cancel +# end +# end +# end +# +# sleep(2) +# +# assert_raise(Rinda::RequestCanceledError) do +# assert_nil(thread_join(taker)) +# end +# +# assert(template.canceled?) +# +# @ts.write([:take, 1]) +# +# assert_equal([[:take, 1]], @ts.read_all([nil, nil])) +# end +# +# def test_cancel_02 +# entry = @ts.write([:removeme, 1]) +# assert_equal([[:removeme, 1]], @ts.read_all([nil, nil])) +# entry.cancel +# assert_equal([], @ts.read_all([nil, nil])) +# +# template = nil +# reader = Thread.new do +# @ts.read([:take, nil], 10) do |t| +# template = t +# Thread.new do +# template.cancel +# end +# end +# end +# +# sleep(2) +# +# assert_raise(Rinda::RequestCanceledError) do +# assert_nil(thread_join(reader)) +# end +# +# assert(template.canceled?) +# +# @ts.write([:take, 1]) +# +# assert_equal([[:take, 1]], @ts.read_all([nil, nil])) +# end +# +# class SimpleRenewer +# def initialize(sec, n = 1) +# @sec = sec +# @n = n +# end +# +# def renew +# return -1 if @n <= 0 +# @n -= 1 +# return @sec +# end +# end +# +# def test_00_renewer +# tuple = Rinda::TupleEntry.new([1,2,3], true) +# assert(!tuple.canceled?) +# assert(tuple.expired?) +# assert(!tuple.alive?) +# +# tuple = Rinda::TupleEntry.new([1,2,3], 1) +# assert(!tuple.canceled?) +# assert(!tuple.expired?) +# assert(tuple.alive?) +# sleep(2) +# assert(tuple.expired?) +# assert(!tuple.alive?) +# +# @renewer = SimpleRenewer.new(1,2) +# tuple = Rinda::TupleEntry.new([1,2,3], @renewer) +# assert(!tuple.canceled?) +# assert(!tuple.expired?) +# assert(tuple.alive?) +# sleep(1) +# assert(!tuple.canceled?) +# assert(!tuple.expired?) +# assert(tuple.alive?) +# sleep(2) +# assert(tuple.expired?) +# assert(!tuple.alive?) +# end +#end +# +#class TupleSpaceTest < Test::Unit::TestCase +# include TupleSpaceTestModule +# +# def setup +# ThreadGroup.new.add(Thread.current) +# @ts = Rinda::TupleSpace.new(1) +# end +# def teardown +# # implementation-dependent +# @ts.instance_eval{@keeper.kill if @keeper} +# end +#end +# +#class TupleSpaceProxyTest < Test::Unit::TestCase +# include TupleSpaceTestModule +# +# def setup +# ThreadGroup.new.add(Thread.current) +# @ts_base = Rinda::TupleSpace.new(1) +# @ts = Rinda::TupleSpaceProxy.new(@ts_base) +# end +# def teardown +# # implementation-dependent +# @ts_base.instance_eval{@keeper.kill if @keeper} +# end +# +# def test_remote_array_and_hash +# # Don't remove ary/hsh local variables. +# # These are necessary to protect objects from GC. +# ary = [1, 2, 3] +# @ts.write(DRbObject.new(ary)) +# assert_equal([1, 2, 3], @ts.take([1, 2, 3], 0)) +# h (... truncated) -- ML: ruby-changes@q... Info: http://www.atdot.net/~ko1/quickml/