From 5c943242309a8ffd89e59187d48caa2e84c08bdc Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Mon, 23 Apr 2018 23:42:16 +0200 Subject: [PATCH] Resource timing, retries --- RNS/Link.py | 9 +-- RNS/Packet.py | 41 +++++++----- RNS/Resource.py | 171 +++++++++++++++++++++++++++++++++++++++-------- RNS/Reticulum.py | 4 +- RNS/Transport.py | 46 ++++++++++++- RNS/__init__.py | 1 + 6 files changed, 218 insertions(+), 54 deletions(-) diff --git a/RNS/Link.py b/RNS/Link.py index 03914a8..11b44a8 100644 --- a/RNS/Link.py +++ b/RNS/Link.py @@ -29,8 +29,8 @@ class Link: # TODO: This should not be hardcoded, # but calculated from something like # first-hop RTT latency and distance - PROOF_TIMEOUT = 10 - TIMEOUT_FACTOR = 4 + DEFAULT_TIMEOUT = 5 + TIMEOUT_FACTOR = 3 KEEPALIVE = 120 PENDING = 0x00 @@ -93,7 +93,8 @@ class Link: self.rx = 0 self.txbytes = 0 self.rxbytes = 0 - self.proof_timeout = Link.PROOF_TIMEOUT + self.default_timeout = Link.DEFAULT_TIMEOUT + self.proof_timeout = self.default_timeout self.timeout_factor = Link.TIMEOUT_FACTOR self.keepalive = Link.KEEPALIVE self.watchdog_lock = False @@ -275,7 +276,7 @@ class Link: next_check = self.request_time + self.proof_timeout sleep_time = next_check - time.time() if time.time() >= self.request_time + self.proof_timeout: - RNS.log("Timeout waiting for RTT packet from link initiator", RNS.LOG_VERBOSE) + #RNS.log("Timeout waiting for RTT packet from link initiator", RNS.LOG_DEBUG) self.status = Link.CLOSED self.teardown_reason = Link.TIMEOUT self.link_closed() diff --git a/RNS/Packet.py b/RNS/Packet.py index 96bcad3..c12569c 100755 --- a/RNS/Packet.py +++ b/RNS/Packet.py @@ -17,22 +17,23 @@ class Packet: header_types = [HEADER_1, HEADER_2, HEADER_3, HEADER_4] # Context types - NONE = 0x00 - RESOURCE = 0x01 - RESOURCE_ADV = 0x02 - RESOURCE_REQ = 0x03 - RESOURCE_HMU = 0x04 - RESOURCE_PRF = 0x05 - RESOURCE_ICL = 0x06 - RESOURCE_RCL = 0x07 - REQUEST = 0x08 - RESPONSE = 0x09 - COMMAND = 0x0A - COMMAND_STAT = 0x0B - KEEPALIVE = 0xFC - LINKCLOSE = 0xFD - LRRTT = 0xFE - LRPROOF = 0xFF + NONE = 0x00 + RESOURCE = 0x01 + RESOURCE_ADV = 0x02 + RESOURCE_REQ = 0x03 + RESOURCE_HMU = 0x04 + RESOURCE_PRF = 0x05 + RESOURCE_ICL = 0x06 + RESOURCE_RCL = 0x07 + CACHE_REQUEST = 0x08 + REQUEST = 0x09 + RESPONSE = 0x0A + COMMAND = 0x0B + COMMAND_STAT = 0x0C + KEEPALIVE = 0xFC + LINKCLOSE = 0xFD + LRRTT = 0xFE + LRPROOF = 0xFF HEADER_MAXSIZE = 23 @@ -171,7 +172,11 @@ class Packet: def resend(self): if self.sent: - Transport.outbound(self.raw) + if RNS.Transport.outbound(self): + return self.receipt + else: + # TODO: Don't raise error here, handle gracefully + raise IOError("Packet could not be sent! Do you have any outbound interfaces configured?") else: raise IOError("Packet was not sent yet") @@ -198,6 +203,8 @@ class Packet: return RNS.Identity.fullHash(self.getHashablePart()) def getHashablePart(self): + # TODO: This assumes transport headers are stripped + # by Transport before going anywhere else return self.raw[0:1]+self.raw[2:] class ProofDestination: diff --git a/RNS/Resource.py b/RNS/Resource.py index c42943c..a4edf87 100644 --- a/RNS/Resource.py +++ b/RNS/Resource.py @@ -6,7 +6,6 @@ import threading import vendor.umsgpack as umsgpack from time import sleep - class Resource: WINDOW_MIN = 1 WINDOW_MAX = 10 @@ -15,9 +14,10 @@ class Resource: SDU = RNS.Reticulum.MTU - RNS.Packet.HEADER_MAXSIZE RANDOM_HASH_SIZE = 4 - DEFAULT_TIMEOUT = RNS.Packet.TIMEOUT - MAX_RETRIES = 3 - ROUNDTRIP_FACTOR = 1.5 + # TODO: Should be allocated more + # intelligently + MAX_RETRIES = 5 + SENDER_GRACE_TIME = 10 HASHMAP_IS_NOT_EXHAUSTED = 0x00 HASHMAP_IS_EXHAUSTED = 0xFF @@ -27,9 +27,11 @@ class Resource: QUEUED = 0x01 ADVERTISED = 0x02 TRANSFERRING = 0x03 - COMPLETE = 0x04 - FAILED = 0x05 - CORRUPT = 0x06 + AWAITING_PROOF = 0x04 + ASSEMBLING = 0x05 + COMPLETE = 0x06 + FAILED = 0x07 + CORRUPT = 0x08 @staticmethod def accept(advertisement_packet, callback=None, progress_callback = None): @@ -68,15 +70,24 @@ class Resource: resource.hashmap_update(0, resource.hashmap_raw) + resource.watchdog_job() + return resource except Exception as e: - RNS.log("Could not decode resource advertisement, dropping resource", RNS.LOG_VERBOSE) - traceback.print_exc() + RNS.log("Could not decode resource advertisement, dropping resource", RNS.LOG_DEBUG) return None def __init__(self, data, link, advertise=True, auto_compress=True, callback=None, progress_callback=None): self.status = Resource.NONE self.link = link + self.max_retries = Resource.MAX_RETRIES + self.retries_left = self.max_retries + self.default_timeout = self.link.default_timeout + self.timeout_factor = self.link.timeout_factor + self.sender_grace_time = Resource.SENDER_GRACE_TIME + self.hmu_retry_ok = False + self.watchdog_lock = False + self.__watchdog_job_id = 0 self.rtt = None if data != None: @@ -112,6 +123,7 @@ class Resource: self.size = len(self.data) self.hashmap = "" + self.sent_parts = 0 self.parts = [] for i in range(0,int(math.ceil(self.size/float(Resource.SDU)))): data = self.data[i*Resource.SDU:(i+1)*Resource.SDU] @@ -142,17 +154,22 @@ class Resource: def hashmap_update_packet(self, plaintext): if not self.status == Resource.FAILED: + self.last_activity = time.time() + self.retries_left = self.max_retries + update = umsgpack.unpackb(plaintext[RNS.Identity.HASHLENGTH/8:]) self.hashmap_update(update[0], update[1]) def hashmap_update(self, segment, hashmap): if not self.status == Resource.FAILED: + self.status = Resource.TRANSFERRING seg_len = ResourceAdvertisement.HASHMAP_MAX_LEN hashes = len(hashmap)/Resource.MAPHASH_LEN for i in range(0,hashes): + if self.hashmap[i+segment*seg_len] == None: + self.hashmap_height += 1 self.hashmap[i+segment*seg_len] = hashmap[i*Resource.MAPHASH_LEN:(i+1)*Resource.MAPHASH_LEN] - self.hashmap_height += 1 self.waiting_for_hmu = False self.request_next() @@ -167,22 +184,105 @@ class Resource: def __advertise_job(self): data = ResourceAdvertisement(self).pack() - packet = RNS.Packet(self.link, data, context=RNS.Packet.RESOURCE_ADV) + self.advertisement_packet = RNS.Packet(self.link, data, context=RNS.Packet.RESOURCE_ADV) while not self.link.ready_for_new_resource(): self.status = Resource.QUEUED sleep(0.25) - packet.send() + self.advertisement_packet.send() self.last_activity = time.time() self.adv_sent = self.last_activity self.rtt = None self.status = Resource.ADVERTISED self.link.register_outgoing_resource(self) + self.watchdog_job() + + def watchdog_job(self): + thread = threading.Thread(target=self.__watchdog_job) + thread.setDaemon(True) + thread.start() + + def __watchdog_job(self): + self.__watchdog_job_id += 1 + this_job_id = self.__watchdog_job_id + + while self.status < Resource.ASSEMBLING and this_job_id == self.__watchdog_job_id: + while self.watchdog_lock: + sleep(0.025) + + sleep_time = None + + if self.status == Resource.ADVERTISED: + sleep_time = (self.adv_sent+self.default_timeout)-time.time() + if sleep_time < 0: + if self.retries_left <= 0: + RNS.log("Resource transfer timeout after sending advertisement", RNS.LOG_DEBUG) + self.cancel() + sleep_time = 0.001 + else: + RNS.log("No part requests received, retrying resource advertisement...", RNS.LOG_DEBUG) + self.retries_left -= 1 + self.advertisement_packet.resend() + self.last_activity = time.time() + self.adv_sent = self.last_activity + sleep_time = 0.001 + + + elif self.status == Resource.TRANSFERRING: + if not self.initiator: + rtt = self.link.rtt if self.rtt == None else self.rtt + sleep_time = self.last_activity + (rtt*self.timeout_factor) - time.time() + + if sleep_time < 0: + if self.retries_left > 0: + RNS.log("Timeout waiting for parts, requesting retry", RNS.LOG_DEBUG) + sleep_time = 0.001 + self.retries_left -= 1 + self.waiting_for_hmu = False + self.request_next() + else: + self.cancel() + sleep_time = 0.001 + else: + max_wait = self.rtt * self.timeout_factor * self.max_retries + self.sender_grace_time + sleep_time = self.last_activity + max_wait - time.time() + if sleep_time < 0: + RNS.log("Resource timed out waiting for part requests", RNS.LOG_DEBUG) + self.cancel() + sleep_time = 0.001 + + elif self.status == Resource.AWAITING_PROOF: + sleep_time = self.last_part_sent + (self.rtt*self.timeout_factor+self.sender_grace_time) - time.time() + if sleep_time < 0: + if self.retries_left <= 0: + RNS.log("Resource timed out waiting for proof", RNS.LOG_DEBUG) + self.cancel() + sleep_time = 0.001 + else: + RNS.log("All parts sent, but no resource proof received, querying network cache...", RNS.LOG_DEBUG) + self.retries_left -= 1 + expected_data = self.hash + self.expected_proof + expected_proof_packet = RNS.Packet(self.link, expected_data, packet_type=RNS.Packet.PROOF, context=RNS.Packet.RESOURCE_PRF) + expected_proof_packet.pack() + expected_proof_packet.updateHash() + RNS.Transport.cache_request(expected_proof_packet.packet_hash) + self.last_part_sent = time.time() + sleep_time = 0.001 + + if sleep_time == 0: + RNS.log("Warning! Link watchdog sleep time of 0!", RNS.LOG_WARNING) + if sleep_time == None or sleep_time < 0: + # TODO: This should probably not be here forever + RNS.log("Timing error! Closing Reticulum now.", RNS.LOG_CRITICAL) + RNS.panic() + + sleep(sleep_time) + def assemble(self): if not self.status == Resource.FAILED: try: - RNS.log("Assembling parts...") + self.status = Resource.ASSEMBLING stream = "" for part in self.parts: stream += part @@ -236,11 +336,14 @@ class Resource: def receive_part(self, packet): self.last_activity = time.time() + self.retries_left = self.max_retries + if self.req_resp == None: self.req_resp = self.last_activity rtt = self.req_resp-self.req_sent if self.rtt == None: self.rtt = rtt + self.watchdog_job() elif self.rtt < rtt: self.rtt = rtt @@ -313,7 +416,12 @@ class Resource: if self.rtt == None: self.rtt = rtt - self.status == Resource.TRANSFERRING + if self.status != Resource.TRANSFERRING: + self.status = Resource.TRANSFERRING + self.watchdog_job() + + self.retries_left = self.max_retries + wants_more_hashmap = True if ord(request_data[0]) == Resource.HASHMAP_IS_EXHAUSTED else False pad = 1+Resource.MAPHASH_LEN if wants_more_hashmap else 1 @@ -322,16 +430,18 @@ class Resource: for i in range(0,len(requested_hashes)/Resource.MAPHASH_LEN): requested_hash = requested_hashes[i*Resource.MAPHASH_LEN:(i+1)*Resource.MAPHASH_LEN] - i = 0 + pi = 0 for part in self.parts: if part.map_hash == requested_hash: if not part.sent: part.send() + self.sent_parts += 1 else: part.resend() self.last_activity = time.time() + self.last_part_sent = self.last_activity break - i += 1 + pi += 1 if wants_more_hashmap: last_map_hash = request_data[1:Resource.MAPHASH_LEN+1] @@ -358,21 +468,26 @@ class Resource: hmu = self.hash+umsgpack.packb([segment, hashmap]) hmu_packet = RNS.Packet(self.link, hmu, context = RNS.Packet.RESOURCE_HMU) + hmu_packet.send() self.last_activity = time.time() + if self.sent_parts == len(self.parts): + self.status = Resource.AWAITING_PROOF + def cancel(self): - self.status = Resource.FAILED - if self.initiator: - if self.link.status == RNS.Link.ACTIVE: - cancel_packet = RNS.Packet(self.link, self.hash, context=RNS.Packet.RESOURCE_ICL) - cancel_packet.send() - self.link.cancel_outgoing_resource(self) - else: - self.link.cancel_incoming_resource(self) - - if self.callback != None: - self.callback(self) + if self.status < Resource.COMPLETE: + self.status = Resource.FAILED + if self.initiator: + if self.link.status == RNS.Link.ACTIVE: + cancel_packet = RNS.Packet(self.link, self.hash, context=RNS.Packet.RESOURCE_ICL) + cancel_packet.send() + self.link.cancel_outgoing_resource(self) + else: + self.link.cancel_incoming_resource(self) + + if self.callback != None: + self.callback(self) def progress_callback(self, callback): self.__progress_callback = callback @@ -382,7 +497,7 @@ class Resource: return progress def __str__(self): - return RNS.prettyHexRep(self.hash) + return RNS.prettyhexrep(self.hash)+str(self.link) class ResourceAdvertisement: diff --git a/RNS/Reticulum.py b/RNS/Reticulum.py index 177d9ba..38d5f46 100755 --- a/RNS/Reticulum.py +++ b/RNS/Reticulum.py @@ -63,8 +63,8 @@ class Reticulum: RNS.loglevel = int(value) if RNS.loglevel < 0: RNS.loglevel = 0 - if RNS.loglevel > 6: - RNS.loglevel = 6 + if RNS.loglevel > 7: + RNS.loglevel = 7 if "reticulum" in self.config: for option in self.config["reticulum"]: diff --git a/RNS/Transport.py b/RNS/Transport.py index 945ec95..45091ec 100755 --- a/RNS/Transport.py +++ b/RNS/Transport.py @@ -1,3 +1,4 @@ +import os import RNS import time import threading @@ -64,7 +65,7 @@ class Transport: @staticmethod def outbound(packet): while (Transport.jobs_running): - sleep(0.1) + sleep(0.01) Transport.jobs_locked = True packet.updateHash() @@ -101,9 +102,15 @@ class Transport: def packet_filter(packet): if packet.context == RNS.Packet.KEEPALIVE: return True + if packet.context == RNS.Packet.RESOURCE_REQ: + return True + if packet.context == RNS.Packet.RESOURCE_PRF: + return True if not packet.packet_hash in Transport.packet_hashlist: return True + return False + @staticmethod def inbound(raw, interface=None): while (Transport.jobs_running): @@ -116,8 +123,9 @@ class Transport: packet.updateHash() packet.receiving_interface = interface - RNS.log(str(interface)+" received packet with hash "+RNS.prettyhexrep(packet.packet_hash), RNS.LOG_DEBUG) + RNS.log(str(interface)+" received packet with hash "+RNS.prettyhexrep(packet.packet_hash), RNS.LOG_EXTREME) + # TODO: Rewrite these redundant cache calls if Transport.packet_filter(packet): Transport.packet_hashlist.append(packet.packet_hash) @@ -216,6 +224,9 @@ class Transport: def shouldCache(packet): # TODO: Implement sensible rules for which # packets to cache + if packet.context == RNS.Packet.RESOURCE_PRF: + return True + return False @staticmethod @@ -226,8 +237,37 @@ class Transport: file = open(RNS.Reticulum.cachepath+"/"+packet_hash, "w") file.write(packet.raw) file.close() - RNS.log("Wrote packet "+packet_hash+" to cache", RNS.LOG_DEBUG) + RNS.log("Wrote packet "+packet_hash+" to cache", RNS.LOG_EXTREME) except Exception as e: RNS.log("Error writing packet to cache", RNS.LOG_ERROR) RNS.log("The contained exception was: "+str(e)) + @staticmethod + def cache_request_packet(packet): + if len(packet.data) == RNS.Identity.HASHLENGTH/8: + packet_hash = RNS.hexrep(packet.data, delimit=False) + path = RNS.Reticulum.cachepath+"/"+packet_hash + if os.path.isfile(path): + file = open(path, "r") + raw = file.read() + file.close() + packet = RNS.Packet(None, raw) + # TODO: Implement outbound for this + + + @staticmethod + def cache_request(packet_hash): + RNS.log("Cache request for "+RNS.prettyhexrep(packet_hash), RNS.LOG_EXTREME) + path = RNS.Reticulum.cachepath+"/"+RNS.hexrep(packet_hash, delimit=False) + if os.path.isfile(path): + file = open(path, "r") + raw = file.read() + Transport.inbound(raw) + file.close() + else: + cache_request_packet = RNS.Packet(Transport.transport_destination(), packet_hash, context = RNS.Packet.CACHE_REQUEST) + + @staticmethod + def transport_destination(): + # TODO: implement this + pass diff --git a/RNS/__init__.py b/RNS/__init__.py index 0483e7a..f356758 100755 --- a/RNS/__init__.py +++ b/RNS/__init__.py @@ -22,6 +22,7 @@ LOG_NOTICE = 3 LOG_INFO = 4 LOG_VERBOSE = 5 LOG_DEBUG = 6 +LOG_EXTREME = 7 LOG_STDOUT = 0x91 LOG_FILE = 0x92