From 7867d7ded9530cac681adb4e76847240a1b799cd Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Sun, 1 Mar 2020 16:56:49 +0100 Subject: [PATCH] Implemented basic multi-hop transport --- RNS/Interfaces/UdpInterface.py | 17 +++-- RNS/Packet.py | 15 +++- RNS/Resource.py | 1 - RNS/Transport.py | 130 ++++++++++++++++++++++++++------- 4 files changed, 124 insertions(+), 39 deletions(-) diff --git a/RNS/Interfaces/UdpInterface.py b/RNS/Interfaces/UdpInterface.py index 44b4583..097991c 100755 --- a/RNS/Interfaces/UdpInterface.py +++ b/RNS/Interfaces/UdpInterface.py @@ -20,10 +20,14 @@ class UdpInterface(Interface): self.bind_ip = bindip self.bind_port = bindport - UdpInterfaceHandler.interface = self + def handlerFactory(callback): + def createHandler(*args, **keys): + return UdpInterfaceHandler(callback, *args, **keys) + return createHandler + self.owner = owner address = (self.bind_ip, self.bind_port) - self.server = SocketServer.UDPServer(address, UdpInterfaceHandler) + self.server = SocketServer.UDPServer(address, handlerFactory(self.processIncoming)) thread = threading.Thread(target=self.server.serve_forever) thread.setDaemon(True) @@ -49,9 +53,10 @@ class UdpInterface(Interface): return "UdpInterface["+self.name+"/"+self.bind_ip+":"+str(self.bind_port)+"]" class UdpInterfaceHandler(SocketServer.BaseRequestHandler): - interface = None + def __init__(self, callback, *args, **keys): + self.callback = callback + SocketServer.BaseRequestHandler.__init__(self, *args, **keys) def handle(self): - if (UdpInterfaceHandler.interface != None): - data = self.request[0] - UdpInterfaceHandler.interface.processIncoming(data) \ No newline at end of file + data = self.request[0] + self.callback(data) \ No newline at end of file diff --git a/RNS/Packet.py b/RNS/Packet.py index 0cd561a..9e63f76 100755 --- a/RNS/Packet.py +++ b/RNS/Packet.py @@ -12,7 +12,7 @@ class Packet: # Header types HEADER_1 = 0x00 # Normal header format - HEADER_2 = 0x01 # Header format used for link packets in transport + HEADER_2 = 0x01 # Header format used for packets in transport HEADER_3 = 0x02 # Reserved HEADER_4 = 0x03 # Reserved header_types = [HEADER_1, HEADER_2, HEADER_3, HEADER_4] @@ -85,6 +85,7 @@ class Packet: return packed_flags def pack(self): + self.destination_hash = self.destination.hash self.header = "" self.header += struct.pack("!B", self.flags) self.header += struct.pack("!B", self.hops) @@ -141,6 +142,7 @@ class Packet: raise IOError("Packet size of "+str(len(self.raw))+" exceeds MTU of "+str(self.MTU)+" bytes") self.packed = True + self.updateHash() def unpack(self): self.flags = ord(self.raw[0]) @@ -163,6 +165,7 @@ class Packet: self.data = self.raw[13:] self.packed = False + self.updateHash() def send(self): if not self.sent: @@ -222,9 +225,13 @@ class Packet: return RNS.Identity.fullHash(self.getHashablePart()) def getHashablePart(self): - # TODO: This assumes transport headers are stripped - # by Transport before going anywhere else - return self.raw[0:1]+self.raw[2:] + hashable_part = struct.pack("!B", struct.unpack("!B", self.raw[0])[0] & 0b00001111) + if self.header_type == Packet.HEADER_2: + hashable_part += self.raw[12:] + else: + hashable_part += self.raw[2:] + + return hashable_part class ProofDestination: def __init__(self, packet): diff --git a/RNS/Resource.py b/RNS/Resource.py index c3bbec9..be02343 100644 --- a/RNS/Resource.py +++ b/RNS/Resource.py @@ -265,7 +265,6 @@ class Resource: expected_data = self.hash + self.expected_proof expected_proof_packet = RNS.Packet(self.link, expected_data, packet_type=RNS.Packet.PROOF, context=RNS.Packet.RESOURCE_PRF) expected_proof_packet.pack() - expected_proof_packet.updateHash() RNS.Transport.cache_request(expected_proof_packet.packet_hash) self.last_part_sent = time.time() sleep_time = 0.001 diff --git a/RNS/Transport.py b/RNS/Transport.py index 122d42b..b18def5 100755 --- a/RNS/Transport.py +++ b/RNS/Transport.py @@ -2,6 +2,7 @@ import os import RNS import time import math +import struct import threading import traceback from time import sleep @@ -41,6 +42,7 @@ class Transport: announce_table = {} # A table for storing announces currently waiting to be retransmitted destination_table = {} # A lookup table containing the next hop to a given destination + reverse_table = {} # A lookup table for storing packet hashes used to return proofs and replies jobs_locked = False jobs_running = False @@ -110,10 +112,6 @@ class Transport: if time.time() > Transport.announces_last_checked+Transport.announces_check_interval: for destination_hash in Transport.announce_table: announce_entry = Transport.announce_table[destination_hash] - # TODO: remove comment and log output - # [time_heard, retransmit_timeout, retries, received_from, packet.hops, packet] - # RNS.log("Announce entry retries: "+str(announce_entry[2]), RNS.LOG_INFO) - # RNS.log("Max retries: "+str(Transport.PATHFINDER_R), RNS.LOG_INFO) if announce_entry[2] > Transport.PATHFINDER_R: RNS.log("Dropping announce for "+RNS.prettyhexrep(destination_hash)+", retries exceeded", RNS.LOG_DEBUG) Transport.announce_table.pop(destination_hash) @@ -140,6 +138,12 @@ class Transport: while (len(Transport.packet_hashlist) > Transport.hashlist_maxsize): Transport.packet_hashlist.pop(0) + # Cull the reverse table according to max size and/or age of entries + # TODO: Implement this + + # Cull the destination table in some way + # TODO: Implement this + except Exception as e: RNS.log("An exception occurred while running Transport jobs.", RNS.LOG_ERROR) RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) @@ -156,24 +160,53 @@ class Transport: sleep(0.01) Transport.jobs_locked = True + # TODO: This updateHash call might be redundant packet.updateHash() sent = False - - for interface in Transport.interfaces: - if interface.OUT: - should_transmit = True - if packet.destination.type == RNS.Destination.LINK: - if packet.destination.status == RNS.Link.CLOSED: - should_transmit = False - if interface != packet.destination.attached_interface: - should_transmit = False - if should_transmit: - # TODO: Remove - RNS.log("Transmitting "+str(len(packet.raw))+" bytes via: "+str(interface), RNS.LOG_EXTREME) - RNS.log("Hash is "+RNS.prettyhexrep(packet.packet_hash), RNS.LOG_EXTREME) - interface.processOutgoing(packet.raw) - sent = True + # Check if we have a known path for the destination + # in the destination table + if packet.packet_type != RNS.Packet.ANNOUNCE and packet.destination_hash in Transport.destination_table: + outbound_interface = Transport.destination_table[packet.destination_hash][5] + + if Transport.destination_table[packet.destination_hash][2] > 1: + # Insert packet into transport + new_flags = (RNS.Packet.HEADER_2) << 6 | (Transport.TRANSPORT) << 4 | (packet.flags & 0b00001111) + new_raw = struct.pack("!B", new_flags) + new_raw += packet.raw[1:2] + new_raw += Transport.destination_table[packet.destination_hash][1] + new_raw += packet.raw[2:] + # RNS.log("Transporting "+str(len(packet.raw))+" bytes via "+RNS.prettyhexrep(Transport.destination_table[packet.destination_hash][1])+" on: "+str(outbound_interface), RNS.LOG_EXTREME) + # RNS.log("Hash is "+RNS.prettyhexrep(packet.packet_hash), RNS.LOG_EXTREME) + RNS.log("Packet was inserted into transport via "+RNS.prettyhexrep(Transport.destination_table[packet.destination_hash][1])+" on: "+str(outbound_interface), RNS.LOG_DEBUG) + outbound_interface.processOutgoing(new_raw) + sent = True + else: + # Destination is directly reachable, and we know on + # what interface, so transmit only on that one + + # TODO: Strip transport headers here + RNS.log("Transmitting "+str(len(packet.raw))+" bytes on: "+str(outbound_interface), RNS.LOG_EXTREME) + RNS.log("Hash is "+RNS.prettyhexrep(packet.packet_hash), RNS.LOG_EXTREME) + outbound_interface.processOutgoing(packet.raw) + sent = True + + else: + # Broadcast packet on all outgoing interfaces + for interface in Transport.interfaces: + if interface.OUT: + should_transmit = True + if packet.destination.type == RNS.Destination.LINK: + if packet.destination.status == RNS.Link.CLOSED: + should_transmit = False + if interface != packet.destination.attached_interface: + should_transmit = False + + if should_transmit: + RNS.log("Transmitting "+str(len(packet.raw))+" bytes on: "+str(interface), RNS.LOG_EXTREME) + RNS.log("Hash is "+RNS.prettyhexrep(packet.packet_hash), RNS.LOG_EXTREME) + interface.processOutgoing(packet.raw) + sent = True if sent: packet.sent = True @@ -199,11 +232,15 @@ class Transport: return True if not packet.packet_hash in Transport.packet_hashlist: return True + else: + if packet.packet_type == RNS.Packet.ANNOUNCE: + return True return False @staticmethod def inbound(raw, interface=None): + # TODO: Rewrite the redundant cache calls in this method while (Transport.jobs_running): sleep(0.1) @@ -211,18 +248,44 @@ class Transport: packet = RNS.Packet(None, raw) packet.unpack() - packet.updateHash() packet.receiving_interface = interface + packet.hops += 1 RNS.log(str(interface)+" received packet with hash "+RNS.prettyhexrep(packet.packet_hash), RNS.LOG_EXTREME) - # TODO: Rewrite these redundant cache calls if Transport.packet_filter(packet): Transport.packet_hashlist.append(packet.packet_hash) + if packet.transport_id != None and packet.packet_type != RNS.Packet.ANNOUNCE: + if packet.transport_id == Transport.identity.hash: + RNS.log("Received packet in transport for "+RNS.prettyhexrep(packet.destination_hash)+" with matching transport ID, transporting it...", RNS.LOG_DEBUG) + if packet.destination_hash in Transport.destination_table: + next_hop = Transport.destination_table[packet.destination_hash][1] + RNS.log("Packet hops: "+str(packet.hops), RNS.LOG_DEBUG) + RNS.log("Next hop to destination is "+RNS.prettyhexrep(next_hop)+", transporting it.", RNS.LOG_DEBUG) + new_raw = packet.raw[0:1] + new_raw += struct.pack("!B", packet.hops) + new_raw += next_hop + new_raw += packet.raw[12:] + outbound_interface = Transport.destination_table[packet.destination_hash][5] + outbound_interface.processOutgoing(new_raw) + + Transport.reverse_table[packet.packet_hash[:10]] = [packet.receiving_interface, outbound_interface, time.time()] + else: + # TODO: There should probably be some kind of REJECT + # mechanism here, to signal to the source that their + # expected path failed + RNS.log("Got packet in transport, but no known path to final destination. Dropping packet.", RNS.LOG_DEBUG) + else: + # TODO: Remove this log statement + RNS.log("Received packet in transport, but transport ID doesn't match, not transporting it further.", RNS.LOG_DEBUG) + + # Announce handling. Handles logic related to incoming + # announces, queueing rebroadcasts of these, and removal + # of queued announce rebroadcasts once handed to the next node if packet.packet_type == RNS.Packet.ANNOUNCE: if RNS.Identity.validateAnnounce(packet): - if (packet.transport_id != None): + if packet.transport_id != None: received_from = packet.transport_id # Check if this is a next retransmission from @@ -231,14 +294,14 @@ class Transport: if packet.destination_hash in Transport.announce_table: announce_entry = Transport.announce_table[packet.destination_hash] - if packet.hops == announce_entry[4]: + if packet.hops-1 == announce_entry[4]: RNS.log("Heard a local rebroadcast of announce for "+RNS.prettyhexrep(packet.destination_hash), RNS.LOG_DEBUG) announce_entry[6] += 1 if announce_entry[6] >= Transport.LOCAL_REBROADCASTS_MAX: RNS.log("Max local rebroadcasts of announce for "+RNS.prettyhexrep(packet.destination_hash)+" reached, dropping announce from our table", RNS.LOG_DEBUG) Transport.announce_table.pop(packet.destination_hash) - if packet.hops == announce_entry[4]+1 and announce_entry[2] > 0: + if packet.hops-1 == announce_entry[4]+1 and announce_entry[2] > 0: now = time.time() if now < announce_entry[1]: RNS.log("Rebroadcasted announce for "+RNS.prettyhexrep(packet.destination_hash)+" has been passed on to next node, no further tries needed", RNS.LOG_DEBUG) @@ -250,7 +313,7 @@ class Transport: # Check if this announce should be inserted into # announce and destination tables should_add = False - packet.hops += 1 + # First, check that the announce is not for a destination # local to this system, and that hops are less than the max if (not any(packet.destination_hash == d.hash for d in Transport.destinations) and packet.hops < Transport.PATHFINDER_M+1): @@ -302,7 +365,8 @@ class Transport: random_blobs.append(random_blob) retransmit_timeout = now + math.pow(Transport.PATHFINDER_C, packet.hops) + (RNS.rand() * Transport.PATHFINDER_RW) Transport.announce_table[packet.destination_hash] = [now, retransmit_timeout, retries, received_from, packet.hops, packet, local_rebroadcasts] - Transport.destination_table[packet.destination_hash] = [now, received_from, packet.hops, expires, random_blobs] + Transport.destination_table[packet.destination_hash] = [now, received_from, packet.hops, expires, random_blobs, packet.receiving_interface] + RNS.log("Path to "+RNS.prettyhexrep(packet.destination_hash)+" is now via "+RNS.prettyhexrep(received_from)+" on "+str(packet.receiving_interface), RNS.LOG_DEBUG) elif packet.packet_type == RNS.Packet.LINKREQUEST: for destination in Transport.destinations: @@ -358,6 +422,18 @@ class Transport: else: proof_hash = None + # Check if this proof neds to be transported + if packet.destination_hash in Transport.reverse_table: + reverse_entry = Transport.reverse_table[packet.destination_hash] + if packet.receiving_interface == reverse_entry[1]: + RNS.log("Proof received on correct interface, transporting it via "+str(reverse_entry[0]), RNS.LOG_DEBUG) + new_raw = packet.raw[0:1] + new_raw += struct.pack("!B", packet.hops) + new_raw += packet.raw[2:] + reverse_entry[0].processOutgoing(new_raw) + else: + RNS.log("Proof received on wrong interface, not transporting it.", RNS.LOG_DEBUG) + for receipt in Transport.receipts: receipt_validated = False if proof_hash != None: @@ -425,8 +501,6 @@ class Transport: def cache_request_packet(packet): if len(packet.data) == RNS.Identity.HASHLENGTH/8: packet_hash = RNS.hexrep(packet.data, delimit=False) - # TODO: There's some pretty obvious file access - # issues here. Make sure this can't happen path = RNS.Reticulum.cachepath+"/"+packet_hash if os.path.isfile(path): file = open(path, "r")