Optimised resource transfers, fixed resource transfer regression, removed txdelay from UDPInterface.
This commit is contained in:
		
							parent
							
								
									b61fa6ce8d
								
							
						
					
					
						commit
						459f6b792f
					
				| @ -20,7 +20,7 @@ import RNS.vendor.umsgpack as umsgpack | ||||
| APP_NAME = "example_utilitites" | ||||
| 
 | ||||
| # We'll also define a default timeout, in seconds | ||||
| APP_TIMEOUT = 15.0 | ||||
| APP_TIMEOUT = 45.0 | ||||
| 
 | ||||
| ########################################################## | ||||
| #### Server Part ######################################### | ||||
|  | ||||
| @ -5,15 +5,15 @@ Header Types | ||||
| type 1          00  Two byte header, one 10 byte address field | ||||
| type 2          01  Two byte header, two 10 byte address fields | ||||
| type 3          10  Reserved | ||||
| type 4          11  Reserved for extended header format | ||||
| type 4          11  Reserved | ||||
| 
 | ||||
| 
 | ||||
| Propagation Types | ||||
| ----------------- | ||||
| broadcast       00 | ||||
| transport       01 | ||||
| relay           10 | ||||
| tunnel          11 | ||||
| reserved        10 | ||||
| reserved        11 | ||||
| 
 | ||||
| 
 | ||||
| Destination Types | ||||
|  | ||||
| @ -11,10 +11,6 @@ class UdpInterface(Interface): | ||||
|     def __init__(self, owner, name, bindip=None, bindport=None, forwardip=None, forwardport=None): | ||||
|         self.IN  = True | ||||
|         self.OUT = False | ||||
| 
 | ||||
|         # TODO: Optimise so this is not needed | ||||
|         self.transmit_delay = 0.001 | ||||
| 
 | ||||
|         self.name = name | ||||
| 
 | ||||
|         if (bindip != None and bindport != None): | ||||
| @ -45,7 +41,6 @@ class UdpInterface(Interface): | ||||
|         self.owner.inbound(data, self) | ||||
| 
 | ||||
|     def processOutgoing(self,data): | ||||
|         time.sleep(self.transmit_delay) | ||||
|         udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | ||||
|         udp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) | ||||
|         udp_socket.sendto(data, (self.forward_ip, self.forward_port)) | ||||
|  | ||||
| @ -15,13 +15,25 @@ class Resource: | ||||
| 	SDU                  = RNS.Packet.MDU | ||||
| 	RANDOM_HASH_SIZE     = 4 | ||||
| 
 | ||||
| 	# This is an indication of what the | ||||
| 	# maximum size a resource should be, if | ||||
| 	# it is to be handled within reasonable | ||||
| 	# time constraint, even on small systems. | ||||
| 	# | ||||
| 	# A small system in this regard is | ||||
| 	# defined as a Raspberry Pi, which should | ||||
| 	# be able to compress, encrypt and hasmap | ||||
| 	# the resource in about 10 seconds. | ||||
| 	MAX_EFFICIENT_SIZE   = 16 * 1024 * 1024 | ||||
| 
 | ||||
| 	# The maximum size to auto-compress with | ||||
| 	# bz2 before sending. | ||||
| 	AUTO_COMPRESS_MAX_SIZE = 32 * 1024 * 1024 | ||||
| 	AUTO_COMPRESS_MAX_SIZE = MAX_EFFICIENT_SIZE | ||||
| 
 | ||||
| 	# TODO: Should be allocated more | ||||
| 	# intelligently | ||||
| 	MAX_RETRIES       = 2 | ||||
| 	# TODO: Set higher | ||||
| 	MAX_RETRIES       = 5 | ||||
| 	SENDER_GRACE_TIME = 10 | ||||
| 	RETRY_GRACE_TIME  = 0.25 | ||||
| 
 | ||||
| @ -72,6 +84,8 @@ class Resource: | ||||
| 			resource.hashmap_height = 0 | ||||
| 			resource.waiting_for_hmu = False | ||||
| 
 | ||||
| 			resource.receiving_part = False | ||||
| 
 | ||||
| 			resource.consecutive_completed_height = 0 | ||||
| 			 | ||||
| 			resource.link.register_incoming_resource(resource) | ||||
| @ -344,14 +358,10 @@ class Resource: | ||||
| 				sleep(sleep_time) | ||||
| 
 | ||||
| 	def assemble(self): | ||||
| 		# TODO: Optimise assembly. It's way too | ||||
| 		# slow for larger files | ||||
| 		if not self.status == Resource.FAILED: | ||||
| 			try: | ||||
| 				self.status = Resource.ASSEMBLING | ||||
| 				stream = b"" | ||||
| 				for part in self.parts: | ||||
| 					stream += part | ||||
| 				stream = b"".join(self.parts) | ||||
| 
 | ||||
