diff --git a/RNS/Interfaces/Interface.py b/RNS/Interfaces/Interface.py index 0f1dc57..d0afb51 100755 --- a/RNS/Interfaces/Interface.py +++ b/RNS/Interfaces/Interface.py @@ -1,6 +1,6 @@ # MIT License # -# Copyright (c) 2016-2022 Mark Qvist / unsigned.io +# Copyright (c) 2016-2023 Mark Qvist / unsigned.io # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal @@ -40,26 +40,116 @@ class Interface: MODE_BOUNDARY = 0x05 MODE_GATEWAY = 0x06 - # Which interface modes a Transport Node - # should actively discover paths for. + # Which interface modes a Transport Node should + # actively discover paths for. DISCOVER_PATHS_FOR = [MODE_ACCESS_POINT, MODE_GATEWAY] - # How many samples to use for incoming - # announce frequency calculation + # How many samples to use for announce + # frequency calculations IA_FREQ_SAMPLES = 6 OA_FREQ_SAMPLES = 6 + # Maximum amount of ingress limited announces + # to hold at any given time. + MAX_HELD_ANNOUNCES = 256 + + # How long a spawned interface will be + # considered to be newly created. Two + # hours by default. + IC_NEW_TIME = 2*60*60 + IC_BURST_FREQ_NEW = 3.5 + IC_BURST_FREQ = 12 + IC_BURST_HOLD = 1*60 + IC_BURST_PENALTY = 5*60 + IC_HELD_RELEASE_INTERVAL = 30 + def __init__(self): self.rxb = 0 self.txb = 0 + self.created = time.time() self.online = False + self.ingress_control = True + self.ic_max_held_announces = Interface.MAX_HELD_ANNOUNCES + self.ic_burst_hold = Interface.IC_BURST_HOLD + self.ic_burst_active = False + self.ic_burst_activated = 0 + self.ic_held_release = 0 + self.ic_burst_freq_new = Interface.IC_BURST_FREQ_NEW + self.ic_burst_freq = Interface.IC_BURST_FREQ + self.ic_new_time = Interface.IC_NEW_TIME + self.ic_burst_penalty = Interface.IC_BURST_PENALTY + self.ic_held_release_interval = Interface.IC_HELD_RELEASE_INTERVAL + self.held_announces = {} + self.ia_freq_deque = deque(maxlen=Interface.IA_FREQ_SAMPLES) self.oa_freq_deque = deque(maxlen=Interface.OA_FREQ_SAMPLES) def get_hash(self): return RNS.Identity.full_hash(str(self).encode("utf-8")) + # This is a generic function for determining when an interface + # should activate ingress limiting. Since this can vary for + # different interface types, this function should be overwritten + # in case a particular interface requires a different approach. + def should_ingress_limit(self): + if self.ingress_control: + freq_threshold = self.ic_burst_freq_new if self.age() < self.ic_new_time else self.ic_burst_freq + ia_freq = self.incoming_announce_frequency() + + if self.ic_burst_active: + if ia_freq < freq_threshold and time.time() > self.ic_burst_activated+self.ic_burst_hold: + self.ic_burst_active = False + self.ic_held_release = time.time() + self.ic_burst_penalty + return True + + else: + if ia_freq > freq_threshold: + self.ic_burst_active = True + self.ic_burst_activated = time.time() + return True + + else: + return False + + else: + return False + + def age(self): + return time.time()-self.created + + def hold_announce(self, announce_packet): + if announce_packet.destination_hash in self.held_announces: + self.held_announces[announce_packet.destination_hash] = announce_packet + elif not len(self.held_announces) >= self.ic_max_held_announces: + self.held_announces[announce_packet.destination_hash] = announce_packet + + def process_held_announces(self): + try: + if not self.should_ingress_limit() and len(self.held_announces) > 0 and time.time() > self.ic_held_release: + freq_threshold = self.ic_burst_freq_new if self.age() < self.ic_new_time else self.ic_burst_freq + ia_freq = self.incoming_announce_frequency() + if ia_freq < freq_threshold: + selected_announce_packet = None + min_hops = RNS.Transport.PATHFINDER_M + for destination_hash in self.held_announces: + announce_packet = self.held_announces[destination_hash] + if announce_packet.hops < min_hops: + min_hops = announce_packet.hops + selected_announce_packet = announce_packet + + if selected_announce_packet != None: + RNS.log("Releasing held announce packet "+str(selected_announce_packet)+" from "+str(self), RNS.LOG_EXTREME) + self.ic_held_release = time.time() + self.ic_held_release_interval + self.held_announces.pop(selected_announce_packet.destination_hash) + def release(): + RNS.Transport.inbound(selected_announce_packet.raw, selected_announce_packet.receiving_interface) + threading.Thread(target=release, daemon=True).start() + + except Exception as e: + RNS.log("An error occurred while processing held announces for "+str(self), RNS.LOG_ERROR) + RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR) + def received_announce(self): self.ia_freq_deque.append(time.time()) if hasattr(self, "parent_interface") and self.parent_interface != None: diff --git a/RNS/Interfaces/LocalInterface.py b/RNS/Interfaces/LocalInterface.py index 624cbe4..bba4413 100644 --- a/RNS/Interfaces/LocalInterface.py +++ b/RNS/Interfaces/LocalInterface.py @@ -1,6 +1,6 @@ # MIT License # -# Copyright (c) 2016-2022 Mark Qvist / unsigned.io +# Copyright (c) 2016-2023 Mark Qvist / unsigned.io # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal @@ -102,6 +102,9 @@ class LocalClientInterface(Interface): thread.daemon = True thread.start() + def should_ingress_limit(self): + return False + def connect(self): self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.connect((self.target_ip, self.target_port)) @@ -327,7 +330,7 @@ class LocalServerInterface(Interface): spawned_interface.target_port = str(handler.client_address[1]) spawned_interface.parent_interface = self spawned_interface.bitrate = self.bitrate - RNS.log("Accepting new connection to shared instance: "+str(spawned_interface), RNS.LOG_EXTREME) + # RNS.log("Accepting new connection to shared instance: "+str(spawned_interface), RNS.LOG_EXTREME) RNS.Transport.interfaces.append(spawned_interface) RNS.Transport.local_client_interfaces.append(spawned_interface) self.clients += 1 diff --git a/RNS/Interfaces/RNodeInterface.py b/RNS/Interfaces/RNodeInterface.py index 475b090..08a4031 100644 --- a/RNS/Interfaces/RNodeInterface.py +++ b/RNS/Interfaces/RNodeInterface.py @@ -1,6 +1,6 @@ # MIT License # -# Copyright (c) 2016-2022 Mark Qvist / unsigned.io +# Copyright (c) 2016-2023 Mark Qvist / unsigned.io # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal @@ -837,5 +837,8 @@ class RNodeInterface(Interface): self.setRadioState(KISS.RADIO_STATE_OFF) self.leave() + def should_ingress_limit(self): + return False + def __str__(self): return "RNodeInterface["+str(self.name)+"]" diff --git a/RNS/Reticulum.py b/RNS/Reticulum.py index 28bc80e..735c563 100755 --- a/RNS/Reticulum.py +++ b/RNS/Reticulum.py @@ -1150,6 +1150,7 @@ class Reticulum: ifstats["txb"] = interface.txb ifstats["incoming_announce_frequency"] = interface.incoming_announce_frequency() ifstats["outgoing_announce_frequency"] = interface.outgoing_announce_frequency() + ifstats["held_announces"] = len(interface.held_announces) ifstats["status"] = interface.online ifstats["mode"] = interface.mode diff --git a/RNS/Transport.py b/RNS/Transport.py index 1166641..8379607 100755 --- a/RNS/Transport.py +++ b/RNS/Transport.py @@ -128,6 +128,8 @@ class Transport: hashlist_maxsize = 1000000 tables_last_culled = 0.0 tables_cull_interval = 5.0 + interface_last_jobs = 0.0 + interface_jobs_interval = 5.0 identity = None @@ -608,6 +610,11 @@ class Transport: Transport.tables_last_culled = time.time() + if time.time() > Transport.interface_last_jobs + Transport.interface_jobs_interval: + for interface in Transport.interfaces: + interface.process_held_announces() + Transport.interface_last_jobs = time.time() + else: # Transport jobs were locked, do nothing pass @@ -1042,6 +1049,7 @@ class Transport: packet = RNS.Packet(None, raw) if not packet.unpack(): + Transport.jobs_locked = False return packet.receiving_interface = interface @@ -1123,6 +1131,7 @@ class Transport: # normal processing. if packet.context == RNS.Packet.CACHE_REQUEST: if Transport.cache_request_packet(packet): + Transport.jobs_locked = False return # If the packet is in transport, check whether we @@ -1232,6 +1241,16 @@ class Transport: if interface != None and RNS.Identity.validate_announce(packet, only_validate_signature=True): interface.received_announce() + if not packet.destination_hash in Transport.destination_table: + # This is an unknown destination, and we'll apply + # potential ingress limiting. Already known + # destinations will have re-announces controlled + # by normal announce rate limiting. + if interface.should_ingress_limit(): + interface.hold_announce(packet) + Transport.jobs_locked = False + return + local_destination = next((d for d in Transport.destinations if d.hash == packet.destination_hash), None) if local_destination == None and RNS.Identity.validate_announce(packet): if packet.transport_id != None: