From a199e4c929b8a54fa45b1c5cbe4802b387b3c1ba Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Fri, 27 Aug 2021 19:52:48 +0200 Subject: [PATCH] Improved link and resource callbacks and resource handling. --- RNS/Link.py | 58 +++++++++++++++++++++++++++++++++++++++---------- RNS/Resource.py | 11 ++++++---- 2 files changed, 54 insertions(+), 15 deletions(-) diff --git a/RNS/Link.py b/RNS/Link.py index f959ea7..a1edc54 100644 --- a/RNS/Link.py +++ b/RNS/Link.py @@ -30,6 +30,8 @@ class Link: This class. :param destination: A :ref:`RNS.Destination` instance which to establish a link to. + :param established_callback: A function or method with the signature *callback(link)* to be called when the link has been established. + :param closed_callback: A function or method with the signature *callback(link)* to be called when the link is closed. :param owner: Internal use by :ref:`RNS.Transport`, ignore this argument. :param peer_pub_bytes: Internal use, ignore this argument. :param peer_sig_pub_bytes: Internal use, ignore this argument. @@ -47,7 +49,7 @@ class Link: # TODO: This should not be hardcoded, # but calculated from something like # first-hop RTT latency and distance - DEFAULT_TIMEOUT = 15.0 + DEFAULT_TIMEOUT = 60.0 """ Default timeout for link establishment in seconds. """ @@ -102,7 +104,7 @@ class Link: return None - def __init__(self, destination=None, owner=None, peer_pub_bytes = None, peer_sig_pub_bytes = None): + def __init__(self, destination=None, established_callback = None, closed_callback = None, owner=None, peer_pub_bytes = None, peer_sig_pub_bytes = None): if destination != None and destination.type != RNS.Destination.SINGLE: raise TypeError("Links can only be established to the \"single\" destination type") self.rtt = None @@ -158,6 +160,12 @@ class Link: else: self.load_peer(peer_pub_bytes, peer_sig_pub_bytes) + if established_callback != None: + self.set_link_established_callback(established_callback) + + if closed_callback != None: + self.set_link_closed_callback(closed_callback) + if (self.initiator): peer_pub_bytes = self.destination.identity.get_public_key()[:Link.ECPUBSIZE//2] peer_sig_pub_bytes = self.destination.identity.get_public_key()[Link.ECPUBSIZE//2:Link.ECPUBSIZE] @@ -266,13 +274,14 @@ class Link: self.had_outbound() - def request(self, path, data = None, response_callback = None, failed_callback = None): + def request(self, path, data = None, response_callback = None, failed_callback = None, timeout = None): """ Sends a request to the remote peer. :param path: The request path. - :param response_callback: A function or method with the signature *response_callback(request_receipt)* to be called when a response is received. See the :ref:`Request Example` for more info. - :param failed_callback: A function or method with the signature *failed_callback(request_receipt)* to be called when a request fails. See the :ref:`Request Example` for more info. + :param response_callback: An optional function or method with the signature *response_callback(request_receipt)* to be called when a response is received. See the :ref:`Request Example` for more info. + :param failed_callback: An optional function or method with the signature *failed_callback(request_receipt)* to be called when a request fails. See the :ref:`Request Example` for more info. + :param timeout: An optional timeout in seconds for the request. If *None* is supplied, this defaults to ``RNS.Packet.TIMEOUT``. """ request_path_hash = RNS.Identity.truncated_hash(path.encode("utf-8")) unpacked_request = [time.time(), request_path_hash, data] @@ -280,24 +289,30 @@ class Link: if len(packed_request) <= Link.MDU: request_packet = RNS.Packet(self, packed_request, RNS.Packet.DATA, context = RNS.Packet.REQUEST) + packet_receipt = request_packet.send() + + if timeout != None: + packet_receipt.set_timeout(timeout) return RequestReceipt( self, - packet_receipt = request_packet.send(), + packet_receipt = packet_receipt, response_callback = response_callback, - failed_callback = failed_callback + failed_callback = failed_callback, + timeout = timeout ) else: request_id = RNS.Identity.truncated_hash(packed_request) RNS.log("Sending request "+RNS.prettyhexrep(request_id)+" as resource.", RNS.LOG_DEBUG) - request_resource = RNS.Resource(packed_request, self, request_id = request_id, is_response = False) + request_resource = RNS.Resource(packed_request, self, request_id = request_id, is_response = False, timeout = timeout) return RequestReceipt( self, resource = request_resource, response_callback = response_callback, - failed_callback = failed_callback + failed_callback = failed_callback, + timeout = timeout ) @@ -831,7 +846,7 @@ class RequestReceipt(): DELIVERED = 0x02 READY = 0x03 - def __init__(self, link, packet_receipt = None, resource = None, response_callback = None, failed_callback = None): + def __init__(self, link, packet_receipt = None, resource = None, response_callback = None, failed_callback = None, timeout = None): self.packet_receipt = packet_receipt self.resource = resource @@ -849,18 +864,28 @@ class RequestReceipt(): self.response = None self.status = RequestReceipt.SENT self.sent_at = time.time() - self.timeout = RNS.Packet.TIMEOUT self.concluded_at = None + if timeout != None: + self.timeout = timeout + else: + self.timeout = RNS.Packet.TIMEOUT + self.callbacks = RequestReceiptCallbacks() self.callbacks.response = response_callback self.callbacks.failed = failed_callback self.link.pending_requests.append(self) + def request_resource_concluded(self, resource): if resource.status == RNS.Resource.COMPLETE: RNS.log("Request "+RNS.prettyhexrep(self.request_id)+" successfully sent as resource.", RNS.LOG_DEBUG) + self.status = RequestReceipt.DELIVERED + self.__resource_response_timeout = time.time()+self.timeout + load_thread = threading.Thread(target=self.__resource_response_timeout_job) + load_thread.setDaemon(True) + load_thread.start() else: RNS.log("Sending request "+RNS.prettyhexrep(self.request_id)+" as resource failed with status: "+RNS.hexrep([resource.status]), RNS.LOG_DEBUG) self.status = RequestReceipt.FAILED @@ -870,6 +895,7 @@ class RequestReceipt(): if self.callbacks.failed != None: self.callbacks.failed(self) + def request_timed_out(self, packet_receipt): self.status = RequestReceipt.FAILED self.concluded_at = time.time() @@ -878,8 +904,18 @@ class RequestReceipt(): if self.callbacks.failed != None: self.callbacks.failed(self) + + def __resource_response_timeout_job(self): + while self.status == RequestReceipt.DELIVERED: + if time.time() > self.__resource_response_timeout: + self.request_timed_out(None) + + time.sleep(0.1) + + def response_received(self, response): self.response = response + self.status = RequestReceipt.READY if self.packet_receipt != None: self.packet_receipt.status = RNS.PacketReceipt.DELIVERED diff --git a/RNS/Resource.py b/RNS/Resource.py index 511d496..91b1908 100644 --- a/RNS/Resource.py +++ b/RNS/Resource.py @@ -52,7 +52,6 @@ class Resource: # TODO: Should be allocated more # intelligently - # TODO: Set higher MAX_RETRIES = 5 SENDER_GRACE_TIME = 10 RETRY_GRACE_TIME = 0.25 @@ -136,7 +135,7 @@ class Resource: # Create a resource for transmission to a remote destination # The data passed can be either a bytes-array or a file opened # in binary read mode. - def __init__(self, data, link, advertise=True, auto_compress=True, callback=None, progress_callback=None, segment_index = 1, original_hash = None, request_id = None, is_response = False): + def __init__(self, data, link, advertise=True, auto_compress=True, callback=None, progress_callback=None, timeout = None, segment_index = 1, original_hash = None, request_id = None, is_response = False): data_size = None resource_data = None if hasattr(data, "read"): @@ -183,7 +182,6 @@ class Resource: self.link = link self.max_retries = Resource.MAX_RETRIES self.retries_left = self.max_retries - self.default_timeout = self.link.default_timeout self.timeout_factor = self.link.timeout_factor self.sender_grace_time = Resource.SENDER_GRACE_TIME self.hmu_retry_ok = False @@ -196,6 +194,11 @@ class Resource: self.receiver_min_consecutive_height = 0 + if timeout != None: + self.timeout = timeout + else: + self.timeout = self.link.default_timeout + if data != None: self.initiator = True self.callback = callback @@ -370,7 +373,7 @@ class Resource: sleep_time = None if self.status == Resource.ADVERTISED: - sleep_time = (self.adv_sent+self.default_timeout)-time.time() + sleep_time = (self.adv_sent+self.timeout)-time.time() if sleep_time < 0: if self.retries_left <= 0: RNS.log("Resource transfer timeout after sending advertisement", RNS.LOG_DEBUG)