From fa82989a2ec69700831b5102063db6db469dfbf9 Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Wed, 23 Feb 2022 17:40:31 +0100 Subject: [PATCH] Preliminary I2P Interface support --- RNS/Interfaces/I2PInterface.py | 495 +++++++++++++++++++++++++++++++ RNS/Reticulum.py | 19 ++ RNS/vendor/i2plib/__init__.py | 25 ++ RNS/vendor/i2plib/__version__.py | 8 + RNS/vendor/i2plib/aiosam.py | 258 ++++++++++++++++ RNS/vendor/i2plib/exceptions.py | 44 +++ RNS/vendor/i2plib/log.py | 5 + RNS/vendor/i2plib/sam.py | 147 +++++++++ RNS/vendor/i2plib/tunnel.py | 203 +++++++++++++ RNS/vendor/i2plib/utils.py | 42 +++ setup.py | 2 +- 11 files changed, 1247 insertions(+), 1 deletion(-) create mode 100644 RNS/Interfaces/I2PInterface.py create mode 100644 RNS/vendor/i2plib/__init__.py create mode 100644 RNS/vendor/i2plib/__version__.py create mode 100644 RNS/vendor/i2plib/aiosam.py create mode 100644 RNS/vendor/i2plib/exceptions.py create mode 100644 RNS/vendor/i2plib/log.py create mode 100644 RNS/vendor/i2plib/sam.py create mode 100644 RNS/vendor/i2plib/tunnel.py create mode 100644 RNS/vendor/i2plib/utils.py diff --git a/RNS/Interfaces/I2PInterface.py b/RNS/Interfaces/I2PInterface.py new file mode 100644 index 0000000..97094a5 --- /dev/null +++ b/RNS/Interfaces/I2PInterface.py @@ -0,0 +1,495 @@ +from .Interface import Interface +import socketserver +import threading +import platform +import socket +import time +import sys +import os +import RNS +import asyncio + +class HDLC(): + FLAG = 0x7E + ESC = 0x7D + ESC_MASK = 0x20 + + @staticmethod + def escape(data): + data = data.replace(bytes([HDLC.ESC]), bytes([HDLC.ESC, HDLC.ESC^HDLC.ESC_MASK])) + data = data.replace(bytes([HDLC.FLAG]), bytes([HDLC.ESC, HDLC.FLAG^HDLC.ESC_MASK])) + return data + +class KISS(): + FEND = 0xC0 + FESC = 0xDB + TFEND = 0xDC + TFESC = 0xDD + CMD_DATA = 0x00 + CMD_UNKNOWN = 0xFE + + @staticmethod + def escape(data): + data = data.replace(bytes([0xdb]), bytes([0xdb, 0xdd])) + data = data.replace(bytes([0xc0]), bytes([0xdb, 0xdc])) + return data + +class ThreadingI2PServer(socketserver.ThreadingMixIn, socketserver.TCPServer): + pass + +class I2PInterfacePeer(Interface): + RECONNECT_WAIT = 5 + RECONNECT_MAX_TRIES = None + + # TCP socket options + TCP_USER_TIMEOUT = 20 + TCP_PROBE_AFTER = 5 + TCP_PROBE_INTERVAL = 3 + TCP_PROBES = 5 + + I2P_USER_TIMEOUT = 40 + I2P_PROBE_AFTER = 10 + I2P_PROBE_INTERVAL = 5 + I2P_PROBES = 6 + + def __init__(self, owner, name, target_i2p_dest=None, connected_socket=None, max_reconnect_tries=None): + self.rxb = 0 + self.txb = 0 + + self.IN = True + self.OUT = False + self.socket = None + self.parent_interface = None + self.name = name + self.initiator = False + self.reconnecting = False + self.never_connected = True + self.owner = owner + self.writing = False + self.online = False + self.detached = False + self.kiss_framing = False + self.i2p_tunneled = True + self.i2p_dest = None + + i2plib = I2PInterfacePeer.i2plib + i2plib.utils = I2PInterfacePeer.utils + + if max_reconnect_tries == None: + self.max_reconnect_tries = I2PInterfacePeer.RECONNECT_MAX_TRIES + else: + self.max_reconnect_tries = max_reconnect_tries + + if connected_socket != None: + self.receives = True + self.target_ip = None + self.target_port = None + self.socket = connected_socket + + if platform.system() == "Linux": + self.set_timeouts_linux() + elif platform.system() == "Darwin": + self.set_timeouts_osx() + + elif target_i2p_dest != None: + self.receives = True + self.initiator = True + + self.sam_address = i2plib.get_sam_address() + self.aio_loop = I2PInterfacePeer.aio_loop + + self.bind_ip = "127.0.0.1" + self.bind_port = i2plib.utils.get_free_port() + + self.i2p_dest = i2plib.Destination(data=target_i2p_dest) + + RNS.log("Bringing up I2P tunnel to "+str(self)+", this may take a while...", RNS.LOG_INFO) + try: + tunnel = i2plib.ClientTunnel(self.i2p_dest, (self.bind_ip, self.bind_port), sam_address=self.sam_address) + self.aio_loop.run_until_complete(tunnel.run()) + except Exception as e: + raise e + raise IOError("Could not connect to I2P SAM API while configuring "+str(self)+". Check that I2P is running and SAM is enabled.") + + RNS.log(str(self)+ " tunnel setup complete", RNS.LOG_VERBOSE) + + self.target_ip = self.bind_ip + self.target_port = self.bind_port + + if not self.connect(initial=True): + # TODO: Remove + RNS.log("Initial TCP attempt failed, trying reconnects...") + thread = threading.Thread(target=self.reconnect) + thread.setDaemon(True) + thread.start() + else: + # TODO: Remove + RNS.log("Initial TCP attempt OK, entering read loop") + thread = threading.Thread(target=self.read_loop) + thread.setDaemon(True) + thread.start() + if not self.kiss_framing: + self.wants_tunnel = True + + + def set_timeouts_linux(self): + if not self.i2p_tunneled: + self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_USER_TIMEOUT, int(I2PInterfacePeer.TCP_USER_TIMEOUT * 1000)) + self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, int(I2PInterfacePeer.TCP_PROBE_AFTER)) + self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, int(I2PInterfacePeer.TCP_PROBE_INTERVAL)) + self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, int(I2PInterfacePeer.TCP_PROBES)) + else: + self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_USER_TIMEOUT, int(I2PInterfacePeer.I2P_USER_TIMEOUT * 1000)) + self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, int(I2PInterfacePeer.I2P_PROBE_AFTER)) + self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, int(I2PInterfacePeer.I2P_PROBE_INTERVAL)) + self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, int(I2PInterfacePeer.I2P_PROBES)) + + def set_timeouts_osx(self): + if hasattr(socket, "TCP_KEEPALIVE"): + TCP_KEEPIDLE = socket.TCP_KEEPALIVE + else: + TCP_KEEPIDLE = 0x10 + + self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + + if not self.i2p_tunneled: + self.socket.setsockopt(socket.IPPROTO_TCP, TCP_KEEPIDLE, int(I2PInterfacePeer.TCP_PROBE_AFTER)) + else: + self.socket.setsockopt(socket.IPPROTO_TCP, TCP_KEEPIDLE, int(I2PInterfacePeer.I2P_PROBE_AFTER)) + + def detach(self): + if self.socket != None: + if hasattr(self.socket, "close"): + if callable(self.socket.close): + RNS.log("Detaching "+str(self), RNS.LOG_DEBUG) + self.detached = True + + try: + self.socket.shutdown(socket.SHUT_RDWR) + except Exception as e: + RNS.log("Error while shutting down socket for "+str(self)+": "+str(e)) + + try: + self.socket.close() + except Exception as e: + RNS.log("Error while closing socket for "+str(self)+": "+str(e)) + + self.socket = None + + def connect(self, initial=False): + try: + self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.socket.connect((self.target_ip, self.target_port)) + self.online = True + + except Exception as e: + if initial: + RNS.log("Initial connection for "+str(self)+" could not be established: "+str(e), RNS.LOG_ERROR) + RNS.log("Leaving unconnected and retrying connection in "+str(I2PInterfacePeer.RECONNECT_WAIT)+" seconds.", RNS.LOG_ERROR) + return False + + else: + raise e + + if platform.system() == "Linux": + self.set_timeouts_linux() + elif platform.system() == "Darwin": + self.set_timeouts_osx() + + self.online = True + self.writing = False + self.never_connected = False + + return True + + + def reconnect(self): + if self.initiator: + if not self.reconnecting: + self.reconnecting = True + attempts = 0 + while not self.online: + time.sleep(I2PInterfacePeer.RECONNECT_WAIT) + attempts += 1 + + if self.max_reconnect_tries != None and attempts > self.max_reconnect_tries: + RNS.log("Max reconnection attempts reached for "+str(self), RNS.LOG_ERROR) + self.teardown() + break + + try: + self.connect() + + except Exception as e: + RNS.log("Connection attempt for "+str(self)+" failed: "+str(e), RNS.LOG_DEBUG) + + if not self.never_connected: + RNS.log("Reconnected TCP socket for "+str(self)+".", RNS.LOG_INFO) + + self.reconnecting = False + thread = threading.Thread(target=self.read_loop) + thread.setDaemon(True) + thread.start() + if not self.kiss_framing: + RNS.Transport.synthesize_tunnel(self) + + else: + RNS.log("Attempt to reconnect on a non-initiator TCP interface. This should not happen.", RNS.LOG_ERROR) + raise IOError("Attempt to reconnect on a non-initiator TCP interface") + + def processIncoming(self, data): + self.rxb += len(data) + if hasattr(self, "parent_interface") and self.parent_interface != None: + self.parent_interface.rxb += len(data) + + self.owner.inbound(data, self) + + def processOutgoing(self, data): + if self.online: + while self.writing: + time.sleep(0.01) + + try: + self.writing = True + + if self.kiss_framing: + data = bytes([KISS.FEND])+bytes([KISS.CMD_DATA])+KISS.escape(data)+bytes([KISS.FEND]) + else: + data = bytes([HDLC.FLAG])+HDLC.escape(data)+bytes([HDLC.FLAG]) + + self.socket.sendall(data) + self.writing = False + self.txb += len(data) + if hasattr(self, "parent_interface") and self.parent_interface != None: + self.parent_interface.txb += len(data) + + except Exception as e: + RNS.log("Exception occurred while transmitting via "+str(self)+", tearing down interface", RNS.LOG_ERROR) + RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) + self.teardown() + + + def read_loop(self): + try: + in_frame = False + escape = False + data_buffer = b"" + command = KISS.CMD_UNKNOWN + + while True: + data_in = self.socket.recv(4096) + if len(data_in) > 0: + pointer = 0 + while pointer < len(data_in): + byte = data_in[pointer] + pointer += 1 + + if self.kiss_framing: + # Read loop for KISS framing + if (in_frame and byte == KISS.FEND and command == KISS.CMD_DATA): + in_frame = False + self.processIncoming(data_buffer) + elif (byte == KISS.FEND): + in_frame = True + command = KISS.CMD_UNKNOWN + data_buffer = b"" + elif (in_frame and len(data_buffer) < RNS.Reticulum.MTU): + if (len(data_buffer) == 0 and command == KISS.CMD_UNKNOWN): + # We only support one HDLC port for now, so + # strip off the port nibble + byte = byte & 0x0F + command = byte + elif (command == KISS.CMD_DATA): + if (byte == KISS.FESC): + escape = True + else: + if (escape): + if (byte == KISS.TFEND): + byte = KISS.FEND + if (byte == KISS.TFESC): + byte = KISS.FESC + escape = False + data_buffer = data_buffer+bytes([byte]) + + else: + # Read loop for HDLC framing + if (in_frame and byte == HDLC.FLAG): + in_frame = False + self.processIncoming(data_buffer) + elif (byte == HDLC.FLAG): + in_frame = True + data_buffer = b"" + elif (in_frame and len(data_buffer) < RNS.Reticulum.MTU): + if (byte == HDLC.ESC): + escape = True + else: + if (escape): + if (byte == HDLC.FLAG ^ HDLC.ESC_MASK): + byte = HDLC.FLAG + if (byte == HDLC.ESC ^ HDLC.ESC_MASK): + byte = HDLC.ESC + escape = False + data_buffer = data_buffer+bytes([byte]) + else: + self.online = False + if self.initiator and not self.detached: + RNS.log("TCP socket for "+str(self)+" was closed, attempting to reconnect...", RNS.LOG_WARNING) + self.reconnect() + else: + RNS.log("TCP socket for remote client "+str(self)+" was closed.", RNS.LOG_VERBOSE) + self.teardown() + + break + + + except Exception as e: + self.online = False + RNS.log("An interface error occurred for "+str(self)+", the contained exception was: "+str(e), RNS.LOG_WARNING) + + if self.initiator: + RNS.log("Attempting to reconnect...", RNS.LOG_WARNING) + self.reconnect() + else: + self.teardown() + + def teardown(self): + if self.initiator and not self.detached: + RNS.log("The interface "+str(self)+" experienced an unrecoverable error and is being torn down. Restart Reticulum to attempt to open this interface again.", RNS.LOG_ERROR) + if RNS.Reticulum.panic_on_interface_error: + RNS.panic() + + else: + RNS.log("The interface "+str(self)+" is being torn down.", RNS.LOG_VERBOSE) + + self.online = False + self.OUT = False + self.IN = False + + if hasattr(self, "parent_interface") and self.parent_interface != None: + self.parent_interface.clients -= 1 + + if self in RNS.Transport.interfaces: + if not self.initiator: + RNS.Transport.interfaces.remove(self) + + + def __str__(self): + return "I2PInterfacePeer["+str(self.name)+"]" + + +class I2PInterface(Interface): + + def __init__(self, owner, name, rns_storagepath, peers): + import RNS.vendor.i2plib as i2plib + import RNS.vendor.i2plib.utils + + self.rxb = 0 + self.txb = 0 + self.online = False + self.clients = 0 + self.storagepath = rns_storagepath+"/i2p" + + self.IN = True + self.OUT = False + self.name = name + self.b32a = None + + self.i2p_tunneled = True + + self.receives = True + self.bind_ip = "127.0.0.1" + self.sam_address = i2plib.get_sam_address() + self.bind_port = i2plib.utils.get_free_port() + self.aio_loop = asyncio.get_event_loop() + + I2PInterfacePeer.i2plib = i2plib + I2PInterfacePeer.utils = RNS.vendor.i2plib.utils + I2PInterfacePeer.aio_loop = self.aio_loop + I2PInterfacePeer.sam_address = self.sam_address + + if not os.path.isdir(self.storagepath): + os.makedirs(self.storagepath) + + self.i2p_dest = None + self.i2p_b32 = None + self.i2p_dest_hash = RNS.Identity.full_hash(RNS.Identity.full_hash(self.name.encode("utf-8"))) + self.i2p_keyfile = self.storagepath+"/"+RNS.hexrep(self.i2p_dest_hash, delimit=False)+".i2p" + + if not os.path.isfile(self.i2p_keyfile): + self.i2p_dest = self.aio_loop.run_until_complete(i2plib.new_destination(sam_address=self.sam_address, loop=self.aio_loop)) + key_file = open(self.i2p_keyfile, "w") + key_file.write(self.i2p_dest.private_key.base64) + key_file.close() + + else: + key_file = open(self.i2p_keyfile, "r") + prvd = key_file.read() + key_file.close() + self.i2p_dest = i2plib.Destination(data=prvd, has_private_key=True) + + self.i2p_b32 = self.i2p_dest.base32 + + def handlerFactory(callback): + def createHandler(*args, **keys): + return I2PInterfaceHandler(callback, *args, **keys) + return createHandler + + self.owner = owner + address = (self.bind_ip, self.bind_port) + + ThreadingI2PServer.allow_reuse_address = True + self.server = ThreadingI2PServer(address, handlerFactory(self.incoming_connection)) + + thread = threading.Thread(target=self.server.serve_forever) + thread.setDaemon(True) + thread.start() + + # TODO: Remove + RNS.log("Started TCP server for I2P on "+str(address)+" "+str(self.server)) + + RNS.log("Bringing up I2P tunnel for "+str(self)+", this may take a while...", RNS.LOG_INFO) + try: + tunnel = i2plib.ServerTunnel((self.bind_ip, self.bind_port), loop=self.aio_loop, destination=self.i2p_dest, sam_address=self.sam_address) + self.aio_loop.run_until_complete(tunnel.run()) + except Exception as e: + raise IOError("Could not connect to I2P SAM API while configuring "+str(self)+". Check that I2P is running and SAM is enabled.") + + RNS.log(str(self)+ " tunnel setup complete, instance reachable at: "+str(self.i2p_dest.base32), RNS.LOG_VERBOSE) + + if peers != None: + for peer_addr in peers: + RNS.log("Establishing I2P tunnel to "+str(peer_addr), RNS.LOG_VERBOSE) + interface_name = peer_addr + peer_interface = I2PInterfacePeer(self, interface_name, peer_addr) + + self.online = True + + + def incoming_connection(self, handler): + RNS.log("Accepting incoming I2P connection", RNS.LOG_VERBOSE) + interface_name = "Connected peer on "+self.name + spawned_interface = I2PInterfacePeer(self.owner, interface_name, connected_socket=handler.request) + spawned_interface.OUT = self.OUT + spawned_interface.IN = self.IN + spawned_interface.parent_interface = self + spawned_interface.online = True + RNS.log("Spawned new I2PInterface Peer: "+str(spawned_interface), RNS.LOG_VERBOSE) + RNS.Transport.interfaces.append(spawned_interface) + self.clients += 1 + spawned_interface.read_loop() + + def processOutgoing(self, data): + pass + + def __str__(self): + return "I2PInterface["+self.name+"]" + +class I2PInterfaceHandler(socketserver.BaseRequestHandler): + def __init__(self, callback, *args, **keys): + self.callback = callback + socketserver.BaseRequestHandler.__init__(self, *args, **keys) + + def handle(self): + self.callback(handler=self) diff --git a/RNS/Reticulum.py b/RNS/Reticulum.py index 3f5a95c..89bd471 100755 --- a/RNS/Reticulum.py +++ b/RNS/Reticulum.py @@ -6,6 +6,7 @@ if get_platform() == "android": from .Interfaces import AutoInterface from .Interfaces import TCPInterface from .Interfaces import UDPInterface + from .Interfaces import I2PInterface else: from .Interfaces import * @@ -396,6 +397,24 @@ class Reticulum: RNS.Transport.interfaces.append(interface) + if c["type"] == "I2PInterface": + i2p_peers = c.as_list("peers") if "peers" in c else None + + interface = I2PInterface.I2PInterface( + RNS.Transport, + name, + Reticulum.storagepath, + i2p_peers + ) + + if "outgoing" in c and c.as_bool("outgoing") == True: + interface.OUT = True + else: + interface.OUT = False + + RNS.Transport.interfaces.append(interface) + + if c["type"] == "SerialInterface": port = c["port"] if "port" in c else None speed = int(c["speed"]) if "speed" in c else 9600 diff --git a/RNS/vendor/i2plib/__init__.py b/RNS/vendor/i2plib/__init__.py new file mode 100644 index 0000000..ee6de01 --- /dev/null +++ b/RNS/vendor/i2plib/__init__.py @@ -0,0 +1,25 @@ +""" +A modern asynchronous library for building I2P applications. +""" + +from .__version__ import ( + __title__, __description__, __url__, __version__, + __author__, __author_email__, __license__, __copyright__ +) + +from .sam import Destination, PrivateKey + +from .aiosam import ( + get_sam_socket, dest_lookup, new_destination, + create_session, stream_connect, stream_accept, + Session, StreamConnection, StreamAcceptor +) + +from .tunnel import ClientTunnel, ServerTunnel + +from .utils import get_sam_address + +from .exceptions import ( + CantReachPeer, DuplicatedDest, DuplicatedId, I2PError, + InvalidId, InvalidKey, KeyNotFound, PeerNotFound, Timeout, +) diff --git a/RNS/vendor/i2plib/__version__.py b/RNS/vendor/i2plib/__version__.py new file mode 100644 index 0000000..9c97f87 --- /dev/null +++ b/RNS/vendor/i2plib/__version__.py @@ -0,0 +1,8 @@ +__title__ = 'i2plib' +__description__ = 'A modern asynchronous library for building I2P applications.' +__url__ = 'https://github.com/l-n-s/i2plib' +__version__ = '0.0.14' +__author__ = 'Viktor Villainov' +__author_email__ = 'supervillain@riseup.net' +__license__ = 'MIT' +__copyright__ = 'Copyright 2018 Viktor Villainov' diff --git a/RNS/vendor/i2plib/aiosam.py b/RNS/vendor/i2plib/aiosam.py new file mode 100644 index 0000000..78277db --- /dev/null +++ b/RNS/vendor/i2plib/aiosam.py @@ -0,0 +1,258 @@ +import asyncio + +from . import sam +from . import exceptions +from . import utils +from .log import logger + +def parse_reply(data): + if not data: + raise ConnectionAbortedError("Empty response: SAM API went offline") + + try: + msg = sam.Message(data.decode().strip()) + logger.debug("SAM reply: "+str(msg)) + except: + raise ConnectionAbortedError("Invalid SAM response") + + return msg + + +async def get_sam_socket(sam_address=sam.DEFAULT_ADDRESS, loop=None): + """A couroutine used to create a new SAM socket. + + :param sam_address: (optional) SAM API address + :param loop: (optional) event loop instance + :return: A (reader, writer) pair + """ + reader, writer = await asyncio.open_connection(*sam_address, loop=loop) + writer.write(sam.hello("3.1", "3.1")) + reply = parse_reply(await reader.readline()) + if reply.ok: + return (reader, writer) + else: + writer.close() + raise exceptions.SAM_EXCEPTIONS[reply["RESULT"]]() + +async def dest_lookup(domain, sam_address=sam.DEFAULT_ADDRESS, + loop=None): + """A coroutine used to lookup a full I2P destination by .i2p domain or + .b32.i2p address. + + :param domain: Address to be resolved, can be a .i2p domain or a .b32.i2p + address. + :param sam_address: (optional) SAM API address + :param loop: (optional) Event loop instance + :return: An instance of :class:`Destination` + """ + reader, writer = await get_sam_socket(sam_address, loop) + writer.write(sam.naming_lookup(domain)) + reply = parse_reply(await reader.readline()) + writer.close() + if reply.ok: + return sam.Destination(reply["VALUE"]) + else: + raise exceptions.SAM_EXCEPTIONS[reply["RESULT"]]() + +async def new_destination(sam_address=sam.DEFAULT_ADDRESS, loop=None, + sig_type=sam.Destination.default_sig_type): + """A coroutine used to generate a new destination with a private key of a + chosen signature type. + + :param sam_address: (optional) SAM API address + :param loop: (optional) Event loop instance + :param sig_type: (optional) Signature type + :return: An instance of :class:`Destination` + """ + reader, writer = await get_sam_socket(sam_address, loop) + writer.write(sam.dest_generate(sig_type)) + reply = parse_reply(await reader.readline()) + writer.close() + return sam.Destination(reply["PRIV"], has_private_key=True) + +async def create_session(session_name, sam_address=sam.DEFAULT_ADDRESS, + loop=None, style="STREAM", + signature_type=sam.Destination.default_sig_type, + destination=None, options={}): + """A coroutine used to create a new SAM session. + + :param session_name: Session nick name + :param sam_address: (optional) SAM API address + :param loop: (optional) Event loop instance + :param style: (optional) Session style, can be STREAM, DATAGRAM, RAW + :param signature_type: (optional) If the destination is TRANSIENT, this + signature type is used + :param destination: (optional) Destination to use in this session. Can be + a base64 encoded string, :class:`Destination` + instance or None. TRANSIENT destination is used when it + is None. + :param options: (optional) A dict object with i2cp options + :return: A (reader, writer) pair + """ + logger.debug("Creating session {}".format(session_name)) + if destination: + if type(destination) == sam.Destination: + destination = destination + else: + destination = sam.Destination( + destination, has_private_key=True) + + dest_string = destination.private_key.base64 + else: + dest_string = sam.TRANSIENT_DESTINATION + + options = " ".join(["{}={}".format(k, v) for k, v in options.items()]) + + reader, writer = await get_sam_socket(sam_address, loop) + writer.write(sam.session_create( + style, session_name, dest_string, options)) + + reply = parse_reply(await reader.readline()) + if reply.ok: + if not destination: + destination = sam.Destination( + reply["DESTINATION"], has_private_key=True) + logger.debug(destination.base32) + logger.debug("Session created {}".format(session_name)) + return (reader, writer) + else: + writer.close() + raise exceptions.SAM_EXCEPTIONS[reply["RESULT"]]() + +async def stream_connect(session_name, destination, + sam_address=sam.DEFAULT_ADDRESS, loop=None): + """A coroutine used to connect to a remote I2P destination. + + :param session_name: Session nick name + :param destination: I2P destination to connect to + :param sam_address: (optional) SAM API address + :param loop: (optional) Event loop instance + :return: A (reader, writer) pair + """ + logger.debug("Connecting stream {}".format(session_name)) + if isinstance(destination, str) and not destination.endswith(".i2p"): + destination = sam.Destination(destination) + elif isinstance(destination, str): + destination = await dest_lookup(destination, sam_address, loop) + + reader, writer = await get_sam_socket(sam_address, loop) + writer.write(sam.stream_connect(session_name, destination.base64, + silent="false")) + reply = parse_reply(await reader.readline()) + if reply.ok: + logger.debug("Stream connected {}".format(session_name)) + return (reader, writer) + else: + writer.close() + raise exceptions.SAM_EXCEPTIONS[reply["RESULT"]]() + +async def stream_accept(session_name, sam_address=sam.DEFAULT_ADDRESS, + loop=None): + """A coroutine used to accept a connection from the I2P network. + + :param session_name: Session nick name + :param sam_address: (optional) SAM API address + :param loop: (optional) Event loop instance + :return: A (reader, writer) pair + """ + reader, writer = await get_sam_socket(sam_address, loop) + writer.write(sam.stream_accept(session_name, silent="false")) + reply = parse_reply(await reader.readline()) + if reply.ok: + return (reader, writer) + else: + writer.close() + raise exceptions.SAM_EXCEPTIONS[reply["RESULT"]]() + +### Context managers + +class Session: + """Async SAM session context manager. + + :param session_name: Session nick name + :param sam_address: (optional) SAM API address + :param loop: (optional) Event loop instance + :param style: (optional) Session style, can be STREAM, DATAGRAM, RAW + :param signature_type: (optional) If the destination is TRANSIENT, this + signature type is used + :param destination: (optional) Destination to use in this session. Can be + a base64 encoded string, :class:`Destination` + instance or None. TRANSIENT destination is used when it + is None. + :param options: (optional) A dict object with i2cp options + :return: :class:`Session` object + """ + def __init__(self, session_name, sam_address=sam.DEFAULT_ADDRESS, + loop=None, style="STREAM", + signature_type=sam.Destination.default_sig_type, + destination=None, options={}): + self.session_name = session_name + self.sam_address = sam_address + self.loop = loop + self.style = style + self.signature_type = signature_type + self.destination = destination + self.options = options + + async def __aenter__(self): + self.reader, self.writer = await create_session(self.session_name, + sam_address=self.sam_address, loop=self.loop, style=self.style, + signature_type=self.signature_type, + destination=self.destination, options=self.options) + return self + + async def __aexit__(self, exc_type, exc, tb): + ### TODO handle exceptions + self.writer.close() + +class StreamConnection: + """Async stream connection context manager. + + :param session_name: Session nick name + :param destination: I2P destination to connect to + :param sam_address: (optional) SAM API address + :param loop: (optional) Event loop instance + :return: :class:`StreamConnection` object + """ + def __init__(self, session_name, destination, + sam_address=sam.DEFAULT_ADDRESS, loop=None): + self.session_name = session_name + self.sam_address = sam_address + self.loop = loop + self.destination = destination + + async def __aenter__(self): + self.reader, self.writer = await stream_connect(self.session_name, + self.destination, sam_address=self.sam_address, loop=self.loop) + self.read = self.reader.read + self.write = self.writer.write + return self + + async def __aexit__(self, exc_type, exc, tb): + ### TODO handle exceptions + self.writer.close() + +class StreamAcceptor: + """Async stream acceptor context manager. + + :param session_name: Session nick name + :param sam_address: (optional) SAM API address + :param loop: (optional) Event loop instance + :return: :class:`StreamAcceptor` object + """ + def __init__(self, session_name, sam_address=sam.DEFAULT_ADDRESS, + loop=None): + self.session_name = session_name + self.sam_address = sam_address + self.loop = loop + + async def __aenter__(self): + self.reader, self.writer = await stream_accept(self.session_name, + sam_address=self.sam_address, loop=self.loop) + self.read = self.reader.read + self.write = self.writer.write + return self + + async def __aexit__(self, exc_type, exc, tb): + ### TODO handle exceptions + self.writer.close() diff --git a/RNS/vendor/i2plib/exceptions.py b/RNS/vendor/i2plib/exceptions.py new file mode 100644 index 0000000..d693c84 --- /dev/null +++ b/RNS/vendor/i2plib/exceptions.py @@ -0,0 +1,44 @@ +# SAM exceptions + +class SAMException(IOError): + """Base class for SAM exceptions""" + +class CantReachPeer(SAMException): + """The peer exists, but cannot be reached""" + +class DuplicatedDest(SAMException): + """The specified Destination is already in use""" + +class DuplicatedId(SAMException): + """The nickname is already associated with a session""" + +class I2PError(SAMException): + """A generic I2P error""" + +class InvalidId(SAMException): + """STREAM SESSION ID doesn't exist""" + +class InvalidKey(SAMException): + """The specified key is not valid (bad format, etc.)""" + +class KeyNotFound(SAMException): + """The naming system can't resolve the given name""" + +class PeerNotFound(SAMException): + """The peer cannot be found on the network""" + +class Timeout(SAMException): + """The peer cannot be found on the network""" + +SAM_EXCEPTIONS = { + "CANT_REACH_PEER": CantReachPeer, + "DUPLICATED_DEST": DuplicatedDest, + "DUPLICATED_ID": DuplicatedId, + "I2P_ERROR": I2PError, + "INVALID_ID": InvalidId, + "INVALID_KEY": InvalidKey, + "KEY_NOT_FOUND": KeyNotFound, + "PEER_NOT_FOUND": PeerNotFound, + "TIMEOUT": Timeout, +} + diff --git a/RNS/vendor/i2plib/log.py b/RNS/vendor/i2plib/log.py new file mode 100644 index 0000000..d632698 --- /dev/null +++ b/RNS/vendor/i2plib/log.py @@ -0,0 +1,5 @@ +"""Logging configuration.""" +import logging + +# Name the logger after the package. +logger = logging.getLogger(__package__) diff --git a/RNS/vendor/i2plib/sam.py b/RNS/vendor/i2plib/sam.py new file mode 100644 index 0000000..1bad75d --- /dev/null +++ b/RNS/vendor/i2plib/sam.py @@ -0,0 +1,147 @@ +from base64 import b64decode, b64encode, b32encode +from hashlib import sha256 +import struct +import re + + +I2P_B64_CHARS = "-~" + +def i2p_b64encode(x): + """Encode I2P destination""" + return b64encode(x, altchars=I2P_B64_CHARS.encode()).decode() + +def i2p_b64decode(x): + """Decode I2P destination""" + return b64decode(x, altchars=I2P_B64_CHARS, validate=True) + +SAM_BUFSIZE = 4096 +DEFAULT_ADDRESS = ("127.0.0.1", 7656) +DEFAULT_MIN_VER = "3.1" +DEFAULT_MAX_VER = "3.1" +TRANSIENT_DESTINATION = "TRANSIENT" + +VALID_BASE32_ADDRESS = re.compile(r"^([a-zA-Z0-9]{52}).b32.i2p$") +VALID_BASE64_ADDRESS = re.compile(r"^([a-zA-Z0-9-~=]{516,528})$") + +class Message(object): + """Parse SAM message to an object""" + def __init__(self, s): + self.opts = {} + if type(s) != str: + self._reply_string = s.decode().strip() + else: + self._reply_string = s + + self.cmd, self.action, opts = self._reply_string.split(" ", 2) + for v in opts.split(" "): + data = v.split("=", 1) if "=" in v else (v, True) + self.opts[data[0]] = data[1] + + def __getitem__(self, key): + return self.opts[key] + + @property + def ok(self): + return self["RESULT"] == "OK" + + def __repr__(self): + return self._reply_string + + +# SAM request messages + +def hello(min_version, max_version): + return "HELLO VERSION MIN={} MAX={}\n".format(min_version, + max_version).encode() + +def session_create(style, session_id, destination, options=""): + return "SESSION CREATE STYLE={} ID={} DESTINATION={} {}\n".format( + style, session_id, destination, options).encode() + + +def stream_connect(session_id, destination, silent="false"): + return "STREAM CONNECT ID={} DESTINATION={} SILENT={}\n".format( + session_id, destination, silent).encode() + +def stream_accept(session_id, silent="false"): + return "STREAM ACCEPT ID={} SILENT={}\n".format(session_id, silent).encode() + +def stream_forward(session_id, port, options=""): + return "STREAM FORWARD ID={} PORT={} {}\n".format( + session_id, port, options).encode() + + + +def naming_lookup(name): + return "NAMING LOOKUP NAME={}\n".format(name).encode() + +def dest_generate(signature_type): + return "DEST GENERATE SIGNATURE_TYPE={}\n".format(signature_type).encode() + +class Destination(object): + """I2P destination + + https://geti2p.net/spec/common-structures#destination + + :param data: (optional) Base64 encoded data or binary data + :param path: (optional) A path to a file with binary data + :param has_private_key: (optional) Does data have a private key? + """ + + ECDSA_SHA256_P256 = 1 + ECDSA_SHA384_P384 = 2 + ECDSA_SHA512_P521 = 3 + EdDSA_SHA512_Ed25519 = 7 + + default_sig_type = EdDSA_SHA512_Ed25519 + + _pubkey_size = 256 + _signkey_size = 128 + _min_cert_size = 3 + + def __init__(self, data=None, path=None, has_private_key=False): + #: Binary destination + self.data = bytes() + #: Base64 encoded destination + self.base64 = "" + #: :class:`RNS.vendor.i2plib.PrivateKey` instance or None + self.private_key = None + + if path: + with open(path, "rb") as f: data = f.read() + + if data and has_private_key: + self.private_key = PrivateKey(data) + + cert_len = struct.unpack("!H", self.private_key.data[385:387])[0] + data = self.private_key.data[:387+cert_len] + + if not data: + raise Exception("Can't create a destination with no data") + + self.data = data if type(data) == bytes else i2p_b64decode(data) + self.base64 = data if type(data) == str else i2p_b64encode(data) + + def __repr__(self): + return "".format(self.base32) + + @property + def base32(self): + """Base32 destination hash of this destination""" + desthash = sha256(self.data).digest() + return b32encode(desthash).decode()[:52].lower() + +class PrivateKey(object): + """I2P private key + + https://geti2p.net/spec/common-structures#keysandcert + + :param data: Base64 encoded data or binary data + """ + + def __init__(self, data): + #: Binary private key + self.data = data if type(data) == bytes else i2p_b64decode(data) + #: Base64 encoded private key + self.base64 = data if type(data) == str else i2p_b64encode(data) + diff --git a/RNS/vendor/i2plib/tunnel.py b/RNS/vendor/i2plib/tunnel.py new file mode 100644 index 0000000..30d5d9a --- /dev/null +++ b/RNS/vendor/i2plib/tunnel.py @@ -0,0 +1,203 @@ +import logging +import asyncio +import argparse + +from . import sam +from . import aiosam +from . import utils +from .log import logger + +BUFFER_SIZE = 65536 + +async def proxy_data(reader, writer): + """Proxy data from reader to writer""" + try: + while True: + data = await reader.read(BUFFER_SIZE) + if not data: + break + writer.write(data) + except Exception as e: + logger.debug('proxy_data_task exception {}'.format(e)) + finally: + try: + writer.close() + except RuntimeError: + pass + logger.debug('close connection') + +class I2PTunnel(object): + """Base I2P Tunnel object, not to be used directly + + :param local_address: A local address to use for a tunnel. + E.g. ("127.0.0.1", 6668) + :param destination: (optional) Destination to use for this tunnel. Can be + a base64 encoded string, :class:`Destination` + instance or None. A new destination is created when it + is None. + :param session_name: (optional) Session nick name. A new session nickname is + generated if not specified. + :param options: (optional) A dict object with i2cp options + :param loop: (optional) Event loop instance + :param sam_address: (optional) SAM API address + """ + + def __init__(self, local_address, destination=None, session_name=None, + options={}, loop=None, sam_address=sam.DEFAULT_ADDRESS): + self.local_address = local_address + self.destination = destination + self.session_name = session_name or utils.generate_session_id() + self.options = options + self.loop = loop + self.sam_address = sam_address + + async def _pre_run(self): + if not self.destination: + self.destination = await aiosam.new_destination( + sam_address=self.sam_address, loop=self.loop) + _, self.session_writer = await aiosam.create_session( + self.session_name, style=self.style, options=self.options, + sam_address=self.sam_address, + loop=self.loop, destination=self.destination) + + def stop(self): + """Stop the tunnel""" + self.session_writer.close() + +class ClientTunnel(I2PTunnel): + """Client tunnel, a subclass of tunnel.I2PTunnel + + If you run a client tunnel with a local address ("127.0.0.1", 6668) and + a remote destination "irc.echelon.i2p", all connections to 127.0.0.1:6668 + will be proxied to irc.echelon.i2p. + + :param remote_destination: Remote I2P destination, can be either .i2p + domain, .b32.i2p address, base64 destination or + :class:`Destination` instance + """ + + def __init__(self, remote_destination, *args, **kwargs): + super().__init__(*args, **kwargs) + self.style = "STREAM" + self.remote_destination = remote_destination + + async def run(self): + """A coroutine used to run the tunnel""" + await self._pre_run() + + async def handle_client(client_reader, client_writer): + """Handle local client connection""" + remote_reader, remote_writer = await aiosam.stream_connect( + self.session_name, self.remote_destination, + sam_address=self.sam_address, loop=self.loop) + asyncio.ensure_future(proxy_data(remote_reader, client_writer), + loop=self.loop) + asyncio.ensure_future(proxy_data(client_reader, remote_writer), + loop=self.loop) + + self.server = await asyncio.start_server(handle_client, *self.local_address, + loop=self.loop) + + def stop(self): + super().stop() + self.server.close() + +class ServerTunnel(I2PTunnel): + """Server tunnel, a subclass of tunnel.I2PTunnel + + If you want to expose a local service 127.0.0.1:80 to the I2P network, run + a server tunnel with a local address ("127.0.0.1", 80). If you don't + provide a private key or a session name, it will use a TRANSIENT + destination. + """ + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.style = "STREAM" + + async def run(self): + """A coroutine used to run the tunnel""" + await self._pre_run() + + async def handle_client(incoming, client_reader, client_writer): + # data and dest may come in one chunk + dest, data = incoming.split(b"\n", 1) + remote_destination = sam.Destination(dest.decode()) + logger.debug("{} client connected: {}.b32.i2p".format( + self.session_name, remote_destination.base32)) + + try: + remote_reader, remote_writer = await asyncio.wait_for( + asyncio.open_connection( + host=self.local_address[0], + port=self.local_address[1], loop=self.loop), + timeout=5, loop=self.loop) + if data: remote_writer.write(data) + asyncio.ensure_future(proxy_data(remote_reader, client_writer), + loop=self.loop) + asyncio.ensure_future(proxy_data(client_reader, remote_writer), + loop=self.loop) + except ConnectionRefusedError: + client_writer.close() + + async def server_loop(): + try: + while True: + client_reader, client_writer = await aiosam.stream_accept( + self.session_name, sam_address=self.sam_address, + loop=self.loop) + incoming = await client_reader.read(BUFFER_SIZE) + asyncio.ensure_future(handle_client( + incoming, client_reader, client_writer), loop=self.loop) + except asyncio.CancelledError: + pass + + self.server_loop = asyncio.ensure_future(server_loop(), loop=self.loop) + + def stop(self): + super().stop() + self.server_loop.cancel() + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument('type', metavar="TYPE", choices=('server', 'client'), + help="Tunnel type (server or client)") + parser.add_argument('address', metavar="ADDRESS", + help="Local address (e.g. 127.0.0.1:8000)") + parser.add_argument('--debug', '-d', action='store_true', + help='Debugging') + parser.add_argument('--key', '-k', default='', metavar='PRIVATE_KEY', + help='Path to private key file') + parser.add_argument('--destination', '-D', default='', + metavar='DESTINATION', help='Remote destination') + args = parser.parse_args() + + SAM_ADDRESS = utils.get_sam_address() + + logging.basicConfig(level=logging.DEBUG if args.debug else logging.INFO) + loop = asyncio.get_event_loop() + loop.set_debug(args.debug) + + if args.key: + destination = sam.Destination(path=args.key, has_private_key=True) + else: + destination = None + + local_address = utils.address_from_string(args.address) + + if args.type == "client": + tunnel = ClientTunnel(args.destination, local_address, loop=loop, + destination=destination, sam_address=SAM_ADDRESS) + elif args.type == "server": + tunnel = ServerTunnel(local_address, loop=loop, destination=destination, + sam_address=SAM_ADDRESS) + + asyncio.ensure_future(tunnel.run(), loop=loop) + + try: + loop.run_forever() + except KeyboardInterrupt: + tunnel.stop() + finally: + loop.stop() + loop.close() diff --git a/RNS/vendor/i2plib/utils.py b/RNS/vendor/i2plib/utils.py new file mode 100644 index 0000000..3c59846 --- /dev/null +++ b/RNS/vendor/i2plib/utils.py @@ -0,0 +1,42 @@ +import socket +import os +import random +import string + +from . import sam + +def get_free_port(): + """Get a free port on your local host""" + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + s.bind(('', 0)) + free_port = s.getsockname()[1] + s.close() + return free_port + +def is_address_accessible(address): + """Check if address is accessible or down""" + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + is_accessible = s.connect_ex(address) == 0 + s.close() + return is_accessible + +def address_from_string(address_string): + """Address tuple from host:port string""" + address = address_string.split(":") + return (address[0], int(address[1])) + +def get_sam_address(): + """ + Get SAM address from environment variable I2P_SAM_ADDRESS, or use a default + value + """ + value = os.getenv("I2P_SAM_ADDRESS") + return address_from_string(value) if value else sam.DEFAULT_ADDRESS + +def generate_session_id(length=6): + """Generate random session id""" + rand = random.SystemRandom() + sid = [rand.choice(string.ascii_letters) for _ in range(length)] + return "reticulum-" + "".join(sid) + diff --git a/setup.py b/setup.py index a078057..62ad8af 100644 --- a/setup.py +++ b/setup.py @@ -29,6 +29,6 @@ setuptools.setup( ] }, - install_requires=['cryptography>=3.4.7', 'pyserial', 'netifaces'], + install_requires=['cryptography>=3.4.7', 'pyserial>=3.5', 'netifaces'], python_requires='>=3.6', )