| 				if self.encrypted: | ||||
| 					data = self.link.decrypt(stream) | ||||
| @ -412,6 +422,10 @@ class Resource: | ||||
| 
 | ||||
| 
 | ||||
| 	def receive_part(self, packet): | ||||
| 		while self.receiving_part: | ||||
| 			sleep(0.001) | ||||
| 
 | ||||
| 		self.receiving_part = True | ||||
| 		self.last_activity = time.time() | ||||
| 		self.retries_left = self.max_retries | ||||
| 
 | ||||
| @ -439,21 +453,25 @@ class Resource: | ||||
| 						self.outstanding_parts -= 1 | ||||
| 
 | ||||
| 						# Update consecutive completed pointer | ||||
| 						if i == 0 or self.parts[i-1] != None and i == self.consecutive_completed_height: | ||||
| 							self.consecutive_completed_height = i+1 | ||||
| 							cp = i+1 | ||||
| 							while cp < len(self.parts) and self.parts[cp] != None: | ||||
| 								self.consecutive_completed_height = cp | ||||
| 								cp += 1 | ||||
| 						if i == self.consecutive_completed_height + 1: | ||||
| 							self.consecutive_completed_height = i | ||||
| 						 | ||||
| 						cp = self.consecutive_completed_height + 1 | ||||
| 						while cp < len(self.parts) and self.parts[cp] != None: | ||||
| 							self.consecutive_completed_height = cp | ||||
| 							cp += 1 | ||||
| 
 | ||||
| 				i += 1 | ||||
| 
 | ||||
| 			self.receiving_part = False | ||||
| 
 | ||||
| 			if self.__progress_callback != None: | ||||
| 				self.__progress_callback(self) | ||||
| 
 | ||||
| 			if self.outstanding_parts == 0 and self.received_count == self.total_parts: | ||||
| 				self.assemble() | ||||
| 			elif self.outstanding_parts == 0: | ||||
| 				# TODO: Figure out if ther is a mathematically | ||||
| 				# TODO: Figure out if there is a mathematically | ||||
| 				# optimal way to adjust windows | ||||
| 				if self.window < self.window_max: | ||||
| 					self.window += 1 | ||||
| @ -461,18 +479,25 @@ class Resource: | ||||
| 						self.window_min += 1 | ||||
| 
 | ||||
| 				self.request_next() | ||||
| 		else: | ||||
| 			self.receiving_part = False | ||||
| 
 | ||||
| 	# Called on incoming resource to send a request for more data | ||||
| 	def request_next(self): | ||||
| 		while self.receiving_part: | ||||
| 			sleep(0.001) | ||||
| 
 | ||||
| 		if not self.status == Resource.FAILED: | ||||
| 			if not self.waiting_for_hmu: | ||||
| 				self.outstanding_parts = 0 | ||||
| 				hashmap_exhausted = Resource.HASHMAP_IS_NOT_EXHAUSTED | ||||
| 				requested_hashes = b"" | ||||
| 
 | ||||
| 				i = 0; pn = self.consecutive_completed_height | ||||
| 				for part in self.parts[self.consecutive_completed_height:self.consecutive_completed_height+self.window]: | ||||
| 					 | ||||
| 				offset = (1 if self.consecutive_completed_height > 0 else 0) | ||||
| 				i = 0; pn = self.consecutive_completed_height+offset | ||||
| 				search_start = pn | ||||
| 
 | ||||
| 				for part in self.parts[search_start:search_start+self.window]: | ||||
| 					if part == None: | ||||
| 						part_hash = self.hashmap[pn] | ||||
| 						if part_hash != None: | ||||
| @ -622,7 +647,6 @@ class Resource: | ||||
| 
 | ||||
| 
 | ||||
| class ResourceAdvertisement: | ||||
| 	# TODO: Can this be allocated dynamically? Keep in mind hashmap_update inference | ||||
| 	HASHMAP_MAX_LEN      = 84 | ||||
| 	COLLISION_GUARD_SIZE = 2*Resource.WINDOW_MAX+HASHMAP_MAX_LEN | ||||
| 
 | ||||
|  | ||||
							
								
								
									
										2
									
								
								setup.py
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								setup.py
									
									
									
									
									
								
							| @ -5,7 +5,7 @@ with open("README.md", "r") as fh: | ||||
| 
 | ||||
| setuptools.setup( | ||||
|     name="rns", | ||||
|     version="0.1.5", | ||||
|     version="0.1.6", | ||||
|     author="Mark Qvist", | ||||
|     author_email="mark@unsigned.io", | ||||
|     description="Self-configuring, encrypted and resilient mesh networking stack for LoRa, packet radio, WiFi and everything in between", | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user