diff --git a/RNS/Transport.py b/RNS/Transport.py index b8505dc..460eb76 100755 --- a/RNS/Transport.py +++ b/RNS/Transport.py @@ -592,7 +592,6 @@ class Transport: interface.announce_queue = [] queued_announces = True if len(interface.announce_queue) > 0 else False - if not queued_announces and outbound_time > interface.announce_allowed_at: tx_time = (len(packet.raw)*8) / interface.bitrate wait_time = (tx_time / interface.announce_cap) @@ -605,24 +604,50 @@ class Transport: else: should_transmit = False if not len(interface.announce_queue) >= RNS.Reticulum.MAX_QUEUED_ANNOUNCES: - entry = {"time": outbound_time, "hops": packet.hops, "raw": packet.raw} - queued_announces = True if len(interface.announce_queue) > 0 else False - interface.announce_queue.append(entry) + should_queue = True - if not queued_announces: - wait_time = max(interface.announce_allowed_at - time.time(), 0) - timer = threading.Timer(wait_time, interface.process_announce_queue) - timer.start() + already_queued = False + for e in interface.announce_queue: + if e["destination"] == packet.destination_hash: + already_queued = True + existing_entry = e - wait_time_str = str(round(wait_time*1000,3))+"ms" - ql_str = str(len(interface.announce_queue)) - RNS.log("Added announce to queue (height "+ql_str+") on "+str(interface)+" for processing in "+wait_time_str, RNS.LOG_EXTREME) + emission_timestamp = Transport.announce_emitted(packet) + if already_queued: + should_queue = False - else: - wait_time = max(interface.announce_allowed_at - time.time(), 0) - wait_time_str = str(round(wait_time*1000,3))+"ms" - ql_str = str(len(interface.announce_queue)) - RNS.log("Added announce to queue (height "+ql_str+") on "+str(interface)+" for processing in "+wait_time_str, RNS.LOG_EXTREME) + if emission_timestamp > existing_entry["emitted"]: + e["time"] = outbound_time + e["hops"] = packet.hops + e["emitted"] = emission_timestamp + e["raw"] = packet.raw + + if should_queue: + entry = { + "destination": packet.destination_hash, + "time": outbound_time, + "hops": packet.hops, + "emitted": Transport.announce_emitted(packet), + "raw": packet.raw + } + + queued_announces = True if len(interface.announce_queue) > 0 else False + interface.announce_queue.append(entry) + + if not queued_announces: + wait_time = max(interface.announce_allowed_at - time.time(), 0) + timer = threading.Timer(wait_time, interface.process_announce_queue) + timer.start() + + wait_time_str = str(round(wait_time*1000,3))+"ms" + ql_str = str(len(interface.announce_queue)) + RNS.log("Added announce to queue (height "+ql_str+") on "+str(interface)+" for processing in "+wait_time_str, RNS.LOG_EXTREME) + + else: + wait_time = max(interface.announce_allowed_at - time.time(), 0) + wait_time_str = str(round(wait_time*1000,3))+"ms" + ql_str = str(len(interface.announce_queue)) + RNS.log("Added announce to queue (height "+ql_str+") on "+str(interface)+" for processing in "+wait_time_str, RNS.LOG_EXTREME) else: pass @@ -985,8 +1010,9 @@ class Transport: # First, check that the announce is not for a destination # local to this system, and that hops are less than the max if (not any(packet.destination_hash == d.hash for d in Transport.destinations) and packet.hops < Transport.PATHFINDER_M+1): + announce_emitted = Transport.announce_emitted(packet) + random_blob = packet.data[RNS.Identity.KEYSIZE//8:RNS.Identity.KEYSIZE//8+RNS.Reticulum.TRUNCATED_HASHLENGTH//8] - announce_emitted = int.from_bytes(random_blob[5:10], "big") random_blobs = [] if packet.destination_hash in Transport.destination_table: random_blobs = Transport.destination_table[packet.destination_hash][4] @@ -1772,6 +1798,13 @@ class Transport: if registered_destination.type == RNS.Destination.SINGLE: registered_destination.announce(path_response=True) + @staticmethod + def announce_emitted(packet): + random_blob = packet.data[RNS.Identity.KEYSIZE//8:RNS.Identity.KEYSIZE//8+RNS.Reticulum.TRUNCATED_HASHLENGTH//8] + announce_emitted = int.from_bytes(random_blob[5:10], "big") + + return announce_emitted + @staticmethod def exit_handler(): try: