From acae9e34c2d0ba892107499ae79ab94219ae52be Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Thu, 23 Sep 2021 16:07:57 +0200 Subject: [PATCH] Improved link status detection and recovery of TCP interfaces over unreliable IP links. --- RNS/Interfaces/TCPInterface.py | 116 ++++++++++++++++++++++++--------- RNS/Interfaces/UDPInterface.py | 9 ++- RNS/Transport.py | 2 +- 3 files changed, 94 insertions(+), 33 deletions(-) diff --git a/RNS/Interfaces/TCPInterface.py b/RNS/Interfaces/TCPInterface.py index bc33fcc..091c7c0 100644 --- a/RNS/Interfaces/TCPInterface.py +++ b/RNS/Interfaces/TCPInterface.py @@ -2,6 +2,7 @@ from .Interface import Interface import socketserver import threading import netifaces +import platform import socket import time import sys @@ -26,6 +27,12 @@ class TCPClientInterface(Interface): RECONNECT_WAIT = 5 RECONNECT_MAX_TRIES = None + # TCP socket options + TCP_USER_TIMEOUT = 20 + TCP_PROBE_AFTER = 5 + TCP_PROBE_INTERVAL = 3 + TCP_PROBES = 5 + def __init__(self, owner, name, target_ip=None, target_port=None, connected_socket=None, max_reconnect_tries=None): self.IN = True self.OUT = False @@ -33,6 +40,8 @@ class TCPClientInterface(Interface): self.parent_interface = None self.name = name self.initiator = False + self.reconnecting = False + self.never_connected = True if max_reconnect_tries == None: self.max_reconnect_tries = TCPClientInterface.RECONNECT_MAX_TRIES @@ -45,13 +54,16 @@ class TCPClientInterface(Interface): self.target_port = None self.socket = connected_socket + if platform.system() == "Linux": + self.set_timeouts_linux() + elif platform.system() == "Darwin": + self.set_timeouts_osx() + elif target_ip != None and target_port != None: self.receives = True self.target_ip = target_ip self.target_port = target_port - - self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.socket.connect((self.target_ip, self.target_port)) + self.connect() self.owner = owner self.online = True @@ -64,32 +76,65 @@ class TCPClientInterface(Interface): thread.start() self.wants_tunnel = True + def set_timeouts_linux(self): + self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_USER_TIMEOUT, int(TCPClientInterface.TCP_USER_TIMEOUT * 1000)) + + self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, int(TCPClientInterface.TCP_PROBE_AFTER)) + self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, int(TCPClientInterface.TCP_PROBE_INTERVAL)) + self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, int(TCPClientInterface.TCP_PROBES)) + + def set_timeouts_osx(self): + if hasattr(socket, "TCP_KEEPALIVE"): + TCP_KEEPIDLE = socket.TCP_KEEPALIVE + else: + TCP_KEEPIDLE = 0x10 + + sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + sock.setsockopt(socket.IPPROTO_TCP, TCP_KEEPIDLE, int(TCPClientInterface.TCP_PROBE_AFTER)) + + def connect(self): + self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.socket.connect((self.target_ip, self.target_port)) + + if platform.system() == "Linux": + self.set_timeouts_linux() + elif platform.system() == "Darwin": + self.set_timeouts_osx() + + self.online = True + self.writing = False + self.never_connected = False + + def reconnect(self): if self.initiator: - attempts = 0 - while not self.online: - attempts += 1 + if not self.reconnecting: + self.reconnecting = True + attempts = 0 + while not self.online: + time.sleep(TCPClientInterface.RECONNECT_WAIT) + attempts += 1 - if self.max_reconnect_tries != None and attempts > self.max_reconnect_tries: - RNS.log("Max reconnection attempts reached for "+str(self), RNS.LOG_ERROR) - self.teardown() - break + if self.max_reconnect_tries != None and attempts > self.max_reconnect_tries: + RNS.log("Max reconnection attempts reached for "+str(self), RNS.LOG_ERROR) + self.teardown() + break - try: - self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.socket.connect((self.target_ip, self.target_port)) - self.online = True - self.writing = False + try: + self.connect() - thread = threading.Thread(target=self.read_loop) - thread.setDaemon(True) - thread.start() - RNS.Transport.synthesize_tunnel(self) + except Exception as e: + RNS.log("Connection attempt for "+str(self)+" failed: "+str(e), RNS.LOG_DEBUG) - except Exception as e: - RNS.log("Reconnection attempt for "+str(self)+" failed. The contained exception was: "+str(e), RNS.LOG_ERROR) + if not self.never_connected: + RNS.log("Reconnected TCP socket for "+str(self)+".", RNS.LOG_INFO) - time.sleep(TCPClientInterface.RECONNECT_WAIT) + self.reconnecting = False + thread = threading.Thread(target=self.read_loop) + thread.setDaemon(True) + thread.start() + RNS.Transport.synthesize_tunnel(self) else: RNS.log("Attempt to reconnect on a non-initiator TCP interface. This should not happen.", RNS.LOG_ERROR) @@ -145,30 +190,43 @@ class TCPClientInterface(Interface): escape = False data_buffer = data_buffer+bytes([byte]) else: - RNS.log("TCP socket for "+str(self)+" was closed, attempting to reconnect...", RNS.LOG_WARNING) self.online = False if self.initiator: + RNS.log("TCP socket for "+str(self)+" was closed, attempting to reconnect...", RNS.LOG_WARNING) self.reconnect() + else: + RNS.log("TCP socket for remote client "+str(self)+" was closed.", RNS.LOG_VERBOSE) + self.teardown() break except Exception as e: self.online = False - RNS.log("An interface error occurred, the contained exception was: "+str(e), RNS.LOG_ERROR) - self.teardown() + RNS.log("An interface error occurred for "+str(self)+", the contained exception was: "+str(e), RNS.LOG_WARNING) + + if self.initiator: + RNS.log("Attempting to reconnect...", RNS.LOG_WARNING) + self.reconnect() + else: + self.teardown() def teardown(self): - RNS.log("The interface "+str(self)+" experienced an unrecoverable error and is being torn down. Restart Reticulum to attempt to open this interface again.", RNS.LOG_ERROR) + if self.initiator: + RNS.log("The interface "+str(self)+" experienced an unrecoverable error and is being torn down. Restart Reticulum to attempt to open this interface again.", RNS.LOG_ERROR) + if RNS.Reticulum.panic_on_interface_error: + RNS.panic() + + else: + RNS.log("The interface "+str(self)+" is being torn down.", RNS.LOG_VERBOSE) + self.online = False self.OUT = False self.IN = False + if self in RNS.Transport.interfaces: RNS.Transport.interfaces.remove(self) - if RNS.Reticulum.panic_on_interface_error: - RNS.panic() - def __str__(self): return "TCPInterface["+str(self.name)+"/"+str(self.target_ip)+":"+str(self.target_port)+"]" diff --git a/RNS/Interfaces/UDPInterface.py b/RNS/Interfaces/UDPInterface.py index ab2622e..a92b40e 100644 --- a/RNS/Interfaces/UDPInterface.py +++ b/RNS/Interfaces/UDPInterface.py @@ -57,9 +57,12 @@ class UDPInterface(Interface): self.owner.inbound(data, self) def processOutgoing(self,data): - udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - udp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) - udp_socket.sendto(data, (self.forward_ip, self.forward_port)) + try: + udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + udp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + udp_socket.sendto(data, (self.forward_ip, self.forward_port)) + except Exception as e: + RNS.log("Could not transmit on "+str(self)+". The contained exception was: "+str(e), RNS.LOG_ERROR) def __str__(self): diff --git a/RNS/Transport.py b/RNS/Transport.py index d3abec1..e43b22f 100755 --- a/RNS/Transport.py +++ b/RNS/Transport.py @@ -1099,7 +1099,7 @@ class Transport: if should_add: Transport.destination_table[destination_hash] = new_entry - RNS.log("Restored path to "+RNS.prettyhexrep(packet.destination_hash)+" is now "+str(announce_hops)+" hops away via "+RNS.prettyhexrep(received_from)+" on "+str(receiving_interface), RNS.LOG_VERBOSE) + RNS.log("Restored path to "+RNS.prettyhexrep(packet.destination_hash)+" is now "+str(announce_hops)+" hops away via "+RNS.prettyhexrep(received_from)+" on "+str(receiving_interface), RNS.LOG_DEBUG)