Implemented requests and responses of arbitrary sizes using resources.

This commit is contained in:
Mark Qvist 2021-08-21 14:52:31 +02:00
parent 1dc6655017
commit 212518a345
6 changed files with 206 additions and 89 deletions

View File

@ -502,7 +502,6 @@ def download_concluded(resource):
saved_filename = current_filename saved_filename = current_filename
if resource.status == RNS.Resource.COMPLETE: if resource.status == RNS.Resource.COMPLETE:
counter = 0 counter = 0
while os.path.isfile(saved_filename): while os.path.isfile(saved_filename):

View File

@ -23,7 +23,7 @@ APP_NAME = "example_utilities"
# A reference to the latest client link that connected # A reference to the latest client link that connected
latest_client_link = None latest_client_link = None
def random_text_generator(path, data, request_id, remote_identity_hash, requested_at): def random_text_generator(path, data, request_id, remote_identity, requested_at):
RNS.log("Generating response to request "+RNS.prettyhexrep(request_id)) RNS.log("Generating response to request "+RNS.prettyhexrep(request_id))
texts = ["They looked up", "On each full moon", "Becky was upset", "Ill stay away from it", "The pet shop stocks everything"] texts = ["They looked up", "On each full moon", "Becky was upset", "Ill stay away from it", "The pet shop stocks everything"]
return texts[random.randint(0, len(texts)-1)] return texts[random.randint(0, len(texts)-1)]

View File

@ -220,7 +220,7 @@ class Destination:
Registers a request handler. Registers a request handler.
:param path: The path for the request handler to be registered. :param path: The path for the request handler to be registered.
:param response_generator: A function or method with the signature *response_generator(path, data, request_id, remote_identity_hash, requested_at)* to be called. Whatever this funcion returns will be sent as a response to the requester. If the function returns ``None``, no response will be sent. :param response_generator: A function or method with the signature *response_generator(path, data, request_id, remote_identity, requested_at)* to be called. Whatever this funcion returns will be sent as a response to the requester. If the function returns ``None``, no response will be sent.
:param allow: One of ``RNS.Destination.ALLOW_NONE``, ``RNS.Destination.ALLOW_ALL`` or ``RNS.Destination.ALLOW_LIST``. If ``RNS.Destination.ALLOW_LIST`` is set, the request handler will only respond to requests for identified peers in the supplied list. :param allow: One of ``RNS.Destination.ALLOW_NONE``, ``RNS.Destination.ALLOW_ALL`` or ``RNS.Destination.ALLOW_LIST``. If ``RNS.Destination.ALLOW_LIST`` is set, the request handler will only respond to requests for identified peers in the supplied list.
:param allowed_list: A list of *bytes-like* :ref:`RNS.Identity<api-identity>` hashes. :param allowed_list: A list of *bytes-like* :ref:`RNS.Identity<api-identity>` hashes.
:raises: ``ValueError`` if any of the supplied arguments are invalid. :raises: ``ValueError`` if any of the supplied arguments are invalid.

View File

