Implemented basic channel windowing

This commit is contained in:
Mark Qvist 2023-05-10 19:15:20 +02:00
parent 01c59ab0c6
commit e543d5c27f

View File

@ -231,10 +231,10 @@ class Channel(contextlib.AbstractContextManager):
WINDOW_MIN = 1 WINDOW_MIN = 1
# The maximum window size for transfers on slow links # The maximum window size for transfers on slow links
WINDOW_MAX_SLOW = 10 WINDOW_MAX_SLOW = 5
# The maximum window size for transfers on fast links # The maximum window size for transfers on fast links
WINDOW_MAX_FAST = 75 WINDOW_MAX_FAST = 48
# For calculating maps and guard segments, this # For calculating maps and guard segments, this
# must be set to the global maximum window. # must be set to the global maximum window.
@ -242,13 +242,11 @@ class Channel(contextlib.AbstractContextManager):
# If the fast rate is sustained for this many request # If the fast rate is sustained for this many request
# rounds, the fast link window size will be allowed. # rounds, the fast link window size will be allowed.
FAST_RATE_THRESHOLD = WINDOW_MAX_SLOW - WINDOW - 2 FAST_RATE_THRESHOLD = 5
# If the RTT rate is higher than this value, # If the RTT rate is higher than this value,
# the max window size for fast links will be used. # the max window size for fast links will be used.
# The default is 50 Kbps (the value is stored in RTT_FAST = 0.2
# bytes per second, hence the "/ 8").
RATE_FAST = (50*1000) / 8
# The minimum allowed flexibility of the window size. # The minimum allowed flexibility of the window size.
# The difference between window_max and window_min # The difference between window_max and window_min
@ -269,7 +267,11 @@ class Channel(contextlib.AbstractContextManager):
self._next_rx_sequence = 0 self._next_rx_sequence = 0
self._message_factories: dict[int, Type[MessageBase]] = {} self._message_factories: dict[int, Type[MessageBase]] = {}
self._max_tries = 5 self._max_tries = 5
self._max_outstanding = Channel.WINDOW self.window = Channel.WINDOW
self.window_max = Channel.WINDOW_MAX_SLOW
self.window_min = Channel.WINDOW_MIN
self.window_flexibility = Channel.WINDOW_FLEXIBILITY
self.fast_rate_rounds = 0
def __enter__(self) -> Channel: def __enter__(self) -> Channel:
return self return self
@ -413,6 +415,29 @@ class Channel(contextlib.AbstractContextManager):
self._rx_ring.remove(e) self._rx_ring.remove(e)
threading.Thread(target=self._run_callbacks, name="Message Callback", args=[m], daemon=True).start() threading.Thread(target=self._run_callbacks, name="Message Callback", args=[m], daemon=True).start()
if self.window < self.window_max:
self.window += 1
if (self.window - self.window_min) > (self.window_flexibility-1):
self.window_min += 1
# TODO: Remove
RNS.log("Increased channel window to "+str(self.window), RNS.LOG_DEBUG)
if self._outlet.rtt != 0:
# TODO: Remove
# RNS.log("Link RTT is "+str(self._outlet.rtt), RNS.LOG_DEBUG)
if self._outlet.rtt > Channel.RTT_FAST:
self.fast_rate_rounds = 0
else:
self.fast_rate_rounds += 1
if self.window_max < Channel.WINDOW_MAX_FAST and self.fast_rate_rounds == Channel.FAST_RATE_THRESHOLD:
self.window_max = Channel.WINDOW_MAX_FAST
# TODO: Remove
RNS.log("Increased max channel window to "+str(self.window_max), RNS.LOG_DEBUG)
except Exception as ex: except Exception as ex:
RNS.log(f"Channel: Error receiving data: {ex}") RNS.log(f"Channel: Error receiving data: {ex}")
@ -433,7 +458,7 @@ class Channel(contextlib.AbstractContextManager):
if not envelope.packet or not self._outlet.get_packet_state(envelope.packet) == MessageState.MSGSTATE_DELIVERED: if not envelope.packet or not self._outlet.get_packet_state(envelope.packet) == MessageState.MSGSTATE_DELIVERED:
outstanding += 1 outstanding += 1
if outstanding >= self._max_outstanding: if outstanding >= self.window:
return False return False
return True return True
@ -468,6 +493,17 @@ class Channel(contextlib.AbstractContextManager):
self._outlet.resend(envelope.packet) self._outlet.resend(envelope.packet)
self._outlet.set_packet_delivered_callback(envelope.packet, self._packet_delivered) self._outlet.set_packet_delivered_callback(envelope.packet, self._packet_delivered)
self._outlet.set_packet_timeout_callback(envelope.packet, self._packet_timeout, self._get_packet_timeout_time(envelope.tries)) self._outlet.set_packet_timeout_callback(envelope.packet, self._packet_timeout, self._get_packet_timeout_time(envelope.tries))
if self.window > self.window_min:
self.window -= 1
if self.window_max > self.window_min:
self.window_max -= 1
if (self.window_max - self.window) > (self.window_flexibility-1):
self.window_max -= 1
# TODO: Remove
RNS.log("Decreased channel window to "+str(self.window))
return False return False
if self._outlet.get_packet_state(packet) != MessageState.MSGSTATE_DELIVERED: if self._outlet.get_packet_state(packet) != MessageState.MSGSTATE_DELIVERED: