Added Identity docstrings. Renamed Identity method.
This commit is contained in:
parent
59f83ee1a5
commit
522204d8a5
109
RNS/Identity.py
109
RNS/Identity.py
@ -14,8 +14,17 @@ from cryptography.hazmat.primitives.asymmetric import rsa
|
|||||||
from cryptography.hazmat.primitives.asymmetric import padding
|
from cryptography.hazmat.primitives.asymmetric import padding
|
||||||
|
|
||||||
class Identity:
|
class Identity:
|
||||||
#KEYSIZE = 1536
|
"""
|
||||||
|
This class is used to manage identities in Reticulum. It provides methods
|
||||||
|
for encryption, decryption, signatures and verification, and is the basis
|
||||||
|
for all encrypted communication over Reticulum networks.
|
||||||
|
|
||||||
|
:param public_only: Specifies whether this destination only holds a public key.
|
||||||
|
"""
|
||||||
KEYSIZE = 1024
|
KEYSIZE = 1024
|
||||||
|
"""
|
||||||
|
RSA key size in bits.
|
||||||
|
"""
|
||||||
DERKEYSIZE = KEYSIZE+272
|
DERKEYSIZE = KEYSIZE+272
|
||||||
|
|
||||||
# Non-configurable constants
|
# Non-configurable constants
|
||||||
@ -27,6 +36,10 @@ class Identity:
|
|||||||
DECRYPT_CHUNKSIZE = KEYSIZE//8
|
DECRYPT_CHUNKSIZE = KEYSIZE//8
|
||||||
|
|
||||||
TRUNCATED_HASHLENGTH = 80 # In bits
|
TRUNCATED_HASHLENGTH = 80 # In bits
|
||||||
|
"""
|
||||||
|
Constant specifying the truncated hash length (in bits) used by Reticulum
|
||||||
|
for addressable hashes. Non-configurable.
|
||||||
|
"""
|
||||||
|
|
||||||
# Storage
|
# Storage
|
||||||
known_destinations = {}
|
known_destinations = {}
|
||||||
@ -38,6 +51,12 @@ class Identity:
|
|||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def recall(destination_hash):
|
def recall(destination_hash):
|
||||||
|
"""
|
||||||
|
Recall identity for a destination hash.
|
||||||
|
|
||||||
|
:param destination_hash: Destination hash as *bytes*.
|
||||||
|
:returns: An :ref:`RNS.Identity<api-identity>` instance that can be used to create an outgoing :ref:`RNS.Destination<api-destination>`, or *None* if the destination is unknown.
|
||||||
|
"""
|
||||||
RNS.log("Searching for "+RNS.prettyhexrep(destination_hash)+"...", RNS.LOG_EXTREME)
|
RNS.log("Searching for "+RNS.prettyhexrep(destination_hash)+"...", RNS.LOG_EXTREME)
|
||||||
if destination_hash in Identity.known_destinations:
|
if destination_hash in Identity.known_destinations:
|
||||||
identity_data = Identity.known_destinations[destination_hash]
|
identity_data = Identity.known_destinations[destination_hash]
|
||||||
@ -52,6 +71,12 @@ class Identity:
|
|||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def recall_app_data(destination_hash):
|
def recall_app_data(destination_hash):
|
||||||
|
"""
|
||||||
|
Recall last heard app_data for a destination hash.
|
||||||
|
|
||||||
|
:param destination_hash: Destination hash as *bytes*.
|
||||||
|
:returns: *Bytes* containing app_data, or *None* if the destination is unknown.
|
||||||
|
"""
|
||||||
RNS.log("Searching for app_data for "+RNS.prettyhexrep(destination_hash)+"...", RNS.LOG_EXTREME)
|
RNS.log("Searching for app_data for "+RNS.prettyhexrep(destination_hash)+"...", RNS.LOG_EXTREME)
|
||||||
if destination_hash in Identity.known_destinations:
|
if destination_hash in Identity.known_destinations:
|
||||||
app_data = Identity.known_destinations[destination_hash][3]
|
app_data = Identity.known_destinations[destination_hash][3]
|
||||||
@ -84,6 +109,12 @@ class Identity:
|
|||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def full_hash(data):
|
def full_hash(data):
|
||||||
|
"""
|
||||||
|
Get a SHA-256 hash of passed data.
|
||||||
|
|
||||||
|
:param data: Data to be hashed as *bytes*.
|
||||||
|
:returns: SHA-256 hash as *bytes*
|
||||||
|
"""
|
||||||
digest = hashes.Hash(hashes.SHA256(), backend=default_backend())
|
digest = hashes.Hash(hashes.SHA256(), backend=default_backend())
|
||||||
digest.update(data)
|
digest.update(data)
|
||||||
|
|
||||||
@ -91,10 +122,22 @@ class Identity:
|
|||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def truncated_hash(data):
|
def truncated_hash(data):
|
||||||
|
"""
|
||||||
|
Get a truncated SHA-256 hash of passed data.
|
||||||
|
|
||||||
|
:param data: Data to be hashed as *bytes*.
|
||||||
|
:returns: Truncated SHA-256 hash as *bytes*
|
||||||
|
"""
|
||||||
return Identity.full_hash(data)[:(Identity.TRUNCATED_HASHLENGTH//8)]
|
return Identity.full_hash(data)[:(Identity.TRUNCATED_HASHLENGTH//8)]
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def get_random_hash():
|
def get_random_hash():
|
||||||
|
"""
|
||||||
|
Get a random SHA-256 hash.
|
||||||
|
|
||||||
|
:param data: Data to be hashed as *bytes*.
|
||||||
|
:returns: Truncated SHA-256 hash of random data as *bytes*
|
||||||
|
"""
|
||||||
return Identity.truncated_hash(os.urandom(10))
|
return Identity.truncated_hash(os.urandom(10))
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
@ -134,6 +177,13 @@ class Identity:
|
|||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def from_file(path):
|
def from_file(path):
|
||||||
|
"""
|
||||||
|
Create a new :ref:`RNS.Identity<api-identity>` instance from a file.
|
||||||
|
Can be used to load previously created and saved identities into Reticulum.
|
||||||
|
|
||||||
|
:param path: The full path to the saved :ref:`RNS.Identity<api-identity>` data
|
||||||
|
:returns: A :ref:`RNS.Identity<api-identity>` instance, or *None* if the loaded data was invalid.
|
||||||
|
"""
|
||||||
identity = Identity(public_only=True)
|
identity = Identity(public_only=True)
|
||||||
if identity.load(path):
|
if identity.load(path):
|
||||||
return identity
|
return identity
|
||||||
@ -175,12 +225,24 @@ class Identity:
|
|||||||
RNS.log("Identity keys created for "+RNS.prettyhexrep(self.hash), RNS.LOG_VERBOSE)
|
RNS.log("Identity keys created for "+RNS.prettyhexrep(self.hash), RNS.LOG_VERBOSE)
|
||||||
|
|
||||||
def get_private_key(self):
|
def get_private_key(self):
|
||||||
|
"""
|
||||||
|
:returns: The private key as *bytes*
|
||||||
|
"""
|
||||||
return self.prv_bytes
|
return self.prv_bytes
|
||||||
|
|
||||||
def get_public_key(self):
|
def get_public_key(self):
|
||||||
|
"""
|
||||||
|
:returns: The public key as *bytes*
|
||||||
|
"""
|
||||||
return self.pub_bytes
|
return self.pub_bytes
|
||||||
|
|
||||||
def load_private_key(self, prv_bytes):
|
def load_private_key(self, prv_bytes):
|
||||||
|
"""
|
||||||
|
Load a private key into the instance.
|
||||||
|
|
||||||
|
:param prv_bytes: The private key as *bytes*.
|
||||||
|
:returns: True if the key was loaded, otherwise False.
|
||||||
|
"""
|
||||||
try:
|
try:
|
||||||
self.prv_bytes = prv_bytes
|
self.prv_bytes = prv_bytes
|
||||||
self.prv = serialization.load_der_private_key(
|
self.prv = serialization.load_der_private_key(
|
||||||
@ -203,6 +265,12 @@ class Identity:
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
def load_public_key(self, key):
|
def load_public_key(self, key):
|
||||||
|
"""
|
||||||
|
Load a public key into the instance.
|
||||||
|
|
||||||
|
:param prv_bytes: The public key as *bytes*.
|
||||||
|
:returns: True if the key was loaded, otherwise False.
|
||||||
|
"""
|
||||||
try:
|
try:
|
||||||
self.pub_bytes = key
|
self.pub_bytes = key
|
||||||
self.pub = load_der_public_key(self.pub_bytes, backend=default_backend())
|
self.pub = load_der_public_key(self.pub_bytes, backend=default_backend())
|
||||||
@ -214,7 +282,15 @@ class Identity:
|
|||||||
self.hash = Identity.truncated_hash(self.pub_bytes)
|
self.hash = Identity.truncated_hash(self.pub_bytes)
|
||||||
self.hexhash = self.hash.hex()
|
self.hexhash = self.hash.hex()
|
||||||
|
|
||||||
def save(self, path):
|
def to_file(self, path):
|
||||||
|
"""
|
||||||
|
Saves the identity to a file. This will write the private key to disk,
|
||||||
|
and anyone with access to this file will be able to decrypt all
|
||||||
|
communication for the identity. Be very careful with this method.
|
||||||
|
|
||||||
|
:param path: The full path specifying where to save the identity.
|
||||||
|
:returns: True if the file was saved, otherwise False.
|
||||||
|
"""
|
||||||
try:
|
try:
|
||||||
with open(path, "wb") as key_file:
|
with open(path, "wb") as key_file:
|
||||||
key_file.write(self.prv_bytes)
|
key_file.write(self.prv_bytes)
|
||||||
@ -235,6 +311,13 @@ class Identity:
|
|||||||
RNS.log("The contained exception was: "+str(e))
|
RNS.log("The contained exception was: "+str(e))
|
||||||
|
|
||||||
def encrypt(self, plaintext):
|
def encrypt(self, plaintext):
|
||||||
|
"""
|
||||||
|
Encrypts information for the identity.
|
||||||
|
|
||||||
|
:param plaintext: The plaintext to be encrypted as *bytes*.
|
||||||
|
:returns: Ciphertext as *bytes*.
|
||||||
|
:raises: *KeyError* if the instance does not hold a public key
|
||||||
|
"""
|
||||||
if self.pub != None:
|
if self.pub != None:
|
||||||
chunksize = Identity.ENCRYPT_CHUNKSIZE
|
chunksize = Identity.ENCRYPT_CHUNKSIZE
|
||||||
chunks = int(math.ceil(len(plaintext)/(float(chunksize))))
|
chunks = int(math.ceil(len(plaintext)/(float(chunksize))))
|
||||||
@ -260,6 +343,13 @@ class Identity:
|
|||||||
|
|
||||||
|
|
||||||
def decrypt(self, ciphertext):
|
def decrypt(self, ciphertext):
|
||||||
|
"""
|
||||||
|
Decrypts information for the identity.
|
||||||
|
|
||||||
|
:param ciphertext: The ciphertext to be decrypted as *bytes*.
|
||||||
|
:returns: Plaintext as *bytes*, or *None* if decryption fails.
|
||||||
|
:raises: *KeyError* if the instance does not hold a private key
|
||||||
|
"""
|
||||||
if self.prv != None:
|
if self.prv != None:
|
||||||
plaintext = None
|
plaintext = None
|
||||||
try:
|
try:
|
||||||
@ -290,6 +380,13 @@ class Identity:
|
|||||||
|
|
||||||
|
|
||||||
def sign(self, message):
|
def sign(self, message):
|
||||||
|
"""
|
||||||
|
Signs information by the identity.
|
||||||
|
|
||||||
|
:param message: The message to be signed as *bytes*.
|
||||||
|
:returns: Signature as *bytes*.
|
||||||
|
:raises: *KeyError* if the instance does not hold a private key
|
||||||
|
"""
|
||||||
if self.prv != None:
|
if self.prv != None:
|
||||||
signature = self.prv.sign(
|
signature = self.prv.sign(
|
||||||
message,
|
message,
|
||||||
@ -304,6 +401,14 @@ class Identity:
|
|||||||
raise KeyError("Signing failed because identity does not hold a private key")
|
raise KeyError("Signing failed because identity does not hold a private key")
|
||||||
|
|
||||||
def validate(self, signature, message):
|
def validate(self, signature, message):
|
||||||
|
"""
|
||||||
|
Validates the signature of a signed message.
|
||||||
|
|
||||||
|
:param signature: The signature to be validated as *bytes*.
|
||||||
|
:param message: The message to be validated as *bytes*.
|
||||||
|
:returns: True if the signature is valid, otherwise False.
|
||||||
|
:raises: *KeyError* if the instance does not hold a public key
|
||||||
|
"""
|
||||||
if self.pub != None:
|
if self.pub != None:
|
||||||
try:
|
try:
|
||||||
self.pub.verify(
|
self.pub.verify(
|
||||||
|
@ -93,7 +93,7 @@ class Transport:
|
|||||||
if Transport.identity == None:
|
if Transport.identity == None:
|
||||||
RNS.log("No valid Transport Identity in storage, creating...", RNS.LOG_VERBOSE)
|
RNS.log("No valid Transport Identity in storage, creating...", RNS.LOG_VERBOSE)
|
||||||
Transport.identity = RNS.Identity()
|
Transport.identity = RNS.Identity()
|
||||||
Transport.identity.save(transport_identity_path)
|
Transport.identity.to_file(transport_identity_path)
|
||||||
else:
|
else:
|
||||||
RNS.log("Loaded Transport Identity from storage", RNS.LOG_VERBOSE)
|
RNS.log("Loaded Transport Identity from storage", RNS.LOG_VERBOSE)
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user