Initial, simple, unit-tested implementation of QMPClient.

This follows the README given earlier, at least in principle, but
doesn't implement the entirety of QMP, by any stretch of the
imagination.

Notable by their absence are error responses, argument validation
(for incoming and outgoing messsages of all types), and any visibility
into qmp_capabilities.

Also missing are integration tests.
This commit is contained in:
Nick Thomas
2011-11-13 18:36:51 +00:00
parent 7777e5cacb
commit 51175ddf56
13 changed files with 833 additions and 1 deletions

35
lib/qmp_client.rb Normal file
View File

@@ -0,0 +1,35 @@
require 'socket'
require 'qmp_client/api'
require 'qmp_client/connectors'
require 'qmp_client/messages'
# This library provides an interface to QEMU's QMP server, allowing you to
# manage and query running virtual machines.
#
# @author Nick Thomas <nick@lupine.me.uk>
module QMPClient
def self.connect_unix(filename, &blk)
sock = UNIXSocket.connect(filename)
connect_socket(sock, &blk)
ensure
sock.close if sock && !sock.closed?
end
def self.connect_tcp(host, port, local_host=nil, local_port=nil, &blk)
sock = TCPSocket.connect(host, port, local_host, local_port)
connect_socket(sock, &blk)
ensure
sock.close if sock && !sock.closed?
end
def self.connect_socket(read_socket, write_socket = nil, &blk)
conn = Connectors::Socket.new(QMPClient::Messages, QMPClient::Messages)
conn.run(read_socket, write_socket) do |rq, wq|
API.run(rq, wq, &blk)
end
end
end

160
lib/qmp_client/api.rb Normal file
View File

@@ -0,0 +1,160 @@
require 'thread'
require 'mutex_m'
require 'qmp_client/messages'
module QMPClient
# This class implements the interface used to interact with a QMP server.
# It expects to be created with a pair of queues for message I/O - which is
# precisely the interface provided by Connectors::Socket#run
#
# @author Nick Thomas <nick@lupine.me.uk>
class API
include Mutex_m
attr_reader :receive_queue
attr_reader :send_queue
# API to a QMP server.
# @param[Queue] receive_q Queue in which received messages will appear
# @param[Queue] send_q Queue to push messages we want to send
# @param[Fixum] thread_pool_size number of threads to run
def initialize(receive_q, send_q)
super()
# Request ID generation
@rid_m = Mutex.new
@rid = 0
@receive_queue = receive_q
@send_queue = send_q
@event_handlers = {}
@callbacks = {}
@greeting_q = Queue.new
end
def self.run(rq, wq, thread_pool_size=10, wait_for_greeting = true, &blk)
new(rq, wq).run(thread_pool_size, wait_for_greeting, &blk)
end
def run(thread_pool_size, wait_for_greeting = true, &blk)
thread_pool = build_thread_pool(@receive_queue, thread_pool_size)
negotiate_capabilities! if wait_for_greeting
yield self
ensure
if thread_pool
thread_pool.size.times { @receive_queue.push(:close) }
thread_pool.each {|t| t.join rescue nil }
end
end
# Takes a query to run, and yields the block when it's complete, or an error
# is raised. Returns as soon as the query is dispatched.
def query(name, &blk)
msg = Messages::Query.new(generate_request_id, name)
send_message(msg, &blk)
end
# Convenience method - synchronous version of query
def sync_query(name)
sync_around(:query, name)
end
# Takes a command to run, and yields the block when it's complete, or an
# error is raised. Returns as soon as the command is dispatched.
def command(name, args = nil, &blk)
msg = Messages::Command.new(generate_request_id, name, args)
send_message(msg, &blk)
end
# Convenience method - synchronous version of command
def sync_command(name, args = nil)
sync_around(:command, name, args)
end
# Registers a block to be run whenever an event is received. We will yield
# the block with the received message.
#
# You can specify only one handler per event.
def on_event(name, &blk)
synchronize do
Kernel.warn("Already registered handler for #{name}, overwriting") if
@event_handlers[name]
@event_handlers[name] = blk
end
end
# Blocks until all outstanding callbacks have been processed.
def wait
raise NotImplementedError.new
end
protected
attr_reader :thread_pool
# Wait for the server greeting and issue the qmp_capabilities command once
# it has been received.
def negotiate_capabilities!
@greeting_q.pop
sync_command('qmp_capabilities')
end
def send_message(msg, &blk)
blk = Proc.new { } unless block_given? && blk
synchronize { @callbacks[msg.request_id] = blk }
send_queue.push(msg)
end
# Convenience method. Synchronous version of send_message.
def sync_send_message(msg)
sync_around(:send_message, msg)
end
def sync_around(method, *args)
rq = Queue.new
send(method, *args) {|result| rq.push(result) }
rq.pop
end
def generate_request_id
"%.8i" % @rid_m.synchronize { @rid += 1 }
end
def build_thread_pool(rq, tp_size)
(1..(tp_size)).collect {|x| read_thread(rq, x)}
end
# @param[Queue] rqueue receive queue to operate on
# @param[Object] x Indentifier for this read thread
def read_thread(rqueue, x)
Thread.new do
Thread.current.abort_on_exception = true # Ensure exceptions bubble out
loop do
msg = rqueue.pop
break if msg == :close
dispatch_message_to_callback(msg)
end
end
end
# Checks the message type, looks to see if there's a callback that needs
# executing, and calls it if so.
def dispatch_message_to_callback(msg)
case msg
when Messages::Event
cb = synchronize { @event_handlers[msg.name] }
cb.call(msg) if cb && cb.respond_to?(:call)
when Messages::Greeting
@greeting_q.push(msg)
else
cb = synchronize { @callbacks.delete(msg.request_id) }
cb.call(msg) if cb && cb.respond_to?(:call)
end
end
end
end

