diff --git a/RNS/Link.py b/RNS/Link.py index 54839d7..ec3de32 100644 --- a/RNS/Link.py +++ b/RNS/Link.py @@ -4,7 +4,9 @@ from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import ec from cryptography.hazmat.primitives.kdf.hkdf import HKDF from cryptography.fernet import Fernet +from time import sleep import vendor.umsgpack as umsgpack +import threading import base64 import time import RNS @@ -24,12 +26,23 @@ class Link: ECPUBSIZE = 91 BLOCKSIZE = 16 + # TODO: This should not be hardcoded, + # but calculated from something like + # first-hop RTT latency and distance + PROOF_TIMEOUT = 10 + TIMEOUT_FACTOR = 4 + KEEPALIVE = 120 + PENDING = 0x00 HANDSHAKE = 0x01 ACTIVE = 0x02 STALE = 0x03 CLOSED = 0x04 + TIMEOUT = 0x01 + INITIATOR_CLOSED = 0x02 + DESTINATION_CLOSED = 0x03 + ACCEPT_NONE = 0x00 ACCEPT_APP = 0x01 ACCEPT_ALL = 0x02 @@ -47,6 +60,9 @@ class Link: link.prove() link.request_time = time.time() RNS.Transport.registerLink(link) + link.last_inbound = time.time() + link.start_watchdog() + if link.owner.callbacks.link_established != None: link.owner.callbacks.link_established(link) RNS.log("Incoming link request "+str(link)+" accepted", RNS.LOG_VERBOSE) @@ -69,6 +85,16 @@ class Link: self.resource_strategy = Link.ACCEPT_NONE self.outgoing_resources = [] self.incoming_resources = [] + self.last_inbound = 0 + self.last_outbound = 0 + self.tx = 0 + self.rx = 0 + self.txbytes = 0 + self.rxbytes = 0 + self.proof_timeout = Link.PROOF_TIMEOUT + self.timeout_factor = Link.TIMEOUT_FACTOR + self.keepalive = Link.KEEPALIVE + self.watchdog_lock = False self.status = Link.PENDING self.type = RNS.Destination.LINK self.owner = owner @@ -100,6 +126,7 @@ class Link: self.setLinkID(self.packet) RNS.Transport.registerLink(self) self.request_time = time.time() + self.start_watchdog() self.packet.send() RNS.log("Link request "+RNS.prettyhexrep(self.link_id)+" sent to "+str(self.destination), RNS.LOG_VERBOSE) @@ -133,29 +160,30 @@ class Link: proof.send() def validateProof(self, packet): - peer_pub_bytes = packet.data[:Link.ECPUBSIZE] - signed_data = self.link_id+peer_pub_bytes - signature = packet.data[Link.ECPUBSIZE:RNS.Identity.KEYSIZE/8+Link.ECPUBSIZE] + if self.initiator: + peer_pub_bytes = packet.data[:Link.ECPUBSIZE] + signed_data = self.link_id+peer_pub_bytes + signature = packet.data[Link.ECPUBSIZE:RNS.Identity.KEYSIZE/8+Link.ECPUBSIZE] - if self.destination.identity.validate(signature, signed_data): - self.loadPeer(peer_pub_bytes) - self.handshake() - self.rtt = time.time() - self.request_time - self.attached_interface = packet.receiving_interface - RNS.Transport.activateLink(self) - RNS.log("Link "+str(self)+" established with "+str(self.destination)+", RTT is "+str(self.rtt), RNS.LOG_VERBOSE) - rtt_data = umsgpack.packb(self.rtt) - rtt_packet = RNS.Packet(self, rtt_data, context=RNS.Packet.LRRTT) - rtt_packet.send() + if self.destination.identity.validate(signature, signed_data): + self.loadPeer(peer_pub_bytes) + self.handshake() + self.rtt = time.time() - self.request_time + self.attached_interface = packet.receiving_interface + RNS.Transport.activateLink(self) + RNS.log("Link "+str(self)+" established with "+str(self.destination)+", RTT is "+str(self.rtt), RNS.LOG_VERBOSE) + rtt_data = umsgpack.packb(self.rtt) + rtt_packet = RNS.Packet(self, rtt_data, context=RNS.Packet.LRRTT) + rtt_packet.send() - self.status = Link.ACTIVE - if self.callbacks.link_established != None: - self.callbacks.link_established(self) - else: - RNS.log("Invalid link proof signature received by "+str(self), RNS.LOG_VERBOSE) - # TODO: should we really do this, or just wait - # for a valid one? Needs analysis. - self.teardown() + self.status = Link.ACTIVE + if self.callbacks.link_established != None: + self.callbacks.link_established(self) + else: + RNS.log("Invalid link proof signature received by "+str(self), RNS.LOG_VERBOSE) + # TODO: should we really do this, or just wait + # for a valid one? Needs analysis. + self.teardown() def rtt_packet(self, packet): @@ -168,7 +196,6 @@ class Link: measured_rtt = time.time() - self.request_time plaintext = self.decrypt(packet.data) rtt = umsgpack.unpackb(plaintext) - #RNS.log("Measured RTT is "+str(measured_rtt)+", received RTT is "+str(rtt)) self.rtt = max(measured_rtt, rtt) self.status = Link.ACTIVE except Exception as e: @@ -185,6 +212,10 @@ class Link: teardown_packet = RNS.Packet(self, self.link_id, context=RNS.Packet.LINKCLOSE) teardown_packet.send() self.status = Link.CLOSED + if self.initiator: + self.teardown_reason = Link.INITIATOR_CLOSED + else: + self.teardown_reason = Link.DESTINATION_CLOSED self.link_closed() def teardown_packet(self, packet): @@ -192,6 +223,10 @@ class Link: plaintext = self.decrypt(packet.data) if plaintext == self.link_id: self.status = Link.CLOSED + if self.initiator: + self.teardown_reason = Link.DESTINATION_CLOSED + else: + self.teardown_reason = Link.INITIATOR_CLOSED self.link_closed() except Exception as e: pass @@ -206,12 +241,80 @@ class Link: if self.callbacks.link_closed != None: self.callbacks.link_closed(self) + def start_watchdog(self): + thread = threading.Thread(target=self.__watchdog_job) + thread.setDaemon(True) + thread.start() + + def __watchdog_job(self): + while not self.status == Link.CLOSED: + while (self.watchdog_lock): + sleep(max(self.rtt, 0.025)) + + if not self.status == Link.CLOSED: + # Link was initiated, but no response + # from destination yet + if self.status == Link.PENDING: + next_check = self.request_time + self.proof_timeout + sleep_time = next_check - time.time() + if time.time() >= self.request_time + self.proof_timeout: + RNS.log("Link establishment timed out", RNS.LOG_VERBOSE) + self.status = Link.CLOSED + self.teardown_reason = Link.TIMEOUT + self.link_closed() + sleep_time = 0.001 + + elif self.status == Link.HANDSHAKE: + next_check = self.request_time + self.proof_timeout + sleep_time = next_check - time.time() + if time.time() >= self.request_time + self.proof_timeout: + RNS.log("Timeout waiting for RTT packet from link initiator", RNS.LOG_VERBOSE) + self.status = Link.CLOSED + self.teardown_reason = Link.TIMEOUT + self.link_closed() + sleep_time = 0.001 + + elif self.status == Link.ACTIVE: + if time.time() >= self.last_inbound + self.keepalive: + sleep_time = self.rtt * self.timeout_factor + self.status = Link.STALE + if self.initiator: + self.send_keepalive() + else: + sleep_time = (self.last_inbound + self.keepalive) - time.time() + + elif self.status == Link.STALE: + sleep_time = 0.001 + self.status = Link.CLOSED + self.teardown_reason = Link.TIMEOUT + self.link_closed() + + + if sleep_time == 0: + RNS.log("Warning! Link watchdog sleep time of 0!", RNS.LOG_ERROR) + if sleep_time == None or sleep_time < 0: + RNS.log("Timing error! Closing Reticulum now.", RNS.LOG_CRITICAL) + RNS.panic() + + sleep(sleep_time) + + + def send_keepalive(self): + keepalive_packet = RNS.Packet(self, chr(0xFF), context=RNS.Packet.KEEPALIVE) + keepalive_packet.send() def receive(self, packet): - if not self.status == Link.CLOSED: + self.watchdog_lock = True + if not self.status == Link.CLOSED and not (self.initiator and packet.context == RNS.Packet.KEEPALIVE and packet.data == chr(0xFF)): if packet.receiving_interface != self.attached_interface: RNS.log("Link-associated packet received on unexpected interface! Someone might be trying to manipulate your communication!", RNS.LOG_ERROR) else: + self.last_inbound = time.time() + self.rx += 1 + self.rxbytes += len(packet.data) + if self.status == Link.STALE: + self.status = Link.ACTIVE + if packet.packet_type == RNS.Packet.DATA: if packet.context == RNS.Packet.NONE: plaintext = self.decrypt(packet.data) @@ -259,6 +362,12 @@ class Link: if resource_hash == resource.hash: resource.cancel() + elif packet.context == RNS.Packet.KEEPALIVE: + if not self.initiator and packet.data == chr(0xFF): + keepalive_packet = RNS.Packet(self, chr(0xFE), context=RNS.Packet.KEEPALIVE) + keepalive_packet.send() + + # TODO: find the most efficient way to allow multiple # transfers at the same time, sending resource hash on # each packet is a huge overhead. Probably some kind @@ -274,6 +383,8 @@ class Link: if resource_hash == resource.hash: resource.validateProof(packet.data) + self.watchdog_lock = False + def encrypt(self, plaintext): if self.__encryption_disabled: diff --git a/RNS/Packet.py b/RNS/Packet.py index 7bb34fe..96bcad3 100755 --- a/RNS/Packet.py +++ b/RNS/Packet.py @@ -29,6 +29,7 @@ class Packet: RESPONSE = 0x09 COMMAND = 0x0A COMMAND_STAT = 0x0B + KEEPALIVE = 0xFC LINKCLOSE = 0xFD LRRTT = 0xFE LRPROOF = 0xFF @@ -98,6 +99,10 @@ class Packet: # A resource takes care of symmetric # encryption by itself self.ciphertext = self.data + elif self.context == Packet.KEEPALIVE: + # Keepalive packets contain no actual + # data + self.ciphertext = self.data else: # In all other cases, we encrypt the packet # with the destination's public key @@ -148,6 +153,11 @@ class Packet: if self.destination.type == RNS.Destination.LINK: if self.destination.status == RNS.Link.CLOSED: raise IOError("Attempt to transmit over a closed link") + else: + self.destination.last_outbound = time.time() + self.destination.tx += 1 + self.destination.txbytes += len(self.data) + if not self.packed: self.pack() diff --git a/RNS/Resource.py b/RNS/Resource.py index be08bba..2901887 100644 --- a/RNS/Resource.py +++ b/RNS/Resource.py @@ -4,6 +4,7 @@ import math import time import threading import vendor.umsgpack as umsgpack +from time import sleep class Resource: diff --git a/RNS/Transport.py b/RNS/Transport.py index 83b61ad..945ec95 100755 --- a/RNS/Transport.py +++ b/RNS/Transport.py @@ -97,6 +97,13 @@ class Transport: Transport.jobs_locked = False return sent + @staticmethod + def packet_filter(packet): + if packet.context == RNS.Packet.KEEPALIVE: + return True + if not packet.packet_hash in Transport.packet_hashlist: + return True + @staticmethod def inbound(raw, interface=None): while (Transport.jobs_running): @@ -111,7 +118,7 @@ class Transport: RNS.log(str(interface)+" received packet with hash "+RNS.prettyhexrep(packet.packet_hash), RNS.LOG_DEBUG) - if not packet.packet_hash in Transport.packet_hashlist: + if Transport.packet_filter(packet): Transport.packet_hashlist.append(packet.packet_hash) if packet.packet_type == RNS.Packet.ANNOUNCE: