From d90b5585ef293ad3d076967023a7cb0f9fe03c0f Mon Sep 17 00:00:00 2001 From: Nick Thomas Date: Thu, 9 Jun 2011 22:05:03 +0100 Subject: [PATCH] in-place commit: state machine was a silly idea Replace with the outline of a simpler scheme. We should be able to read messages from the wire now, and we will act on messages via a passed-in instance. Next is to implement send_version and send_verack; and the receive equivalents in BitcoinServer. Then we can build a test rig to have a play. Once that's tested, we can implement a simple message and start on the actor - which will be the first non-trivial bit of sharp-coin proper. We'll be adding features and validations to em-bitcoin & btc_wire_proto as sharp-coin begin to depend on them for correct behaviour, and not before. --- lib/em-bitcoin.rb | 205 ++++++++++++++++++++++++++++------------------ 1 file changed, 124 insertions(+), 81 deletions(-) diff --git a/lib/em-bitcoin.rb b/lib/em-bitcoin.rb index e6d01e8..5c73fdd 100644 --- a/lib/em-bitcoin.rb +++ b/lib/em-bitcoin.rb @@ -7,10 +7,7 @@ module EventMachine # Implements the TCP protocol that Bitcoin peers speak to each other. This # module is mixed into both incoming and outgoing connections. # - # We implement the protocol as a simple(ish!) state machine. When we want - # something doing, we call state(sym, data) to append that to the - # list of things to do. If something is urgent, we can call state! to - # put it at the beginning of the list. + # # # Here is a list of states: # send_ver, recv_ver, verify_ver @@ -21,134 +18,180 @@ module EventMachine # # @author Nick Thomas module BitcoinPeer + # Raised in case of any weird semantics / invalid syntax + class ProtocolError < StandardError + end + PE = ProtocolError + + # The list of methods a valid actor will respond to. + ACTOR_METHODS = [ + :log, # log(:level, message) - self-evident + :connection=, # Called with +self+ to allow actor interaction + :ready! # Called when the connection is ready to be used + ] + + def log(level, data) + @actor.log(level, data) + end protected - # Sets up the variables required to manage the state machine. Should be - # called before you try to push a state - in post_init, say. def init_state! - @stream = StringIO.new("") - @state_m = Mutex.new # Synchronize around @states and @working - @state_m.synchronize do - @states = [] - @current_state = nil - end + @data = "" + @ready = nil + @actor.connection = self # Tell the actor about the connection end - # Checks the current configuration object to see if we have a valid config - # or not. - # @return[Array[true|false, msg]] Whether the config is valid, and an + # Checks the current actor object to see if it is valid or not. + # + # The actor encapsulates the logic of the application using em-bitcoin. We + # call various methods on it when we receive events from the wire that the + # application may find interesting, such as receiving a new block or + # transaction. In response to these events, or independently, the actor + # can interact with us via its 'connection' attribute to, for instance, + # send a block or a transaction. It is likely to want to save received + # data somewhere so it can be interacted with later. + # + # @return[Array[true|false, msg]] Whether the actor is valid, and an # optional message specifying why it's invalid, if it is. - def valid_config? - [false, "configuration check not implemented yet"] - end - - # Grabs the state that we're currently working on. - # @return[Array[Symbol,Object]] state symbol + data - def get_state - @state_m.synchronize do - @current_state ||= @states.shift - @current_state - end - end - - # Call this when we've completed the actions required by the current state - def finished_state - @state_m.synchronize { @current_state = nil } - end - - # Push a state to the end of the state queue. - def state(new_state, data = nil) - @state_m.synchronize { @states.push(new_state, data) } - end - - # Add a state to the start of the state queue. - def state!(new_state, data = nil) - @state_m.synchronize { @states.unshift(new_state, data) } - end + def valid_actor? + return [false, "Actor not set"] if @actor.nil? - # Do some work on the state machine - def state_tick - state, data = get_state - return unless state - if respond_to?(state) - send(state, data) - else - raise NotImplementedError.new("Unknown state: #{state} with #{data}") + ACTOR_METHODS.each do |m| + return [false, "Actor doesn't implement all #{m}"] unless + @actor.respond_to?(m) end + + true end - # Receiving data mostly drives the state machine. Where something wants - # to send data, it gets queued straight onto the I/O/ - def receive_data(io) + # The peer has given us data. Here, we split the data into packets and + # hand them off to +receive_packet+ + # @param[String] str String containing the data passed in + def receive_data(str) + @data << str + log(:debug, "Received #{str.size}b. Buffer now #{@data.length}b") + finished = false + while !finished + begin + packet = BtcWireProto::Message.read(@data) + @data.slice!(0, packet.num_bytes - 1) # Remove data from buffer + if packet.cmd_sym + m = "receive_#{packet.cmd_sym}" + if self.respond_to?(m) + log(:info, "Received #{packet.cmd_sym}") + send(m, packet) + else + raise NotImplementedError.new("#{m} not implemented") + end + else + log(:warn, "Received packet with no command, discarding it") + log(:debug, packet) + end + rescue EOFError + finished = true + end + end + log(:debug, "Leaving receive_data with #{@data.length} bytes in buffer") end - - - - # State machine behaviours now. + + # receive_version and receive_verack implementations differ in client & + # server, so are implemented there. # Send a 'version' message to the peer. - # Next - def send_ver - + def send_version + # TODO end + # Send a 'verack' message to the peer + def send_verack + # TODO + end end # EventMachine protocol class that handles an *outgoing* connection to # another bitcoin peer. Common functionality (p2p!) is held in BitcoinPeer. # - # State machine flow: - # send_ver, recv_verack - # recv_ver, verify_ver, send_verack - # + # Initiation: send version. + # receive verack, receive version (any order) or conn close + # send verack or conn close + # # @author Nick Thomas class BitcoinClient < EM::Connection include BitcoinPeer - # @param[Object] config See the BitcoinPeer#valid_config? - def initialize(config) + # @param[Object] actor See the BitcoinPeer#valid_actor? + def initialize(actor) super - @config = config - result, msg = valid_config? - raise ArgumentError.new("Invalid configuration: #{msg}") unless result + @actor = actor + result, msg = valid_actor? + raise ArgumentError.new("Invalid actor: #{msg}") unless result init_state! end def post_init - state(:send_ver) + end + # As soon as the TCP connection is up, we send a version message. def connection_completed - advance_state + send_version + @ready = :version_sent + end + + # We receive this in response to our version message. We don't need to do + # anything with it though. + def receive_verack(p) + raise PE.new("Received verack inappropriately") unless + @ready == :version_sent + log(:info, "Peer accepted our version message") + end + + # The peer sends this after it's received our version. We check that we + # can communicate with the specified version and either return a verack, + # or close the connection. + def receive_version(p) + raise PE.new("Received version inappropriately") unless + @ready == :version_sent + log(:info, "Peer tells us its version is #{p.payload.version}") + send_verack + @ready = true + @actor.ready! end end # EventMachine protocol class that handles an *incoming* connection from # another bitcoin peer. Common functionality (p2p!) is held in BitcoinPeer # - # State machine flow: - # recv_ver, verify_ver, send_verack - # send_ver, recv_verack + # Servers wait for the client to initiate the connection by sending a + # version message. # # @author Nick Thomas class BitcoinServer < EM::Connection include BitcoinPeer - # @param[Object] config See the BitcoinPeer#valid_config? - def initialize(config) + # @param[Object] actor See the BitcoinPeer#valid_actor? method + def initialize(actor) super - @config = config - result, msg = valid_config? - raise ArgumentError.new("Invalid configuration: #{msg}") unless result + @actor = actor + result, msg = valid_actor? + raise ArgumentError.new("Invalid actor: #{msg}") unless result init_state! end + - def post_init - state(:recv_ver) + # We should receive this as the very first message on the wire + def receive_version(p) + # TODO end + + # We should only receive this after sending our own version + def receive_verack(p) + # TODO + end + end end end