View File

@@ -137,7 +137,7 @@ module QMPClient
messages.each {|msg| receive_queues.each {|q| q.push(msg)}}
r_buf
rescue => err # Any error dispatching means we need to shut up shop
rescue # Any error dispatching means we need to shut up shop
nil
end

View File

@@ -0,0 +1,61 @@
require 'json'
require 'qmp_client/messages/command'
require 'qmp_client/messages/greeting'
require 'qmp_client/messages/event'
require 'qmp_client/messages/query'
require 'qmp_client/messages/reply'
module QMPClient
# Module containing all the Message classes. Also serves as a quick-and-dirty
# serialiser and deserialiser for them.
#
# @author Nick Thomas <nick@lupine.me.uk>
module Messages
# Raised when we're trying to convert raw text into a message, but failing
class ParserError < StandardError
attr_reader :upstream_error
def initialize(message, upstream)
super(message)
@upstream_error = upstream
end
end
# converts raw data into a message class
def self.deserialise(text)
hsh = begin
JSON::parse(text)
rescue => err
raise ParserError.new("Provided text is not valid JSON", err)
end
# ordered by frequency of receipt
return Reply::build(hsh) if Reply::represents?(hsh)
return Event::build(hsh) if Event::represents?(hsh)
return Command::build(hsh) if Command::represents?(hsh)
return Query::build(hsh) if Query::represents?(hsh)
return Greeting::build(hsh) if Greeting::represents?(hsh)
raise ParserError.new("JSON is valid but isn't a QMP message")
end
# See: deserialise.
def self.deserialize(text)
deserialise(text)
end
# converts a message into raw data
def self.serialise(message)
message.to_hash.to_json
end
# See: serialise
def self.serialize(message)
serialise(message)
end
end
end

View File

@@ -0,0 +1,42 @@
require 'qmp_client/messages/message'
module QMPClient
module Messages
class Command < Message
attr_reader :request_id
attr_reader :name
attr_reader :arguments
# Does the hash represent a Command?
def self.represents?(hsh)
super(hsh) &&
(hsh.keys - %w|id execute arguments|).empty? &&
hsh['execute'].is_a?(String) && hsh['execute'] !~ /\Aquery-/ &&
hsh['arguments'].nil? || hsh['arguments'].is_a?(Hash)
end
def ==(other)
other.is_a?(Command) && [:request_id, :name, :arguments].all? do |m|
self.send(m) == other.send(m)
end
end
def self.build(hsh)
new(hsh['id'], hsh['execute'], hsh['arguments'])
end
def initialize(request_id, name, arguments = nil)
@request_id = request_id
@name = name
@arguments = arguments
end
def to_hash
d = {'id' => request_id, 'execute' => name}
d['arguments'] = arguments unless arguments.nil?
d
end
end
end
end

View File

