Cleanup
This commit is contained in:
parent
1542c5f4fe
commit
66fda34b20
@ -253,7 +253,8 @@ class Channel(contextlib.AbstractContextManager):
|
|||||||
# will never be smaller than this value.
|
# will never be smaller than this value.
|
||||||
WINDOW_FLEXIBILITY = 4
|
WINDOW_FLEXIBILITY = 4
|
||||||
|
|
||||||
SEQ_MODULUS = 0x10000
|
SEQ_MAX = 0xFFFF
|
||||||
|
SEQ_MODULUS = SEQ_MAX+1
|
||||||
|
|
||||||
def __init__(self, outlet: ChannelOutletBase):
|
def __init__(self, outlet: ChannelOutletBase):
|
||||||
"""
|
"""
|
||||||
@ -378,8 +379,8 @@ class Channel(contextlib.AbstractContextManager):
|
|||||||
try:
|
try:
|
||||||
if cb(message):
|
if cb(message):
|
||||||
return
|
return
|
||||||
except Exception as ex:
|
except Exception as e:
|
||||||
RNS.log(f"Channel "+str(self)+" experienced an error while running message callback: {ex}", RNS.LOG_ERROR)
|
RNS.log("Channel "+str(self)+" experienced an error while running a message callback. The contained exception was: "+str(e), RNS.LOG_ERROR)
|
||||||
|
|
||||||
def _receive(self, raw: bytes):
|
def _receive(self, raw: bytes):
|
||||||
try:
|
try:
|
||||||
@ -387,7 +388,6 @@ class Channel(contextlib.AbstractContextManager):
|
|||||||
with self._lock:
|
with self._lock:
|
||||||
message = envelope.unpack(self._message_factories)
|
message = envelope.unpack(self._message_factories)
|
||||||
|
|
||||||
# TODO: Test sequence overflow
|
|
||||||
if envelope.sequence < self._next_rx_sequence:
|
if envelope.sequence < self._next_rx_sequence:
|
||||||
window_overflow = (self._next_rx_sequence+Channel.WINDOW_MAX) % Channel.SEQ_MODULUS
|
window_overflow = (self._next_rx_sequence+Channel.WINDOW_MAX) % Channel.SEQ_MODULUS
|
||||||
if window_overflow < self._next_rx_sequence:
|
if window_overflow < self._next_rx_sequence:
|
||||||
@ -411,24 +411,14 @@ class Channel(contextlib.AbstractContextManager):
|
|||||||
if e.sequence == self._next_rx_sequence:
|
if e.sequence == self._next_rx_sequence:
|
||||||
contigous.append(e)
|
contigous.append(e)
|
||||||
self._next_rx_sequence = (self._next_rx_sequence + 1) % Channel.SEQ_MODULUS
|
self._next_rx_sequence = (self._next_rx_sequence + 1) % Channel.SEQ_MODULUS
|
||||||
# TODO: Remove
|
|
||||||
if self._next_rx_sequence > Channel.SEQ_MODULUS - 100:
|
|
||||||
RNS.log("RX SEQ "+str(self._next_rx_sequence))
|
|
||||||
print("RX SEQ "+str(self._next_rx_sequence))
|
|
||||||
|
|
||||||
if self._next_rx_sequence == 0:
|
|
||||||
RNS.log("RX SEQ OVERFLOW")
|
|
||||||
RNS.log("RX SEQ OVERFLOW")
|
|
||||||
print("RX SEQ OVERFLOW")
|
|
||||||
print("RX SEQ OVERFLOW")
|
|
||||||
|
|
||||||
for e in contigous:
|
for e in contigous:
|
||||||
m = e.unpack(self._message_factories)
|
m = e.unpack(self._message_factories)
|
||||||
self._rx_ring.remove(e)
|
self._rx_ring.remove(e)
|
||||||
threading.Thread(target=self._run_callbacks, name="Message Callback", args=[m], daemon=True).start()
|
threading.Thread(target=self._run_callbacks, name="Message Callback", args=[m], daemon=True).start()
|
||||||
|
|
||||||
except Exception as ex:
|
except Exception as e:
|
||||||
RNS.log(f"Channel: Error receiving data: {ex}")
|
RNS.log("An error ocurred while receiving data on "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
|
||||||
|
|
||||||
def is_ready_to_send(self) -> bool:
|
def is_ready_to_send(self) -> bool:
|
||||||
"""
|
"""
|
||||||
@ -437,7 +427,6 @@ class Channel(contextlib.AbstractContextManager):
|
|||||||
:return: True if ready
|
:return: True if ready
|
||||||
"""
|
"""
|
||||||
if not self._outlet.is_usable:
|
if not self._outlet.is_usable:
|
||||||
RNS.log("Channel: Link is not usable.", RNS.LOG_EXTREME)
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
with self._lock:
|
with self._lock:
|
||||||
@ -466,12 +455,10 @@ class Channel(contextlib.AbstractContextManager):
|
|||||||
if (self.window - self.window_min) > (self.window_flexibility-1):
|
if (self.window - self.window_min) > (self.window_flexibility-1):
|
||||||
self.window_min += 1
|
self.window_min += 1
|
||||||
|
|
||||||
# TODO: Remove
|
# TODO: Remove at some point
|
||||||
RNS.log("Increased channel window to "+str(self.window), RNS.LOG_DEBUG)
|
RNS.log("Increased "+str(self)+" window to "+str(self.window), RNS.LOG_DEBUG)
|
||||||
|
|
||||||
if self._outlet.rtt != 0:
|
if self._outlet.rtt != 0:
|
||||||
# TODO: Remove
|
|
||||||
# RNS.log("Link RTT is "+str(self._outlet.rtt), RNS.LOG_DEBUG)
|
|
||||||
if self._outlet.rtt > Channel.RTT_FAST:
|
if self._outlet.rtt > Channel.RTT_FAST:
|
||||||
self.fast_rate_rounds = 0
|
self.fast_rate_rounds = 0
|
||||||
|
|
||||||
@ -481,13 +468,13 @@ class Channel(contextlib.AbstractContextManager):
|
|||||||
if self.window_max < Channel.WINDOW_MAX_FAST and self.fast_rate_rounds == Channel.FAST_RATE_THRESHOLD:
|
if self.window_max < Channel.WINDOW_MAX_FAST and self.fast_rate_rounds == Channel.FAST_RATE_THRESHOLD:
|
||||||
self.window_max = Channel.WINDOW_MAX_FAST
|
self.window_max = Channel.WINDOW_MAX_FAST
|
||||||
|
|
||||||
# TODO: Remove
|
# TODO: Remove at some point
|
||||||
RNS.log("Increased max channel window to "+str(self.window_max), RNS.LOG_DEBUG)
|
RNS.log("Increased "+str(self)+" max window to "+str(self.window_max), RNS.LOG_EXTREME)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
RNS.log("Channel: Envelope not found in TX ring", RNS.LOG_DEBUG)
|
RNS.log("Envelope not found in TX ring for "+str(self), RNS.LOG_DEBUG)
|
||||||
if not envelope:
|
if not envelope:
|
||||||
RNS.log("Channel: Spurious message received.", RNS.LOG_EXTREME)
|
RNS.log("Spurious message received on "+str(self), RNS.LOG_EXTREME)
|
||||||
|
|
||||||
def _packet_delivered(self, packet: TPacket):
|
def _packet_delivered(self, packet: TPacket):
|
||||||
self._packet_tx_op(packet, lambda env: True)
|
self._packet_tx_op(packet, lambda env: True)
|
||||||
@ -498,7 +485,7 @@ class Channel(contextlib.AbstractContextManager):
|
|||||||
def _packet_timeout(self, packet: TPacket):
|
def _packet_timeout(self, packet: TPacket):
|
||||||
def retry_envelope(envelope: Envelope) -> bool:
|
def retry_envelope(envelope: Envelope) -> bool:
|
||||||
if envelope.tries >= self._max_tries:
|
if envelope.tries >= self._max_tries:
|
||||||
RNS.log("Channel: Retry count exceeded, tearing down Link.", RNS.LOG_ERROR)
|
RNS.log("Retry count exceeded on "+str(self)+", tearing down Link.", RNS.LOG_ERROR)
|
||||||
self._shutdown() # start on separate thread?
|
self._shutdown() # start on separate thread?
|
||||||
self._outlet.timed_out()
|
self._outlet.timed_out()
|
||||||
return True
|
return True
|
||||||
@ -514,8 +501,8 @@ class Channel(contextlib.AbstractContextManager):
|
|||||||
if (self.window_max - self.window) > (self.window_flexibility-1):
|
if (self.window_max - self.window) > (self.window_flexibility-1):
|
||||||
self.window_max -= 1
|
self.window_max -= 1
|
||||||
|
|
||||||
# TODO: Remove
|
# TODO: Remove at some point
|
||||||
RNS.log("Decreased channel window to "+str(self.window), RNS.LOG_DEBUG)
|
RNS.log("Decreased "+str(self)+" window to "+str(self.window), RNS.LOG_EXTREME)
|
||||||
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
@ -535,16 +522,6 @@ class Channel(contextlib.AbstractContextManager):
|
|||||||
raise ChannelException(CEType.ME_LINK_NOT_READY, f"Link is not ready")
|
raise ChannelException(CEType.ME_LINK_NOT_READY, f"Link is not ready")
|
||||||
envelope = Envelope(self._outlet, message=message, sequence=self._next_sequence)
|
envelope = Envelope(self._outlet, message=message, sequence=self._next_sequence)
|
||||||
self._next_sequence = (self._next_sequence + 1) % Channel.SEQ_MODULUS
|
self._next_sequence = (self._next_sequence + 1) % Channel.SEQ_MODULUS
|
||||||
# TODO: Remove
|
|
||||||
if self._next_sequence > Channel.SEQ_MODULUS - 100:
|
|
||||||
RNS.log("RX SEQ "+str(self._next_rx_sequence))
|
|
||||||
print("RX SEQ "+str(self._next_rx_sequence))
|
|
||||||
|
|
||||||
if self._next_sequence == 0:
|
|
||||||
RNS.log("TX SEQ OVERFLOW")
|
|
||||||
RNS.log("TX SEQ OVERFLOW")
|
|
||||||
print("TX SEQ OVERFLOW")
|
|
||||||
print("TX SEQ OVERFLOW")
|
|
||||||
|
|
||||||
self._emplace_envelope(envelope, self._tx_ring)
|
self._emplace_envelope(envelope, self._tx_ring)
|
||||||
if envelope is None:
|
if envelope is None:
|
||||||
|
Loading…
Reference in New Issue
Block a user