Work on I2P Interface

This commit is contained in:
Mark Qvist 2022-02-23 21:19:43 +01:00
parent fa82989a2e
commit b62e9af5d4

View File

@ -9,6 +9,11 @@ import os
import RNS import RNS
import asyncio import asyncio
# TODO: Remove
import logging
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)
class HDLC(): class HDLC():
FLAG = 0x7E FLAG = 0x7E
ESC = 0x7D ESC = 0x7D
@ -34,11 +39,102 @@ class KISS():
data = data.replace(bytes([0xc0]), bytes([0xdb, 0xdc])) data = data.replace(bytes([0xc0]), bytes([0xdb, 0xdc]))
return data return data
class I2PController:
def __init__(self, rns_storagepath):
import RNS.vendor.i2plib as i2plib
import RNS.vendor.i2plib.utils
self.loop = None
self.i2plib = i2plib
self.utils = i2plib.utils
self.sam_address = i2plib.get_sam_address()
self.storagepath = rns_storagepath+"/i2p"
if not os.path.isdir(self.storagepath):
os.makedirs(self.storagepath)
def start(self):
asyncio.set_event_loop(asyncio.new_event_loop())
self.loop = asyncio.get_event_loop()
try:
self.loop.run_forever()
except Exception as e:
RNS.log("Exception on event loop for "+str(self)+": "+str(e), RNS.LOG_ERROR)
finally:
self.loop.close()
RNS.log("EVENT LOOP DOWN")
def stop(self):
for task in asyncio.Task.all_tasks(loop=self.loop):
task.cancel()
self.loop.stop()
def get_free_port(self):
return self.i2plib.utils.get_free_port()
def client_tunnel(self, owner, i2p_destination):
try:
async def tunnel_up():
RNS.log("Bringing up I2P tunnel to "+str(owner)+" in background, this may take a while...", RNS.LOG_INFO)
tunnel = self.i2plib.ClientTunnel(i2p_destination, owner.local_addr, sam_address=self.sam_address)
await tunnel.run()
tunnel.aclose()
RNS.log(str(owner)+ " tunnel setup complete", RNS.LOG_VERBOSE)
asyncio.run_coroutine_threadsafe(tunnel_up(), self.loop)
except Exception as e:
raise IOError("Could not connect to I2P SAM API while configuring to "+str(owner)+". Check that I2P is running and SAM is enabled.")
def server_tunnel(self, owner):
i2p_dest_hash = RNS.Identity.full_hash(RNS.Identity.full_hash(owner.name.encode("utf-8")))
i2p_keyfile = self.storagepath+"/"+RNS.hexrep(i2p_dest_hash, delimit=False)+".i2p"
i2p_dest = None
if not os.path.isfile(i2p_keyfile):
coro = self.i2plib.new_destination(sam_address=self.sam_address, loop=self.loop)
i2p_dest = asyncio.run_coroutine_threadsafe(coro, self.loop).result()
key_file = open(i2p_keyfile, "w")
key_file.write(i2p_dest.private_key.base64)
key_file.close()
# TODO: Remove
RNS.log("Created")
else:
key_file = open(i2p_keyfile, "r")
prvd = key_file.read()
key_file.close()
i2p_dest = self.i2plib.Destination(data=prvd, has_private_key=True)
# TODO: Remove
RNS.log("Loaded")
i2p_b32 = i2p_dest.base32
try:
async def tunnel_up():
RNS.log(str(owner)+" Bringing up I2P tunnel in background, this may take a while...", RNS.LOG_INFO)
tunnel = self.i2plib.ServerTunnel((owner.bind_ip, owner.bind_port), loop=self.loop, destination=i2p_dest, sam_address=self.sam_address)
await tunnel.run()
tunnel.aclose()
RNS.log(str(owner)+ " tunnel setup complete, instance reachable at: "+str(i2p_dest.base32)+".b32.i2p", RNS.LOG_VERBOSE)
asyncio.run_coroutine_threadsafe(tunnel_up(), self.loop)
except Exception as e:
raise IOError("Could not connect to I2P SAM API while configuring "+str(self)+". Check that I2P is running and SAM is enabled.")
def get_loop(self):
return asyncio.get_event_loop()
class ThreadingI2PServer(socketserver.ThreadingMixIn, socketserver.TCPServer): class ThreadingI2PServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
pass pass
class I2PInterfacePeer(Interface): class I2PInterfacePeer(Interface):
RECONNECT_WAIT = 5 RECONNECT_WAIT = 10
RECONNECT_MAX_TRIES = None RECONNECT_MAX_TRIES = None
# TCP socket options # TCP socket options
@ -71,9 +167,7 @@ class I2PInterfacePeer(Interface):
self.kiss_framing = False self.kiss_framing = False
self.i2p_tunneled = True self.i2p_tunneled = True
self.i2p_dest = None self.i2p_dest = None
self.i2p_tunnel_ready = False
i2plib = I2PInterfacePeer.i2plib
i2plib.utils = I2PInterfacePeer.utils
if max_reconnect_tries == None: if max_reconnect_tries == None:
self.max_reconnect_tries = I2PInterfacePeer.RECONNECT_MAX_TRIES self.max_reconnect_tries = I2PInterfacePeer.RECONNECT_MAX_TRIES
@ -95,27 +189,17 @@ class I2PInterfacePeer(Interface):
self.receives = True self.receives = True
self.initiator = True self.initiator = True
self.sam_address = i2plib.get_sam_address()
self.aio_loop = I2PInterfacePeer.aio_loop
self.bind_ip = "127.0.0.1" self.bind_ip = "127.0.0.1"
self.bind_port = i2plib.utils.get_free_port() self.bind_port = self.owner.i2p.get_free_port()
self.local_addr = (self.bind_ip, self.bind_port)
self.i2p_dest = i2plib.Destination(data=target_i2p_dest)
RNS.log("Bringing up I2P tunnel to "+str(self)+", this may take a while...", RNS.LOG_INFO)
try:
tunnel = i2plib.ClientTunnel(self.i2p_dest, (self.bind_ip, self.bind_port), sam_address=self.sam_address)
self.aio_loop.run_until_complete(tunnel.run())
except Exception as e:
raise e
raise IOError("Could not connect to I2P SAM API while configuring "+str(self)+". Check that I2P is running and SAM is enabled.")
RNS.log(str(self)+ " tunnel setup complete", RNS.LOG_VERBOSE)
self.target_ip = self.bind_ip self.target_ip = self.bind_ip
self.target_port = self.bind_port self.target_port = self.bind_port
self.owner.i2p.client_tunnel(self, target_i2p_dest)
# TODO: Remove
RNS.log("TCP params: "+str((self.bind_ip, self.bind_port)))
if not self.connect(initial=True): if not self.connect(initial=True):
# TODO: Remove # TODO: Remove
RNS.log("Initial TCP attempt failed, trying reconnects...") RNS.log("Initial TCP attempt failed, trying reconnects...")
@ -382,85 +466,48 @@ class I2PInterfacePeer(Interface):
class I2PInterface(Interface): class I2PInterface(Interface):
def __init__(self, owner, name, rns_storagepath, peers): def __init__(self, owner, name, rns_storagepath, peers):
import RNS.vendor.i2plib as i2plib
import RNS.vendor.i2plib.utils
self.rxb = 0 self.rxb = 0
self.txb = 0 self.txb = 0
self.online = False self.online = False
self.clients = 0 self.clients = 0
self.storagepath = rns_storagepath+"/i2p" self.owner = owner
self.i2p_tunneled = True
self.i2p = I2PController(rns_storagepath)
self.IN = True self.IN = True
self.OUT = False self.OUT = False
self.name = name self.name = name
self.b32a = None
self.i2p_tunneled = True
self.receives = True self.receives = True
self.bind_ip = "127.0.0.1" self.bind_ip = "127.0.0.1"
self.sam_address = i2plib.get_sam_address() self.bind_port = self.i2p.get_free_port()
self.bind_port = i2plib.utils.get_free_port() self.address = (self.bind_ip, self.bind_port)
self.aio_loop = asyncio.get_event_loop()
I2PInterfacePeer.i2plib = i2plib i2p_thread = threading.Thread(target=self.i2p.start)
I2PInterfacePeer.utils = RNS.vendor.i2plib.utils i2p_thread.setDaemon(True)
I2PInterfacePeer.aio_loop = self.aio_loop i2p_thread.start()
I2PInterfacePeer.sam_address = self.sam_address
if not os.path.isdir(self.storagepath):
os.makedirs(self.storagepath)
self.i2p_dest = None
self.i2p_b32 = None
self.i2p_dest_hash = RNS.Identity.full_hash(RNS.Identity.full_hash(self.name.encode("utf-8")))
self.i2p_keyfile = self.storagepath+"/"+RNS.hexrep(self.i2p_dest_hash, delimit=False)+".i2p"
if not os.path.isfile(self.i2p_keyfile):
self.i2p_dest = self.aio_loop.run_until_complete(i2plib.new_destination(sam_address=self.sam_address, loop=self.aio_loop))
key_file = open(self.i2p_keyfile, "w")
key_file.write(self.i2p_dest.private_key.base64)
key_file.close()
else:
key_file = open(self.i2p_keyfile, "r")
prvd = key_file.read()
key_file.close()
self.i2p_dest = i2plib.Destination(data=prvd, has_private_key=True)
self.i2p_b32 = self.i2p_dest.base32
def handlerFactory(callback): def handlerFactory(callback):
def createHandler(*args, **keys): def createHandler(*args, **keys):
return I2PInterfaceHandler(callback, *args, **keys) return I2PInterfaceHandler(callback, *args, **keys)
return createHandler return createHandler
self.owner = owner
address = (self.bind_ip, self.bind_port)
ThreadingI2PServer.allow_reuse_address = True ThreadingI2PServer.allow_reuse_address = True
self.server = ThreadingI2PServer(address, handlerFactory(self.incoming_connection)) self.server = ThreadingI2PServer(self.address, handlerFactory(self.incoming_connection))
thread = threading.Thread(target=self.server.serve_forever) thread = threading.Thread(target=self.server.serve_forever)
thread.setDaemon(True) thread.setDaemon(True)
thread.start() thread.start()
# TODO: Remove # TODO: Remove
RNS.log("Started TCP server for I2P on "+str(address)+" "+str(self.server)) RNS.log("Started TCP server for I2P on "+str(self.address)+" "+str(self.server))
RNS.log("Bringing up I2P tunnel for "+str(self)+", this may take a while...", RNS.LOG_INFO) self.i2p.server_tunnel(self)
try:
tunnel = i2plib.ServerTunnel((self.bind_ip, self.bind_port), loop=self.aio_loop, destination=self.i2p_dest, sam_address=self.sam_address)
self.aio_loop.run_until_complete(tunnel.run())
except Exception as e:
raise IOError("Could not connect to I2P SAM API while configuring "+str(self)+". Check that I2P is running and SAM is enabled.")
RNS.log(str(self)+ " tunnel setup complete, instance reachable at: "+str(self.i2p_dest.base32), RNS.LOG_VERBOSE)
if peers != None: if peers != None:
for peer_addr in peers: for peer_addr in peers:
RNS.log("Establishing I2P tunnel to "+str(peer_addr), RNS.LOG_VERBOSE)
interface_name = peer_addr interface_name = peer_addr
peer_interface = I2PInterfacePeer(self, interface_name, peer_addr) peer_interface = I2PInterfacePeer(self, interface_name, peer_addr)
@ -483,6 +530,9 @@ class I2PInterface(Interface):
def processOutgoing(self, data): def processOutgoing(self, data):
pass pass
def detach(self):
self.i2p.stop()
def __str__(self): def __str__(self):
return "I2PInterface["+self.name+"]" return "I2PInterface["+self.name+"]"