Project framework + socket connector

The socket connector is responsible for reading from a socket and
converting the lines that were read to messages, using a passed-in
deserialiser. It also provides an interface to push messages onto
the socket, via a serialiser which is also passed in.
This commit is contained in:
Nick Thomas
2011-11-12 21:45:14 +00:00
commit 1688e55459
6 changed files with 380 additions and 0 deletions

View File

@@ -0,0 +1,17 @@
require 'qmp_client/connectors/socket'
module QMPClient
# The point of the connectors is to provide a unified interface to the varied
# interfaces a QMP server can have. They are:
# 1) TCP/UNIX socket
# 2) STDIN+STDOUT / arbitrary fds
# 3) mock interface (for tests)
# 4) Anything else that might pop up!
#
# @author Nick Thomas <nick@lupine.me.uk>
module Connectors
end
end

View File

@@ -0,0 +1,143 @@
require 'mutex_m'
module QMPClient
module Connectors
# We yield an instance of this call in the run method of the connectors -
# if someone wants to send a message, they call push on it with the message.
#
# This blocks until the data is on the wire, which isn't necessarily a bad
# this.
#
# @author Nick Thomas <nick@lupine.me.uk>
class WriteProxy
include Mutex_m
def initialize(w_socket, serialiser)
super()
@w_socket = w_socket
@serialiser = serialiser
end
def push(msg)
synchronize do
@w_socket.puts(@serialiser.serialise(msg))
@w_socket.flush
end
end
end
# Handles a subclass of BasicSocket, proactively reading data from it and
# converting it into a stream of Messages, and writing serialised Messages
# to it on request.
#
# @author Nick Thomas <nick@lupine.me.uk>
class Socket
# @param[#serialise] serialiser Object to convert message instances to
# the wire format
# @param[#deserialise] deserialiser Object to convert wire data to message
# instances.
def initialize(serialiser, deserialiser)
@serialiser = serialiser
@deserialiser = deserialiser
@receive_queues = []
end
# Call this with an object responding to +push+ and every time a message
# is received, we will pass it on. Multiple queues can be registered.
# Messages are pushed onto the queue as hashes.
def register_receive_queue(q)
@receive_queues.push(q)
end
def unregister_receive_queue(q)
@receive_queues.delete(q)
end
# Starts reading from / writing to the socket as appropriate, yields the
# block, and stops reading/writing the socket once the block returns.
# @param[IO] read_socket Socket to read data from
# @param[IO] write_socket Socket to write data to. Use read_socket if nil
# @yields[WriteProxy] writer Push messages to this to get them on the wire
def run(read_socket, write_socket=nil, &blk)
write_socket ||= read_socket
write_proxy = WriteProxy.new(write_socket, @serialiser)
begin
stop_r, stop_w = IO.pipe
r_thread = read_thread(read_socket, stop_r)
yield(write_proxy)
ensure
stop_w.write("*") if stop_w # Stop signal for the r_thread
stop_w.close if stop_w && !stop_w.closed?
stop_r.close if stop_r && !stop_w.closed?
r_thread.join if r_thread && r_thread.alive?
end
end
protected
attr_reader :deserialiser
attr_reader :serialiser
attr_reader :receive_queues
# Reads from r_sock and turns the data into messages (which get pushed
# out via +register_receive_queue+) until either the socket raises an
# error, or the stop socket becomes readable.
def read_thread(r_sock, stop_socket)
Thread.new do
buf = ""
socks = [r_sock, stop_socket]
loop do
readables, _, errored = begin
Kernel.select(socks, nil, socks)
rescue IOError # socket closed between run and this call
break
end
# exit conditions
break if readables.include?(stop_socket)
break unless errored.empty?
# Should never happen, but anything's possible
next unless readables.include?(r_sock)
# Pull the data from the socket, taking care of errors
begin
buf << r_sock.read_nonblock(8192)
rescue IO::WaitReadable
next # socket isn't ready to read
# TODO: other possible errors
end
buf = dispatch_messages_from_buffer(buf)
break if buf.nil? # Dispatching raised an error of some sort!
end
end
end
# Converts each message in +buf+ into an instance, and pushes it to each
# of the receive queues.
# @param[String] buf String buffer
# @return[nil,String] Either the unparsed component of the data, or nil
# if an error has occured
def dispatch_messages_from_buffer(buf)
lines = buf.split("\n")
r_buf = if buf[-1..-1] == "\n" # Every element is a message
""
else # The last element is a partial message, so put it back
lines.pop
end
messages = lines.collect {|line| deserialiser.deserialise(line) }
messages.each {|msg| receive_queues.each {|q| q.push(msg)}}
r_buf
rescue => err # Any error dispatching means we need to shut up shop
nil
end
end
end
end