Added continous resource timeout adjustment. Fixes missing response timeout check.
This commit is contained in:
parent
42a3d23e99
commit
07cf180ea8
24
RNS/Link.py
24
RNS/Link.py
@ -292,7 +292,7 @@ class Link:
|
|||||||
packed_request = umsgpack.packb(unpacked_request)
|
packed_request = umsgpack.packb(unpacked_request)
|
||||||
|
|
||||||
if timeout == None:
|
if timeout == None:
|
||||||
timeout = self.rtt * self.traffic_timeout_factor
|
timeout = self.rtt * self.traffic_timeout_factor + RNS.Resource.RESPONSE_MAX_GRACE_TIME
|
||||||
|
|
||||||
if len(packed_request) <= Link.MDU:
|
if len(packed_request) <= Link.MDU:
|
||||||
request_packet = RNS.Packet(self, packed_request, RNS.Packet.DATA, context = RNS.Packet.REQUEST)
|
request_packet = RNS.Packet(self, packed_request, RNS.Packet.DATA, context = RNS.Packet.REQUEST)
|
||||||
@ -867,7 +867,8 @@ class RequestReceipt():
|
|||||||
FAILED = 0x00
|
FAILED = 0x00
|
||||||
SENT = 0x01
|
SENT = 0x01
|
||||||
DELIVERED = 0x02
|
DELIVERED = 0x02
|
||||||
READY = 0x03
|
RECEIVING = 0x03
|
||||||
|
READY = 0x04
|
||||||
|
|
||||||
def __init__(self, link, packet_receipt = None, resource = None, response_callback = None, failed_callback = None, progress_callback = None, timeout = None):
|
def __init__(self, link, packet_receipt = None, resource = None, response_callback = None, failed_callback = None, progress_callback = None, timeout = None):
|
||||||
self.packet_receipt = packet_receipt
|
self.packet_receipt = packet_receipt
|
||||||
@ -914,6 +915,9 @@ class RequestReceipt():
|
|||||||
self.started_at = time.time()
|
self.started_at = time.time()
|
||||||
self.status = RequestReceipt.DELIVERED
|
self.status = RequestReceipt.DELIVERED
|
||||||
self.__resource_response_timeout = time.time()+self.timeout
|
self.__resource_response_timeout = time.time()+self.timeout
|
||||||
|
response_timeout_thread = threading.Thread(target=self.__response_timeout_job)
|
||||||
|
response_timeout_thread.setDaemon(True)
|
||||||
|
response_timeout_thread.start()
|
||||||
else:
|
else:
|
||||||
RNS.log("Sending request "+RNS.prettyhexrep(self.request_id)+" as resource failed with status: "+RNS.hexrep([resource.status]), RNS.LOG_DEBUG)
|
RNS.log("Sending request "+RNS.prettyhexrep(self.request_id)+" as resource failed with status: "+RNS.hexrep([resource.status]), RNS.LOG_DEBUG)
|
||||||
self.status = RequestReceipt.FAILED
|
self.status = RequestReceipt.FAILED
|
||||||
@ -924,6 +928,15 @@ class RequestReceipt():
|
|||||||
self.callbacks.failed(self)
|
self.callbacks.failed(self)
|
||||||
|
|
||||||
|
|
||||||
|
def __response_timeout_job(self):
|
||||||
|
while self.status == RequestReceipt.DELIVERED:
|
||||||
|
now = time.time()
|
||||||
|
if now > self.__resource_response_timeout:
|
||||||
|
self.request_timed_out(None)
|
||||||
|
|
||||||
|
time.sleep(0.1)
|
||||||
|
|
||||||
|
|
||||||
def request_timed_out(self, packet_receipt):
|
def request_timed_out(self, packet_receipt):
|
||||||
self.status = RequestReceipt.FAILED
|
self.status = RequestReceipt.FAILED
|
||||||
self.concluded_at = time.time()
|
self.concluded_at = time.time()
|
||||||
@ -932,9 +945,10 @@ class RequestReceipt():
|
|||||||
if self.callbacks.failed != None:
|
if self.callbacks.failed != None:
|
||||||
self.callbacks.failed(self)
|
self.callbacks.failed(self)
|
||||||
|
|
||||||
|
|
||||||
def response_resource_progress(self, resource):
|
def response_resource_progress(self, resource):
|
||||||
if not self.status == RequestReceipt.FAILED:
|
if not self.status == RequestReceipt.FAILED:
|
||||||
self.status = RequestReceipt.DELIVERED
|
self.status = RequestReceipt.RECEIVING
|
||||||
if self.packet_receipt != None:
|
if self.packet_receipt != None:
|
||||||
self.packet_receipt.status = RNS.PacketReceipt.DELIVERED
|
self.packet_receipt.status = RNS.PacketReceipt.DELIVERED
|
||||||
self.packet_receipt.proved = True
|
self.packet_receipt.proved = True
|
||||||
@ -943,9 +957,7 @@ class RequestReceipt():
|
|||||||
self.packet_receipt.callbacks.delivery(self.packet_receipt)
|
self.packet_receipt.callbacks.delivery(self.packet_receipt)
|
||||||
|
|
||||||
self.progress = resource.get_progress()
|
self.progress = resource.get_progress()
|
||||||
now = time.time()
|
|
||||||
self.__resource_response_timeout = time.time()+self.timeout
|
|
||||||
|
|
||||||
if self.callbacks.progress != None:
|
if self.callbacks.progress != None:
|
||||||
self.callbacks.progress(self)
|
self.callbacks.progress(self)
|
||||||
else:
|
else:
|
||||||
|
@ -32,15 +32,16 @@ class Resource:
|
|||||||
# maximum size a resource should be, if
|
# maximum size a resource should be, if
|
||||||
# it is to be handled within reasonable
|
# it is to be handled within reasonable
|
||||||
# time constraint, even on small systems.
|
# time constraint, even on small systems.
|
||||||
|
#
|
||||||
# A small system in this regard is
|
# A small system in this regard is
|
||||||
# defined as a Raspberry Pi, which should
|
# defined as a Raspberry Pi, which should
|
||||||
# be able to compress, encrypt and hash-map
|
# be able to compress, encrypt and hash-map
|
||||||
# the resource in about 10 seconds.
|
# the resource in about 10 seconds.
|
||||||
|
#
|
||||||
# This constant will be used when determining
|
# This constant will be used when determining
|
||||||
# how to sequence the sending of large resources.
|
# how to sequence the sending of large resources.
|
||||||
MAX_EFFICIENT_SIZE = 16 * 1024 * 1024
|
MAX_EFFICIENT_SIZE = 16 * 1024 * 1024
|
||||||
|
RESPONSE_MAX_GRACE_TIME = 10
|
||||||
|
|
||||||
# The maximum size to auto-compress with
|
# The maximum size to auto-compress with
|
||||||
# bz2 before sending.
|
# bz2 before sending.
|
||||||
@ -389,7 +390,12 @@ class Resource:
|
|||||||
|
|
||||||
elif self.status == Resource.TRANSFERRING:
|
elif self.status == Resource.TRANSFERRING:
|
||||||
if not self.initiator:
|
if not self.initiator:
|
||||||
rtt = self.link.rtt if self.rtt == None else self.rtt
|
|
||||||
|
if self.rtt == None:
|
||||||
|
rtt = self.link.rtt
|
||||||
|
else:
|
||||||
|
rtt = self.rtt
|
||||||
|
|
||||||
sleep_time = self.last_activity + (rtt*self.part_timeout_factor) + Resource.RETRY_GRACE_TIME - time.time()
|
sleep_time = self.last_activity + (rtt*self.part_timeout_factor) + Resource.RETRY_GRACE_TIME - time.time()
|
||||||
|
|
||||||
if sleep_time < 0:
|
if sleep_time < 0:
|
||||||
@ -541,15 +547,14 @@ class Resource:
|
|||||||
self.req_resp = self.last_activity
|
self.req_resp = self.last_activity
|
||||||
rtt = self.req_resp-self.req_sent
|
rtt = self.req_resp-self.req_sent
|
||||||
|
|
||||||
if rtt >= self.link.rtt:
|
self.part_timeout_factor = Resource.PART_TIMEOUT_FACTOR_AFTER_RTT
|
||||||
self.part_timeout_factor = Resource.PART_TIMEOUT_FACTOR_AFTER_RTT
|
if self.rtt == None:
|
||||||
if self.rtt == None:
|
self.rtt = self.link.rtt
|
||||||
self.rtt = rtt
|
self.watchdog_job()
|
||||||
self.watchdog_job()
|
elif rtt < self.rtt:
|
||||||
elif rtt < self.rtt:
|
self.rtt = max(self.rtt - self.rtt*0.05, rtt)
|
||||||
self.rtt = max(self.rtt - self.rtt*0.05, rtt)
|
elif rtt > self.rtt:
|
||||||
elif rtt > self.rtt:
|
self.rtt = min(self.rtt + self.rtt*0.05, rtt)
|
||||||
self.rtt = min(self.rtt + self.rtt*0.05, rtt)
|
|
||||||
|
|
||||||
if not self.status == Resource.FAILED:
|
if not self.status == Resource.FAILED:
|
||||||
self.status = Resource.TRANSFERRING
|
self.status = Resource.TRANSFERRING
|
||||||
|
Loading…
Reference in New Issue
Block a user