From 6dc5c832f83aa554783d297ead52cd0d5d4db195 Mon Sep 17 00:00:00 2001 From: Nick Thomas Date: Sun, 13 Nov 2011 12:00:15 +0000 Subject: [PATCH] Alter the interface presented by Connectors::Socket We now provide a queue-like reader as well as a queue-lke writer. Additional read queues can still be registered. --- Rakefile | 6 ++--- lib/qmp_client/connectors/socket.rb | 9 ++++--- .../unit/qmp_client/connectors/test_socket.rb | 24 +++++++++---------- 3 files changed, 19 insertions(+), 20 deletions(-) diff --git a/Rakefile b/Rakefile index f4494a7..c0ecd0f 100644 --- a/Rakefile +++ b/Rakefile @@ -14,10 +14,8 @@ spec = Gem::Specification.new do |s| s.description = "Library that interfaces with running QEMU processes over QMP" s.author = 'Nicholas Thomas' s.email = 'nick@lupine.me.uk' - # s.executables = ['your_executable_here'] - s.files = %w(LICENSE README Rakefile) + Dir.glob("{bin,lib,spec,test}/**/*") + s.files = %w(LICENSE README Rakefile) + Dir.glob("{bin,lib,test}/**/*") s.require_path = "lib" - s.bindir = "bin" end Rake::GemPackageTask.new(spec) do |p| @@ -52,7 +50,7 @@ SCHEMA_OUTPUT = File.join( ) namespace :dev do - desc "Download the QMP schema filelinux-x86_64/pty netbeans" + desc "Download the QMP schema file" task :fetch_schema do `wget '#{SCHEMA_LOC}' -O '#{SCHEMA_OUTPUT}'` end diff --git a/lib/qmp_client/connectors/socket.rb b/lib/qmp_client/connectors/socket.rb index 551b1b1..94dec9b 100644 --- a/lib/qmp_client/connectors/socket.rb +++ b/lib/qmp_client/connectors/socket.rb @@ -1,5 +1,4 @@ require 'mutex_m' - module QMPClient module Connectors @@ -61,15 +60,18 @@ module QMPClient # block, and stops reading/writing the socket once the block returns. # @param[IO] read_socket Socket to read data from # @param[IO] write_socket Socket to write data to. Use read_socket if nil - # @yields[WriteProxy] writer Push messages to this to get them on the wire + # @yields[Queue, WriteProxy] reader, writer pop / Push messages def run(read_socket, write_socket=nil, &blk) write_socket ||= read_socket + readq = Queue.new + register_receive_queue(readq) write_proxy = WriteProxy.new(write_socket, @serialiser) begin stop_r, stop_w = IO.pipe r_thread = read_thread(read_socket, stop_r) - yield(write_proxy) + yield(readq, write_proxy) ensure + unregister_receive_queue(readq) if defined?(readq) stop_w.write("*") if stop_w # Stop signal for the r_thread stop_w.close if stop_w && !stop_w.closed? stop_r.close if stop_r && !stop_w.closed? @@ -114,6 +116,7 @@ module QMPClient break if buf.nil? # Dispatching raised an error of some sort! end end + end # Converts each message in +buf+ into an instance, and pushes it to each diff --git a/test/unit/qmp_client/connectors/test_socket.rb b/test/unit/qmp_client/connectors/test_socket.rb index beb8e02..2e46359 100644 --- a/test/unit/qmp_client/connectors/test_socket.rb +++ b/test/unit/qmp_client/connectors/test_socket.rb @@ -9,8 +9,6 @@ module TestQMPClient @deserialiser = mock("(deserialiser)") @connector = QMPClient::Connectors::Socket.new(@serialiser, @deserialiser) - @msg_received_q = Queue.new - @connector.register_receive_queue(@msg_received_q) @read_r, @read_w = IO.pipe @write_r, @write_w = IO.pipe @@ -23,8 +21,8 @@ module TestQMPClient def with_connector(reason="Run block", t=1, &blk) assert_doesnt_time_out(t, reason) do - @connector.run(@read_r, @write_w) do |writer| - + @connector.run(@read_r, @write_w) do |reader, writer| + @read_interface = reader @write_interface = writer yield @connector, writer end @@ -50,8 +48,8 @@ module TestQMPClient def assert_client_saw(symbol, discard = 0) assert_doesnt_time_out(1, "checking message #{symbol.inspect} was received from server") do - discard.times { @msg_received_q.pop } - assert_equal(symbol, @msg_received_q.pop) + discard.times { @read_interface.pop } + assert_equal(symbol, @read_interface.pop) end end @@ -73,7 +71,7 @@ module TestQMPClient end def test_writes_are_threadsafe - with_connector do |connector, writer| + with_connector do |reader, writer| (1..5).each {|n| Thread.new { message_from_client(:"Message #{n}")}} data = (1..5).collect {|n| @write_r.gets } @@ -85,17 +83,17 @@ module TestQMPClient end def test_register_unregister_receive_queue - with_connector do |connector, writer| + with_connector do |reader, writer| rq1 = Queue.new # We expect both of these to get both messages rq2 = Queue.new urq = Queue.new # We're going to unregister this one after one msg - connector.register_receive_queue(rq1) - connector.register_receive_queue(rq2) - connector.register_receive_queue(urq) + @connector.register_receive_queue(rq1) + @connector.register_receive_queue(rq2) + @connector.register_receive_queue(urq) message_from_server(:test_1) assert_client_saw(:test_1) - connector.unregister_receive_queue(urq) + @connector.unregister_receive_queue(urq) message_from_server(:test_2) assert_client_saw(:test_2) @@ -115,7 +113,7 @@ module TestQMPClient end def test_run_block_finishes_cleanly_if_socket_reports_error - with_connector do |connector, writer| + with_connector do |reader, writer| Kernel.expects(:select).once.returns([[], [], [@read_r]]) end end