@ -280,10 +280,25 @@ class Link:
if len(packed_request) <= Link.MDU: if len(packed_request) <= Link.MDU:
request_packet = RNS.Packet(self, packed_request, RNS.Packet.DATA, context = RNS.Packet.REQUEST) request_packet = RNS.Packet(self, packed_request, RNS.Packet.DATA, context = RNS.Packet.REQUEST)
return RequestReceipt(self, request_packet.send(), response_callback, failed_callback)
return RequestReceipt(
self,
packet_receipt = request_packet.send(),
response_callback = response_callback,
failed_callback = failed_callback
)
else: else:
# TODO: Implement sending requests as Resources request_id = RNS.Identity.truncated_hash(packed_request)
raise IOError("Request size of "+str(len(packed_request))+" exceeds MDU of "+str(Link.MDU)+" bytes") RNS.log("Sending request "+RNS.prettyhexrep(request_id)+" as resource.", RNS.LOG_DEBUG)
request_resource = RNS.Resource(packed_request, self, request_id = request_id, is_response = False)
return RequestReceipt(
self,
resource = request_resource,
response_callback = response_callback,
failed_callback = failed_callback
)
def rtt_packet(self, packet): def rtt_packet(self, packet):
@ -446,6 +461,77 @@ class Link:
keepalive_packet.send() keepalive_packet.send()
self.had_outbound() self.had_outbound()
def handle_request(self, request_id, unpacked_request):
requested_at = unpacked_request[0]
path_hash = unpacked_request[1]
request_data = unpacked_request[2]
if path_hash in self.destination.request_handlers:
request_handler = self.destination.request_handlers[path_hash]
path = request_handler[0]
response_generator = request_handler[1]
allow = request_handler[2]
allowed_list = request_handler[3]
allowed = False
if not allow == RNS.Destination.ALLOW_NONE:
if allow == RNS.Destination.ALLOW_LIST:
if self.__remote_identity.hash in allowed_list:
allowed = True
elif allow == RNS.Destination.ALLOW_ALL:
allowed = True
if allowed:
RNS.log("Handling request "+RNS.prettyhexrep(request_id)+" for: "+str(path), RNS.LOG_DEBUG)
response = response_generator(path, request_data, request_id, self.__remote_identity, requested_at)
if response != None:
packed_response = umsgpack.packb([request_id, response])
if len(packed_response) <= Link.MDU:
RNS.Packet(self, packed_response, RNS.Packet.DATA, context = RNS.Packet.RESPONSE).send()
else:
response_resource = RNS.Resource(packed_response, self, request_id = request_id, is_response = True)
else:
identity_string = RNS.prettyhexrep(self.get_remote_identity()) if self.get_remote_identity() != None else "<Unknown>"
RNS.log("Request "+RNS.prettyhexrep(request_id)+" from "+identity_string+" not allowed for: "+str(path), RNS.LOG_DEBUG)
def handle_response(self, request_id, response_data):
remove = None
for pending_request in self.pending_requests:
if pending_request.request_id == request_id:
remove = pending_request
try:
pending_request.response_received(response_data)
except Exception as e:
RNS.log("Error occurred while handling response. The contained exception was: "+str(e), RNS.LOG_ERROR)
break
if remove != None:
self.pending_requests.remove(remove)
def request_resource_concluded(self, resource):
if resource.status == RNS.Resource.COMPLETE:
packed_request = resource.data.read()
unpacked_request = umsgpack.unpackb(packed_request)
request_id = RNS.Identity.truncated_hash(packed_request)
request_data = unpacked_request
self.handle_request(request_id, request_data)
else:
RNS.log("Incoming request resource failed with status: "+RNS.hexrep([resource.status]), RNS.LOG_DEBUG)
def response_resource_concluded(self, resource):
if resource.status == RNS.Resource.COMPLETE:
packed_response = resource.data.read()
unpacked_response = umsgpack.unpackb(packed_response)
request_id = unpacked_response[0]
response_data = unpacked_response[1]
self.handle_response(request_id, response_data)
else:
RNS.log("Incoming response resource failed with status: "+RNS.hexrep([resource.status]), RNS.LOG_DEBUG)
def receive(self, packet): def receive(self, packet):
self.watchdog_lock = True self.watchdog_lock = True
if not self.status == Link.CLOSED and not (self.initiator and packet.context == RNS.Packet.KEEPALIVE and packet.data == bytes([0xFF])): if not self.status == Link.CLOSED and not (self.initiator and packet.context == RNS.Packet.KEEPALIVE and packet.data == bytes([0xFF])):
@ -493,64 +579,19 @@ class Link:
request_id = packet.getTruncatedHash() request_id = packet.getTruncatedHash()
packed_request = self.decrypt(packet.data) packed_request = self.decrypt(packet.data)
unpacked_request = umsgpack.unpackb(packed_request) unpacked_request = umsgpack.unpackb(packed_request)
requested_at = unpacked_request[0] self.handle_request(request_id, unpacked_request)
path_hash = unpacked_request[1]
request_data = unpacked_request[2]
if path_hash in self.destination.request_handlers:
request_handler = self.destination.request_handlers[path_hash]
path = request_handler[0]
response_generator = request_handler[1]
allow = request_handler[2]
allowed_list = request_handler[3]
allowed = False
if not allow == RNS.Destination.ALLOW_NONE:
if allow == RNS.Destination.ALLOW_LIST:
if self.__remote_identity in allowed_list:
allowed = True
elif allow == RNS.Destination.ALLOW_ALL:
allowed = True
if allowed:
response = response_generator(path, request_data, request_id, self.__remote_identity, requested_at)
if response != None:
packed_response = umsgpack.packb([request_id, True, response])
if len(packed_response) <= Link.MDU:
RNS.Packet(self, packed_response, RNS.Packet.DATA, context = RNS.Packet.RESPONSE).send()
else:
# TODO: Implement transfer as resource
packed_response = umsgpack.packb([request_id, False, None])
raise Exception("Response transfer as resource not implemented")
except Exception as e: except Exception as e:
RNS.log("Error occurred while handling request. The contained exception was: "+str(e), RNS.LOG_ERROR) RNS.log("Error occurred while handling request. The contained exception was: "+str(e), RNS.LOG_ERROR)
elif packet.context == RNS.Packet.RESPONSE: elif packet.context == RNS.Packet.RESPONSE:
packed_response = self.decrypt(packet.data) try:
unpacked_response = umsgpack.unpackb(packed_response) packed_response = self.decrypt(packet.data)
request_id = unpacked_response[0] unpacked_response = umsgpack.unpackb(packed_response)
request_id = unpacked_response[0]
if unpacked_response[1] == True: response_data = unpacked_response[1]
remove = None self.handle_response(request_id, response_data)
for pending_request in self.pending_requests: except Exception as e:
if pending_request.request_id == request_id: RNS.log("Error occurred while handling response. The contained exception was: "+str(e), RNS.LOG_ERROR)
response_data = unpacked_response[2]
remove = pending_request
try:
pending_request.response_received(response_data)
except Exception as e:
RNS.log("Error occurred while handling response. The contained exception was: "+str(e), RNS.LOG_ERROR)
break
if remove != None:
self.pending_requests.remove(remove)
else:
# TODO: Implement receiving responses as Resources
raise Exception("Response transfer as resource not implemented")
elif packet.context == RNS.Packet.LRRTT: elif packet.context == RNS.Packet.LRRTT:
if not self.initiator: if not self.initiator:
@ -561,7 +602,12 @@ class Link:
elif packet.context == RNS.Packet.RESOURCE_ADV: elif packet.context == RNS.Packet.RESOURCE_ADV:
packet.plaintext = self.decrypt(packet.data) packet.plaintext = self.decrypt(packet.data)
if self.resource_strategy == Link.ACCEPT_NONE:
if RNS.ResourceAdvertisement.is_request(packet):
RNS.Resource.accept(packet, callback=self.request_resource_concluded)
elif RNS.ResourceAdvertisement.is_response(packet):
RNS.Resource.accept(packet, callback=self.response_resource_concluded)
elif self.resource_strategy == Link.ACCEPT_NONE:
pass pass
elif self.resource_strategy == Link.ACCEPT_APP: elif self.resource_strategy == Link.ACCEPT_APP:
if self.callbacks.resource != None: if self.callbacks.resource != None:
@ -785,8 +831,18 @@ class RequestReceipt():
DELIVERED = 0x02 DELIVERED = 0x02
READY = 0x03 READY = 0x03
def __init__(self, link, packet_receipt, response_callback = None, failed_callback = None): def __init__(self, link, packet_receipt = None, resource = None, response_callback = None, failed_callback = None):
self.hash = packet_receipt.truncated_hash self.packet_receipt = packet_receipt
self.resource = resource
if self.packet_receipt != None:
self.hash = packet_receipt.truncated_hash
self.packet_receipt.set_timeout_callback(self.request_timed_out)
elif self.resource != None:
self.hash = resource.request_id
resource.set_callback(self.request_resource_concluded)
self.link = link self.link = link
self.request_id = self.hash self.request_id = self.hash
@ -800,11 +856,19 @@ class RequestReceipt():
self.callbacks.response = response_callback self.callbacks.response = response_callback
self.callbacks.failed = failed_callback self.callbacks.failed = failed_callback
self.packet_receipt = packet_receipt
self.packet_receipt.set_timeout_callback(self.request_timed_out)
self.link.pending_requests.append(self) self.link.pending_requests.append(self)
def request_resource_concluded(self, resource):
if resource.status == RNS.Resource.COMPLETE:
RNS.log("Request "+RNS.prettyhexrep(self.request_id)+" successfully sent as resource.", RNS.LOG_DEBUG)
else:
RNS.log("Sending request "+RNS.prettyhexrep(self.request_id)+" as resource failed with status: "+RNS.hexrep([resource.status]), RNS.LOG_DEBUG)
self.status = RequestReceipt.FAILED
self.concluded_at = time.time()
self.link.pending_requests.remove(self)
if self.callbacks.failed != None:
self.callbacks.failed(self)
def request_timed_out(self, packet_receipt): def request_timed_out(self, packet_receipt):
self.status = RequestReceipt.FAILED self.status = RequestReceipt.FAILED
@ -817,11 +881,12 @@ class RequestReceipt():
def response_received(self, response): def response_received(self, response):
self.response = response self.response = response
self.packet_receipt.status = RNS.PacketReceipt.DELIVERED if self.packet_receipt != None:
self.packet_receipt.proved = True self.packet_receipt.status = RNS.PacketReceipt.DELIVERED
self.packet_receipt.concluded_at = time.time() self.packet_receipt.proved = True
if self.packet_receipt.callbacks.delivery != None: self.packet_receipt.concluded_at = time.time()
self.packet_receipt.callbacks.delivery(self) if self.packet_receipt.callbacks.delivery != None:
self.packet_receipt.callbacks.delivery(self)
if self.callbacks.response != None: if self.callbacks.response != None:
self.callbacks.response(self) self.callbacks.response(self)

