require 'eventmachine' require 'stringio' require 'btc_wire_proto' module EventMachine module Protocols # Implements the TCP protocol that Bitcoin peers speak to each other. This # module is mixed into both incoming and outgoing connections. # # Documentation is here: https://en.bitcoin.it/wiki/Network # # @author Nick Thomas module BitcoinPeer # Raised in case of any weird semantics / invalid syntax class ProtocolError < StandardError end PE = ProtocolError BTC = ::BtcWireProto # The actor for this peer attr_reader :actor # The list of methods a valid actor will respond to. ACTOR_METHODS = [ :network_name, # Returns a symbol telling us which network to be on :current_time, # Time instance specifying the current time :sub_version, # String specifying custom version string :current_height, # Number of the newest block known to the actor :log, # log(:level, message) - self-evident :node_nonce, # 32-bit number lets us identify streams to self :connection=, # Called with +self+ to allow actor interaction :ready! # Called when the connection is ready to be used ] # receive_version and receive_verack implementations differ in client & # server, so are implemented there. # Simple wrapper around +send_data+ that logs usage # @param[BTC::Message] packet Packet to send def send_packet(packet) data = packet.to_binary_s log(:info, "Sending #{packet.cmd_sym} message (#{data.length}b)") log(:debug, data.inspect) log(:debug, packet.inspect) send_data(data) end # Send a 'version' message to the peer. def send_version m = build_message(:version, { :version => BTC::CURRENT_VERSION(actor.network_name), :services => {:node_network => 1}, :timestamp => actor.current_time.to_i, :addr_me => my_netaddr, :addr_you => peer_netaddr, :nonce => actor.node_nonce, :sub_version_num => (actor.sub_version || "em-bitcoin"), :start_height => actor.current_height }) send_packet(m) end # Send a 'verack' message to the peer def send_verack log(:info, "Sending verack message") send_packet(build_message(:verack)) end protected # Puts together a message from the details given. Common fields, like # magic and checksums, are filled in. def build_message(command, payload_opts = {}, header_opts = {}) header_opts[:command] = command.to_s header_opts[:magic] ||= BTC::NETWORKS[actor.network_name] header_opts[:payload] = payload_opts m = BTC::Message.new(header_opts) end def log(level, data) actor.log(level, data) end def init_state! @data = "" @ready = nil actor.connection = self # Tell the actor about the connection end # Checks the current actor object to see if it is valid or not. # # The actor encapsulates the logic of the application using em-bitcoin. We # call various methods on it when we receive events from the wire that the # application may find interesting, such as receiving a new block or # transaction. In response to these events, or independently, the actor # can interact with us via its 'connection' attribute to, for instance, # send a block or a transaction. It is likely to want to save received # data somewhere so it can be interacted with later. # # @return[Array[true|false, msg]] Whether the actor is valid, and an # optional message specifying why it's invalid, if it is. def valid_actor? return [false, "Actor not set"] if @actor.nil? ACTOR_METHODS.each do |m| return [false, "Actor doesn't implement all #{m}"] unless actor.respond_to?(m) end true end # The peer has given us data. Here, we split the data into packets and # hand them off to +receive_packet+ # @param[String] str String containing the data passed in def receive_data(str) @data << str log(:debug, "Received #{str.size}b. Buffer now #{@data.length}b") finished = false while !finished begin packet = BTC::Message.read(@data) used = @data.slice!(0, packet.num_bytes) # Remove data from buf receive_packet(packet) rescue IOError,EOFError # Not enough data finished = true end end log(:debug, "Leaving receive_data with #{@data.length} bytes in buffer") log(:debug, @data.inspect) if @data.length > 0 end # Checks the passed-in packet for validity and dispatches it if valid # @param[BTC::Message] packet Received packet def receive_packet(packet) log(:info, "Received #{packet.cmd_sym} packet (#{packet.num_bytes}b)") log(:debug, packet.inspect) if packet.network_name != actor.network_name log( :info, "Received packet from incorrect network: " + "#{packet.network_name || packet.magic}, discarding" ) return nil end if packet.cmd_sym m = "receive_#{packet.cmd_sym}" if self.respond_to?(m) log(:info, "Received #{packet.cmd_sym}") send(m, packet) else raise NotImplementedError.new("#{m} not implemented") end else log(:info, "Received packet with no command, discarding it") end end # Checks whether we can communicate sensibly with a peer of a particular # version. At the moment, we're fairly liberal about who we try to talk to # @param[Fixnum] their_version Version of the peer under question # @return[Boolean] can we communicate with a peer advertising the version? def can_talk_version?(their_version) their_version >= 31402 end # @return[BTC::NetAddr] wire represenation of our own network address def my_netaddr sockaddr = get_sockname raise PE.new("Unable to determine own NetAddr") unless sockaddr port, ip = Socket::unpack_sockaddr_in(sockaddr) BTC::NetAddr.new( :services => {:node_network => 1}, :ip => ip, :port => port ) end # FIXME: what does node_network mean here? # @return[BTC::NetAddr] wire representation of our peer's network address def peer_netaddr sockaddr = get_peername raise PE.new("Unable to determine peer NetAddr") unless sockaddr port, ip = Socket::unpack_sockaddr_in(sockaddr) BTC::NetAddr.new( :services => {:node_network => 1}, :ip => ip, :port => port ) end end # EventMachine protocol class that handles an *outgoing* connection to # another bitcoin peer. Common functionality (p2p!) is held in BitcoinPeer. # # Initiation: send version. # receive verack, receive version (any order) or conn close # send verack or conn close # # @author Nick Thomas class BitcoinClient < EM::Connection include BitcoinPeer # @param[Object] actor See the BitcoinPeer#valid_actor? method. If this is # a proc, we execute it and use the return value as the actor. def initialize(actor) @actor = actor @actor = @actor.call if actor.is_a?(Proc) result, msg = valid_actor? raise ArgumentError.new("Invalid actor: #{msg}") unless result init_state! end def post_init end # As soon as the TCP connection is up, we send a version message. def connection_completed send_version @ready = :version_sent end # We receive this in response to our version message. We don't need to do # anything with it though. def receive_verack(p) raise PE.new("Received verack inappropriately") unless @ready == :version_sent log(:info, "Peer accepted our version message") end # The peer sends this after it's received our version. We check that we # can communicate with the specified version and either return a verack, # or close the connection. def receive_version(p) raise PE.new("Received version inappropriately") unless @ready == :version_sent if can_talk_version?(p.payload.version) log(:info, "Notifying actor of ready peer (v.#{p.payload.version})") send_verack @ready = true actor.ready! else log(:info, "Bad peer, can't talk version #{p.payload.version}") unbind end end end # EventMachine protocol class that handles an *incoming* connection from # another bitcoin peer. Common functionality (p2p!) is held in BitcoinPeer # # Servers wait for the client to initiate the connection by sending a # version message. # # @author Nick Thomas class BitcoinServer < EM::Connection include BitcoinPeer def port my_netaddr.port end # @param[Object] actor See the BitcoinPeer#valid_actor? method. If this is # a proc, we execute it and use the return value as the actor. def initialize(actor) @actor = actor @actor = @actor.call if actor.is_a?(Proc) result, msg = valid_actor? raise ArgumentError.new("Invalid actor: #{msg}") unless result init_state! end # We should receive this as the very first message on the wire. # If we're prepared ot talk to a client with this version, we send a # verack message followed by our own version and wait for a verack to be # received. def receive_version(p) raise PE.new("Received version inappropriately") unless @ready.nil? if can_talk_version?(p.payload.version) log(:info, "Peer version #{p.payload.version}") send_verack send_version @ready = :version_sent else log(:info, "Bad peer - can't talk version #{p.payload.version}") unbind end end # We should only receive this after sending our own version. If the client # doesn't like our version, it'll close the connection. def receive_verack(p) raise PE.new("Received verack inappropriately") unless @ready == :version_sent log(:info, "Notifying actor of ready peer (v.#{p.payload.version})") @ready = true actor.ready! end end end end