From a8ea7bcca6ff9a79b537def6fbc79331d1c26135 Mon Sep 17 00:00:00 2001 From: Mark Qvist Date: Fri, 10 Jun 2022 11:27:52 +0200 Subject: [PATCH] Updated tests --- tests/identity.py | 162 ++++++++++++++++++++++++++++++++++------------ tests/link.py | 13 +++- 2 files changed, 130 insertions(+), 45 deletions(-) diff --git a/tests/identity.py b/tests/identity.py index 982c1ac..85c52d1 100644 --- a/tests/identity.py +++ b/tests/identity.py @@ -27,7 +27,125 @@ class TestIdentity(unittest.TestCase): self.assertEqual(i.hash, bytes.fromhex(id_hash)) self.assertEqual(i.get_private_key(), bytes.fromhex(key)) - def test_1_encrypt(self): + def test_1_sign(self): + print("") + + # Test known signature + fid = RNS.Identity.from_bytes(bytes.fromhex(fixed_keys[0][0])) + sig = fid.sign(signed_message.encode("utf-8")) + + self.assertEqual(sig, bytes.fromhex(sig_from_key_0)) + + # Test signature time jitter + id1 = RNS.Identity() + id2 = RNS.Identity(create_keys=False) + id2.load_public_key(id1.get_public_key()) + + if RNS.Cryptography.backend() == "internal": + rounds = 2000 + else: + rounds = 20000 + + times = [] + for i in range(1, rounds): + msg = os.urandom(512) + start = time.time() + signature = id1.sign(msg) + t = time.time() - start + times.append(t) + + import statistics + tmin = min(times)*1000 + tmax = max(times)*1000 + tmean = (sum(times)/len(times))*1000 + tmed = statistics.median(times)*1000 + tmdev = tmax - tmin + mpct = (tmax/tmed)*100 + print("Random messages:") + print(" Signature timing min/avg/med/max/mdev: "+str(round(tmin, 3))+"/"+str(round(tmean, 3))+"/"+str(round(tmed, 3))+"/"+str(round(tmax, 3))+"/"+str(round(tmdev, 3))) + print(" Max deviation from median: "+str(round(mpct, 1))+"%") + print() + + id1 = RNS.Identity() + id2 = RNS.Identity(create_keys=False) + id2.load_public_key(id1.get_public_key()) + + times = [] + for i in range(1, rounds): + msg = bytes([0x00])*512 + start = time.time() + signature = id1.sign(msg) + t = time.time() - start + times.append(t) + + tmin = min(times)*1000 + tmax = max(times)*1000 + tmean = (sum(times)/len(times))*1000 + tmed = statistics.median(times)*1000 + tmdev = tmax - tmin + mpct = (tmax/tmed)*100 + print("All 0xff messages:") + print(" Signature timing min/avg/med/max/mdev: "+str(round(tmin, 3))+"/"+str(round(tmean, 3))+"/"+str(round(tmed, 3))+"/"+str(round(tmax, 3))+"/"+str(round(tmdev, 3))) + print(" Max deviation from median: "+str(round(mpct, 1))+"%") + print() + + id1 = RNS.Identity() + id2 = RNS.Identity(create_keys=False) + id2.load_public_key(id1.get_public_key()) + + times = [] + for i in range(1, rounds): + msg = bytes([0xff])*512 + start = time.time() + signature = id1.sign(msg) + t = time.time() - start + times.append(t) + + tmin = min(times)*1000 + tmax = max(times)*1000 + tmean = (sum(times)/len(times))*1000 + tmed = statistics.median(times)*1000 + tmdev = tmax - tmin + mpct = (tmax/tmed)*100 + print("All 0x00 messages:") + print(" Signature timing min/avg/med/max/mdev: "+str(round(tmin, 3))+"/"+str(round(tmean, 3))+"/"+str(round(tmed, 3))+"/"+str(round(tmax, 3))+"/"+str(round(tmdev, 3))) + print(" Max deviation from median: "+str(round(mpct, 1))+"%") + print() + + b = 0 + t = 0 + for i in range(1, 500): + mlen = i % (RNS.Reticulum.MTU//2) + (RNS.Reticulum.MTU//2) + msg = os.urandom(mlen) + b += mlen + id1 = RNS.Identity() + id2 = RNS.Identity(create_keys=False) + id2.load_public_key(id1.get_public_key()) + + start = time.time() + signature = id1.sign(msg) + self.assertEqual(True, id2.validate(signature, msg)) + t += time.time() - start + + print("Sign/validate chunks < MTU: "+self.size_str(b/t, "b")+"ps") + + for i in range(1, 500): + mlen = 16*1024 + msg = os.urandom(mlen) + b += mlen + id1 = RNS.Identity() + id2 = RNS.Identity(create_keys=False) + id2.load_public_key(id1.get_public_key()) + + start = time.time() + signature = id1.sign(msg) + self.assertEqual(True, id2.validate(signature, msg)) + t += time.time() - start + + print("Sign/validate 16KB chunks: "+self.size_str(b/t, "b")+"ps") + + + def test_2_encrypt(self): print("") # Test decryption of known token @@ -83,48 +201,6 @@ class TestIdentity(unittest.TestCase): print("Encrypt "+self.size_str(mlen)+" chunks: "+self.size_str(b/e_t, "b")+"ps") print("Decrypt "+self.size_str(mlen)+" chunks: "+self.size_str(b/d_t, "b")+"ps") - def test_2_sign(self): - print("") - - # Test known signature - fid = RNS.Identity.from_bytes(bytes.fromhex(fixed_keys[0][0])) - sig = fid.sign(signed_message.encode("utf-8")) - - self.assertEqual(sig, bytes.fromhex(sig_from_key_0)) - - b = 0 - t = 0 - for i in range(1, 500): - mlen = i % (RNS.Reticulum.MTU//2) + (RNS.Reticulum.MTU//2) - msg = os.urandom(mlen) - b += mlen - id1 = RNS.Identity() - id2 = RNS.Identity(create_keys=False) - id2.load_public_key(id1.get_public_key()) - - start = time.time() - signature = id1.sign(msg) - self.assertEqual(True, id2.validate(signature, msg)) - t += time.time() - start - - print("Sign/validate chunks < MTU: "+self.size_str(b/t, "b")+"ps") - - for i in range(1, 500): - mlen = 16*1024 - msg = os.urandom(mlen) - b += mlen - id1 = RNS.Identity() - id2 = RNS.Identity(create_keys=False) - id2.load_public_key(id1.get_public_key()) - - start = time.time() - signature = id1.sign(msg) - self.assertEqual(True, id2.validate(signature, msg)) - t += time.time() - start - - print("Sign/validate 16KB chunks: "+self.size_str(b/t, "b")+"ps") - - def size_str(self, num, suffix='B'): units = ['','K','M','G','T','P','E','Z'] last_unit = 'Y' diff --git a/tests/link.py b/tests/link.py index 5581e19..67c8ce0 100644 --- a/tests/link.py +++ b/tests/link.py @@ -19,11 +19,11 @@ fixed_keys = [ def targets_job(caller): cmd = "python -c \"from tests.link import targets; targets()\"" - print("Opening subprocess for "+str(caller)+"...", RNS.LOG_VERBOSE) + print("Opening subprocess for "+str(cmd)+"...", RNS.LOG_VERBOSE) ppath = os.getcwd() try: - caller.process = subprocess.Popen(shlex.split(cmd), cwd=ppath) + caller.process = subprocess.Popen(shlex.split(cmd), cwd=ppath, stdout=subprocess.PIPE) except Exception as e: raise e caller.pipe_is_open = False @@ -39,10 +39,19 @@ def init_rns(caller=None): c_rns.m_proc = caller.process print("Done starting local RNS instance...") +def close_rns(): + global c_rns + if c_rns != None: + c_rns.m_proc.kill() + class TestLink(unittest.TestCase): def setUp(self): pass + @classmethod + def tearDownClass(cls): + close_rns() + def test_0_establish(self): init_rns(self) print("")