View File

@ -21,6 +21,8 @@ class Resource:
:param progress_callback: A *callable* with the signature *callback(resource)*. Will be called whenever the resource transfer progress is updated. :param progress_callback: A *callable* with the signature *callback(resource)*. Will be called whenever the resource transfer progress is updated.
:param segment_index: Internal use, ignore. :param segment_index: Internal use, ignore.
:param original_hash: Internal use, ignore. :param original_hash: Internal use, ignore.
:param is_request: Internal use, ignore.
:param is_response: Internal use, ignore.
""" """
WINDOW_FLEXIBILITY = 4 WINDOW_FLEXIBILITY = 4
WINDOW_MIN = 1 WINDOW_MIN = 1
@ -119,7 +121,8 @@ class Resource:
resource.link.register_incoming_resource(resource) resource.link.register_incoming_resource(resource)
RNS.log("Accepting resource advertisement for "+RNS.prettyhexrep(resource.hash), RNS.LOG_DEBUG) RNS.log("Accepting resource advertisement for "+RNS.prettyhexrep(resource.hash), RNS.LOG_DEBUG)
resource.link.callbacks.resource_started(resource) if resource.link.callbacks.resource_started != None:
resource.link.callbacks.resource_started(resource)
resource.hashmap_update(0, resource.hashmap_raw) resource.hashmap_update(0, resource.hashmap_raw)
@ -133,7 +136,7 @@ class Resource:
# Create a resource for transmission to a remote destination # Create a resource for transmission to a remote destination
# The data passed can be either a bytes-array or a file opened # The data passed can be either a bytes-array or a file opened
# in binary read mode. # in binary read mode.
def __init__(self, data, link, advertise=True, auto_compress=True, callback=None, progress_callback=None, segment_index = 1, original_hash = None): def __init__(self, data, link, advertise=True, auto_compress=True, callback=None, progress_callback=None, segment_index = 1, original_hash = None, request_id = None, is_response = False):
data_size = None data_size = None
resource_data = None resource_data = None
if hasattr(data, "read"): if hasattr(data, "read"):
@ -188,6 +191,8 @@ class Resource:
self.__watchdog_job_id = 0 self.__watchdog_job_id = 0
self.__progress_callback = progress_callback self.__progress_callback = progress_callback
self.rtt = None self.rtt = None
self.request_id = request_id
self.is_response = is_response
self.receiver_min_consecutive_height = 0 self.receiver_min_consecutive_height = 0
@ -250,6 +255,7 @@ class Resource:
self.random_hash = RNS.Identity.get_random_hash()[:Resource.RANDOM_HASH_SIZE] self.random_hash = RNS.Identity.get_random_hash()[:Resource.RANDOM_HASH_SIZE]
self.hash = RNS.Identity.full_hash(data+self.random_hash) self.hash = RNS.Identity.full_hash(data+self.random_hash)
self.truncated_hash = RNS.Identity.truncated_hash(data+self.random_hash)
self.expected_proof = RNS.Identity.full_hash(data+self.hash) self.expected_proof = RNS.Identity.full_hash(data+self.hash)
if original_hash == None: if original_hash == None:
@ -739,6 +745,9 @@ class Resource:
self.link.resource_concluded(self) self.link.resource_concluded(self)
self.callback(self) self.callback(self)
def set_callback(self, callback):
self.callback = callback
def progress_callback(self, callback): def progress_callback(self, callback):
self.__progress_callback = callback self.__progress_callback = callback
@ -767,24 +776,63 @@ class Resource:
class ResourceAdvertisement: class ResourceAdvertisement:
HASHMAP_MAX_LEN = 73 HASHMAP_MAX_LEN = 70
COLLISION_GUARD_SIZE = 2*Resource.WINDOW_MAX+HASHMAP_MAX_LEN COLLISION_GUARD_SIZE = 2*Resource.WINDOW_MAX+HASHMAP_MAX_LEN
def __init__(self, resource=None): @staticmethod
def is_request(advertisement_packet):
adv = ResourceAdvertisement.unpack(advertisement_packet.plaintext)
if adv.q != None and adv.u:
return True
else:
return False
@staticmethod
def is_response(advertisement_packet):
adv = ResourceAdvertisement.unpack(advertisement_packet.plaintext)
if adv.q != None and adv.p:
return True
else:
return False
@staticmethod
def get_request_id(advertisement_packet):
adv = ResourceAdvertisement.unpack(advertisement_packet.plaintext)
return adv.q
def __init__(self, resource=None, request_id=None, is_response=False):
if resource != None: if resource != None:
self.t = resource.size # Transfer size self.t = resource.size # Transfer size
self.d = resource.total_size # Total uncompressed data size self.d = resource.total_size # Total uncompressed data size
self.n = len(resource.parts) # Number of parts self.n = len(resource.parts) # Number of parts
self.h = resource.hash # Resource hash self.h = resource.hash # Resource hash
self.r = resource.random_hash # Resource random hash self.r = resource.random_hash # Resource random hash
self.o = resource.original_hash # First-segment hash self.o = resource.original_hash # First-segment hash
self.m = resource.hashmap # Resource hashmap self.m = resource.hashmap # Resource hashmap
self.c = resource.compressed # Compression flag self.c = resource.compressed # Compression flag
self.e = resource.encrypted # Encryption flag self.e = resource.encrypted # Encryption flag
self.s = resource.split # Split flag self.s = resource.split # Split flag
self.i = resource.segment_index # Segment index self.i = resource.segment_index # Segment index
self.l = resource.total_segments # Total segments self.l = resource.total_segments # Total segments
self.f = 0x00 | self.s << 2 | self.c << 1 | self.e # Flags self.q = resource.request_id # ID of associated request
self.u = False # Is request flag
self.p = False # Is response flag
if self.q != None:
if not resource.is_response:
self.u = True
self.p = False
else:
self.u = False
self.p = True
# Flags
self.f = 0x00 | self.p << 4 | self.u << 3 | self.s << 2 | self.c << 1 | self.e
def pack(self, segment=0): def pack(self, segment=0):
hashmap_start = segment*ResourceAdvertisement.HASHMAP_MAX_LEN hashmap_start = segment*ResourceAdvertisement.HASHMAP_MAX_LEN
@ -803,12 +851,14 @@ class ResourceAdvertisement:
"o": self.o, # Original hash "o": self.o, # Original hash
"i": self.i, # Segment index "i": self.i, # Segment index
"l": self.l, # Total segments "l": self.l, # Total segments
"q": self.q, # Request ID
"f": self.f, # Resource flags "f": self.f, # Resource flags
"m": hashmap "m": hashmap
} }
return umsgpack.packb(dictionary) return umsgpack.packb(dictionary)
@staticmethod @staticmethod
def unpack(data): def unpack(data):
dictionary = umsgpack.unpackb(data) dictionary = umsgpack.unpackb(data)
@ -824,8 +874,11 @@ class ResourceAdvertisement:
adv.f = dictionary["f"] adv.f = dictionary["f"]
adv.i = dictionary["i"] adv.i = dictionary["i"]
adv.l = dictionary["l"] adv.l = dictionary["l"]
adv.q = dictionary["q"]
adv.e = True if (adv.f & 0x01) == 0x01 else False adv.e = True if (adv.f & 0x01) == 0x01 else False
adv.c = True if ((adv.f >> 1) & 0x01) == 0x01 else False adv.c = True if ((adv.f >> 1) & 0x01) == 0x01 else False
adv.s = True if ((adv.f >> 2) & 0x01) == 0x01 else False adv.s = True if ((adv.f >> 2) & 0x01) == 0x01 else False
adv.u = True if ((adv.f >> 3) & 0x01) == 0x01 else False
adv.p = True if ((adv.f >> 4) & 0x01) == 0x01 else False
return adv return adv

View File

@ -14,7 +14,7 @@ from .Transport import Transport
from .Destination import Destination from .Destination import Destination
from .Packet import Packet from .Packet import Packet
from .Packet import PacketReceipt from .Packet import PacketReceipt
from .Resource import Resource from .Resource import Resource, ResourceAdvertisement
modules = glob.glob(os.path.dirname(__file__)+"/*.py") modules = glob.glob(os.path.dirname(__file__)+"/*.py")
__all__ = [ os.path.basename(f)[:-3] for f in modules if not f.endswith('__init__.py')] __all__ = [ os.path.basename(f)[:-3] for f in modules if not f.endswith('__init__.py')]