in-place commit: state machine was a silly idea
Replace with the outline of a simpler scheme. We should be able to read messages from the wire now, and we will act on messages via a passed-in instance. Next is to implement send_version and send_verack; and the receive equivalents in BitcoinServer. Then we can build a test rig to have a play. Once that's tested, we can implement a simple message and start on the actor - which will be the first non-trivial bit of sharp-coin proper. We'll be adding features and validations to em-bitcoin & btc_wire_proto as sharp-coin begin to depend on them for correct behaviour, and not before.
This commit is contained in:
@@ -7,10 +7,7 @@ module EventMachine
|
|||||||
# Implements the TCP protocol that Bitcoin peers speak to each other. This
|
# Implements the TCP protocol that Bitcoin peers speak to each other. This
|
||||||
# module is mixed into both incoming and outgoing connections.
|
# module is mixed into both incoming and outgoing connections.
|
||||||
#
|
#
|
||||||
# We implement the protocol as a simple(ish!) state machine. When we want
|
#
|
||||||
# something doing, we call state(sym, data) to append that to the
|
|
||||||
# list of things to do. If something is urgent, we can call state! to
|
|
||||||
# put it at the beginning of the list.
|
|
||||||
#
|
#
|
||||||
# Here is a list of states:
|
# Here is a list of states:
|
||||||
# send_ver, recv_ver, verify_ver
|
# send_ver, recv_ver, verify_ver
|
||||||
@@ -21,134 +18,180 @@ module EventMachine
|
|||||||
#
|
#
|
||||||
# @author Nick Thomas <nick@lupine.me.uk>
|
# @author Nick Thomas <nick@lupine.me.uk>
|
||||||
module BitcoinPeer
|
module BitcoinPeer
|
||||||
|
# Raised in case of any weird semantics / invalid syntax
|
||||||
|
class ProtocolError < StandardError
|
||||||
|
end
|
||||||
|
PE = ProtocolError
|
||||||
|
|
||||||
|
# The list of methods a valid actor will respond to.
|
||||||
|
ACTOR_METHODS = [
|
||||||
|
:log, # log(:level, message) - self-evident
|
||||||
|
:connection=, # Called with +self+ to allow actor interaction
|
||||||
|
:ready! # Called when the connection is ready to be used
|
||||||
|
]
|
||||||
|
|
||||||
|
def log(level, data)
|
||||||
|
@actor.log(level, data)
|
||||||
|
end
|
||||||
|
|
||||||
protected
|
protected
|
||||||
|
|
||||||
# Sets up the variables required to manage the state machine. Should be
|
|
||||||
# called before you try to push a state - in post_init, say.
|
|
||||||
def init_state!
|
def init_state!
|
||||||
@stream = StringIO.new("")
|
@data = ""
|
||||||
@state_m = Mutex.new # Synchronize around @states and @working
|
@ready = nil
|
||||||
@state_m.synchronize do
|
@actor.connection = self # Tell the actor about the connection
|
||||||
@states = []
|
|
||||||
@current_state = nil
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
|
||||||
# Checks the current configuration object to see if we have a valid config
|
# Checks the current actor object to see if it is valid or not.
|
||||||
# or not.
|
#
|
||||||
# @return[Array[true|false, msg]] Whether the config is valid, and an
|
# The actor encapsulates the logic of the application using em-bitcoin. We
|
||||||
|
# call various methods on it when we receive events from the wire that the
|
||||||
|
# application may find interesting, such as receiving a new block or
|
||||||
|
# transaction. In response to these events, or independently, the actor
|
||||||
|
# can interact with us via its 'connection' attribute to, for instance,
|
||||||
|
# send a block or a transaction. It is likely to want to save received
|
||||||
|
# data somewhere so it can be interacted with later.
|
||||||
|
#
|
||||||
|
# @return[Array[true|false, msg]] Whether the actor is valid, and an
|
||||||
# optional message specifying why it's invalid, if it is.
|
# optional message specifying why it's invalid, if it is.
|
||||||
def valid_config?
|
def valid_actor?
|
||||||
[false, "configuration check not implemented yet"]
|
return [false, "Actor not set"] if @actor.nil?
|
||||||
end
|
|
||||||
|
|
||||||
# Grabs the state that we're currently working on.
|
ACTOR_METHODS.each do |m|
|
||||||
# @return[Array[Symbol,Object]] state symbol + data
|
return [false, "Actor doesn't implement all #{m}"] unless
|
||||||
def get_state
|
@actor.respond_to?(m)
|
||||||
@state_m.synchronize do
|
|
||||||
@current_state ||= @states.shift
|
|
||||||
@current_state
|
|
||||||
end
|
end
|
||||||
|
|
||||||
|
true
|
||||||
end
|
end
|
||||||
|
|
||||||
# Call this when we've completed the actions required by the current state
|
# The peer has given us data. Here, we split the data into packets and
|
||||||
def finished_state
|
# hand them off to +receive_packet+
|
||||||
@state_m.synchronize { @current_state = nil }
|
# @param[String] str String containing the data passed in
|
||||||
end
|
def receive_data(str)
|
||||||
|
@data << str
|
||||||
# Push a state to the end of the state queue.
|
log(:debug, "Received #{str.size}b. Buffer now #{@data.length}b")
|
||||||
def state(new_state, data = nil)
|
finished = false
|
||||||
@state_m.synchronize { @states.push(new_state, data) }
|
while !finished
|
||||||
end
|
begin
|
||||||
|
packet = BtcWireProto::Message.read(@data)
|
||||||
# Add a state to the start of the state queue.
|
@data.slice!(0, packet.num_bytes - 1) # Remove data from buffer
|
||||||
def state!(new_state, data = nil)
|
if packet.cmd_sym
|
||||||
@state_m.synchronize { @states.unshift(new_state, data) }
|
m = "receive_#{packet.cmd_sym}"
|
||||||
end
|
if self.respond_to?(m)
|
||||||
|
log(:info, "Received #{packet.cmd_sym}")
|
||||||
# Do some work on the state machine
|
send(m, packet)
|
||||||
def state_tick
|
else
|
||||||
state, data = get_state
|
raise NotImplementedError.new("#{m} not implemented")
|
||||||
return unless state
|
end
|
||||||
if respond_to?(state)
|
else
|
||||||
send(state, data)
|
log(:warn, "Received packet with no command, discarding it")
|
||||||
else
|
log(:debug, packet)
|
||||||
raise NotImplementedError.new("Unknown state: #{state} with #{data}")
|
end
|
||||||
|
rescue EOFError
|
||||||
|
finished = true
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
log(:debug, "Leaving receive_data with #{@data.length} bytes in buffer")
|
||||||
end
|
end
|
||||||
|
|
||||||
# Receiving data mostly drives the state machine. Where something wants
|
# receive_version and receive_verack implementations differ in client &
|
||||||
# to send data, it gets queued straight onto the I/O/
|
# server, so are implemented there.
|
||||||
def receive_data(io)
|
|
||||||
end
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# State machine behaviours now.
|
|
||||||
|
|
||||||
# Send a 'version' message to the peer.
|
# Send a 'version' message to the peer.
|
||||||
# Next
|
def send_version
|
||||||
def send_ver
|
# TODO
|
||||||
|
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# Send a 'verack' message to the peer
|
||||||
|
def send_verack
|
||||||
|
# TODO
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
# EventMachine protocol class that handles an *outgoing* connection to
|
# EventMachine protocol class that handles an *outgoing* connection to
|
||||||
# another bitcoin peer. Common functionality (p2p!) is held in BitcoinPeer.
|
# another bitcoin peer. Common functionality (p2p!) is held in BitcoinPeer.
|
||||||
#
|
#
|
||||||
# State machine flow:
|
# Initiation: send version.
|
||||||
# send_ver, recv_verack
|
# receive verack, receive version (any order) or conn close
|
||||||
# recv_ver, verify_ver, send_verack
|
# send verack or conn close
|
||||||
#
|
#
|
||||||
# @author Nick Thomas <nick@lupine.me.uk>
|
# @author Nick Thomas <nick@lupine.me.uk>
|
||||||
class BitcoinClient < EM::Connection
|
class BitcoinClient < EM::Connection
|
||||||
include BitcoinPeer
|
include BitcoinPeer
|
||||||
|
|
||||||
# @param[Object] config See the BitcoinPeer#valid_config?
|
# @param[Object] actor See the BitcoinPeer#valid_actor?
|
||||||
def initialize(config)
|
def initialize(actor)
|
||||||
super
|
super
|
||||||
@config = config
|
@actor = actor
|
||||||
result, msg = valid_config?
|
result, msg = valid_actor?
|
||||||
raise ArgumentError.new("Invalid configuration: #{msg}") unless result
|
raise ArgumentError.new("Invalid actor: #{msg}") unless result
|
||||||
|
|
||||||
init_state!
|
init_state!
|
||||||
end
|
end
|
||||||
|
|
||||||
def post_init
|
def post_init
|
||||||
state(:send_ver)
|
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# As soon as the TCP connection is up, we send a version message.
|
||||||
def connection_completed
|
def connection_completed
|
||||||
advance_state
|
send_version
|
||||||
|
@ready = :version_sent
|
||||||
|
end
|
||||||
|
|
||||||
|
# We receive this in response to our version message. We don't need to do
|
||||||
|
# anything with it though.
|
||||||
|
def receive_verack(p)
|
||||||
|
raise PE.new("Received verack inappropriately") unless
|
||||||
|
@ready == :version_sent
|
||||||
|
log(:info, "Peer accepted our version message")
|
||||||
|
end
|
||||||
|
|
||||||
|
# The peer sends this after it's received our version. We check that we
|
||||||
|
# can communicate with the specified version and either return a verack,
|
||||||
|
# or close the connection.
|
||||||
|
def receive_version(p)
|
||||||
|
raise PE.new("Received version inappropriately") unless
|
||||||
|
@ready == :version_sent
|
||||||
|
log(:info, "Peer tells us its version is #{p.payload.version}")
|
||||||
|
send_verack
|
||||||
|
@ready = true
|
||||||
|
@actor.ready!
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
# EventMachine protocol class that handles an *incoming* connection from
|
# EventMachine protocol class that handles an *incoming* connection from
|
||||||
# another bitcoin peer. Common functionality (p2p!) is held in BitcoinPeer
|
# another bitcoin peer. Common functionality (p2p!) is held in BitcoinPeer
|
||||||
#
|
#
|
||||||
# State machine flow:
|
# Servers wait for the client to initiate the connection by sending a
|
||||||
# recv_ver, verify_ver, send_verack
|
# version message.
|
||||||
# send_ver, recv_verack
|
|
||||||
#
|
#
|
||||||
# @author Nick Thomas <nick@lupine.me.uk>
|
# @author Nick Thomas <nick@lupine.me.uk>
|
||||||
class BitcoinServer < EM::Connection
|
class BitcoinServer < EM::Connection
|
||||||
include BitcoinPeer
|
include BitcoinPeer
|
||||||
|
|
||||||
# @param[Object] config See the BitcoinPeer#valid_config?
|
# @param[Object] actor See the BitcoinPeer#valid_actor? method
|
||||||
def initialize(config)
|
def initialize(actor)
|
||||||
super
|
super
|
||||||
@config = config
|
@actor = actor
|
||||||
result, msg = valid_config?
|
result, msg = valid_actor?
|
||||||
raise ArgumentError.new("Invalid configuration: #{msg}") unless result
|
raise ArgumentError.new("Invalid actor: #{msg}") unless result
|
||||||
|
|
||||||
init_state!
|
init_state!
|
||||||
end
|
end
|
||||||
|
|
||||||
def post_init
|
|
||||||
state(:recv_ver)
|
# We should receive this as the very first message on the wire
|
||||||
|
def receive_version(p)
|
||||||
|
# TODO
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# We should only receive this after sending our own version
|
||||||
|
def receive_verack(p)
|
||||||
|
# TODO
|
||||||
|
end
|
||||||
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
Reference in New Issue
Block a user