Improved resource transfers over unreliable links

This commit is contained in:
Mark Qvist 2023-11-04 18:05:20 +01:00
parent db1cdec2a2
commit c4d0f08767

View File

@ -26,6 +26,7 @@ import bz2
import math import math
import time import time
import threading import threading
from threading import Lock
from .vendor import umsgpack as umsgpack from .vendor import umsgpack as umsgpack
from time import sleep from time import sleep
@ -47,7 +48,7 @@ class Resource:
WINDOW = 4 WINDOW = 4
# Absolute minimum window size during transfer # Absolute minimum window size during transfer
WINDOW_MIN = 1 WINDOW_MIN = 2
# The maximum window size for transfers on slow links # The maximum window size for transfers on slow links
WINDOW_MAX_SLOW = 10 WINDOW_MAX_SLOW = 10
@ -103,7 +104,7 @@ class Resource:
PART_TIMEOUT_FACTOR = 4 PART_TIMEOUT_FACTOR = 4
PART_TIMEOUT_FACTOR_AFTER_RTT = 2 PART_TIMEOUT_FACTOR_AFTER_RTT = 2
MAX_RETRIES = 8 MAX_RETRIES = 16
MAX_ADV_RETRIES = 4 MAX_ADV_RETRIES = 4
SENDER_GRACE_TIME = 10 SENDER_GRACE_TIME = 10
RETRY_GRACE_TIME = 0.25 RETRY_GRACE_TIME = 0.25
@ -170,7 +171,8 @@ class Resource:
resource.receiving_part = False resource.receiving_part = False
resource.consecutive_completed_height = 0 # TODO: Recheck
resource.consecutive_completed_height = -1
if not resource.link.has_incoming_resource(resource): if not resource.link.has_incoming_resource(resource):
resource.link.register_incoming_resource(resource) resource.link.register_incoming_resource(resource)
@ -366,7 +368,8 @@ class Resource:
if advertise: if advertise:
self.advertise() self.advertise()
else: else:
pass self.receive_lock = Lock()
def hashmap_update_packet(self, plaintext): def hashmap_update_packet(self, plaintext):
if not self.status == Resource.FAILED: if not self.status == Resource.FAILED:
@ -623,99 +626,99 @@ class Resource:
def receive_part(self, packet): def receive_part(self, packet):
while self.receiving_part: with self.receive_lock:
sleep(0.001)
self.receiving_part = True self.receiving_part = True
self.last_activity = time.time() self.last_activity = time.time()
self.retries_left = self.max_retries self.retries_left = self.max_retries
if self.req_resp == None: if self.req_resp == None:
self.req_resp = self.last_activity self.req_resp = self.last_activity
rtt = self.req_resp-self.req_sent rtt = self.req_resp-self.req_sent
self.part_timeout_factor = Resource.PART_TIMEOUT_FACTOR_AFTER_RTT self.part_timeout_factor = Resource.PART_TIMEOUT_FACTOR_AFTER_RTT
if self.rtt == None: if self.rtt == None:
self.rtt = self.link.rtt self.rtt = self.link.rtt
self.watchdog_job() self.watchdog_job()
elif rtt < self.rtt: elif rtt < self.rtt:
self.rtt = max(self.rtt - self.rtt*0.05, rtt) self.rtt = max(self.rtt - self.rtt*0.05, rtt)
elif rtt > self.rtt: elif rtt > self.rtt:
self.rtt = min(self.rtt + self.rtt*0.05, rtt) self.rtt = min(self.rtt + self.rtt*0.05, rtt)
if rtt > 0: if rtt > 0:
req_resp_cost = len(packet.raw)+self.req_sent_bytes req_resp_cost = len(packet.raw)+self.req_sent_bytes
self.req_resp_rtt_rate = req_resp_cost / rtt self.req_resp_rtt_rate = req_resp_cost / rtt
if self.req_resp_rtt_rate > Resource.RATE_FAST and self.fast_rate_rounds < Resource.FAST_RATE_THRESHOLD: if self.req_resp_rtt_rate > Resource.RATE_FAST and self.fast_rate_rounds < Resource.FAST_RATE_THRESHOLD:
self.fast_rate_rounds += 1 self.fast_rate_rounds += 1
if self.fast_rate_rounds == Resource.FAST_RATE_THRESHOLD: if self.fast_rate_rounds == Resource.FAST_RATE_THRESHOLD:
self.window_max = Resource.WINDOW_MAX_FAST self.window_max = Resource.WINDOW_MAX_FAST
if not self.status == Resource.FAILED: if not self.status == Resource.FAILED:
self.status = Resource.TRANSFERRING self.status = Resource.TRANSFERRING
part_data = packet.data part_data = packet.data
part_hash = self.get_map_hash(part_data) part_hash = self.get_map_hash(part_data)
i = self.consecutive_completed_height consecutive_index = self.consecutive_completed_height if self.consecutive_completed_height >= 0 else 0
for map_hash in self.hashmap[self.consecutive_completed_height:self.consecutive_completed_height+self.window]: i = consecutive_index
if map_hash == part_hash: for map_hash in self.hashmap[consecutive_index:consecutive_index+self.window]:
if self.parts[i] == None: if map_hash == part_hash:
if self.parts[i] == None:
# Insert data into parts list # Insert data into parts list
self.parts[i] = part_data self.parts[i] = part_data
self.rtt_rxd_bytes += len(part_data) self.rtt_rxd_bytes += len(part_data)
self.received_count += 1 self.received_count += 1
self.outstanding_parts -= 1 self.outstanding_parts -= 1
# Update consecutive completed pointer # Update consecutive completed pointer
if i == self.consecutive_completed_height + 1: if i == self.consecutive_completed_height + 1:
self.consecutive_completed_height = i self.consecutive_completed_height = i
cp = self.consecutive_completed_height + 1 cp = self.consecutive_completed_height + 1
while cp < len(self.parts) and self.parts[cp] != None: while cp < len(self.parts) and self.parts[cp] != None:
self.consecutive_completed_height = cp self.consecutive_completed_height = cp
cp += 1 cp += 1
if self.__progress_callback != None: if self.__progress_callback != None:
try: try:
self.__progress_callback(self) self.__progress_callback(self)
except Exception as e: except Exception as e:
RNS.log("Error while executing progress callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR) RNS.log("Error while executing progress callback from "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
i += 1 i += 1
self.receiving_part = False self.receiving_part = False
if self.received_count == self.total_parts and not self.assembly_lock: if self.received_count == self.total_parts and not self.assembly_lock:
self.assembly_lock = True self.assembly_lock = True
self.assemble() self.assemble()
elif self.outstanding_parts == 0: elif self.outstanding_parts == 0:
# TODO: Figure out if there is a mathematically # TODO: Figure out if there is a mathematically
# optimal way to adjust windows # optimal way to adjust windows
if self.window < self.window_max: if self.window < self.window_max:
self.window += 1 self.window += 1
if (self.window - self.window_min) > (self.window_flexibility-1): if (self.window - self.window_min) > (self.window_flexibility-1):
self.window_min += 1 self.window_min += 1
if self.req_sent != 0: if self.req_sent != 0:
rtt = time.time()-self.req_sent rtt = time.time()-self.req_sent
req_transferred = self.rtt_rxd_bytes - self.rtt_rxd_bytes_at_part_req req_transferred = self.rtt_rxd_bytes - self.rtt_rxd_bytes_at_part_req
if rtt != 0: if rtt != 0:
self.req_data_rtt_rate = req_transferred/rtt self.req_data_rtt_rate = req_transferred/rtt
self.rtt_rxd_bytes_at_part_req = self.rtt_rxd_bytes self.rtt_rxd_bytes_at_part_req = self.rtt_rxd_bytes
if self.req_data_rtt_rate > Resource.RATE_FAST and self.fast_rate_rounds < Resource.FAST_RATE_THRESHOLD: if self.req_data_rtt_rate > Resource.RATE_FAST and self.fast_rate_rounds < Resource.FAST_RATE_THRESHOLD:
self.fast_rate_rounds += 1 self.fast_rate_rounds += 1
if self.fast_rate_rounds == Resource.FAST_RATE_THRESHOLD: if self.fast_rate_rounds == Resource.FAST_RATE_THRESHOLD:
self.window_max = Resource.WINDOW_MAX_FAST self.window_max = Resource.WINDOW_MAX_FAST
self.request_next() self.request_next()
else: else:
self.receiving_part = False self.receiving_part = False
# Called on incoming resource to send a request for more data # Called on incoming resource to send a request for more data
def request_next(self): def request_next(self):
@ -728,11 +731,25 @@ class Resource:
hashmap_exhausted = Resource.HASHMAP_IS_NOT_EXHAUSTED hashmap_exhausted = Resource.HASHMAP_IS_NOT_EXHAUSTED
requested_hashes = b"" requested_hashes = b""
offset = (1 if self.consecutive_completed_height > 0 else 0) i = 0; pn = self.consecutive_completed_height+1
i = 0; pn = self.consecutive_completed_height+offset
search_start = pn search_start = pn
search_size = self.window
for part in self.parts[search_start:search_start+self.window]: # TODO: Remove
# tpm = []
# tpi = 0
# try:
# for p in self.parts:
# if p == None:
# tpm.append(None)
# else:
# tpm.append(tpi)
# tpi+=1
# except Exception as e:
# print(str(e))
# RNS.log(f"Partmap: "+str(tpm))
for part in self.parts[search_start:search_start+search_size]:
if part == None: if part == None:
part_hash = self.hashmap[pn] part_hash = self.hashmap[pn]
if part_hash != None: if part_hash != None:
@ -752,7 +769,6 @@ class Resource:
hmu_part += last_map_hash hmu_part += last_map_hash
self.waiting_for_hmu = True self.waiting_for_hmu = True
requested_data = b""
request_data = hmu_part + self.hash + requested_hashes request_data = hmu_part + self.hash + requested_hashes
request_packet = RNS.Packet(self.link, request_data, context = RNS.Packet.RESOURCE_REQ) request_packet = RNS.Packet(self.link, request_data, context = RNS.Packet.RESOURCE_REQ)
@ -908,8 +924,7 @@ class Resource:
else: else:
self.progress_total_parts = float(self.total_parts) self.progress_total_parts = float(self.total_parts)
progress = min(1.0, self.processed_parts / self.progress_total_parts)
progress = self.processed_parts / self.progress_total_parts
return progress return progress
def get_transfer_size(self): def get_transfer_size(self):