diff --git a/config.ini_sample b/config.ini_sample index ddf3589..9ec9cdf 100644 --- a/config.ini_sample +++ b/config.ini_sample @@ -73,10 +73,15 @@ disable_mempool_fee_histogram = false # Parameter for broadcasting unconfirmed transactions # Options are: # * own-node (broadcast using the connected full node) +# * tor (broadcast to random nodes over tor) # * system %s (save transaction to file, and invoke system command # with file path as parameter %s) broadcast_method = own-node +# For tor broadcasting (broadcast_method = tor) configure the tor proxy host and port below +tor_host = localhost +tor_port = 9050 + [watch-only-addresses] #Add individual addresses to this section, for example paper wallets diff --git a/electrumpersonalserver/server/__init__.py b/electrumpersonalserver/server/__init__.py index bf06a8c..9089efe 100644 --- a/electrumpersonalserver/server/__init__.py +++ b/electrumpersonalserver/server/__init__.py @@ -28,3 +28,11 @@ parse_electrum_master_public_key, DeterministicWallet, ) +from electrumpersonalserver.server.socks import ( + socksocket, + setdefaultproxy, + PROXY_TYPE_SOCKS5, +) +from electrumpersonalserver.server.peertopeer import ( + tor_broadcast_tx, +) diff --git a/electrumpersonalserver/server/common.py b/electrumpersonalserver/server/common.py index 1b31a38..091cdec 100644 --- a/electrumpersonalserver/server/common.py +++ b/electrumpersonalserver/server/common.py @@ -5,12 +5,14 @@ from ipaddress import ip_network, ip_address import logging import tempfile +import threading from electrumpersonalserver.server.jsonrpc import JsonRpc, JsonRpcError import electrumpersonalserver.server.hashes as hashes import electrumpersonalserver.server.merkleproof as merkleproof import electrumpersonalserver.server.deterministicwallet as deterministicwallet import electrumpersonalserver.server.transactionmonitor as transactionmonitor +import electrumpersonalserver.server.peertopeer as p2p SERVER_VERSION_NUMBER = "0.1.7" @@ -93,7 +95,7 @@ def on_disconnect(txmonitor): txmonitor.unsubscribe_all_addresses() def handle_query(sock, line, rpc, txmonitor, disable_mempool_fee_histogram, - broadcast_method): + broadcast_method, tor_hostport=None): logger = logging.getLogger('ELECTRUMPERSONALSERVER') logger.debug("=> " + line) try: @@ -244,6 +246,20 @@ def handle_query(sock, line, rpc, txmonitor, disable_mempool_fee_histogram, rpc.call("sendrawtransaction", [txhex]) except JsonRpcError as e: pass + elif broadcast_method == "tor": + # send through tor + TOR_CONNECTIONS = 8 + network = "mainnet" + chaininfo = rpc.call("getblockchaininfo", []) + if chaininfo["chain"] == "test": + network = "testnet" + elif chaininfo["chain"] == "regtest": + network = "regtest" + for i in range(TOR_CONNECTIONS): + t = threading.Thread(target=p2p.tor_broadcast_tx, + args=(txhex, tor_hostport, network, rpc,)) + t.start() + time.sleep(0.1) elif broadcast_method.startswith("system "): with tempfile.NamedTemporaryFile() as fd: system_line = broadcast_method[7:].replace("%s", fd.name) @@ -451,6 +467,9 @@ def run_electrum_server(rpc, txmonitor, config): "disable_mempool_fee_histogram", fallback=False) broadcast_method = config.get("electrum-server", "broadcast_method", fallback="own-node") + tor_host = config.get("electrum-server", "tor_host", fallback="localhost") + tor_port = int(config.get("electrum-server", "tor_port", fallback="9050")) + tor_hostport = (tor_host, tor_port) server_sock = create_server_socket(hostport) server_sock.settimeout(poll_interval_listening) @@ -491,7 +510,7 @@ def run_electrum_server(rpc, txmonitor, config): lb = recv_buffer.find(b'\n') handle_query(sock, line.decode("utf-8"), rpc, txmonitor, disable_mempool_fee_histogram, - broadcast_method) + broadcast_method, tor_hostport) except socket.timeout: on_heartbeat_connected(sock, rpc, txmonitor) except (IOError, EOFError) as e: diff --git a/electrumpersonalserver/server/peertopeer.py b/electrumpersonalserver/server/peertopeer.py new file mode 100644 index 0000000..0c3dda8 --- /dev/null +++ b/electrumpersonalserver/server/peertopeer.py @@ -0,0 +1,356 @@ +#! /usr/bin/env python + +import socket, time +import base64 +from struct import pack, unpack +from datetime import datetime + +import electrumpersonalserver.bitcoin as btc +from electrumpersonalserver.server.socks import socksocket, setdefaultproxy, PROXY_TYPE_SOCKS5 +from electrumpersonalserver.server.jsonrpc import JsonRpcError + +import logging as log + +PROTOCOL_VERSION = 70012 +DEFAULT_USER_AGENT = '/Satoshi:0.18.0/' +NODE_WITNESS = (1 << 3) + +# protocol versions above this also send a relay boolean +RELAY_TX_VERSION = 70001 + +# length of bitcoin p2p packets +HEADER_LENGTH = 24 + +# if no message has been seen for this many seconds, send a ping +KEEPALIVE_INTERVAL = 2 * 60 + +# close connection if keep alive ping isnt responded to in this many seconds +KEEPALIVE_TIMEOUT = 20 * 60 + +def ip_to_hex(ip_str): + # ipv4 only for now + return socket.inet_pton(socket.AF_INET, ip_str) + + +def create_net_addr(hexip, port): # doesnt contain time as in bitcoin wiki + services = 0 + hex = bytes(10) + b'\xFF\xFF' + hexip + return pack('H', port) + + +def create_var_str(s): + return btc.num_to_var_int(len(s)) + s.encode() + + +def read_int(ptr, payload, n, littleendian=True): + data = payload[ptr[0] : ptr[0]+n] + if littleendian: + data = data[::-1] + ret = btc.decode(data, 256) + ptr[0] += n + return ret + + +def read_var_int(ptr, payload): + val = payload[ptr[0]] + ptr[0] += 1 + if val < 253: + return val + return read_int(ptr, payload, 2**(val - 252)) + + +def read_var_str(ptr, payload): + l = read_var_int(ptr, payload) + ret = payload[ptr[0]: ptr[0] + l] + ptr[0] += l + return ret + + +def ip_hex_to_str(ip_hex): + # https://en.wikipedia.org/wiki/IPv6#IPv4-mapped_IPv6_addresses + # https://www.cypherpunk.at/onioncat_trac/wiki/OnionCat + if ip_hex[:14] == '\x00'*10 + '\xff'*2: + # ipv4 mapped ipv6 addr + return socket.inet_ntoa(ip_hex[12:]) + elif ip_hex[:6] == '\xfd\x87\xd8\x7e\xeb\x43': + return base64.b32encode(ip_hex[6:]).lower() + '.onion' + else: + return socket.inet_ntop(socket.AF_INET6, ip_hex) + + +class P2PMessageHandler(object): + def __init__(self): + self.last_message = datetime.now() + self.waiting_for_keepalive = False + self.log = (log if log else + log.getLogger('ELECTRUMPERSONALSERVER')) + + def check_keepalive(self, p2p): + if self.waiting_for_keepalive: + if (datetime.now() - self.last_message).total_seconds() < KEEPALIVE_TIMEOUT: + return + log.info('keepalive timed out, closing') + p2p.sock.close() + else: + if (datetime.now() - self.last_message).total_seconds() < KEEPALIVE_INTERVAL: + return + log.debug('sending keepalive to peer') + self.waiting_for_keepalive = True + p2p.sock.sendall(p2p.create_message('ping', '\x00'*8)) + + def handle_message(self, p2p, command, length, payload): + self.last_message = datetime.now() + self.waiting_for_keepalive = False + ptr = [0] + if command == b'version': + version = read_int(ptr, payload, 4) + services = read_int(ptr, payload, 8) + timestamp = read_int(ptr, payload, 8) + addr_recv_services = read_int(ptr, payload, 8) + addr_recv_ip = payload[ptr[0] : ptr[0]+16] + ptr[0] += 16 + addr_recv_port = read_int(ptr, payload, 2, False) + addr_trans_services = read_int(ptr, payload, 8) + addr_trans_ip = payload[ptr[0] : ptr[0]+16] + ptr[0] += 16 + addr_trans_port = read_int(ptr, payload, 2, False) + ptr[0] += 8 # skip over nonce + user_agent = read_var_str(ptr, payload) + start_height = read_int(ptr, payload, 4) + if version > RELAY_TX_VERSION: + relay = read_int(ptr, payload, 1) != 0 + else: # must check this node accepts unconfirmed transactions for the broadcast + relay = True + log.debug(('peer version message: version=%d services=0x%x' + + ' timestamp=%s user_agent=%s start_height=%d relay=%i' + + ' them=%s:%d us=%s:%d') % (version, + services, str(datetime.fromtimestamp(timestamp)), + user_agent, start_height, relay, ip_hex_to_str(addr_trans_ip) + , addr_trans_port, ip_hex_to_str(addr_recv_ip), addr_recv_port)) + p2p.sock.sendall(p2p.create_message('verack', b'')) + self.on_recv_version(p2p, version, services, timestamp, + addr_recv_services, addr_recv_ip, addr_trans_services, + addr_trans_ip, addr_trans_port, user_agent, start_height, + relay) + elif command == b'verack': + self.on_connected(p2p) + elif command == b'ping': + p2p.sock.sendall(p2p.create_message('pong', payload)) + + # optional override these in a subclass + + def on_recv_version(self, p2p, version, services, timestamp, + addr_recv_services, addr_recv_ip, addr_trans_services, + addr_trans_ip, addr_trans_port, user_agent, start_height, relay): + pass + + def on_connected(self, p2p): + pass + + def on_heartbeat(self, p2p): + pass + + +class P2PProtocol(object): + def __init__(self, p2p_message_handler, remote_hostport, + network, user_agent=DEFAULT_USER_AGENT, + socks5_hostport=("localhost", 9050), connect_timeout=30, heartbeat_interval=15): + self.log = (log if log else + log.getLogger('ELECTRUMPERSONALSERVER')) + self.p2p_message_handler = p2p_message_handler + self.network = network + self.user_agent = user_agent + self.socks5_hostport = socks5_hostport + self.heartbeat_interval = heartbeat_interval + self.connect_timeout = connect_timeout + if self.network == "testnet": + self.magic = 0x0709110b + elif self.network == "regtest": + self.magic = 0xdab5bffa + else: + self.magic = 0xd9b4bef9 + + self.closed = False + + self.remote_hostport = remote_hostport + + def run(self): + services = NODE_WITNESS + st = int(time.time()) + nonce = 0 + start_height = 0 + + netaddr = create_net_addr(ip_to_hex('0.0.0.0'), 0) + version_message = (pack('= HEADER_LENGTH: + net_magic, command, payload_length, checksum = unpack('= 0 and len(recv_buffer) >= payload_length: + payload = recv_buffer[:payload_length] + recv_buffer = recv_buffer[payload_length:] + if btc.bin_dbl_sha256(payload)[:4] == checksum: + self.p2p_message_handler.handle_message(self, command, + payload_length, payload) + else: + log.debug('wrong checksum, dropping message, cmd=' + command + ' payloadlen=' + str(payload_length)) + payload_length = -1 + data_remaining = True + else: + data_remaining = False + except socket.timeout: + self.p2p_message_handler.check_keepalive(self) + self.p2p_message_handler.on_heartbeat(self) + except EOFError as e: + self.closed = True + except IOError as e: + import traceback + log.debug("logging traceback from %s: \n" % + traceback.format_exc()) + self.closed = True + finally: + try: + self.sock.close() + except Exception as _: + pass + + + def close(self): + self.closed = True + + def create_message(self, command, payload): + return (pack(" socket object + + Open a SOCKS enabled socket. The parameters are the same as + those of the standard socket init. In order for SOCKS to work, + you must specify family=AF_INET, type=SOCK_STREAM and proto=0. + """ + + def __init__(self, + family=socket.AF_INET, + type=socket.SOCK_STREAM, + proto=0, + _sock=None): + _orgsocket.__init__(self, family, type, proto, _sock) + if _defaultproxy is not None: + self.__proxy = _defaultproxy + else: + self.__proxy = (None, None, None, None, None, None) + self.__proxysockname = None + self.__proxypeername = None + + def __recvall(self, bytes): + """__recvall(bytes) -> data + Receive EXACTLY the number of bytes requested from the socket. + Blocks until the required number of bytes have been received. + """ + data = b'' + while len(data) < bytes: + data = data + self.recv(bytes - len(data)) + return data + + def setproxy(self, + proxytype=None, + addr=None, + port=None, + rdns=True, + username=None, + password=None): + """setproxy(proxytype, addr[, port[, rdns[, username[, password]]]]) + Sets the proxy to be used. + proxytype - The type of the proxy to be used. Three types + are supported: PROXY_TYPE_SOCKS4 (including socks4a), + PROXY_TYPE_SOCKS5 and PROXY_TYPE_HTTP + addr - The address of the server (IP or DNS). + port - The port of the server. Defaults to 1080 for SOCKS + servers and 8080 for HTTP proxy servers. + rdns - Should DNS queries be preformed on the remote side + (rather than the local side). The default is True. + Note: This has no effect with SOCKS4 servers. + username - Username to authenticate with to the server. + The default is no authentication. + password - Password to authenticate with to the server. + Only relevant when username is also provided. + """ + self.__proxy = (proxytype, addr, port, rdns, username, password) + + def __negotiatesocks5(self, destaddr, destport): + """__negotiatesocks5(self,destaddr,destport) + Negotiates a connection through a SOCKS5 server. + """ + # First we'll send the authentication packages we support. + if (self.__proxy[4] is not None) and (self.__proxy[5] is not None): + # The username/password details were supplied to the + # setproxy method so we support the USERNAME/PASSWORD + # authentication (in addition to the standard none). + self.sendall(b'\x05\x02\x00\x02') + else: + # No username/password were entered, therefore we + # only support connections with no authentication. + self.sendall(b'\x05\x01\x00') + # We'll receive the server's response to determine which + # method was selected + chosenauth = self.__recvall(2) + if chosenauth[0:1] != b"\x05": + self.close() + raise GeneralProxyError((1, _generalerrors[1])) + # Check the chosen authentication method + if chosenauth[1:2] == b"\x00": + # No authentication is required + pass + elif chosenauth[1:2] == b"\x02": + # Okay, we need to perform a basic username/password + # authentication. + self.sendall(b'\x01' + bytes([len(self.__proxy[4])]) + self.__proxy[4].encode() + + bytes([len(self.__proxy[5])]) + self.__proxy[5].encode()) + authstat = self.__recvall(2) + if authstat[0:1] != b"\x01": + # Bad response + self.close() + raise GeneralProxyError((1, _generalerrors[1])) + if authstat[1:2] != b"\x00": + # Authentication failed + self.close() + raise Socks5AuthError((3, _socks5autherrors[3])) + # Authentication succeeded + else: + # Reaching here is always bad + self.close() + if chosenauth[1:2] == b"\xFF": + raise Socks5AuthError((2, _socks5autherrors[2])) + else: + raise GeneralProxyError((1, _generalerrors[1])) + # Now we can request the actual connection + req = b"\x05\x01\x00" + # If the given destination address is an IP address, we'll + # use the IPv4 address request even if remote resolving was specified. + try: + ipaddr = socket.inet_aton(destaddr) + req = req + b"\x01" + ipaddr + except socket.error: + # Well it's not an IP number, so it's probably a DNS name. + if self.__proxy[3]: + # Resolve remotely + ipaddr = None + req = req + b"\x03" + bytes([len(destaddr)]) + destaddr.encode() + else: + # Resolve locally + ipaddr = socket.inet_aton(socket.gethostbyname(destaddr)) + req = req + b"\x01" + ipaddr + req += struct.pack(">H", destport) + self.sendall(req) + # Get the response + resp = self.__recvall(4) + if resp[0:1] != b"\x05": + self.close() + raise GeneralProxyError((1, _generalerrors[1])) + elif resp[1:2] != b"\x00": + # Connection failed + self.close() + raise Socks5Error(_socks5errors[min(9, ord(resp[1:2]))]) + # Get the bound address/port + elif resp[3:4] == b"\x01": + boundaddr = self.__recvall(4) + elif resp[3:4] == b"\x03": + resp = resp + self.recv(1) + boundaddr = self.__recvall(resp[4:5]) + else: + self.close() + raise GeneralProxyError((1, _generalerrors[1])) + boundport = struct.unpack(">H", self.__recvall(2))[0] + self.__proxysockname = (boundaddr, boundport) + if ipaddr is not None: + self.__proxypeername = (socket.inet_ntoa(ipaddr), destport) + else: + self.__proxypeername = (destaddr, destport) + + def getproxysockname(self): + """getsockname() -> address info + Returns the bound IP address and port number at the proxy. + """ + return self.__proxysockname + + def getproxypeername(self): + """getproxypeername() -> address info + Returns the IP and port number of the proxy. + """ + return _orgsocket.getpeername(self) + + def getpeername(self): + """getpeername() -> address info + Returns the IP address and port number of the destination + machine (note: getproxypeername returns the proxy) + """ + return self.__proxypeername + + def __negotiatesocks4(self, destaddr, destport): + """__negotiatesocks4(self,destaddr,destport) + Negotiates a connection through a SOCKS4 server. + """ + # Check if the destination address provided is an IP address + rmtrslv = False + try: + ipaddr = socket.inet_aton(destaddr) + except socket.error: + # It's a DNS name. Check where it should be resolved. + if self.__proxy[3]: + ipaddr = b"\x00\x00\x00\x01" + rmtrslv = True + else: + ipaddr = socket.inet_aton(socket.gethostbyname(destaddr)) + # Construct the request packet + req = b"\x04\x01" + struct.pack(">H", destport) + ipaddr + # The username parameter is considered userid for SOCKS4 + if self.__proxy[4] is not None: + req = req + self.__proxy[4].encode() + req += b"\x00" + # DNS name if remote resolving is required + # NOTE: This is actually an extension to the SOCKS4 protocol + # called SOCKS4A and may not be supported in all cases. + if rmtrslv: + req = req + destaddr + b"\x00" + self.sendall(req) + # Get the response from the server + resp = self.__recvall(8) + if resp[0:1] != b"\x00": + # Bad data + self.close() + raise GeneralProxyError((1, _generalerrors[1])) + if resp[1:2] != b"\x5A": + # Server returned an error + self.close() + if ord(resp[1:2]) in (91, 92, 93): + self.close() + raise Socks4Error((ord(resp[1]), _socks4errors[ord(resp[1:2]) - + 90])) + else: + raise Socks4Error((94, _socks4errors[4])) + # Get the bound address/port + self.__proxysockname = (socket.inet_ntoa(resp[4:]), struct.unpack( + ">H", resp[2:4])[0]) + if rmtrslv is not None: + self.__proxypeername = (socket.inet_ntoa(ipaddr), destport) + else: + self.__proxypeername = (destaddr, destport) + + def __negotiatehttp(self, destaddr, destport): + """__negotiatehttp(self,destaddr,destport) + Negotiates a connection through an HTTP server. + """ + # If we need to resolve locally, we do this now + if not self.__proxy[3]: + addr = socket.gethostbyname(destaddr) + else: + addr = destaddr + self.sendall("CONNECT " + addr + ":" + str(destport) + " HTTP/1.1\r\n" + + "Host: " + destaddr + "\r\n\r\n") + # We read the response until we get the string "\r\n\r\n" + resp = self.recv(1) + while resp.find(b"\r\n\r\n") == -1: + resp = resp + self.recv(1) + # We just need the first line to check if the connection + # was successful + statusline = resp.splitlines()[0].split(b" ", 2) + if statusline[0] not in ("HTTP/1.0", "HTTP/1.1"): + self.close() + raise GeneralProxyError((1, _generalerrors[1])) + try: + statuscode = int(statusline[1]) + except ValueError: + self.close() + raise GeneralProxyError((1, _generalerrors[1])) + if statuscode != 200: + self.close() + raise HTTPError((statuscode, statusline[2])) + self.__proxysockname = ("0.0.0.0", 0) + self.__proxypeername = (addr, destport) + + def connect(self, destpair): + """connect(self,despair) + Connects to the specified destination through a proxy. + destpar - A tuple of the IP/DNS address and the port number. + (identical to socket's connect). + To select the proxy server use setproxy(). + """ + # Do a minimal input check first + if (type(destpair) in + (list, tuple) == False) or (len(destpair) < 2) or ( + type(destpair[0]) != str) or (type(destpair[1]) != int): + raise GeneralProxyError((5, _generalerrors[5])) + if self.__proxy[0] == PROXY_TYPE_SOCKS5: + if self.__proxy[2] is not None: + portnum = self.__proxy[2] + else: + portnum = 1080 + _orgsocket.connect(self, (self.__proxy[1], portnum)) + self.__negotiatesocks5(destpair[0], destpair[1]) + elif self.__proxy[0] == PROXY_TYPE_SOCKS4: + if self.__proxy[2] is not None: + portnum = self.__proxy[2] + else: + portnum = 1080 + _orgsocket.connect(self, (self.__proxy[1], portnum)) + self.__negotiatesocks4(destpair[0], destpair[1]) + elif self.__proxy[0] == PROXY_TYPE_HTTP: + if self.__proxy[2] is not None: + portnum = self.__proxy[2] + else: + portnum = 8080 + _orgsocket.connect(self, (self.__proxy[1], portnum)) + self.__negotiatehttp(destpair[0], destpair[1]) + elif self.__proxy[0] is None: + _orgsocket.connect(self, (destpair[0], destpair[1])) + else: + raise GeneralProxyError((4, _generalerrors[4]))