Alter the interface presented by Connectors::Socket
We now provide a queue-like reader as well as a queue-lke writer. Additional read queues can still be registered.
This commit is contained in:
6
Rakefile
6
Rakefile
@@ -14,10 +14,8 @@ spec = Gem::Specification.new do |s|
|
|||||||
s.description = "Library that interfaces with running QEMU processes over QMP"
|
s.description = "Library that interfaces with running QEMU processes over QMP"
|
||||||
s.author = 'Nicholas Thomas'
|
s.author = 'Nicholas Thomas'
|
||||||
s.email = 'nick@lupine.me.uk'
|
s.email = 'nick@lupine.me.uk'
|
||||||
# s.executables = ['your_executable_here']
|
s.files = %w(LICENSE README Rakefile) + Dir.glob("{bin,lib,test}/**/*")
|
||||||
s.files = %w(LICENSE README Rakefile) + Dir.glob("{bin,lib,spec,test}/**/*")
|
|
||||||
s.require_path = "lib"
|
s.require_path = "lib"
|
||||||
s.bindir = "bin"
|
|
||||||
end
|
end
|
||||||
|
|
||||||
Rake::GemPackageTask.new(spec) do |p|
|
Rake::GemPackageTask.new(spec) do |p|
|
||||||
@@ -52,7 +50,7 @@ SCHEMA_OUTPUT = File.join(
|
|||||||
)
|
)
|
||||||
|
|
||||||
namespace :dev do
|
namespace :dev do
|
||||||
desc "Download the QMP schema filelinux-x86_64/pty netbeans"
|
desc "Download the QMP schema file"
|
||||||
task :fetch_schema do
|
task :fetch_schema do
|
||||||
`wget '#{SCHEMA_LOC}' -O '#{SCHEMA_OUTPUT}'`
|
`wget '#{SCHEMA_LOC}' -O '#{SCHEMA_OUTPUT}'`
|
||||||
end
|
end
|
||||||
|
@@ -1,5 +1,4 @@
|
|||||||
require 'mutex_m'
|
require 'mutex_m'
|
||||||
|
|
||||||
module QMPClient
|
module QMPClient
|
||||||
module Connectors
|
module Connectors
|
||||||
|
|
||||||
@@ -61,15 +60,18 @@ module QMPClient
|
|||||||
# block, and stops reading/writing the socket once the block returns.
|
# block, and stops reading/writing the socket once the block returns.
|
||||||
# @param[IO] read_socket Socket to read data from
|
# @param[IO] read_socket Socket to read data from
|
||||||
# @param[IO] write_socket Socket to write data to. Use read_socket if nil
|
# @param[IO] write_socket Socket to write data to. Use read_socket if nil
|
||||||
# @yields[WriteProxy] writer Push messages to this to get them on the wire
|
# @yields[Queue, WriteProxy] reader, writer pop / Push messages
|
||||||
def run(read_socket, write_socket=nil, &blk)
|
def run(read_socket, write_socket=nil, &blk)
|
||||||
write_socket ||= read_socket
|
write_socket ||= read_socket
|
||||||
|
readq = Queue.new
|
||||||
|
register_receive_queue(readq)
|
||||||
write_proxy = WriteProxy.new(write_socket, @serialiser)
|
write_proxy = WriteProxy.new(write_socket, @serialiser)
|
||||||
begin
|
begin
|
||||||
stop_r, stop_w = IO.pipe
|
stop_r, stop_w = IO.pipe
|
||||||
r_thread = read_thread(read_socket, stop_r)
|
r_thread = read_thread(read_socket, stop_r)
|
||||||
yield(write_proxy)
|
yield(readq, write_proxy)
|
||||||
ensure
|
ensure
|
||||||
|
unregister_receive_queue(readq) if defined?(readq)
|
||||||
stop_w.write("*") if stop_w # Stop signal for the r_thread
|
stop_w.write("*") if stop_w # Stop signal for the r_thread
|
||||||
stop_w.close if stop_w && !stop_w.closed?
|
stop_w.close if stop_w && !stop_w.closed?
|
||||||
stop_r.close if stop_r && !stop_w.closed?
|
stop_r.close if stop_r && !stop_w.closed?
|
||||||
@@ -114,6 +116,7 @@ module QMPClient
|
|||||||
break if buf.nil? # Dispatching raised an error of some sort!
|
break if buf.nil? # Dispatching raised an error of some sort!
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
end
|
end
|
||||||
|
|
||||||
# Converts each message in +buf+ into an instance, and pushes it to each
|
# Converts each message in +buf+ into an instance, and pushes it to each
|
||||||
|
@@ -9,8 +9,6 @@ module TestQMPClient
|
|||||||
@deserialiser = mock("(deserialiser)")
|
@deserialiser = mock("(deserialiser)")
|
||||||
|
|
||||||
@connector = QMPClient::Connectors::Socket.new(@serialiser, @deserialiser)
|
@connector = QMPClient::Connectors::Socket.new(@serialiser, @deserialiser)
|
||||||
@msg_received_q = Queue.new
|
|
||||||
@connector.register_receive_queue(@msg_received_q)
|
|
||||||
|
|
||||||
@read_r, @read_w = IO.pipe
|
@read_r, @read_w = IO.pipe
|
||||||
@write_r, @write_w = IO.pipe
|
@write_r, @write_w = IO.pipe
|
||||||
@@ -23,8 +21,8 @@ module TestQMPClient
|
|||||||
|
|
||||||
def with_connector(reason="Run block", t=1, &blk)
|
def with_connector(reason="Run block", t=1, &blk)
|
||||||
assert_doesnt_time_out(t, reason) do
|
assert_doesnt_time_out(t, reason) do
|
||||||
@connector.run(@read_r, @write_w) do |writer|
|
@connector.run(@read_r, @write_w) do |reader, writer|
|
||||||
|
@read_interface = reader
|
||||||
@write_interface = writer
|
@write_interface = writer
|
||||||
yield @connector, writer
|
yield @connector, writer
|
||||||
end
|
end
|
||||||
@@ -50,8 +48,8 @@ module TestQMPClient
|
|||||||
|
|
||||||
def assert_client_saw(symbol, discard = 0)
|
def assert_client_saw(symbol, discard = 0)
|
||||||
assert_doesnt_time_out(1, "checking message #{symbol.inspect} was received from server") do
|
assert_doesnt_time_out(1, "checking message #{symbol.inspect} was received from server") do
|
||||||
discard.times { @msg_received_q.pop }
|
discard.times { @read_interface.pop }
|
||||||
assert_equal(symbol, @msg_received_q.pop)
|
assert_equal(symbol, @read_interface.pop)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
@@ -73,7 +71,7 @@ module TestQMPClient
|
|||||||
end
|
end
|
||||||
|
|
||||||
def test_writes_are_threadsafe
|
def test_writes_are_threadsafe
|
||||||
with_connector do |connector, writer|
|
with_connector do |reader, writer|
|
||||||
(1..5).each {|n| Thread.new { message_from_client(:"Message #{n}")}}
|
(1..5).each {|n| Thread.new { message_from_client(:"Message #{n}")}}
|
||||||
|
|
||||||
data = (1..5).collect {|n| @write_r.gets }
|
data = (1..5).collect {|n| @write_r.gets }
|
||||||
@@ -85,17 +83,17 @@ module TestQMPClient
|
|||||||
end
|
end
|
||||||
|
|
||||||
def test_register_unregister_receive_queue
|
def test_register_unregister_receive_queue
|
||||||
with_connector do |connector, writer|
|
with_connector do |reader, writer|
|
||||||
rq1 = Queue.new # We expect both of these to get both messages
|
rq1 = Queue.new # We expect both of these to get both messages
|
||||||
rq2 = Queue.new
|
rq2 = Queue.new
|
||||||
urq = Queue.new # We're going to unregister this one after one msg
|
urq = Queue.new # We're going to unregister this one after one msg
|
||||||
connector.register_receive_queue(rq1)
|
@connector.register_receive_queue(rq1)
|
||||||
connector.register_receive_queue(rq2)
|
@connector.register_receive_queue(rq2)
|
||||||
connector.register_receive_queue(urq)
|
@connector.register_receive_queue(urq)
|
||||||
|
|
||||||
message_from_server(:test_1)
|
message_from_server(:test_1)
|
||||||
assert_client_saw(:test_1)
|
assert_client_saw(:test_1)
|
||||||
connector.unregister_receive_queue(urq)
|
@connector.unregister_receive_queue(urq)
|
||||||
message_from_server(:test_2)
|
message_from_server(:test_2)
|
||||||
assert_client_saw(:test_2)
|
assert_client_saw(:test_2)
|
||||||
|
|
||||||
@@ -115,7 +113,7 @@ module TestQMPClient
|
|||||||
end
|
end
|
||||||
|
|
||||||
def test_run_block_finishes_cleanly_if_socket_reports_error
|
def test_run_block_finishes_cleanly_if_socket_reports_error
|
||||||
with_connector do |connector, writer|
|
with_connector do |reader, writer|
|
||||||
Kernel.expects(:select).once.returns([[], [], [@read_r]])
|
Kernel.expects(:select).once.returns([[], [], [@read_r]])
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
Reference in New Issue
Block a user