@@ -0,0 +1,80 @@
require 'qmp_client/messages/message'
module QMPClient
module Messages
class Event < Message
attr_reader :name
attr_reader :time
def timestamp
time_to_hash(time)
end
def timestamp=(hsh)
@time = hash_to_time(hsh)
end
attr_reader :data
def ==(other)
other.is_a?(Event) &&
other.name == self.name &&
other.timestamp == self.timestamp &&
other.data == self.data
end
# Events look like:
# {"event" => event-name, ["data" => {event-specific-data},]
# "timestamp" => {"seconds" => int, "microseconds" => int} }
# FIXME: Make this tidy
def self.represents?(obj)
return false unless super(obj)
keys_valid = (obj.keys - %w|event data timestamp|).empty?
event_valid = obj['event'].is_a?(String)
ts = obj['timestamp']
ts_valid = (
ts.is_a?(Hash) && (ts.keys - %w|seconds microseconds|).empty? &&
ts['seconds'].is_a?(Fixnum) && ts['microseconds'].is_a?(Fixnum)
)
data_valid = obj['data'].nil? || obj['data'].is_a?(Hash)
keys_valid && event_valid && ts_valid && data_valid
end
# build an event from a Hash
def self.build(obj)
new(obj['event'], obj['timestamp'], obj['data'])
end
def initialize(name, timestamp, data=nil)
@name = name
@time = if timestamp.is_a?(Hash)
hash_to_time(timestamp)
else
timestamp
end
@data = data
end
def to_hash
d = {'event' => name, 'timestamp' => time_to_hash(time)}
d['data'] = data if data
d
end
protected
def hash_to_time(hsh)
Time.at(hsh['seconds'], hsh['microseconds'])
end
def time_to_hash(time)
{'seconds' => time.tv_sec, 'microseconds' => time.tv_usec}
end
end
end
end

View File

@@ -0,0 +1,46 @@
require 'qmp_client/messages/message'
module QMPClient
module Messages
class Greeting < Message
attr_reader :qmp_version
attr_reader :qmp_package
attr_reader :capabilities
def self.represents?(hsh)
super(hsh) && hsh['QMP'].is_a?(Hash) &&
hsh['QMP']['version'].is_a?(Hash) &&
hsh['QMP']['package'].is_a?(String) &&
hsh['capabilities'].is_a?(Array)
end
# build an event from a Hash
def self.build(hsh)
new(hsh['QMP']['version'], hsh['QMP']['package'], hsh['capabilities'])
end
def ==(other)
other.is_a?(Greeting) && other.qmp_version == self.qmp_version &&
other.qmp_package == self.qmp_package &&
other.capabilities == self.capabilities
end
def initialize(qmp_version, qmp_package, capabilities)
@qmp_version = qmp_version
@qmp_package = qmp_package
@capabilities = capabilities
end
def to_hash
{
'QMP' => {'version' => qmp_version, 'package' => qmp_package},
'capabilities' => capabilities
}
end
protected
end
end
end

View File

@@ -0,0 +1,21 @@
module QMPClient
module Messages
class Message
# Can we build a Message from the object?
def self.represents?(obj)
obj.is_a?(Hash)
end
def represents?(obj)
self.class.represents?(obj)
end
# Create a new Message.
# @param[Hash] obj source data for the message.
def initialize
raise NotImplementedError.new("Can't instantiate base class")
end
end
end
end

View File

@@ -0,0 +1,41 @@
require 'qmp_client/messages/message'
module QMPClient
module Messages
class Query < Message
attr_reader :request_id
attr_reader :name
def self.represents?(hsh)
super(hsh) &&
(hsh.keys - %w|id execute|).empty? &&
hsh['execute'].is_a?(String) && hsh['execute'] =~ /\Aquery-/
end
def ==(other)
other.is_a?(Query) && self.request_id == other.request_id &&
self.name == other.name
end
def self.build(hsh)
name = if hsh['execute'].is_a?(String)
hsh['execute'].gsub(/\Aquery-/, "")
else
hsh['execute']
end
new(hsh['id'], name)
end
def initialize(request_id, name)
@request_id = request_id
@name = name
end
def to_hash
{'execute' => "query-#{name}", 'id' => request_id}
end
end
end
end

View File

@@ -0,0 +1,34 @@
require 'qmp_client/messages/message'
module QMPClient
module Messages
class Reply < Message
attr_reader :request_id
attr_reader :return_value
def self.represents?(hsh)
super(hsh) &&
(hsh.keys - %w|id return|).empty? && !hsh['return'].nil?
end
def ==(other)
other.is_a?(Reply) && self.request_id == other.request_id &&
self.return_value == other.return_value
end
def self.build(hsh)
new(hsh['id'], hsh['return'])
end
def initialize(request_id, return_value)
@request_id = request_id
@return_value = return_value
end
def to_hash
{'id' => request_id, 'return' => return_value}
end
end
end
end

View File

@@ -0,0 +1,146 @@
require 'helper'
require 'qmp_client/api'
module TestQMPClient
class TestAPI < QMPClientTestCase
include ::QMPClient
def setup
super
@read_q = Queue.new
@write_q = Queue.new
@api = QMPClient::API.new(@read_q, @write_q)
end
def written_message
assert_doesnt_time_out(1) { @write_q.pop }
end
def greeting_from_server
@read_q.push(Messages::Greeting.new({}, "", []))
end
def reply_from_server(request_id, return_value)
@read_q.push(Messages::Reply.new(request_id, return_value))
end
def event_from_server(event_name, data=nil)
@read_q.push(Messages::Event.new(event_name, Time.now, data))
end
def with_api(tps=1, &blk)
@api.run(tps, false, &blk)
end
def assert_reply(msg, rid, val = nil)
assert_kind_of(Messages::Reply, msg)
assert_equal(rid, msg.request_id)
assert_equal(val, msg.return_value)
end
def test_run
@api.run(1, false) do |api|
assert_equal(@api, api, "run doesn't yield API")
end
end
def test_wait_for_greeting
running = false
endq = Queue.new
greeting_from_server
t = Thread.new do
@api.run(1, true) do |api|
running = true
end
end
msg = written_message
assert_kind_of(Messages::Command, msg)
assert_equal('qmp_capabilities', msg.name)
reply_from_server(msg.request_id, {})
assert_doesnt_time_out(1, "Waiting for run block to execute") { t.join }
assert(running, "Run block was never executed")
end
def test_query
with_api do |api|
rsp = nil
api.query('foo') {|m| rsp = m }
msg = written_message
assert_kind_of(Messages::Query, msg)
assert(msg.request_id)
assert_equal('foo', msg.name)
reply_from_server(msg.request_id, 'bar')
sleep(0.1) until rsp
assert_reply(rsp, msg.request_id, 'bar')
end
end
def test_sync_query
with_api do |api|
rsp = nil
t = Thread.new { rsp = api.sync_query('foo') }
msg = written_message
assert_kind_of(Messages::Query, msg)
assert(msg.request_id)
assert_equal('foo', msg.name)
reply_from_server(msg.request_id, 'bar')
t.join
assert_reply(rsp, msg.request_id, 'bar')
end
end
def test_command
with_api do |api|
rsp = nil
api.command('foo', {'arg1' => 'val1'}) {|m| rsp = m }
msg = written_message
assert_kind_of(Messages::Command, msg)
assert(msg.request_id)
assert_equal('foo', msg.name)
assert_equal({'arg1' => 'val1'}, msg.arguments)
reply_from_server(msg.request_id, 'bar')
sleep(0.1) until rsp
assert_reply(rsp, msg.request_id, 'bar')
end
end
def test_sync_command
with_api do |api|
rsp = nil
t = Thread.new { rsp = api.sync_command('foo', {'a' => 'b'}) }
msg = written_message
assert(msg.request_id)
assert_equal('foo', msg.name)
assert_equal({'a' => 'b'}, msg.arguments)
reply_from_server(msg.request_id, 'bar')
t.join
assert_reply(rsp, msg.request_id, 'bar')
end
end
def test_on_event
with_api do |api|
rsp = nil
api.on_event("FOO") {|e| rsp = e }
event_from_server('FOO', {'a' => 'b'})
sleep(0.1) until rsp
assert_kind_of(Messages::Event, rsp)
assert_equal('FOO', rsp.name)
assert_kind_of(Time, rsp.time)
assert_equal({'a' => 'b'}, rsp.data)
end
end
def test_wait
skip("Not implemented")
end
end
end

View File

