commit 1688e5545988f724e4a86b7d80c8280d5a1b81e7 Author: Nick Thomas Date: Sat Nov 12 21:45:14 2011 +0000 Project framework + socket connector The socket connector is responsible for reading from a socket and converting the lines that were read to messages, using a passed-in deserialiser. It also provides an interface to push messages onto the socket, via a serialiser which is also passed in. diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..14bc68c --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/nbproject/private/ \ No newline at end of file diff --git a/Rakefile b/Rakefile new file mode 100644 index 0000000..f4494a7 --- /dev/null +++ b/Rakefile @@ -0,0 +1,59 @@ +require 'rubygems' +require 'rake' +require 'rake/clean' +require 'rake/gempackagetask' +require 'rake/rdoctask' +require 'rake/testtask' + +spec = Gem::Specification.new do |s| + s.name = 'qmp_client' + s.version = '0.0.1' + s.has_rdoc = true + s.extra_rdoc_files = ['README', 'LICENSE'] + s.summary = 'QMP client library ' + s.description = "Library that interfaces with running QEMU processes over QMP" + s.author = 'Nicholas Thomas' + s.email = 'nick@lupine.me.uk' + # s.executables = ['your_executable_here'] + s.files = %w(LICENSE README Rakefile) + Dir.glob("{bin,lib,spec,test}/**/*") + s.require_path = "lib" + s.bindir = "bin" +end + +Rake::GemPackageTask.new(spec) do |p| + p.gem_spec = spec + p.need_tar = true + p.need_zip = true +end + +Rake::RDocTask.new do |rdoc| + files =['README', 'LICENSE', 'lib/**/*.rb'] + rdoc.rdoc_files.add(files) + rdoc.main = "README" # page to start on + rdoc.title = "qmp-client Docs" + rdoc.rdoc_dir = 'doc/rdoc' # rdoc output folder + rdoc.options << '--line-numbers' +end + +Rake::TestTask.new do |t| + t.libs << ["test/unit"] + t.test_files = FileList['test/unit/**/test_*.rb'] + t.options="-v" + t.name = "test:unit" +end + +SCHEMA_HOST = 'http://git.kernel.org/' +SCHEMA_QUERY = 'p=virt/kvm/qemu-kvm.git;a=blob_plain;f=qapi-schema.json' + +SCHEMA_LOC = [SCHEMA_HOST, SCHEMA_QUERY].join('?') +SCHEMA_OUTPUT = File.join( + Rake::application.original_dir, + 'lib', 'qmp_client', 'qapi-schema.json' +) + +namespace :dev do + desc "Download the QMP schema filelinux-x86_64/pty netbeans" + task :fetch_schema do + `wget '#{SCHEMA_LOC}' -O '#{SCHEMA_OUTPUT}'` + end +end diff --git a/lib/qmp_client/connectors.rb b/lib/qmp_client/connectors.rb new file mode 100644 index 0000000..c86ee6d --- /dev/null +++ b/lib/qmp_client/connectors.rb @@ -0,0 +1,17 @@ +require 'qmp_client/connectors/socket' + +module QMPClient + + # The point of the connectors is to provide a unified interface to the varied + # interfaces a QMP server can have. They are: + # 1) TCP/UNIX socket + # 2) STDIN+STDOUT / arbitrary fds + # 3) mock interface (for tests) + # 4) Anything else that might pop up! + # + # @author Nick Thomas + module Connectors + + end + +end \ No newline at end of file diff --git a/lib/qmp_client/connectors/socket.rb b/lib/qmp_client/connectors/socket.rb new file mode 100644 index 0000000..551b1b1 --- /dev/null +++ b/lib/qmp_client/connectors/socket.rb @@ -0,0 +1,143 @@ +require 'mutex_m' + +module QMPClient + module Connectors + + # We yield an instance of this call in the run method of the connectors - + # if someone wants to send a message, they call push on it with the message. + # + # This blocks until the data is on the wire, which isn't necessarily a bad + # this. + # + # @author Nick Thomas + class WriteProxy + include Mutex_m + + def initialize(w_socket, serialiser) + super() + @w_socket = w_socket + @serialiser = serialiser + end + + def push(msg) + synchronize do + @w_socket.puts(@serialiser.serialise(msg)) + @w_socket.flush + end + end + end + + + # Handles a subclass of BasicSocket, proactively reading data from it and + # converting it into a stream of Messages, and writing serialised Messages + # to it on request. + # + # @author Nick Thomas + class Socket + + # @param[#serialise] serialiser Object to convert message instances to + # the wire format + # @param[#deserialise] deserialiser Object to convert wire data to message + # instances. + def initialize(serialiser, deserialiser) + @serialiser = serialiser + @deserialiser = deserialiser + + @receive_queues = [] + end + + # Call this with an object responding to +push+ and every time a message + # is received, we will pass it on. Multiple queues can be registered. + # Messages are pushed onto the queue as hashes. + def register_receive_queue(q) + @receive_queues.push(q) + end + + def unregister_receive_queue(q) + @receive_queues.delete(q) + end + + # Starts reading from / writing to the socket as appropriate, yields the + # block, and stops reading/writing the socket once the block returns. + # @param[IO] read_socket Socket to read data from + # @param[IO] write_socket Socket to write data to. Use read_socket if nil + # @yields[WriteProxy] writer Push messages to this to get them on the wire + def run(read_socket, write_socket=nil, &blk) + write_socket ||= read_socket + write_proxy = WriteProxy.new(write_socket, @serialiser) + begin + stop_r, stop_w = IO.pipe + r_thread = read_thread(read_socket, stop_r) + yield(write_proxy) + ensure + stop_w.write("*") if stop_w # Stop signal for the r_thread + stop_w.close if stop_w && !stop_w.closed? + stop_r.close if stop_r && !stop_w.closed? + r_thread.join if r_thread && r_thread.alive? + end + end + + protected + attr_reader :deserialiser + attr_reader :serialiser + attr_reader :receive_queues + + # Reads from r_sock and turns the data into messages (which get pushed + # out via +register_receive_queue+) until either the socket raises an + # error, or the stop socket becomes readable. + def read_thread(r_sock, stop_socket) + Thread.new do + buf = "" + socks = [r_sock, stop_socket] + loop do + readables, _, errored = begin + Kernel.select(socks, nil, socks) + rescue IOError # socket closed between run and this call + break + end + # exit conditions + break if readables.include?(stop_socket) + break unless errored.empty? + + # Should never happen, but anything's possible + next unless readables.include?(r_sock) + + # Pull the data from the socket, taking care of errors + begin + buf << r_sock.read_nonblock(8192) + rescue IO::WaitReadable + next # socket isn't ready to read + # TODO: other possible errors + end + + buf = dispatch_messages_from_buffer(buf) + break if buf.nil? # Dispatching raised an error of some sort! + end + end + end + + # Converts each message in +buf+ into an instance, and pushes it to each + # of the receive queues. + # @param[String] buf String buffer + # @return[nil,String] Either the unparsed component of the data, or nil + # if an error has occured + def dispatch_messages_from_buffer(buf) + lines = buf.split("\n") + + r_buf = if buf[-1..-1] == "\n" # Every element is a message + "" + else # The last element is a partial message, so put it back + lines.pop + end + + messages = lines.collect {|line| deserialiser.deserialise(line) } + messages.each {|msg| receive_queues.each {|q| q.push(msg)}} + + r_buf + rescue => err # Any error dispatching means we need to shut up shop + nil + end + + end + end +end diff --git a/test/unit/helper.rb b/test/unit/helper.rb new file mode 100644 index 0000000..b97bd73 --- /dev/null +++ b/test/unit/helper.rb @@ -0,0 +1,35 @@ +ROOT = File.expand_path(File.join(File.basename(__FILE__), '..')) + +$: << File.join(ROOT, 'lib') + +require 'minitest/unit' +require 'minitest/autorun' +require 'mocha' + +require 'timeout' # assert_doesnt_time_out + +module EnvHelpers + def silence_warnings + old = $VERBOSE + $VERBOSE = nil + yield + ensure + $VERBOSE = old + end +end + +class BaseTestCase < MiniTest::Unit::TestCase + include EnvHelpers + + def assert_doesnt_time_out(n, reason = "", &blk) + + Timeout::timeout(n, &blk) + rescue TimeoutError => err + r_full = [reason, "(timed out after #{n}s)", "\n" + err.backtrace.join("\n")].join(" ") + raise MiniTest::Assertion.new(r_full) + end +end + +class QMPClientTestCase < BaseTestCase + +end \ No newline at end of file diff --git a/test/unit/qmp_client/connectors/test_socket.rb b/test/unit/qmp_client/connectors/test_socket.rb new file mode 100644 index 0000000..beb8e02 --- /dev/null +++ b/test/unit/qmp_client/connectors/test_socket.rb @@ -0,0 +1,125 @@ +require 'helper' +require 'qmp_client/connectors/socket' + +module TestQMPClient + module TestConnectors + class TestSocket < QMPClientTestCase + def setup + @serialiser = mock("(serialiser)") + @deserialiser = mock("(deserialiser)") + + @connector = QMPClient::Connectors::Socket.new(@serialiser, @deserialiser) + @msg_received_q = Queue.new + @connector.register_receive_queue(@msg_received_q) + + @read_r, @read_w = IO.pipe + @write_r, @write_w = IO.pipe + end + + def teardown + [@read_r, @read_w, @write_r, @write_w].each {|s| s.close unless s.closed? } + super + end + + def with_connector(reason="Run block", t=1, &blk) + assert_doesnt_time_out(t, reason) do + @connector.run(@read_r, @write_w) do |writer| + + @write_interface = writer + yield @connector, writer + end + end + end + + def message_from_server(symbol) + @deserialiser.expects(:deserialise).with(symbol.to_s).once. + returns(symbol) + @read_w.puts symbol.to_s + end + + def assert_server_saw(symbol) + assert_doesnt_time_out(1, "Checking message #{symbol.inspect} was sent to server") do + assert_equal(symbol.to_s + "\n", @write_r.gets) + end + end + + def message_from_client(symbol) + @serialiser.expects(:serialise).with(symbol).once.returns(symbol.to_s) + @write_interface.push(symbol) + end + + def assert_client_saw(symbol, discard = 0) + assert_doesnt_time_out(1, "checking message #{symbol.inspect} was received from server") do + discard.times { @msg_received_q.pop } + assert_equal(symbol, @msg_received_q.pop) + end + end + + + def test_run_fires_up_read_and_write_sides_and_operates_correctly + with_connector do + message_from_client(:test_1) + assert_server_saw(:test_1) + + message_from_server(:test_2) + assert_client_saw(:test_2) + end + + # After shutting down like this, all our sockets should be open + refute(@read_r.closed?, "read side of read socket closed") + refute(@read_w.closed?, "write side of read socket closed") + refute(@write_r.closed?, "read side of write socket closed") + refute(@write_w.closed?, "write side of write socket closed") + end + + def test_writes_are_threadsafe + with_connector do |connector, writer| + (1..5).each {|n| Thread.new { message_from_client(:"Message #{n}")}} + + data = (1..5).collect {|n| @write_r.gets } + assert_equal( + ["Message 1\n", "Message 2\n", "Message 3\n", "Message 4\n", "Message 5\n"], + data.sort + ) + end + end + + def test_register_unregister_receive_queue + with_connector do |connector, writer| + rq1 = Queue.new # We expect both of these to get both messages + rq2 = Queue.new + urq = Queue.new # We're going to unregister this one after one msg + connector.register_receive_queue(rq1) + connector.register_receive_queue(rq2) + connector.register_receive_queue(urq) + + message_from_server(:test_1) + assert_client_saw(:test_1) + connector.unregister_receive_queue(urq) + message_from_server(:test_2) + assert_client_saw(:test_2) + + [rq1, rq2].each do |q| + assert_equal(2, q.size, "receive q missed a message") + assert_equal(:test_1, q.pop) + assert_equal(:test_2, q.pop) + end + assert_equal(1, urq.size, "Unregister queue has wrong # of messages") + assert_equal(:test_1, urq.pop, "Unregister queue has wrong message") + end + end + + # This closes the socket before Kernel#select can get to it + def test_closing_socket_finishes_run_block_cleanly + with_connector { @read_r.close } + end + + def test_run_block_finishes_cleanly_if_socket_reports_error + with_connector do |connector, writer| + Kernel.expects(:select).once.returns([[], [], [@read_r]]) + end + end + + end + end +end