Implemented bandwidth-based announce propagation calculation
This commit is contained in:
parent
64593e27be
commit
31104c6e9c
@ -21,6 +21,8 @@
|
|||||||
# SOFTWARE.
|
# SOFTWARE.
|
||||||
|
|
||||||
import RNS
|
import RNS
|
||||||
|
import time
|
||||||
|
import threading
|
||||||
|
|
||||||
class Interface:
|
class Interface:
|
||||||
IN = False
|
IN = False
|
||||||
@ -41,5 +43,61 @@ class Interface:
|
|||||||
def get_hash(self):
|
def get_hash(self):
|
||||||
return RNS.Identity.full_hash(str(self).encode("utf-8"))
|
return RNS.Identity.full_hash(str(self).encode("utf-8"))
|
||||||
|
|
||||||
|
# TODO: Clean
|
||||||
|
# def bogus_queue(self):
|
||||||
|
# self.announce_queue = []
|
||||||
|
|
||||||
|
# import random
|
||||||
|
# import time
|
||||||
|
|
||||||
|
# now = time.time()
|
||||||
|
# random.seed(45)
|
||||||
|
# for i in range(1,32):
|
||||||
|
# entry = {"time": now+i*3, "hops":random.randint(4,16), "raw": str("bogus_data_"+str(i)).encode("utf-8")}
|
||||||
|
# self.announce_queue.append(entry)
|
||||||
|
|
||||||
|
def process_announce_queue(self):
|
||||||
|
if not hasattr(self, "announce_cap"):
|
||||||
|
self.announce_cap = RNS.Reticulum.ANNOUNCE_CAP
|
||||||
|
|
||||||
|
if hasattr(self, "announce_queue"):
|
||||||
|
# TODO: Clean
|
||||||
|
# RNS.log("Processing announce queue on "+str(self), RNS.LOG_DEBUG)
|
||||||
|
try:
|
||||||
|
now = time.time()
|
||||||
|
stale = []
|
||||||
|
for a in self.announce_queue:
|
||||||
|
if now > a["time"]+RNS.Reticulum.QUEUED_ANNOUNCE_LIFE:
|
||||||
|
stale.append(a)
|
||||||
|
|
||||||
|
for s in stale:
|
||||||
|
self.announce_queue.remove(s)
|
||||||
|
|
||||||
|
if len(self.announce_queue) > 0:
|
||||||
|
min_hops = min(entry["hops"] for entry in self.announce_queue)
|
||||||
|
entries = list(filter(lambda e: e["hops"] == min_hops, self.announce_queue))
|
||||||
|
entries.sort(key=lambda e: e["time"])
|
||||||
|
selected = entries[0]
|
||||||
|
|
||||||
|
now = time.time()
|
||||||
|
tx_time = (len(selected["raw"])*8) / self.bitrate
|
||||||
|
wait_time = (tx_time / self.announce_cap)
|
||||||
|
self.announce_allowed_at = now + wait_time
|
||||||
|
|
||||||
|
self.processOutgoing(selected["raw"])
|
||||||
|
self.announce_queue.remove(selected)
|
||||||
|
# TODO: Clean debug statements
|
||||||
|
# RNS.log("Sent queued announce with "+str(selected["hops"])+" hops on "+str(self))
|
||||||
|
if len(self.announce_queue) > 0:
|
||||||
|
# TODO: Clean debug statements
|
||||||
|
# RNS.log("Still have "+str(len(self.announce_queue))+" announces in queue, scheduling next for tx in "+str(round(wait_time*1000,6))+"ms", RNS.LOG_DEBUG)
|
||||||
|
timer = threading.Timer(wait_time, self.process_announce_queue)
|
||||||
|
timer.start()
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self.announce_queue = []
|
||||||
|
RNS.log("Error while processing announce queue on "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
|
||||||
|
RNS.log("The announce queue for this interface has been cleared.", RNS.LOG_ERROR)
|
||||||
|
|
||||||
def detach(self):
|
def detach(self):
|
||||||
pass
|
pass
|
@ -85,6 +85,9 @@ class Reticulum:
|
|||||||
the default value.
|
the default value.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
MAX_QUEUED_ANNOUNCES = 16384
|
||||||
|
QUEUED_ANNOUNCE_LIFE = 60*60*24
|
||||||
|
|
||||||
ANNOUNCE_CAP = 2
|
ANNOUNCE_CAP = 2
|
||||||
"""
|
"""
|
||||||
The maximum percentage of interface bandwidth that, at any given time,
|
The maximum percentage of interface bandwidth that, at any given time,
|
||||||
|
@ -56,7 +56,7 @@ class Transport:
|
|||||||
PATHFINDER_D = 10 # Fixed per-hop delay
|
PATHFINDER_D = 10 # Fixed per-hop delay
|
||||||
PATHFINDER_R = 1 # Retransmit retries
|
PATHFINDER_R = 1 # Retransmit retries
|
||||||
PATHFINDER_T = 10 # Retry grace period
|
PATHFINDER_T = 10 # Retry grace period
|
||||||
PATHFINDER_RW = 5 # Random window for announce rebroadcast
|
PATHFINDER_RW = 4 # Random window for announce rebroadcast
|
||||||
PATHFINDER_E = 60*60*24*7 # Path expiration of one week
|
PATHFINDER_E = 60*60*24*7 # Path expiration of one week
|
||||||
AP_PATH_TIME = 60*60*24 # Path expiration of one day for Access Point paths
|
AP_PATH_TIME = 60*60*24 # Path expiration of one day for Access Point paths
|
||||||
|
|
||||||
@ -82,6 +82,7 @@ class Transport:
|
|||||||
# TODO: "destination_table" should really be renamed to "path_table"
|
# TODO: "destination_table" should really be renamed to "path_table"
|
||||||
# Notes on memory usage: 1 megabyte of memory can store approximately
|
# Notes on memory usage: 1 megabyte of memory can store approximately
|
||||||
# 55.100 path table entries or approximately 22.300 link table entries.
|
# 55.100 path table entries or approximately 22.300 link table entries.
|
||||||
|
|
||||||
announce_table = {} # A table for storing announces currently waiting to be retransmitted
|
announce_table = {} # A table for storing announces currently waiting to be retransmitted
|
||||||
destination_table = {} # A lookup table containing the next hop to a given destination
|
destination_table = {} # A lookup table containing the next hop to a given destination
|
||||||
reverse_table = {} # A lookup table for storing packet hashes used to return proofs and replies
|
reverse_table = {} # A lookup table for storing packet hashes used to return proofs and replies
|
||||||
@ -302,7 +303,6 @@ class Transport:
|
|||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
if time.time() > announce_entry[1]:
|
if time.time() > announce_entry[1]:
|
||||||
# announce_entry[1] = time.time() + math.pow(Transport.PATHFINDER_C, announce_entry[4]) + Transport.PATHFINDER_T + Transport.PATHFINDER_RW
|
|
||||||
announce_entry[1] = time.time() + math.pow(Transport.PATHFINDER_C, announce_entry[4]) + Transport.PATHFINDER_T + Transport.PATHFINDER_RW
|
announce_entry[1] = time.time() + math.pow(Transport.PATHFINDER_C, announce_entry[4]) + Transport.PATHFINDER_T + Transport.PATHFINDER_RW
|
||||||
announce_entry[2] += 1
|
announce_entry[2] += 1
|
||||||
packet = announce_entry[5]
|
packet = announce_entry[5]
|
||||||
@ -476,12 +476,14 @@ class Transport:
|
|||||||
@staticmethod
|
@staticmethod
|
||||||
def outbound(packet):
|
def outbound(packet):
|
||||||
while (Transport.jobs_running):
|
while (Transport.jobs_running):
|
||||||
|
# TODO: Profile actual impact here on faster links
|
||||||
sleep(0.01)
|
sleep(0.01)
|
||||||
|
|
||||||
Transport.jobs_locked = True
|
Transport.jobs_locked = True
|
||||||
# TODO: This updateHash call might be redundant
|
# TODO: This updateHash call might be redundant
|
||||||
packet.update_hash()
|
packet.update_hash()
|
||||||
sent = False
|
sent = False
|
||||||
|
outbound_time = time.time()
|
||||||
|
|
||||||
# Check if we have a known path for the destination in the path table
|
# Check if we have a known path for the destination in the path table
|
||||||
if packet.packet_type != RNS.Packet.ANNOUNCE and packet.destination.type != RNS.Destination.PLAIN and packet.destination.type != RNS.Destination.GROUP and packet.destination_hash in Transport.destination_table:
|
if packet.packet_type != RNS.Packet.ANNOUNCE and packet.destination.type != RNS.Destination.PLAIN and packet.destination.type != RNS.Destination.GROUP and packet.destination_hash in Transport.destination_table:
|
||||||
@ -550,17 +552,75 @@ class Transport:
|
|||||||
should_transmit = False
|
should_transmit = False
|
||||||
|
|
||||||
if packet.packet_type == RNS.Packet.ANNOUNCE:
|
if packet.packet_type == RNS.Packet.ANNOUNCE:
|
||||||
if packet.attached_interface == None and interface.mode == RNS.Interfaces.Interface.Interface.MODE_ACCESS_POINT:
|
if packet.attached_interface == None:
|
||||||
RNS.log("Blocking announce broadcast on "+str(interface)+" due to AP mode", RNS.LOG_DEBUG)
|
if interface.mode == RNS.Interfaces.Interface.Interface.MODE_ACCESS_POINT:
|
||||||
should_transmit = False
|
RNS.log("Blocking announce broadcast on "+str(interface)+" due to AP mode", RNS.LOG_EXTREME)
|
||||||
# TODO: Add capacity limit based on interface bandwidth
|
should_transmit = False
|
||||||
|
|
||||||
|
else:
|
||||||
|
# TODO: Remove logging statements
|
||||||
|
if packet.hops > 0:
|
||||||
|
|
||||||
|
if not hasattr(interface, "announce_cap"):
|
||||||
|
interface.announce_cap = RNS.Reticulum.ANNOUNCE_CAP
|
||||||
|
|
||||||
|
if not hasattr(interface, "announce_allowed_at"):
|
||||||
|
interface.announce_allowed_at = 0
|
||||||
|
|
||||||
|
if not hasattr(interface, "announce_queue"):
|
||||||
|
interface.announce_queue = []
|
||||||
|
|
||||||
|
queued_announces = True if len(interface.announce_queue) > 0 else False
|
||||||
|
|
||||||
|
if not queued_announces and outbound_time > interface.announce_allowed_at:
|
||||||
|
tx_time = (len(packet.raw)*8) / interface.bitrate
|
||||||
|
wait_time = (tx_time / interface.announce_cap)
|
||||||
|
interface.announce_allowed_at = outbound_time + wait_time
|
||||||
|
|
||||||
|
else:
|
||||||
|
should_transmit = False
|
||||||
|
if not len(interface.announce_queue) >= RNS.Reticulum.MAX_QUEUED_ANNOUNCES:
|
||||||
|
entry = {"time": outbound_time, "hops": packet.hops, "raw": packet.raw}
|
||||||
|
queued_announces = True if len(interface.announce_queue) > 0 else False
|
||||||
|
interface.announce_queue.append(entry)
|
||||||
|
# TODO: Clean
|
||||||
|
# RNS.log("Added announce to queue on "+str(interface), RNS.LOG_DEBUG)
|
||||||
|
|
||||||
|
if not queued_announces:
|
||||||
|
# TODO: Clean
|
||||||
|
# RNS.log("Interface "+str(interface)+" still waiting for announce allowance", RNS.LOG_DEBUG)
|
||||||
|
wait_time = max(interface.announce_allowed_at - time.time(), 0)
|
||||||
|
timer = threading.Timer(wait_time, interface.process_announce_queue)
|
||||||
|
timer.start()
|
||||||
|
# TODO: Clean
|
||||||
|
# RNS.log("Triggering run in "+str(wait_time)+" seconds", RNS.LOG_DEBUG)
|
||||||
|
else:
|
||||||
|
# TODO: Clean
|
||||||
|
# RNS.log("Interface "+str(interface)+" has announces in queue, adding directly to it", RNS.LOG_DEBUG)
|
||||||
|
pass
|
||||||
|
|
||||||
|
else:
|
||||||
|
# TODO: Clean
|
||||||
|
# RNS.log("Not retransmitting announce on "+str(interface)+" since the queue is full", RNS.LOG_DEBUG)
|
||||||
|
pass
|
||||||
|
|
||||||
|
else:
|
||||||
|
# TODO: Clean
|
||||||
|
# RNS.log("Skipping announce cap calculations for "+str(packet.hops)+" hop packet", RNS.LOG_DEBUG)
|
||||||
|
pass
|
||||||
|
|
||||||
if should_transmit:
|
if should_transmit:
|
||||||
if not stored_hash:
|
if not stored_hash:
|
||||||
Transport.packet_hashlist.append(packet.packet_hash)
|
Transport.packet_hashlist.append(packet.packet_hash)
|
||||||
stored_hash = True
|
stored_hash = True
|
||||||
|
|
||||||
interface.processOutgoing(packet.raw)
|
def send_packet():
|
||||||
|
interface.processOutgoing(packet.raw)
|
||||||
|
|
||||||
|
thread = threading.Thread(target=send_packet)
|
||||||
|
thread.daemon = True
|
||||||
|
thread.start()
|
||||||
|
|
||||||
sent = True
|
sent = True
|
||||||
|
|
||||||
if sent:
|
if sent:
|
||||||
@ -641,6 +701,9 @@ class Transport:
|
|||||||
def inbound(raw, interface=None):
|
def inbound(raw, interface=None):
|
||||||
while (Transport.jobs_running):
|
while (Transport.jobs_running):
|
||||||
sleep(0.01)
|
sleep(0.01)
|
||||||
|
|
||||||
|
if Transport.identity == None:
|
||||||
|
return
|
||||||
|
|
||||||
Transport.jobs_locked = True
|
Transport.jobs_locked = True
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user