Implemented announce ingress control

This commit is contained in:
Mark Qvist 2023-10-01 00:16:32 +02:00
parent 803a5736c9
commit 3c4791a622
5 changed files with 124 additions and 8 deletions

View File

@ -1,6 +1,6 @@
# MIT License # MIT License
# #
# Copyright (c) 2016-2022 Mark Qvist / unsigned.io # Copyright (c) 2016-2023 Mark Qvist / unsigned.io
# #
# Permission is hereby granted, free of charge, to any person obtaining a copy # Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal # of this software and associated documentation files (the "Software"), to deal
@ -40,26 +40,116 @@ class Interface:
MODE_BOUNDARY = 0x05 MODE_BOUNDARY = 0x05
MODE_GATEWAY = 0x06 MODE_GATEWAY = 0x06
# Which interface modes a Transport Node # Which interface modes a Transport Node should
# should actively discover paths for. # actively discover paths for.
DISCOVER_PATHS_FOR = [MODE_ACCESS_POINT, MODE_GATEWAY] DISCOVER_PATHS_FOR = [MODE_ACCESS_POINT, MODE_GATEWAY]
# How many samples to use for incoming # How many samples to use for announce
# announce frequency calculation # frequency calculations
IA_FREQ_SAMPLES = 6 IA_FREQ_SAMPLES = 6
OA_FREQ_SAMPLES = 6 OA_FREQ_SAMPLES = 6
# Maximum amount of ingress limited announces
# to hold at any given time.
MAX_HELD_ANNOUNCES = 256
# How long a spawned interface will be
# considered to be newly created. Two
# hours by default.
IC_NEW_TIME = 2*60*60
IC_BURST_FREQ_NEW = 3.5
IC_BURST_FREQ = 12
IC_BURST_HOLD = 1*60
IC_BURST_PENALTY = 5*60
IC_HELD_RELEASE_INTERVAL = 30
def __init__(self): def __init__(self):
self.rxb = 0 self.rxb = 0
self.txb = 0 self.txb = 0
self.created = time.time()
self.online = False self.online = False
self.ingress_control = True self.ingress_control = True
self.ic_max_held_announces = Interface.MAX_HELD_ANNOUNCES
self.ic_burst_hold = Interface.IC_BURST_HOLD
self.ic_burst_active = False
self.ic_burst_activated = 0
self.ic_held_release = 0
self.ic_burst_freq_new = Interface.IC_BURST_FREQ_NEW
self.ic_burst_freq = Interface.IC_BURST_FREQ
self.ic_new_time = Interface.IC_NEW_TIME
self.ic_burst_penalty = Interface.IC_BURST_PENALTY
self.ic_held_release_interval = Interface.IC_HELD_RELEASE_INTERVAL
self.held_announces = {}
self.ia_freq_deque = deque(maxlen=Interface.IA_FREQ_SAMPLES) self.ia_freq_deque = deque(maxlen=Interface.IA_FREQ_SAMPLES)
self.oa_freq_deque = deque(maxlen=Interface.OA_FREQ_SAMPLES) self.oa_freq_deque = deque(maxlen=Interface.OA_FREQ_SAMPLES)
def get_hash(self): def get_hash(self):
return RNS.Identity.full_hash(str(self).encode("utf-8")) return RNS.Identity.full_hash(str(self).encode("utf-8"))
# This is a generic function for determining when an interface
# should activate ingress limiting. Since this can vary for
# different interface types, this function should be overwritten
# in case a particular interface requires a different approach.
def should_ingress_limit(self):
if self.ingress_control:
freq_threshold = self.ic_burst_freq_new if self.age() < self.ic_new_time else self.ic_burst_freq
ia_freq = self.incoming_announce_frequency()
if self.ic_burst_active:
if ia_freq < freq_threshold and time.time() > self.ic_burst_activated+self.ic_burst_hold:
self.ic_burst_active = False
self.ic_held_release = time.time() + self.ic_burst_penalty
return True
else:
if ia_freq > freq_threshold:
self.ic_burst_active = True
self.ic_burst_activated = time.time()
return True
else:
return False
else:
return False
def age(self):
return time.time()-self.created
def hold_announce(self, announce_packet):
if announce_packet.destination_hash in self.held_announces:
self.held_announces[announce_packet.destination_hash] = announce_packet
elif not len(self.held_announces) >= self.ic_max_held_announces:
self.held_announces[announce_packet.destination_hash] = announce_packet
def process_held_announces(self):
try:
if not self.should_ingress_limit() and len(self.held_announces) > 0 and time.time() > self.ic_held_release:
freq_threshold = self.ic_burst_freq_new if self.age() < self.ic_new_time else self.ic_burst_freq
ia_freq = self.incoming_announce_frequency()
if ia_freq < freq_threshold:
selected_announce_packet = None
min_hops = RNS.Transport.PATHFINDER_M
for destination_hash in self.held_announces:
announce_packet = self.held_announces[destination_hash]
if announce_packet.hops < min_hops:
min_hops = announce_packet.hops
selected_announce_packet = announce_packet
if selected_announce_packet != None:
RNS.log("Releasing held announce packet "+str(selected_announce_packet)+" from "+str(self), RNS.LOG_EXTREME)
self.ic_held_release = time.time() + self.ic_held_release_interval
self.held_announces.pop(selected_announce_packet.destination_hash)
def release():
RNS.Transport.inbound(selected_announce_packet.raw, selected_announce_packet.receiving_interface)
threading.Thread(target=release, daemon=True).start()
except Exception as e:
RNS.log("An error occurred while processing held announces for "+str(self), RNS.LOG_ERROR)
RNS.log("The contained exception was: "+str(e), RNS.LOG_ERROR)
def received_announce(self): def received_announce(self):
self.ia_freq_deque.append(time.time()) self.ia_freq_deque.append(time.time())
if hasattr(self, "parent_interface") and self.parent_interface != None: if hasattr(self, "parent_interface") and self.parent_interface != None:

