Improved Channel sequencing, retries and transfer efficiency
This commit is contained in:
parent
481062fca1
commit
8c8affc800
@ -234,13 +234,16 @@ class Channel(contextlib.AbstractContextManager):
|
|||||||
WINDOW = 2
|
WINDOW = 2
|
||||||
|
|
||||||
# Absolute minimum window size
|
# Absolute minimum window size
|
||||||
WINDOW_MIN = 1
|
WINDOW_MIN = 2
|
||||||
|
WINDOW_MIN_LIMIT_SLOW = 2
|
||||||
|
WINDOW_MIN_LIMIT_MEDIUM = 5
|
||||||
|
WINDOW_MIN_LIMIT_FAST = 16
|
||||||
|
|
||||||
# The maximum window size for transfers on slow links
|
# The maximum window size for transfers on slow links
|
||||||
WINDOW_MAX_SLOW = 5
|
WINDOW_MAX_SLOW = 5
|
||||||
|
|
||||||
# The maximum window size for transfers on mid-speed links
|
# The maximum window size for transfers on mid-speed links
|
||||||
WINDOW_MAX_MEDIUM = 16
|
WINDOW_MAX_MEDIUM = 12
|
||||||
|
|
||||||
# The maximum window size for transfers on fast links
|
# The maximum window size for transfers on fast links
|
||||||
WINDOW_MAX_FAST = 48
|
WINDOW_MAX_FAST = 48
|
||||||
@ -255,7 +258,7 @@ class Channel(contextlib.AbstractContextManager):
|
|||||||
|
|
||||||
# If the RTT rate is higher than this value,
|
# If the RTT rate is higher than this value,
|
||||||
# the max window size for fast links will be used.
|
# the max window size for fast links will be used.
|
||||||
RTT_FAST = 0.25
|
RTT_FAST = 0.18
|
||||||
RTT_MEDIUM = 0.75
|
RTT_MEDIUM = 0.75
|
||||||
RTT_SLOW = 1.45
|
RTT_SLOW = 1.45
|
||||||
|
|
||||||
@ -385,9 +388,8 @@ class Channel(contextlib.AbstractContextManager):
|
|||||||
RNS.log(f"Envelope: Emplacement of duplicate envelope with sequence "+str(envelope.sequence), RNS.LOG_EXTREME)
|
RNS.log(f"Envelope: Emplacement of duplicate envelope with sequence "+str(envelope.sequence), RNS.LOG_EXTREME)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
if envelope.sequence < existing.sequence and not envelope.sequence < window_overflow:
|
if envelope.sequence < existing.sequence and not (self._next_rx_sequence - envelope.sequence) > (Channel.SEQ_MAX//2):
|
||||||
ring.insert(i, envelope)
|
ring.insert(i, envelope)
|
||||||
RNS.log("Inserted seq "+str(envelope.sequence)+" at "+str(i), RNS.LOG_DEBUG)
|
|
||||||
|
|
||||||
envelope.tracked = True
|
envelope.tracked = True
|
||||||
return True
|
return True
|
||||||
@ -396,6 +398,7 @@ class Channel(contextlib.AbstractContextManager):
|
|||||||
|
|
||||||
envelope.tracked = True
|
envelope.tracked = True
|
||||||
ring.append(envelope)
|
ring.append(envelope)
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def _run_callbacks(self, message: MessageBase):
|
def _run_callbacks(self, message: MessageBase):
|
||||||
@ -429,13 +432,18 @@ class Channel(contextlib.AbstractContextManager):
|
|||||||
if not is_new:
|
if not is_new:
|
||||||
RNS.log("Duplicate message received on channel "+str(self), RNS.LOG_EXTREME)
|
RNS.log("Duplicate message received on channel "+str(self), RNS.LOG_EXTREME)
|
||||||
return
|
return
|
||||||
else:
|
else:
|
||||||
with self._lock:
|
with self._lock:
|
||||||
contigous = []
|
contigous = []
|
||||||
for e in self._rx_ring:
|
for e in self._rx_ring:
|
||||||
if e.sequence == self._next_rx_sequence:
|
if e.sequence == self._next_rx_sequence:
|
||||||
contigous.append(e)
|
contigous.append(e)
|
||||||
self._next_rx_sequence = (self._next_rx_sequence + 1) % Channel.SEQ_MODULUS
|
self._next_rx_sequence = (self._next_rx_sequence + 1) % Channel.SEQ_MODULUS
|
||||||
|
if self._next_rx_sequence == 0:
|
||||||
|
for e in self._rx_ring:
|
||||||
|
if e.sequence == self._next_rx_sequence:
|
||||||
|
contigous.append(e)
|
||||||
|
self._next_rx_sequence = (self._next_rx_sequence + 1) % Channel.SEQ_MODULUS
|
||||||
|
|
||||||
for e in contigous:
|
for e in contigous:
|
||||||
if not e.unpacked:
|
if not e.unpacked:
|
||||||
@ -474,6 +482,7 @@ class Channel(contextlib.AbstractContextManager):
|
|||||||
with self._lock:
|
with self._lock:
|
||||||
envelope = next(filter(lambda e: self._outlet.get_packet_id(e.packet) == self._outlet.get_packet_id(packet),
|
envelope = next(filter(lambda e: self._outlet.get_packet_id(e.packet) == self._outlet.get_packet_id(packet),
|
||||||
self._tx_ring), None)
|
self._tx_ring), None)
|
||||||
|
|
||||||
if envelope and op(envelope):
|
if envelope and op(envelope):
|
||||||
envelope.tracked = False
|
envelope.tracked = False
|
||||||
if envelope in self._tx_ring:
|
if envelope in self._tx_ring:
|
||||||
@ -481,11 +490,9 @@ class Channel(contextlib.AbstractContextManager):
|
|||||||
|
|
||||||
if self.window < self.window_max:
|
if self.window < self.window_max:
|
||||||
self.window += 1
|
self.window += 1
|
||||||
if (self.window - self.window_min) > (self.window_flexibility-1):
|
|
||||||
self.window_min += 1
|
|
||||||
|
|
||||||
# TODO: Remove at some point
|
# TODO: Remove at some point
|
||||||
# RNS.log("Increased "+str(self)+" window to "+str(self.window), RNS.LOG_EXTREME)
|
# RNS.log("Increased "+str(self)+" window to "+str(self.window), RNS.LOG_DEBUG)
|
||||||
|
|
||||||
if self._outlet.rtt != 0:
|
if self._outlet.rtt != 0:
|
||||||
if self._outlet.rtt > Channel.RTT_FAST:
|
if self._outlet.rtt > Channel.RTT_FAST:
|
||||||
@ -498,15 +505,20 @@ class Channel(contextlib.AbstractContextManager):
|
|||||||
self.medium_rate_rounds += 1
|
self.medium_rate_rounds += 1
|
||||||
if self.window_max < Channel.WINDOW_MAX_MEDIUM and self.medium_rate_rounds == Channel.FAST_RATE_THRESHOLD:
|
if self.window_max < Channel.WINDOW_MAX_MEDIUM and self.medium_rate_rounds == Channel.FAST_RATE_THRESHOLD:
|
||||||
self.window_max = Channel.WINDOW_MAX_MEDIUM
|
self.window_max = Channel.WINDOW_MAX_MEDIUM
|
||||||
|
self.window_min = Channel.WINDOW_MIN_LIMIT_MEDIUM
|
||||||
# TODO: Remove at some point
|
# TODO: Remove at some point
|
||||||
# RNS.log("Increased "+str(self)+" max window to "+str(self.window_max), RNS.LOG_EXTREME)
|
# RNS.log("Increased "+str(self)+" max window to "+str(self.window_max), RNS.LOG_DEBUG)
|
||||||
|
# RNS.log("Increased "+str(self)+" min window to "+str(self.window_min), RNS.LOG_DEBUG)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
self.fast_rate_rounds += 1
|
self.fast_rate_rounds += 1
|
||||||
if self.window_max < Channel.WINDOW_MAX_FAST and self.fast_rate_rounds == Channel.FAST_RATE_THRESHOLD:
|
if self.window_max < Channel.WINDOW_MAX_FAST and self.fast_rate_rounds == Channel.FAST_RATE_THRESHOLD:
|
||||||
self.window_max = Channel.WINDOW_MAX_FAST
|
self.window_max = Channel.WINDOW_MAX_FAST
|
||||||
|
self.window_min = Channel.WINDOW_MIN_LIMIT_FAST
|
||||||
# TODO: Remove at some point
|
# TODO: Remove at some point
|
||||||
# RNS.log("Increased "+str(self)+" max window to "+str(self.window_max), RNS.LOG_EXTREME)
|
# RNS.log("Increased "+str(self)+" max window to "+str(self.window_max), RNS.LOG_DEBUG)
|
||||||
|
# RNS.log("Increased "+str(self)+" min window to "+str(self.window_min), RNS.LOG_DEBUG)
|
||||||
|
|
||||||
|
|
||||||
else:
|
else:
|
||||||
RNS.log("Envelope not found in TX ring for "+str(self), RNS.LOG_EXTREME)
|
RNS.log("Envelope not found in TX ring for "+str(self), RNS.LOG_EXTREME)
|
||||||
@ -516,8 +528,15 @@ class Channel(contextlib.AbstractContextManager):
|
|||||||
def _packet_delivered(self, packet: TPacket):
|
def _packet_delivered(self, packet: TPacket):
|
||||||
self._packet_tx_op(packet, lambda env: True)
|
self._packet_tx_op(packet, lambda env: True)
|
||||||
|
|
||||||
|
def _update_packet_timeouts(self):
|
||||||
|
for envelope in self._tx_ring:
|
||||||
|
updated_timeout = self._get_packet_timeout_time(envelope.tries)
|
||||||
|
if updated_timeout > envelope.packet.receipt.timeout:
|
||||||
|
envelope.packet.receipt.set_timeout(updated_timeout)
|
||||||
|
|
||||||
def _get_packet_timeout_time(self, tries: int) -> float:
|
def _get_packet_timeout_time(self, tries: int) -> float:
|
||||||
return pow(2, tries - 1) * max(self._outlet.rtt, 0.01) * 5
|
to = pow(1.5, tries - 1) * max(self._outlet.rtt*2.5, 0.025) * (len(self._tx_ring)+1.5)
|
||||||
|
return to
|
||||||
|
|
||||||
def _packet_timeout(self, packet: TPacket):
|
def _packet_timeout(self, packet: TPacket):
|
||||||
def retry_envelope(envelope: Envelope) -> bool:
|
def retry_envelope(envelope: Envelope) -> bool:
|
||||||
@ -526,17 +545,22 @@ class Channel(contextlib.AbstractContextManager):
|
|||||||
self._shutdown() # start on separate thread?
|
self._shutdown() # start on separate thread?
|
||||||
self._outlet.timed_out()
|
self._outlet.timed_out()
|
||||||
return True
|
return True
|
||||||
|
|
||||||
envelope.tries += 1
|
envelope.tries += 1
|
||||||
self._outlet.resend(envelope.packet)
|
self._outlet.resend(envelope.packet)
|
||||||
self._outlet.set_packet_delivered_callback(envelope.packet, self._packet_delivered)
|
self._outlet.set_packet_delivered_callback(envelope.packet, self._packet_delivered)
|
||||||
self._outlet.set_packet_timeout_callback(envelope.packet, self._packet_timeout, self._get_packet_timeout_time(envelope.tries))
|
self._outlet.set_packet_timeout_callback(envelope.packet, self._packet_timeout, self._get_packet_timeout_time(envelope.tries))
|
||||||
|
self._update_packet_timeouts()
|
||||||
|
|
||||||
if self.window > self.window_min:
|
if self.window > self.window_min:
|
||||||
self.window -= 1
|
self.window -= 1
|
||||||
if self.window_max > self.window_min:
|
# TODO: Remove at some point
|
||||||
|
# RNS.log("Decreased "+str(self)+" window to "+str(self.window), RNS.LOG_DEBUG)
|
||||||
|
|
||||||
|
if self.window_max > (self.window_min+self.window_flexibility):
|
||||||
self.window_max -= 1
|
self.window_max -= 1
|
||||||
if (self.window_max - self.window) > (self.window_flexibility-1):
|
# TODO: Remove at some point
|
||||||
self.window_max -= 1
|
# RNS.log("Decreased "+str(self)+" max window to "+str(self.window_max), RNS.LOG_DEBUG)
|
||||||
|
|
||||||
# TODO: Remove at some point
|
# TODO: Remove at some point
|
||||||
# RNS.log("Decreased "+str(self)+" window to "+str(self.window), RNS.LOG_EXTREME)
|
# RNS.log("Decreased "+str(self)+" window to "+str(self.window), RNS.LOG_EXTREME)
|
||||||
@ -573,6 +597,7 @@ class Channel(contextlib.AbstractContextManager):
|
|||||||
envelope.tries += 1
|
envelope.tries += 1
|
||||||
self._outlet.set_packet_delivered_callback(envelope.packet, self._packet_delivered)
|
self._outlet.set_packet_delivered_callback(envelope.packet, self._packet_delivered)
|
||||||
self._outlet.set_packet_timeout_callback(envelope.packet, self._packet_timeout, self._get_packet_timeout_time(envelope.tries))
|
self._outlet.set_packet_timeout_callback(envelope.packet, self._packet_timeout, self._get_packet_timeout_time(envelope.tries))
|
||||||
|
self._update_packet_timeouts()
|
||||||
|
|
||||||
return envelope
|
return envelope
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user