Improved link status detection and recovery of TCP interfaces over unreliable IP links.

This commit is contained in:
Mark Qvist 2021-09-23 16:07:57 +02:00
parent aaf0ace027
commit acae9e34c2
3 changed files with 94 additions and 33 deletions

View File

@ -2,6 +2,7 @@ from .Interface import Interface
import socketserver import socketserver
import threading import threading
import netifaces import netifaces
import platform
import socket import socket
import time import time
import sys import sys
@ -26,6 +27,12 @@ class TCPClientInterface(Interface):
RECONNECT_WAIT = 5 RECONNECT_WAIT = 5
RECONNECT_MAX_TRIES = None RECONNECT_MAX_TRIES = None
# TCP socket options
TCP_USER_TIMEOUT = 20
TCP_PROBE_AFTER = 5
TCP_PROBE_INTERVAL = 3
TCP_PROBES = 5
def __init__(self, owner, name, target_ip=None, target_port=None, connected_socket=None, max_reconnect_tries=None): def __init__(self, owner, name, target_ip=None, target_port=None, connected_socket=None, max_reconnect_tries=None):
self.IN = True self.IN = True
self.OUT = False self.OUT = False
@ -33,6 +40,8 @@ class TCPClientInterface(Interface):
self.parent_interface = None self.parent_interface = None
self.name = name self.name = name
self.initiator = False self.initiator = False
self.reconnecting = False
self.never_connected = True
if max_reconnect_tries == None: if max_reconnect_tries == None:
self.max_reconnect_tries = TCPClientInterface.RECONNECT_MAX_TRIES self.max_reconnect_tries = TCPClientInterface.RECONNECT_MAX_TRIES
@ -45,13 +54,16 @@ class TCPClientInterface(Interface):
self.target_port = None self.target_port = None
self.socket = connected_socket self.socket = connected_socket
if platform.system() == "Linux":
self.set_timeouts_linux()
elif platform.system() == "Darwin":
self.set_timeouts_osx()
elif target_ip != None and target_port != None: elif target_ip != None and target_port != None:
self.receives = True self.receives = True
self.target_ip = target_ip self.target_ip = target_ip
self.target_port = target_port self.target_port = target_port
self.connect()
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((self.target_ip, self.target_port))
self.owner = owner self.owner = owner
self.online = True self.online = True
@ -64,32 +76,65 @@ class TCPClientInterface(Interface):
thread.start() thread.start()
self.wants_tunnel = True self.wants_tunnel = True
def set_timeouts_linux(self):
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_USER_TIMEOUT, int(TCPClientInterface.TCP_USER_TIMEOUT * 1000))
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, int(TCPClientInterface.TCP_PROBE_AFTER))
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, int(TCPClientInterface.TCP_PROBE_INTERVAL))
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, int(TCPClientInterface.TCP_PROBES))
def set_timeouts_osx(self):
if hasattr(socket, "TCP_KEEPALIVE"):
TCP_KEEPIDLE = socket.TCP_KEEPALIVE
else:
TCP_KEEPIDLE = 0x10
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
sock.setsockopt(socket.IPPROTO_TCP, TCP_KEEPIDLE, int(TCPClientInterface.TCP_PROBE_AFTER))
def connect(self):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((self.target_ip, self.target_port))
if platform.system() == "Linux":
self.set_timeouts_linux()
elif platform.system() == "Darwin":
self.set_timeouts_osx()
self.online = True
self.writing = False
self.never_connected = False
def reconnect(self): def reconnect(self):
if self.initiator: if self.initiator:
attempts = 0 if not self.reconnecting:
while not self.online: self.reconnecting = True
attempts += 1 attempts = 0
while not self.online:
time.sleep(TCPClientInterface.RECONNECT_WAIT)
attempts += 1
if self.max_reconnect_tries != None and attempts > self.max_reconnect_tries: if self.max_reconnect_tries != None and attempts > self.max_reconnect_tries:
RNS.log("Max reconnection attempts reached for "+str(self), RNS.LOG_ERROR) RNS.log("Max reconnection attempts reached for "+str(self), RNS.LOG_ERROR)
self.teardown() self.teardown()
break break
try: try:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.connect()
self.socket.connect((self.target_ip, self.target_port))
self.online = True
self.writing = False
thread = threading.Thread(target=self.read_loop) except Exception as e:
thread.setDaemon(True) RNS.log("Connection attempt for "+str(self)+" failed: "+str(e), RNS.LOG_DEBUG)
thread.start()
RNS.Transport.synthesize_tunnel(self)
except Exception as e: if not self.never_connected:
RNS.log("Reconnection attempt for "+str(self)+" failed. The contained exception was: "+str(e), RNS.LOG_ERROR) RNS.log("Reconnected TCP socket for "+str(self)+".", RNS.LOG_INFO)
time.sleep(TCPClientInterface.RECONNECT_WAIT) self.reconnecting = False
thread = threading.Thread(target=self.read_loop)
thread.setDaemon(True)
thread.start()
RNS.Transport.synthesize_tunnel(self)
else: else:
RNS.log("Attempt to reconnect on a non-initiator TCP interface. This should not happen.", RNS.LOG_ERROR) RNS.log("Attempt to reconnect on a non-initiator TCP interface. This should not happen.", RNS.LOG_ERROR)
@ -145,30 +190,43 @@ class TCPClientInterface(Interface):
escape = False escape = False
data_buffer = data_buffer+bytes([byte]) data_buffer = data_buffer+bytes([byte])
else: else:
RNS.log("TCP socket for "+str(self)+" was closed, attempting to reconnect...", RNS.LOG_WARNING)
self.online = False self.online = False
if self.initiator: if self.initiator:
RNS.log("TCP socket for "+str(self)+" was closed, attempting to reconnect...", RNS.LOG_WARNING)
self.reconnect() self.reconnect()
else:
RNS.log("TCP socket for remote client "+str(self)+" was closed.", RNS.LOG_VERBOSE)
self.teardown()
break break
except Exception as e: except Exception as e:
self.online = False self.online = False
RNS.log("An interface error occurred, the contained exception was: "+str(e), RNS.LOG_ERROR) RNS.log("An interface error occurred for "+str(self)+", the contained exception was: "+str(e), RNS.LOG_WARNING)
self.teardown()
if self.initiator:
RNS.log("Attempting to reconnect...", RNS.LOG_WARNING)
self.reconnect()
else:
self.teardown()
def teardown(self): def teardown(self):
RNS.log("The interface "+str(self)+" experienced an unrecoverable error and is being torn down. Restart Reticulum to attempt to open this interface again.", RNS.LOG_ERROR) if self.initiator:
RNS.log("The interface "+str(self)+" experienced an unrecoverable error and is being torn down. Restart Reticulum to attempt to open this interface again.", RNS.LOG_ERROR)
if RNS.Reticulum.panic_on_interface_error:
RNS.panic()
else:
RNS.log("The interface "+str(self)+" is being torn down.", RNS.LOG_VERBOSE)
self.online = False self.online = False
self.OUT = False self.OUT = False
self.IN = False self.IN = False
if self in RNS.Transport.interfaces: if self in RNS.Transport.interfaces:
RNS.Transport.interfaces.remove(self) RNS.Transport.interfaces.remove(self)
if RNS.Reticulum.panic_on_interface_error:
RNS.panic()
def __str__(self): def __str__(self):
return "TCPInterface["+str(self.name)+"/"+str(self.target_ip)+":"+str(self.target_port)+"]" return "TCPInterface["+str(self.name)+"/"+str(self.target_ip)+":"+str(self.target_port)+"]"

View File

@ -57,9 +57,12 @@ class UDPInterface(Interface):
self.owner.inbound(data, self) self.owner.inbound(data, self)
def processOutgoing(self,data): def processOutgoing(self,data):
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) try:
udp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udp_socket.sendto(data, (self.forward_ip, self.forward_port)) udp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
udp_socket.sendto(data, (self.forward_ip, self.forward_port))
except Exception as e:
RNS.log("Could not transmit on "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR)
def __str__(self): def __str__(self):

View File

@ -1099,7 +1099,7 @@ class Transport:
if should_add: if should_add:
Transport.destination_table[destination_hash] = new_entry Transport.destination_table[destination_hash] = new_entry
RNS.log("Restored path to "+RNS.prettyhexrep(packet.destination_hash)+" is now "+str(announce_hops)+" hops away via "+RNS.prettyhexrep(received_from)+" on "+str(receiving_interface), RNS.LOG_VERBOSE) RNS.log("Restored path to "+RNS.prettyhexrep(packet.destination_hash)+" is now "+str(announce_hops)+" hops away via "+RNS.prettyhexrep(received_from)+" on "+str(receiving_interface), RNS.LOG_DEBUG)