View File

@ -1,6 +1,6 @@
# MIT License # MIT License
# #
# Copyright (c) 2016-2022 Mark Qvist / unsigned.io # Copyright (c) 2016-2023 Mark Qvist / unsigned.io
# #
# Permission is hereby granted, free of charge, to any person obtaining a copy # Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal # of this software and associated documentation files (the "Software"), to deal
@ -102,6 +102,9 @@ class LocalClientInterface(Interface):
thread.daemon = True thread.daemon = True
thread.start() thread.start()
def should_ingress_limit(self):
return False
def connect(self): def connect(self):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((self.target_ip, self.target_port)) self.socket.connect((self.target_ip, self.target_port))
@ -327,7 +330,7 @@ class LocalServerInterface(Interface):
spawned_interface.target_port = str(handler.client_address[1]) spawned_interface.target_port = str(handler.client_address[1])
spawned_interface.parent_interface = self spawned_interface.parent_interface = self
spawned_interface.bitrate = self.bitrate spawned_interface.bitrate = self.bitrate
RNS.log("Accepting new connection to shared instance: "+str(spawned_interface), RNS.LOG_EXTREME) # RNS.log("Accepting new connection to shared instance: "+str(spawned_interface), RNS.LOG_EXTREME)
RNS.Transport.interfaces.append(spawned_interface) RNS.Transport.interfaces.append(spawned_interface)
RNS.Transport.local_client_interfaces.append(spawned_interface) RNS.Transport.local_client_interfaces.append(spawned_interface)
self.clients += 1 self.clients += 1

View File

@ -1,6 +1,6 @@
# MIT License # MIT License
# #
# Copyright (c) 2016-2022 Mark Qvist / unsigned.io # Copyright (c) 2016-2023 Mark Qvist / unsigned.io
# #
# Permission is hereby granted, free of charge, to any person obtaining a copy # Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal # of this software and associated documentation files (the "Software"), to deal
@ -837,5 +837,8 @@ class RNodeInterface(Interface):
self.setRadioState(KISS.RADIO_STATE_OFF) self.setRadioState(KISS.RADIO_STATE_OFF)
self.leave() self.leave()
def should_ingress_limit(self):
return False
def __str__(self): def __str__(self):
return "RNodeInterface["+str(self.name)+"]" return "RNodeInterface["+str(self.name)+"]"

View File

@ -1150,6 +1150,7 @@ class Reticulum:
ifstats["txb"] = interface.txb ifstats["txb"] = interface.txb
ifstats["incoming_announce_frequency"] = interface.incoming_announce_frequency() ifstats["incoming_announce_frequency"] = interface.incoming_announce_frequency()
ifstats["outgoing_announce_frequency"] = interface.outgoing_announce_frequency() ifstats["outgoing_announce_frequency"] = interface.outgoing_announce_frequency()
ifstats["held_announces"] = len(interface.held_announces)
ifstats["status"] = interface.online ifstats["status"] = interface.online
ifstats["mode"] = interface.mode ifstats["mode"] = interface.mode

View File

@ -128,6 +128,8 @@ class Transport:
hashlist_maxsize = 1000000 hashlist_maxsize = 1000000
tables_last_culled = 0.0 tables_last_culled = 0.0
tables_cull_interval = 5.0 tables_cull_interval = 5.0
interface_last_jobs = 0.0
interface_jobs_interval = 5.0
identity = None identity = None
@ -608,6 +610,11 @@ class Transport:
Transport.tables_last_culled = time.time() Transport.tables_last_culled = time.time()
if time.time() > Transport.interface_last_jobs + Transport.interface_jobs_interval:
for interface in Transport.interfaces:
interface.process_held_announces()
Transport.interface_last_jobs = time.time()
else: else:
# Transport jobs were locked, do nothing # Transport jobs were locked, do nothing
pass pass
@ -1042,6 +1049,7 @@ class Transport:
packet = RNS.Packet(None, raw) packet = RNS.Packet(None, raw)
if not packet.unpack(): if not packet.unpack():
Transport.jobs_locked = False
return return
packet.receiving_interface = interface packet.receiving_interface = interface
@ -1123,6 +1131,7 @@ class Transport:
# normal processing. # normal processing.
if packet.context == RNS.Packet.CACHE_REQUEST: if packet.context == RNS.Packet.CACHE_REQUEST:
if Transport.cache_request_packet(packet): if Transport.cache_request_packet(packet):
Transport.jobs_locked = False
return return
# If the packet is in transport, check whether we # If the packet is in transport, check whether we
@ -1232,6 +1241,16 @@ class Transport:
if interface != None and RNS.Identity.validate_announce(packet, only_validate_signature=True): if interface != None and RNS.Identity.validate_announce(packet, only_validate_signature=True):
interface.received_announce() interface.received_announce()
if not packet.destination_hash in Transport.destination_table:
# This is an unknown destination, and we'll apply
# potential ingress limiting. Already known
# destinations will have re-announces controlled
# by normal announce rate limiting.
if interface.should_ingress_limit():
interface.hold_announce(packet)
Transport.jobs_locked = False
return
local_destination = next((d for d in Transport.destinations if d.hash == packet.destination_hash), None) local_destination = next((d for d in Transport.destinations if d.hash == packet.destination_hash), None)
if local_destination == None and RNS.Identity.validate_announce(packet): if local_destination == None and RNS.Identity.validate_announce(packet):
if packet.transport_id != None: if packet.transport_id != None: