Tuned channel windowing
This commit is contained in:
parent
e10ddf9d2d
commit
73faf04ea1
@ -242,7 +242,7 @@ class Channel(contextlib.AbstractContextManager):
|
|||||||
|
|
||||||
# If the fast rate is sustained for this many request
|
# If the fast rate is sustained for this many request
|
||||||
# rounds, the fast link window size will be allowed.
|
# rounds, the fast link window size will be allowed.
|
||||||
FAST_RATE_THRESHOLD = 5
|
FAST_RATE_THRESHOLD = 10
|
||||||
|
|
||||||
# If the RTT rate is higher than this value,
|
# If the RTT rate is higher than this value,
|
||||||
# the max window size for fast links will be used.
|
# the max window size for fast links will be used.
|
||||||
@ -412,37 +412,21 @@ class Channel(contextlib.AbstractContextManager):
|
|||||||
contigous.append(e)
|
contigous.append(e)
|
||||||
self._next_rx_sequence = (self._next_rx_sequence + 1) % Channel.SEQ_MODULUS
|
self._next_rx_sequence = (self._next_rx_sequence + 1) % Channel.SEQ_MODULUS
|
||||||
# TODO: Remove
|
# TODO: Remove
|
||||||
|
if self._next_rx_sequence > Channel.SEQ_MODULUS - 100:
|
||||||
|
RNS.log("RX SEQ "+str(self._next_rx_sequence))
|
||||||
|
print("RX SEQ "+str(self._next_rx_sequence))
|
||||||
|
|
||||||
if self._next_rx_sequence == 0:
|
if self._next_rx_sequence == 0:
|
||||||
RNS.log("SEQ OVERFLOW")
|
RNS.log("RX SEQ OVERFLOW")
|
||||||
|
RNS.log("RX SEQ OVERFLOW")
|
||||||
|
print("RX SEQ OVERFLOW")
|
||||||
|
print("RX SEQ OVERFLOW")
|
||||||
|
|
||||||
for e in contigous:
|
for e in contigous:
|
||||||
m = e.unpack(self._message_factories)
|
m = e.unpack(self._message_factories)
|
||||||
self._rx_ring.remove(e)
|
self._rx_ring.remove(e)
|
||||||
threading.Thread(target=self._run_callbacks, name="Message Callback", args=[m], daemon=True).start()
|
threading.Thread(target=self._run_callbacks, name="Message Callback", args=[m], daemon=True).start()
|
||||||
|
|
||||||
if self.window < self.window_max:
|
|
||||||
self.window += 1
|
|
||||||
if (self.window - self.window_min) > (self.window_flexibility-1):
|
|
||||||
self.window_min += 1
|
|
||||||
|
|
||||||
# TODO: Remove
|
|
||||||
RNS.log("Increased channel window to "+str(self.window), RNS.LOG_DEBUG)
|
|
||||||
|
|
||||||
if self._outlet.rtt != 0:
|
|
||||||
# TODO: Remove
|
|
||||||
# RNS.log("Link RTT is "+str(self._outlet.rtt), RNS.LOG_DEBUG)
|
|
||||||
if self._outlet.rtt > Channel.RTT_FAST:
|
|
||||||
self.fast_rate_rounds = 0
|
|
||||||
|
|
||||||
else:
|
|
||||||
self.fast_rate_rounds += 1
|
|
||||||
|
|
||||||
if self.window_max < Channel.WINDOW_MAX_FAST and self.fast_rate_rounds == Channel.FAST_RATE_THRESHOLD:
|
|
||||||
self.window_max = Channel.WINDOW_MAX_FAST
|
|
||||||
|
|
||||||
# TODO: Remove
|
|
||||||
RNS.log("Increased max channel window to "+str(self.window_max), RNS.LOG_DEBUG)
|
|
||||||
|
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
RNS.log(f"Channel: Error receiving data: {ex}")
|
RNS.log(f"Channel: Error receiving data: {ex}")
|
||||||
|
|
||||||
@ -476,6 +460,30 @@ class Channel(contextlib.AbstractContextManager):
|
|||||||
envelope.tracked = False
|
envelope.tracked = False
|
||||||
if envelope in self._tx_ring:
|
if envelope in self._tx_ring:
|
||||||
self._tx_ring.remove(envelope)
|
self._tx_ring.remove(envelope)
|
||||||
|
|
||||||
|
if self.window < self.window_max:
|
||||||
|
self.window += 1
|
||||||
|
if (self.window - self.window_min) > (self.window_flexibility-1):
|
||||||
|
self.window_min += 1
|
||||||
|
|
||||||
|
# TODO: Remove
|
||||||
|
RNS.log("Increased channel window to "+str(self.window), RNS.LOG_DEBUG)
|
||||||
|
|
||||||
|
if self._outlet.rtt != 0:
|
||||||
|
# TODO: Remove
|
||||||
|
# RNS.log("Link RTT is "+str(self._outlet.rtt), RNS.LOG_DEBUG)
|
||||||
|
if self._outlet.rtt > Channel.RTT_FAST:
|
||||||
|
self.fast_rate_rounds = 0
|
||||||
|
|
||||||
|
else:
|
||||||
|
self.fast_rate_rounds += 1
|
||||||
|
|
||||||
|
if self.window_max < Channel.WINDOW_MAX_FAST and self.fast_rate_rounds == Channel.FAST_RATE_THRESHOLD:
|
||||||
|
self.window_max = Channel.WINDOW_MAX_FAST
|
||||||
|
|
||||||
|
# TODO: Remove
|
||||||
|
RNS.log("Increased max channel window to "+str(self.window_max), RNS.LOG_DEBUG)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
RNS.log("Channel: Envelope not found in TX ring", RNS.LOG_DEBUG)
|
RNS.log("Channel: Envelope not found in TX ring", RNS.LOG_DEBUG)
|
||||||
if not envelope:
|
if not envelope:
|
||||||
@ -527,6 +535,17 @@ class Channel(contextlib.AbstractContextManager):
|
|||||||
raise ChannelException(CEType.ME_LINK_NOT_READY, f"Link is not ready")
|
raise ChannelException(CEType.ME_LINK_NOT_READY, f"Link is not ready")
|
||||||
envelope = Envelope(self._outlet, message=message, sequence=self._next_sequence)
|
envelope = Envelope(self._outlet, message=message, sequence=self._next_sequence)
|
||||||
self._next_sequence = (self._next_sequence + 1) % Channel.SEQ_MODULUS
|
self._next_sequence = (self._next_sequence + 1) % Channel.SEQ_MODULUS
|
||||||
|
# TODO: Remove
|
||||||
|
if self._next_sequence > Channel.SEQ_MODULUS - 100:
|
||||||
|
RNS.log("RX SEQ "+str(self._next_rx_sequence))
|
||||||
|
print("RX SEQ "+str(self._next_rx_sequence))
|
||||||
|
|
||||||
|
if self._next_sequence == 0:
|
||||||
|
RNS.log("TX SEQ OVERFLOW")
|
||||||
|
RNS.log("TX SEQ OVERFLOW")
|
||||||
|
print("TX SEQ OVERFLOW")
|
||||||
|
print("TX SEQ OVERFLOW")
|
||||||
|
|
||||||
self._emplace_envelope(envelope, self._tx_ring)
|
self._emplace_envelope(envelope, self._tx_ring)
|
||||||
if envelope is None:
|
if envelope is None:
|
||||||
raise BlockingIOError()
|
raise BlockingIOError()
|
||||||
|
Loading…
Reference in New Issue
Block a user