diff --git a/lib/qmp_client.rb b/lib/qmp_client.rb new file mode 100644 index 0000000..b5f856a --- /dev/null +++ b/lib/qmp_client.rb @@ -0,0 +1,35 @@ +require 'socket' + +require 'qmp_client/api' +require 'qmp_client/connectors' +require 'qmp_client/messages' + +# This library provides an interface to QEMU's QMP server, allowing you to +# manage and query running virtual machines. +# +# @author Nick Thomas +module QMPClient + + def self.connect_unix(filename, &blk) + sock = UNIXSocket.connect(filename) + connect_socket(sock, &blk) + ensure + sock.close if sock && !sock.closed? + end + + def self.connect_tcp(host, port, local_host=nil, local_port=nil, &blk) + sock = TCPSocket.connect(host, port, local_host, local_port) + connect_socket(sock, &blk) + ensure + sock.close if sock && !sock.closed? + end + + def self.connect_socket(read_socket, write_socket = nil, &blk) + conn = Connectors::Socket.new(QMPClient::Messages, QMPClient::Messages) + + conn.run(read_socket, write_socket) do |rq, wq| + API.run(rq, wq, &blk) + end + end + +end \ No newline at end of file diff --git a/lib/qmp_client/api.rb b/lib/qmp_client/api.rb new file mode 100644 index 0000000..d0bffe7 --- /dev/null +++ b/lib/qmp_client/api.rb @@ -0,0 +1,160 @@ +require 'thread' +require 'mutex_m' + +require 'qmp_client/messages' + +module QMPClient + + # This class implements the interface used to interact with a QMP server. + # It expects to be created with a pair of queues for message I/O - which is + # precisely the interface provided by Connectors::Socket#run + # + # @author Nick Thomas + class API + include Mutex_m + attr_reader :receive_queue + attr_reader :send_queue + + # API to a QMP server. + # @param[Queue] receive_q Queue in which received messages will appear + # @param[Queue] send_q Queue to push messages we want to send + # @param[Fixum] thread_pool_size number of threads to run + def initialize(receive_q, send_q) + super() + # Request ID generation + @rid_m = Mutex.new + @rid = 0 + + @receive_queue = receive_q + @send_queue = send_q + + @event_handlers = {} + @callbacks = {} + + @greeting_q = Queue.new + end + + def self.run(rq, wq, thread_pool_size=10, wait_for_greeting = true, &blk) + new(rq, wq).run(thread_pool_size, wait_for_greeting, &blk) + end + + def run(thread_pool_size, wait_for_greeting = true, &blk) + thread_pool = build_thread_pool(@receive_queue, thread_pool_size) + negotiate_capabilities! if wait_for_greeting + yield self + ensure + if thread_pool + thread_pool.size.times { @receive_queue.push(:close) } + thread_pool.each {|t| t.join rescue nil } + end + end + + # Takes a query to run, and yields the block when it's complete, or an error + # is raised. Returns as soon as the query is dispatched. + def query(name, &blk) + msg = Messages::Query.new(generate_request_id, name) + send_message(msg, &blk) + end + + # Convenience method - synchronous version of query + def sync_query(name) + sync_around(:query, name) + end + + # Takes a command to run, and yields the block when it's complete, or an + # error is raised. Returns as soon as the command is dispatched. + def command(name, args = nil, &blk) + msg = Messages::Command.new(generate_request_id, name, args) + send_message(msg, &blk) + end + + # Convenience method - synchronous version of command + def sync_command(name, args = nil) + sync_around(:command, name, args) + end + + # Registers a block to be run whenever an event is received. We will yield + # the block with the received message. + # + # You can specify only one handler per event. + def on_event(name, &blk) + synchronize do + Kernel.warn("Already registered handler for #{name}, overwriting") if + @event_handlers[name] + @event_handlers[name] = blk + end + end + + # Blocks until all outstanding callbacks have been processed. + def wait + raise NotImplementedError.new + end + + protected + + attr_reader :thread_pool + + # Wait for the server greeting and issue the qmp_capabilities command once + # it has been received. + def negotiate_capabilities! + @greeting_q.pop + sync_command('qmp_capabilities') + end + + def send_message(msg, &blk) + blk = Proc.new { } unless block_given? && blk + + synchronize { @callbacks[msg.request_id] = blk } + send_queue.push(msg) + end + + # Convenience method. Synchronous version of send_message. + def sync_send_message(msg) + sync_around(:send_message, msg) + end + + def sync_around(method, *args) + rq = Queue.new + send(method, *args) {|result| rq.push(result) } + rq.pop + end + + def generate_request_id + "%.8i" % @rid_m.synchronize { @rid += 1 } + end + + def build_thread_pool(rq, tp_size) + (1..(tp_size)).collect {|x| read_thread(rq, x)} + end + + # @param[Queue] rqueue receive queue to operate on + # @param[Object] x Indentifier for this read thread + def read_thread(rqueue, x) + Thread.new do + Thread.current.abort_on_exception = true # Ensure exceptions bubble out + loop do + msg = rqueue.pop + break if msg == :close + dispatch_message_to_callback(msg) + end + end + end + + # Checks the message type, looks to see if there's a callback that needs + # executing, and calls it if so. + def dispatch_message_to_callback(msg) + case msg + when Messages::Event + cb = synchronize { @event_handlers[msg.name] } + cb.call(msg) if cb && cb.respond_to?(:call) + when Messages::Greeting + @greeting_q.push(msg) + else + cb = synchronize { @callbacks.delete(msg.request_id) } + cb.call(msg) if cb && cb.respond_to?(:call) + end + end + + end + +end \ No newline at end of file diff --git a/lib/qmp_client/connectors/socket.rb b/lib/qmp_client/connectors/socket.rb index 94dec9b..524cdd3 100644 --- a/lib/qmp_client/connectors/socket.rb +++ b/lib/qmp_client/connectors/socket.rb @@ -137,7 +137,7 @@ module QMPClient messages.each {|msg| receive_queues.each {|q| q.push(msg)}} r_buf - rescue => err # Any error dispatching means we need to shut up shop + rescue # Any error dispatching means we need to shut up shop nil end diff --git a/lib/qmp_client/messages.rb b/lib/qmp_client/messages.rb new file mode 100644 index 0000000..ec59e21 --- /dev/null +++ b/lib/qmp_client/messages.rb @@ -0,0 +1,61 @@ +require 'json' + +require 'qmp_client/messages/command' +require 'qmp_client/messages/greeting' +require 'qmp_client/messages/event' +require 'qmp_client/messages/query' +require 'qmp_client/messages/reply' + +module QMPClient + + # Module containing all the Message classes. Also serves as a quick-and-dirty + # serialiser and deserialiser for them. + # + # @author Nick Thomas + module Messages + + # Raised when we're trying to convert raw text into a message, but failing + class ParserError < StandardError + attr_reader :upstream_error + def initialize(message, upstream) + super(message) + @upstream_error = upstream + end + end + + # converts raw data into a message class + def self.deserialise(text) + hsh = begin + JSON::parse(text) + rescue => err + raise ParserError.new("Provided text is not valid JSON", err) + end + + # ordered by frequency of receipt + return Reply::build(hsh) if Reply::represents?(hsh) + return Event::build(hsh) if Event::represents?(hsh) + return Command::build(hsh) if Command::represents?(hsh) + return Query::build(hsh) if Query::represents?(hsh) + return Greeting::build(hsh) if Greeting::represents?(hsh) + + + raise ParserError.new("JSON is valid but isn't a QMP message") + end + + # See: deserialise. + def self.deserialize(text) + deserialise(text) + end + + # converts a message into raw data + def self.serialise(message) + message.to_hash.to_json + end + + # See: serialise + def self.serialize(message) + serialise(message) + end + + end +end \ No newline at end of file diff --git a/lib/qmp_client/messages/command.rb b/lib/qmp_client/messages/command.rb new file mode 100644 index 0000000..e65a857 --- /dev/null +++ b/lib/qmp_client/messages/command.rb @@ -0,0 +1,42 @@ +require 'qmp_client/messages/message' + +module QMPClient + module Messages + class Command < Message + attr_reader :request_id + attr_reader :name + attr_reader :arguments + + # Does the hash represent a Command? + def self.represents?(hsh) + super(hsh) && + (hsh.keys - %w|id execute arguments|).empty? && + hsh['execute'].is_a?(String) && hsh['execute'] !~ /\Aquery-/ && + hsh['arguments'].nil? || hsh['arguments'].is_a?(Hash) + end + + def ==(other) + other.is_a?(Command) && [:request_id, :name, :arguments].all? do |m| + self.send(m) == other.send(m) + end + end + + def self.build(hsh) + new(hsh['id'], hsh['execute'], hsh['arguments']) + end + + def initialize(request_id, name, arguments = nil) + @request_id = request_id + @name = name + @arguments = arguments + end + + def to_hash + d = {'id' => request_id, 'execute' => name} + d['arguments'] = arguments unless arguments.nil? + d + end + + end + end +end diff --git a/lib/qmp_client/messages/event.rb b/lib/qmp_client/messages/event.rb new file mode 100644 index 0000000..6ecfc6d --- /dev/null +++ b/lib/qmp_client/messages/event.rb @@ -0,0 +1,80 @@ +require 'qmp_client/messages/message' + +module QMPClient + module Messages + class Event < Message + attr_reader :name + attr_reader :time + + def timestamp + time_to_hash(time) + end + def timestamp=(hsh) + @time = hash_to_time(hsh) + end + + attr_reader :data + + def ==(other) + other.is_a?(Event) && + other.name == self.name && + other.timestamp == self.timestamp && + other.data == self.data + end + + # Events look like: + # {"event" => event-name, ["data" => {event-specific-data},] + # "timestamp" => {"seconds" => int, "microseconds" => int} } + # FIXME: Make this tidy + def self.represents?(obj) + return false unless super(obj) + + keys_valid = (obj.keys - %w|event data timestamp|).empty? + event_valid = obj['event'].is_a?(String) + + ts = obj['timestamp'] + ts_valid = ( + ts.is_a?(Hash) && (ts.keys - %w|seconds microseconds|).empty? && + ts['seconds'].is_a?(Fixnum) && ts['microseconds'].is_a?(Fixnum) + ) + data_valid = obj['data'].nil? || obj['data'].is_a?(Hash) + + keys_valid && event_valid && ts_valid && data_valid + end + + # build an event from a Hash + def self.build(obj) + new(obj['event'], obj['timestamp'], obj['data']) + end + + def initialize(name, timestamp, data=nil) + @name = name + + @time = if timestamp.is_a?(Hash) + hash_to_time(timestamp) + else + timestamp + end + + @data = data + end + + def to_hash + d = {'event' => name, 'timestamp' => time_to_hash(time)} + d['data'] = data if data + d + end + + protected + + def hash_to_time(hsh) + Time.at(hsh['seconds'], hsh['microseconds']) + end + + def time_to_hash(time) + {'seconds' => time.tv_sec, 'microseconds' => time.tv_usec} + end + + end + end +end diff --git a/lib/qmp_client/messages/greeting.rb b/lib/qmp_client/messages/greeting.rb new file mode 100644 index 0000000..a8316e2 --- /dev/null +++ b/lib/qmp_client/messages/greeting.rb @@ -0,0 +1,46 @@ +require 'qmp_client/messages/message' + +module QMPClient + module Messages + class Greeting < Message + attr_reader :qmp_version + attr_reader :qmp_package + attr_reader :capabilities + + def self.represents?(hsh) + + super(hsh) && hsh['QMP'].is_a?(Hash) && + hsh['QMP']['version'].is_a?(Hash) && + hsh['QMP']['package'].is_a?(String) && + hsh['capabilities'].is_a?(Array) + end + + # build an event from a Hash + def self.build(hsh) + new(hsh['QMP']['version'], hsh['QMP']['package'], hsh['capabilities']) + end + + def ==(other) + other.is_a?(Greeting) && other.qmp_version == self.qmp_version && + other.qmp_package == self.qmp_package && + other.capabilities == self.capabilities + end + + def initialize(qmp_version, qmp_package, capabilities) + @qmp_version = qmp_version + @qmp_package = qmp_package + @capabilities = capabilities + end + + def to_hash + { + 'QMP' => {'version' => qmp_version, 'package' => qmp_package}, + 'capabilities' => capabilities + } + end + + protected + + end + end +end diff --git a/lib/qmp_client/messages/message.rb b/lib/qmp_client/messages/message.rb new file mode 100644 index 0000000..3aaa5d7 --- /dev/null +++ b/lib/qmp_client/messages/message.rb @@ -0,0 +1,21 @@ +module QMPClient + module Messages + class Message + + # Can we build a Message from the object? + def self.represents?(obj) + obj.is_a?(Hash) + end + + def represents?(obj) + self.class.represents?(obj) + end + # Create a new Message. + # @param[Hash] obj source data for the message. + def initialize + raise NotImplementedError.new("Can't instantiate base class") + end + + end + end +end diff --git a/lib/qmp_client/messages/query.rb b/lib/qmp_client/messages/query.rb new file mode 100644 index 0000000..2fb3469 --- /dev/null +++ b/lib/qmp_client/messages/query.rb @@ -0,0 +1,41 @@ +require 'qmp_client/messages/message' + +module QMPClient + module Messages + class Query < Message + attr_reader :request_id + attr_reader :name + + def self.represents?(hsh) + super(hsh) && + (hsh.keys - %w|id execute|).empty? && + hsh['execute'].is_a?(String) && hsh['execute'] =~ /\Aquery-/ + end + + def ==(other) + other.is_a?(Query) && self.request_id == other.request_id && + self.name == other.name + end + + def self.build(hsh) + name = if hsh['execute'].is_a?(String) + hsh['execute'].gsub(/\Aquery-/, "") + else + hsh['execute'] + end + + new(hsh['id'], name) + end + + def initialize(request_id, name) + @request_id = request_id + @name = name + end + + def to_hash + {'execute' => "query-#{name}", 'id' => request_id} + end + + end + end +end diff --git a/lib/qmp_client/messages/reply.rb b/lib/qmp_client/messages/reply.rb new file mode 100644 index 0000000..c675d2a --- /dev/null +++ b/lib/qmp_client/messages/reply.rb @@ -0,0 +1,34 @@ +require 'qmp_client/messages/message' + +module QMPClient + module Messages + class Reply < Message + attr_reader :request_id + attr_reader :return_value + + def self.represents?(hsh) + super(hsh) && + (hsh.keys - %w|id return|).empty? && !hsh['return'].nil? + end + + def ==(other) + other.is_a?(Reply) && self.request_id == other.request_id && + self.return_value == other.return_value + end + + def self.build(hsh) + new(hsh['id'], hsh['return']) + end + + def initialize(request_id, return_value) + @request_id = request_id + @return_value = return_value + end + + def to_hash + {'id' => request_id, 'return' => return_value} + end + + end + end +end diff --git a/test/unit/qmp_client/test_api.rb b/test/unit/qmp_client/test_api.rb new file mode 100644 index 0000000..a5c203a --- /dev/null +++ b/test/unit/qmp_client/test_api.rb @@ -0,0 +1,146 @@ +require 'helper' +require 'qmp_client/api' + +module TestQMPClient + + class TestAPI < QMPClientTestCase + + include ::QMPClient + + def setup + super + + @read_q = Queue.new + @write_q = Queue.new + @api = QMPClient::API.new(@read_q, @write_q) + end + + def written_message + assert_doesnt_time_out(1) { @write_q.pop } + end + + def greeting_from_server + @read_q.push(Messages::Greeting.new({}, "", [])) + end + + def reply_from_server(request_id, return_value) + @read_q.push(Messages::Reply.new(request_id, return_value)) + end + + def event_from_server(event_name, data=nil) + @read_q.push(Messages::Event.new(event_name, Time.now, data)) + end + + def with_api(tps=1, &blk) + @api.run(tps, false, &blk) + end + + def assert_reply(msg, rid, val = nil) + assert_kind_of(Messages::Reply, msg) + assert_equal(rid, msg.request_id) + assert_equal(val, msg.return_value) + end + + def test_run + @api.run(1, false) do |api| + assert_equal(@api, api, "run doesn't yield API") + end + end + + def test_wait_for_greeting + running = false + endq = Queue.new + + greeting_from_server + t = Thread.new do + @api.run(1, true) do |api| + running = true + end + end + + msg = written_message + assert_kind_of(Messages::Command, msg) + assert_equal('qmp_capabilities', msg.name) + reply_from_server(msg.request_id, {}) + assert_doesnt_time_out(1, "Waiting for run block to execute") { t.join } + assert(running, "Run block was never executed") + end + + def test_query + with_api do |api| + rsp = nil + api.query('foo') {|m| rsp = m } + msg = written_message + assert_kind_of(Messages::Query, msg) + assert(msg.request_id) + assert_equal('foo', msg.name) + reply_from_server(msg.request_id, 'bar') + sleep(0.1) until rsp + assert_reply(rsp, msg.request_id, 'bar') + end + end + + def test_sync_query + with_api do |api| + rsp = nil + t = Thread.new { rsp = api.sync_query('foo') } + + msg = written_message + assert_kind_of(Messages::Query, msg) + assert(msg.request_id) + assert_equal('foo', msg.name) + reply_from_server(msg.request_id, 'bar') + t.join + assert_reply(rsp, msg.request_id, 'bar') + end + end + + def test_command + with_api do |api| + rsp = nil + api.command('foo', {'arg1' => 'val1'}) {|m| rsp = m } + msg = written_message + assert_kind_of(Messages::Command, msg) + assert(msg.request_id) + assert_equal('foo', msg.name) + assert_equal({'arg1' => 'val1'}, msg.arguments) + reply_from_server(msg.request_id, 'bar') + sleep(0.1) until rsp + assert_reply(rsp, msg.request_id, 'bar') + end + end + + def test_sync_command + with_api do |api| + rsp = nil + t = Thread.new { rsp = api.sync_command('foo', {'a' => 'b'}) } + + msg = written_message + assert(msg.request_id) + assert_equal('foo', msg.name) + assert_equal({'a' => 'b'}, msg.arguments) + reply_from_server(msg.request_id, 'bar') + t.join + assert_reply(rsp, msg.request_id, 'bar') + end + end + + def test_on_event + with_api do |api| + rsp = nil + api.on_event("FOO") {|e| rsp = e } + event_from_server('FOO', {'a' => 'b'}) + sleep(0.1) until rsp + assert_kind_of(Messages::Event, rsp) + assert_equal('FOO', rsp.name) + assert_kind_of(Time, rsp.time) + assert_equal({'a' => 'b'}, rsp.data) + end + end + + def test_wait + skip("Not implemented") + end + + end +end \ No newline at end of file diff --git a/test/unit/qmp_client/test_messages.rb b/test/unit/qmp_client/test_messages.rb new file mode 100644 index 0000000..ff9735d --- /dev/null +++ b/test/unit/qmp_client/test_messages.rb @@ -0,0 +1,84 @@ +require 'helper' + +require 'qmp_client/messages' + +module TestQMPClient + module TestMessages + class TestModuleMethods < QMPClientTestCase + include QMPClient + + COMMAND_MSG = Messages::Command.new('00000001', 'foo', {'a' => 'b'}) + COMMAND_HSH = {'execute' => 'foo', 'arguments' => {'a'=>'b'}, 'id' => '00000001'} + COMMAND_TXT = COMMAND_HSH.to_json + + GREETING_MSG = Messages::Greeting.new( + {'qemu' => {'micro' => 50, 'minor' => 13, 'major' => 0}}, + "", [] + ) + GREETING_HSH = { + 'QMP' => { + 'version' => {'qemu' => {'micro' => 50, 'minor' => 13, 'major' => 0}}, + 'package' => "", + }, + 'capabilities' => [] + } + GREETING_TXT = GREETING_HSH.to_json + + ETIME = Time.now + EVENT_MSG = Messages::Event.new('foo', ETIME, {'a' => 'b'}) + EVENT_HSH = { + 'event' => 'foo', + 'timestamp' => {'seconds' => ETIME.tv_sec, 'microseconds' => ETIME.tv_usec}, + 'data' => {'a' => 'b'} + } + EVENT_TXT = EVENT_HSH.to_json + + QUERY_MSG = Messages::Query.new('00000001', 'foo') + QUERY_HSH = { + 'execute' => 'query-foo', + 'id' => '00000001' + } + QUERY_TXT = QUERY_HSH.to_json + + REPLY_MSG = Messages::Reply.new('00000001', {'foo' => 'bar'}) + REPLY_HSH = { + 'id' => '00000001', + 'return' => {'foo' => 'bar'} + } + REPLY_TXT = REPLY_HSH.to_json + + [ + [Messages::Command, COMMAND_MSG, COMMAND_HSH, COMMAND_TXT], + [Messages::Greeting, GREETING_MSG, GREETING_HSH, GREETING_TXT], + [Messages::Event, EVENT_MSG, EVENT_HSH, EVENT_TXT], + [Messages::Query, QUERY_MSG, QUERY_HSH, QUERY_TXT], + [Messages::Reply, REPLY_MSG, REPLY_HSH, REPLY_TXT] + ].each do |kls, msg_instance, msg_hash, msg_json| + + t = kls.to_s.split("::")[-1].downcase + + define_method("test_serialise_#{t}") do + data = Messages::serialise(msg_instance) + assert_kind_of(String, data) + rsp = JSON::parse(data) # can't compare the strings directly + assert_equal(msg_hash, rsp) + end + + define_method("test_deserialise_#{t}") do + created = Messages::deserialise(msg_json) + assert_kind_of(kls, created) + assert_equal(msg_hash, created.to_hash) + assert_equal(msg_instance, created) + end + + define_method("test_#{t}_to_hash_and_build") do + created = kls.build(msg_hash) + assert_equal(msg_hash, created.to_hash) + assert_equal(msg_instance, created) + + end + end + + end + end +end \ No newline at end of file diff --git a/test/unit/test_qmp_client.rb b/test/unit/test_qmp_client.rb new file mode 100644 index 0000000..cc499e3 --- /dev/null +++ b/test/unit/test_qmp_client.rb @@ -0,0 +1,82 @@ +require 'helper' +require 'qmp_client' + +module TestQMPClient + class TestModuleMethods < BaseTestCase + + include QMPClient + + def test_require_loads_main_constants + assert(defined?(::QMPClient), "QMPClient not defined") + assert(defined?(::QMPClient::API), "QMPClient::API not defined") + assert(defined?(::QMPClient::Connectors), "Connectors not defined") + assert(defined?(::QMPClient::Connectors::Socket), "Socket connector not defined") + assert(defined?(::QMPClient::Connectors::WriteProxy), "WriteProxy not defined") + assert(defined?(::QMPClient::Messages), "Messages module not defined") + assert(defined?(::QMPClient::Messages::Query), "Query message not defined") + assert(defined?(::QMPClient::Messages::Command), "Command message not defined") + assert(defined?(::QMPClient::Messages::Event), "Event message not defined") + assert(defined?(::QMPClient::Messages::Reply), "Greeting message not defined") + end + + def expects_socket_run(rio, wio=nil) + rq, wq = [Queue.new, Queue.new] + Connectors::Socket.any_instance.expects(:run).with(rio, wio). + once.yields(rq, wq) + API.expects(:run).with(rq, wq).once.yields("(mock-api)") + end + + def test_connect_tcp + cargs = ["127.0.0.1", 4440, "127.0.0.1", 40000] + mock_sock = mock("(tcp-socket)") + mock_sock.stubs(:closed? => false, :close => true) + TCPSocket.expects(:connect).with(*cargs).once.returns(mock_sock) + expects_socket_run(mock_sock) + + runs_api_block = false + QMPClient::connect_tcp(*cargs) do |api| + runs_api_block = true + end + + assert(runs_api_block, "API not yielded by connect_tcp") + end + + def test_connect_unix + mock_sock = mock("(unix-socket)") + mock_sock.stubs(:closed? => false, :close => true) + UNIXSocket.expects(:connect).with("/tmp/test.sock").once.returns(mock_sock) + + expects_socket_run(mock_sock) + + runs_api_block = false + QMPClient::connect_unix("/tmp/test.sock") do |api| + runs_api_block = true + end + + assert(runs_api_block, "API not yielded by connect_unix") + end + + def test_connect_socket + rs, ws = [mock("(read-socket)"), mock("(write-socket)")] + expects_socket_run(rs, ws) + + runs_api_block = false + QMPClient::connect_socket(rs, ws) do |api| + runs_api_block = true + end + + assert(runs_api_block, "API not yielded by connect_socket") + + # If we pass in just one socket, it's used for both read and write + rw = mock("(rw-socket)") + runs_api_block = false + expects_socket_run(rw) + + QMPClient::connect_socket(rw) do + runs_api_block = true + end + assert(runs_api_block, "API not yielded by connect_socket with one sock") + end + + end +end