@@ -0,0 +1,84 @@
require 'helper'
require 'qmp_client/messages'
module TestQMPClient
module TestMessages
class TestModuleMethods < QMPClientTestCase
include QMPClient
COMMAND_MSG = Messages::Command.new('00000001', 'foo', {'a' => 'b'})
COMMAND_HSH = {'execute' => 'foo', 'arguments' => {'a'=>'b'}, 'id' => '00000001'}
COMMAND_TXT = COMMAND_HSH.to_json
GREETING_MSG = Messages::Greeting.new(
{'qemu' => {'micro' => 50, 'minor' => 13, 'major' => 0}},
"", []
)
GREETING_HSH = {
'QMP' => {
'version' => {'qemu' => {'micro' => 50, 'minor' => 13, 'major' => 0}},
'package' => "",
},
'capabilities' => []
}
GREETING_TXT = GREETING_HSH.to_json
ETIME = Time.now
EVENT_MSG = Messages::Event.new('foo', ETIME, {'a' => 'b'})
EVENT_HSH = {
'event' => 'foo',
'timestamp' => {'seconds' => ETIME.tv_sec, 'microseconds' => ETIME.tv_usec},
'data' => {'a' => 'b'}
}
EVENT_TXT = EVENT_HSH.to_json
QUERY_MSG = Messages::Query.new('00000001', 'foo')
QUERY_HSH = {
'execute' => 'query-foo',
'id' => '00000001'
}
QUERY_TXT = QUERY_HSH.to_json
REPLY_MSG = Messages::Reply.new('00000001', {'foo' => 'bar'})
REPLY_HSH = {
'id' => '00000001',
'return' => {'foo' => 'bar'}
}
REPLY_TXT = REPLY_HSH.to_json
[
[Messages::Command, COMMAND_MSG, COMMAND_HSH, COMMAND_TXT],
[Messages::Greeting, GREETING_MSG, GREETING_HSH, GREETING_TXT],
[Messages::Event, EVENT_MSG, EVENT_HSH, EVENT_TXT],
[Messages::Query, QUERY_MSG, QUERY_HSH, QUERY_TXT],
[Messages::Reply, REPLY_MSG, REPLY_HSH, REPLY_TXT]
].each do |kls, msg_instance, msg_hash, msg_json|
t = kls.to_s.split("::")[-1].downcase
define_method("test_serialise_#{t}") do
data = Messages::serialise(msg_instance)
assert_kind_of(String, data)
rsp = JSON::parse(data) # can't compare the strings directly
assert_equal(msg_hash, rsp)
end
define_method("test_deserialise_#{t}") do
created = Messages::deserialise(msg_json)
assert_kind_of(kls, created)
assert_equal(msg_hash, created.to_hash)
assert_equal(msg_instance, created)
end
define_method("test_#{t}_to_hash_and_build") do
created = kls.build(msg_hash)
assert_equal(msg_hash, created.to_hash)
assert_equal(msg_instance, created)
end
end
end
end
end

View File

@@ -0,0 +1,82 @@
require 'helper'
require 'qmp_client'
module TestQMPClient
class TestModuleMethods < BaseTestCase
include QMPClient
def test_require_loads_main_constants
assert(defined?(::QMPClient), "QMPClient not defined")
assert(defined?(::QMPClient::API), "QMPClient::API not defined")
assert(defined?(::QMPClient::Connectors), "Connectors not defined")
assert(defined?(::QMPClient::Connectors::Socket), "Socket connector not defined")
assert(defined?(::QMPClient::Connectors::WriteProxy), "WriteProxy not defined")
assert(defined?(::QMPClient::Messages), "Messages module not defined")
assert(defined?(::QMPClient::Messages::Query), "Query message not defined")
assert(defined?(::QMPClient::Messages::Command), "Command message not defined")
assert(defined?(::QMPClient::Messages::Event), "Event message not defined")
assert(defined?(::QMPClient::Messages::Reply), "Greeting message not defined")
end
def expects_socket_run(rio, wio=nil)
rq, wq = [Queue.new, Queue.new]
Connectors::Socket.any_instance.expects(:run).with(rio, wio).
once.yields(rq, wq)
API.expects(:run).with(rq, wq).once.yields("(mock-api)")
end
def test_connect_tcp
cargs = ["127.0.0.1", 4440, "127.0.0.1", 40000]
mock_sock = mock("(tcp-socket)")
mock_sock.stubs(:closed? => false, :close => true)
TCPSocket.expects(:connect).with(*cargs).once.returns(mock_sock)
expects_socket_run(mock_sock)
runs_api_block = false
QMPClient::connect_tcp(*cargs) do |api|
runs_api_block = true
end
assert(runs_api_block, "API not yielded by connect_tcp")
end
def test_connect_unix
mock_sock = mock("(unix-socket)")
mock_sock.stubs(:closed? => false, :close => true)
UNIXSocket.expects(:connect).with("/tmp/test.sock").once.returns(mock_sock)
expects_socket_run(mock_sock)
runs_api_block = false
QMPClient::connect_unix("/tmp/test.sock") do |api|
runs_api_block = true
end
assert(runs_api_block, "API not yielded by connect_unix")
end
def test_connect_socket
rs, ws = [mock("(read-socket)"), mock("(write-socket)")]
expects_socket_run(rs, ws)
runs_api_block = false
QMPClient::connect_socket(rs, ws) do |api|
runs_api_block = true
end
assert(runs_api_block, "API not yielded by connect_socket")
# If we pass in just one socket, it's used for both read and write
rw = mock("(rw-socket)")
runs_api_block = false
expects_socket_run(rw)
QMPClient::connect_socket(rw) do
runs_api_block = true
end
assert(runs_api_block, "API not yielded by connect_socket with one sock")
end
end
end