Buffer: send and receive binary data over Channel
(also some minor fixes in channel)
This commit is contained in:
		
							parent
							
								
									58004d7c05
								
							
						
					
					
						commit
						aac2b9f987
					
				
							
								
								
									
										323
									
								
								Examples/Buffer.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										323
									
								
								Examples/Buffer.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,323 @@ | ||||
| ########################################################## | ||||
| # This RNS example demonstrates how to set up a link to  # | ||||
| # a destination, and pass binary data over it using a    # | ||||
| # using a channel buffer.                                # | ||||
| ########################################################## | ||||
| from __future__ import annotations | ||||
| import os | ||||
| import sys | ||||
| import time | ||||
| import argparse | ||||
| from datetime import datetime | ||||
| 
 | ||||
| import RNS | ||||
| from RNS.vendor import umsgpack | ||||
| 
 | ||||
| # Let's define an app name. We'll use this for all | ||||
| # destinations we create. Since this echo example | ||||
| # is part of a range of example utilities, we'll put | ||||
| # them all within the app namespace "example_utilities" | ||||
| APP_NAME = "example_utilities" | ||||
| 
 | ||||
| 
 | ||||
| ########################################################## | ||||
| #### Server Part ######################################### | ||||
| ########################################################## | ||||
| 
 | ||||
| # A reference to the latest client link that connected | ||||
| latest_client_link = None | ||||
| 
 | ||||
| # A reference to the latest buffer object | ||||
| latest_buffer = None | ||||
| 
 | ||||
| # This initialisation is executed when the users chooses | ||||
| # to run as a server | ||||
| def server(configpath): | ||||
|     # We must first initialise Reticulum | ||||
|     reticulum = RNS.Reticulum(configpath) | ||||
|      | ||||
|     # Randomly create a new identity for our link example | ||||
|     server_identity = RNS.Identity() | ||||
| 
 | ||||
|     # We create a destination that clients can connect to. We | ||||
|     # want clients to create links to this destination, so we | ||||
|     # need to create a "single" destination type. | ||||
|     server_destination = RNS.Destination( | ||||
|         server_identity, | ||||
|         RNS.Destination.IN, | ||||
|         RNS.Destination.SINGLE, | ||||
|         APP_NAME, | ||||
|         "bufferexample" | ||||
|     ) | ||||
| 
 | ||||
|     # We configure a function that will get called every time | ||||
|     # a new client creates a link to this destination. | ||||
|     server_destination.set_link_established_callback(client_connected) | ||||
| 
 | ||||
|     # Everything's ready! | ||||
|     # Let's Wait for client requests or user input | ||||
|     server_loop(server_destination) | ||||
| 
 | ||||
| def server_loop(destination): | ||||
|     # Let the user know that everything is ready | ||||
|     RNS.log( | ||||
|         "Link example "+ | ||||
|         RNS.prettyhexrep(destination.hash)+ | ||||
|         " running, waiting for a connection." | ||||
|     ) | ||||
| 
 | ||||
|     RNS.log("Hit enter to manually send an announce (Ctrl-C to quit)") | ||||
| 
 | ||||
|     # We enter a loop that runs until the users exits. | ||||
|     # If the user hits enter, we will announce our server | ||||
|     # destination on the network, which will let clients | ||||
|     # know how to create messages directed towards it. | ||||
|     while True: | ||||
|         entered = input() | ||||
|         destination.announce() | ||||
|         RNS.log("Sent announce from "+RNS.prettyhexrep(destination.hash)) | ||||
| 
 | ||||
| # When a client establishes a link to our server | ||||
| # destination, this function will be called with | ||||
| # a reference to the link. | ||||
| def client_connected(link): | ||||
|     global latest_client_link, latest_buffer | ||||
|     latest_client_link = link | ||||
| 
 | ||||
|     RNS.log("Client connected") | ||||
|     link.set_link_closed_callback(client_disconnected) | ||||
| 
 | ||||
|     # If a new connection is received, the old reader | ||||
|     # needs to be disconnected. | ||||
|     if latest_buffer: | ||||
|         latest_buffer.close() | ||||
| 
 | ||||
| 
 | ||||
|     # Create buffer objects. | ||||
|     #   The stream_id parameter to these functions is | ||||
|     #   a bit like a file descriptor, except that it | ||||
|     #   is unique to the *receiver*. | ||||
|     # | ||||
|     #   In this example, both the reader and the writer | ||||
|     #   use stream_id = 0, but there are actually two | ||||
|     #   separate unidirectional streams flowing in | ||||
|     #   opposite directions. | ||||
|     # | ||||
|     channel = link.get_channel() | ||||
|     latest_buffer = RNS.Buffer.create_bidirectional_buffer(0, 0, channel, server_buffer_ready) | ||||
| 
 | ||||
| def client_disconnected(link): | ||||
|     RNS.log("Client disconnected") | ||||
| 
 | ||||
| def server_buffer_ready(ready_bytes: int): | ||||
|     """ | ||||
|     Callback from buffer when buffer has data available | ||||
| 
 | ||||
|     :param ready_bytes: The number of bytes ready to read | ||||
|     """ | ||||
|     global latest_buffer | ||||
| 
 | ||||
|     data = latest_buffer.read(ready_bytes) | ||||
|     data = data.decode("utf-8") | ||||
| 
 | ||||
|     RNS.log("Received data on the link: " + data) | ||||
| 
 | ||||
|     reply_message = "I received \""+data+"\" over the buffer" | ||||
|     reply_message = reply_message.encode("utf-8") | ||||
|     latest_buffer.write(reply_message) | ||||
|     latest_buffer.flush() | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
| ########################################################## | ||||
| #### Client Part ######################################### | ||||
| ########################################################## | ||||
| 
 | ||||
| # A reference to the server link | ||||
| server_link = None | ||||
| 
 | ||||
| # A reference to the buffer object, needed to share the | ||||
| # object from the link connected callback to the client | ||||
| # loop. | ||||
| buffer = None | ||||
| 
 | ||||
| # This initialisation is executed when the users chooses | ||||
| # to run as a client | ||||
| def client(destination_hexhash, configpath): | ||||
|     # We need a binary representation of the destination | ||||
|     # hash that was entered on the command line | ||||
|     try: | ||||
|         dest_len = (RNS.Reticulum.TRUNCATED_HASHLENGTH//8)*2 | ||||
|         if len(destination_hexhash) != dest_len: | ||||
|             raise ValueError( | ||||
|                 "Destination length is invalid, must be {hex} hexadecimal characters ({byte} bytes).".format(hex=dest_len, byte=dest_len//2) | ||||
|             ) | ||||
|              | ||||
|         destination_hash = bytes.fromhex(destination_hexhash) | ||||
|     except: | ||||
|         RNS.log("Invalid destination entered. Check your input!\n") | ||||
|         exit() | ||||
| 
 | ||||
|     # We must first initialise Reticulum | ||||
|     reticulum = RNS.Reticulum(configpath) | ||||
| 
 | ||||
|     # Check if we know a path to the destination | ||||
|     if not RNS.Transport.has_path(destination_hash): | ||||
|         RNS.log("Destination is not yet known. Requesting path and waiting for announce to arrive...") | ||||
|         RNS.Transport.request_path(destination_hash) | ||||
|         while not RNS.Transport.has_path(destination_hash): | ||||
|             time.sleep(0.1) | ||||
| 
 | ||||
|     # Recall the server identity | ||||
|     server_identity = RNS.Identity.recall(destination_hash) | ||||
| 
 | ||||
|     # Inform the user that we'll begin connecting | ||||
|     RNS.log("Establishing link with server...") | ||||
| 
 | ||||
|     # When the server identity is known, we set | ||||
|     # up a destination | ||||
|     server_destination = RNS.Destination( | ||||
|         server_identity, | ||||
|         RNS.Destination.OUT, | ||||
|         RNS.Destination.SINGLE, | ||||
|         APP_NAME, | ||||
|         "bufferexample" | ||||
|     ) | ||||
| 
 | ||||
|     # And create a link | ||||
|     link = RNS.Link(server_destination) | ||||
| 
 | ||||
|     # We'll also set up functions to inform the | ||||
|     # user when the link is established or closed | ||||
|     link.set_link_established_callback(link_established) | ||||
|     link.set_link_closed_callback(link_closed) | ||||
| 
 | ||||
|     # Everything is set up, so let's enter a loop | ||||
|     # for the user to interact with the example | ||||
|     client_loop() | ||||
| 
 | ||||
| def client_loop(): | ||||
|     global server_link | ||||
| 
 | ||||
|     # Wait for the link to become active | ||||
|     while not server_link: | ||||
|         time.sleep(0.1) | ||||
| 
 | ||||
|     should_quit = False | ||||
|     while not should_quit: | ||||
|         try: | ||||
|             print("> ", end=" ") | ||||
|             text = input() | ||||
| 
 | ||||
|             # Check if we should quit the example | ||||
|             if text == "quit" or text == "q" or text == "exit": | ||||
|                 should_quit = True | ||||
|                 server_link.teardown() | ||||
|             else: | ||||
|                 # Otherwise, encode the text and write it to the buffer. | ||||
|                 text = text.encode("utf-8") | ||||
|                 buffer.write(text) | ||||
|                 # Flush the buffer to force the data to be sent. | ||||
|                 buffer.flush() | ||||
| 
 | ||||
| 
 | ||||
|         except Exception as e: | ||||
|             RNS.log("Error while sending data over the link: "+str(e)) | ||||
|             should_quit = True | ||||
|             server_link.teardown() | ||||
| 
 | ||||
| # This function is called when a link | ||||
| # has been established with the server | ||||
| def link_established(link): | ||||
|     # We store a reference to the link | ||||
|     # instance for later use | ||||
|     global server_link, buffer | ||||
|     server_link = link | ||||
| 
 | ||||
|     # Create buffer, see server_client_connected() for | ||||
|     # more detail about setting up the buffer. | ||||
|     channel = link.get_channel() | ||||
|     buffer = RNS.Buffer.create_bidirectional_buffer(0, 0, channel, client_buffer_ready) | ||||
| 
 | ||||
|     # Inform the user that the server is | ||||
|     # connected | ||||
|     RNS.log("Link established with server, enter some text to send, or \"quit\" to quit") | ||||
| 
 | ||||
| # When a link is closed, we'll inform the | ||||
| # user, and exit the program | ||||
| def link_closed(link): | ||||
|     if link.teardown_reason == RNS.Link.TIMEOUT: | ||||
|         RNS.log("The link timed out, exiting now") | ||||
|     elif link.teardown_reason == RNS.Link.DESTINATION_CLOSED: | ||||
|         RNS.log("The link was closed by the server, exiting now") | ||||
|     else: | ||||
|         RNS.log("Link closed, exiting now") | ||||
|      | ||||
|     RNS.Reticulum.exit_handler() | ||||
|     time.sleep(1.5) | ||||
|     os._exit(0) | ||||
| 
 | ||||
| # When the buffer has new data, read it and write it to the terminal. | ||||
| def client_buffer_ready(ready_bytes: int): | ||||
|     global buffer | ||||
|     data = buffer.read(ready_bytes) | ||||
|     RNS.log("Received data on the link: " + data.decode("utf-8")) | ||||
|     print("> ", end=" ") | ||||
|     sys.stdout.flush() | ||||
| 
 | ||||
| 
 | ||||
| ########################################################## | ||||
| #### Program Startup ##################################### | ||||
| ########################################################## | ||||
| 
 | ||||
| # This part of the program runs at startup, | ||||
| # and parses input of from the user, and then | ||||
| # starts up the desired program mode. | ||||
| if __name__ == "__main__": | ||||
|     try: | ||||
|         parser = argparse.ArgumentParser(description="Simple buffer example") | ||||
| 
 | ||||
|         parser.add_argument( | ||||
|             "-s", | ||||
|             "--server", | ||||
|             action="store_true", | ||||
|             help="wait for incoming link requests from clients" | ||||
|         ) | ||||
| 
 | ||||
|         parser.add_argument( | ||||
|             "--config", | ||||
|             action="store", | ||||
|             default=None, | ||||
|             help="path to alternative Reticulum config directory", | ||||
|             type=str | ||||
|         ) | ||||
| 
 | ||||
|         parser.add_argument( | ||||
|             "destination", | ||||
|             nargs="?", | ||||
|             default=None, | ||||
|             help="hexadecimal hash of the server destination", | ||||
|             type=str | ||||
|         ) | ||||
| 
 | ||||
|         args = parser.parse_args() | ||||
| 
 | ||||
|         if args.config: | ||||
|             configarg = args.config | ||||
|         else: | ||||
|             configarg = None | ||||
| 
 | ||||
|         if args.server: | ||||
|             server(configarg) | ||||
|         else: | ||||
|             if (args.destination == None): | ||||
|                 print("") | ||||
|                 parser.print_help() | ||||
|                 print("") | ||||
|             else: | ||||
|                 client(args.destination, configarg) | ||||
| 
 | ||||
|     except KeyboardInterrupt: | ||||
|         print("") | ||||
|         exit() | ||||
| @ -243,11 +243,6 @@ def client(destination_hexhash, configpath): | ||||
|     # And create a link | ||||
|     link = RNS.Link(server_destination) | ||||
| 
 | ||||
|     # We set a callback that will get executed | ||||
|     # every time a packet is received over the | ||||
|     # link | ||||
|     link.set_packet_callback(client_message_received) | ||||
| 
 | ||||
|     # We'll also set up functions to inform the | ||||
|     # user when the link is established or closed | ||||
|     link.set_link_established_callback(link_established) | ||||
| @ -330,7 +325,7 @@ def link_closed(link): | ||||
|     time.sleep(1.5) | ||||
|     os._exit(0) | ||||
| 
 | ||||
| # When a packet is received over the link, we | ||||
| # When a packet is received over the channel, we | ||||
| # simply print out the data. | ||||
| def client_message_received(message): | ||||
|     if isinstance(message, StringMessage): | ||||
| @ -348,7 +343,7 @@ def client_message_received(message): | ||||
| # starts up the desired program mode. | ||||
| if __name__ == "__main__": | ||||
|     try: | ||||
|         parser = argparse.ArgumentParser(description="Simple link example") | ||||
|         parser = argparse.ArgumentParser(description="Simple channel example") | ||||
| 
 | ||||
|         parser.add_argument( | ||||
|             "-s", | ||||
|  | ||||
							
								
								
									
										305
									
								
								RNS/Buffer.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										305
									
								
								RNS/Buffer.py
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,305 @@ | ||||
| from __future__ import annotations | ||||
| import sys | ||||
| from threading import RLock | ||||
| from RNS.vendor import umsgpack | ||||
| from RNS.Channel import Channel, MessageBase, SystemMessageTypes | ||||
| import RNS | ||||
| from io import RawIOBase, BufferedRWPair, BufferedReader, BufferedWriter | ||||
| from typing import Callable | ||||
| from contextlib import AbstractContextManager | ||||
| 
 | ||||
| 
 | ||||
| class StreamDataMessage(MessageBase): | ||||
|     MSGTYPE = SystemMessageTypes.SMT_STREAM_DATA | ||||
|     """ | ||||
|     Message type for ``Channel``. ``StreamDataMessage`` | ||||
|     uses a system-reserved message type. | ||||
|     """ | ||||
| 
 | ||||
|     STREAM_ID_MAX = 65535 | ||||
|     """ | ||||
|     While not essential for the current message packing | ||||
|     method (umsgpack), the stream id is clamped to the | ||||
|     size of a UInt16 for future struct packing. | ||||
|     """ | ||||
| 
 | ||||
|     OVERHEAD = 0 | ||||
|     """ | ||||
|     The number of bytes used by this messa | ||||
|      | ||||
|     When the Buffer package is imported, this value is | ||||
|     calculated based on the value of RNS.Link.MDU. | ||||
|     """ | ||||
| 
 | ||||
|     MAX_DATA_LEN = 0 | ||||
|     """ | ||||
|     When the Buffer package is imported, this value is | ||||
|     calculcated based on the value of OVERHEAD | ||||
|     """ | ||||
| 
 | ||||
|     def __init__(self, stream_id: int = None, data: bytes = None, eof: bool = False): | ||||
|         """ | ||||
|         This class is used to encapsulate binary stream | ||||
|         data to be sent over a ``Channel``. | ||||
| 
 | ||||
|         :param stream_id: id of stream relative to receiver | ||||
|         :param data: binary data | ||||
|         :param eof: set to True if signalling End of File | ||||
|         """ | ||||
|         super().__init__() | ||||
|         if stream_id is not None and stream_id > self.STREAM_ID_MAX: | ||||
|             raise ValueError("stream_id must be 0-65535") | ||||
|         self.stream_id = stream_id | ||||
|         self.data = data or bytes() | ||||
|         self.eof = eof | ||||
| 
 | ||||
|     def pack(self) -> bytes: | ||||
|         if self.stream_id is None: | ||||
|             raise ValueError("stream_id") | ||||
|         return umsgpack.packb((self.stream_id, self.eof, bytes(self.data))) | ||||
| 
 | ||||
|     def unpack(self, raw): | ||||
|         self.stream_id, self.eof, self.data = umsgpack.unpackb(raw) | ||||
| 
 | ||||
| 
 | ||||
| _link_sized_bytes = ("\0"*RNS.Link.MDU).encode("utf-8") | ||||
| StreamDataMessage.OVERHEAD = len(StreamDataMessage(stream_id=StreamDataMessage.STREAM_ID_MAX, | ||||
|                                                    data=_link_sized_bytes, | ||||
|                                                    eof=True).pack()) - len(_link_sized_bytes) - 6  # 6 is envelope overhead | ||||
| StreamDataMessage.MAX_DATA_LEN = RNS.Link.MDU - StreamDataMessage.OVERHEAD | ||||
| _link_sized_bytes = None | ||||
| 
 | ||||
| 
 | ||||
| class RawChannelReader(RawIOBase, AbstractContextManager): | ||||
|     """ | ||||
|     An implementation of RawIOBase that receives | ||||
|     binary stream data sent over a ``Channel``. | ||||
| 
 | ||||
|       This class generally need not be instantiated directly. | ||||
|       Use :func:`RNS.Buffer.create_reader`, | ||||
|       :func:`RNS.Buffer.create_writer`, and | ||||
|       :func:`RNS.Buffer.create_bidirectional_buffer` functions | ||||
|       to create buffered streams with optional callbacks. | ||||
| 
 | ||||
|       For additional information on the API of this | ||||
|       object, see the Python documentation for | ||||
|       ``RawIOBase``. | ||||
|     """ | ||||
|     def __init__(self, stream_id: int, channel: Channel): | ||||
|         """ | ||||
|         Create a raw channel reader. | ||||
| 
 | ||||
|         :param stream_id: local stream id to receive at | ||||
|         :param channel: ``Channel`` object to receive from | ||||
|         """ | ||||
|         self._stream_id = stream_id | ||||
|         self._channel = channel | ||||
|         self._lock = RLock() | ||||
|         self._buffer = bytearray() | ||||
|         self._eof = False | ||||
|         self._channel._register_message_type(StreamDataMessage, is_system_type=True) | ||||
|         self._channel.add_message_handler(self._handle_message) | ||||
|         self._listeners: [Callable[[int], None]] = [] | ||||
| 
 | ||||
|     def add_ready_callback(self, cb: Callable[[int], None]): | ||||
|         """ | ||||
|         Add a function to be called when new data is available. | ||||
|         The function should have the signature ``(ready_bytes: int) -> None`` | ||||
| 
 | ||||
|         :param cb: function to call | ||||
|         """ | ||||
|         with self._lock: | ||||
|             self._listeners.append(cb) | ||||
| 
 | ||||
|     def remove_ready_callback(self, cb: Callable[[int], None]): | ||||
|         """ | ||||
|         Remove a function added with :func:`RNS.RawChannelReader.add_ready_callback()` | ||||
| 
 | ||||
|         :param cb: function to remove | ||||
|         """ | ||||
|         with self._lock: | ||||
|             self._listeners.remove(cb) | ||||
| 
 | ||||
|     def _handle_message(self, message: MessageBase): | ||||
|         if isinstance(message, StreamDataMessage): | ||||
|             if message.stream_id == self._stream_id: | ||||
|                 with self._lock: | ||||
|                     if message.data is not None: | ||||
|                         self._buffer.extend(message.data) | ||||
|                     if message.eof: | ||||
|                         self._eof = True | ||||
|                     for listener in self._listeners: | ||||
|                         try: | ||||
|                             listener(len(self._buffer)) | ||||
|                         except Exception as ex: | ||||
|                             RNS.log("Error calling RawChannelReader(" + str(self._stream_id) + ") callback: " + str(ex)) | ||||
|                     return True | ||||
|         return False | ||||
| 
 | ||||
|     def _read(self, __size: int) -> bytes | None: | ||||
|         with self._lock: | ||||
|             result = self._buffer[:__size] | ||||
|             self._buffer = self._buffer[__size:] | ||||
|             return result if len(result) > 0 or self._eof else None | ||||
| 
 | ||||
|     def readinto(self, __buffer: bytearray) -> int | None: | ||||
|         ready = self._read(len(__buffer)) | ||||
|         if ready: | ||||
|             __buffer[:len(ready)] = ready | ||||
|         return len(ready) if ready else None | ||||
| 
 | ||||
|     def writable(self) -> bool: | ||||
|         return False | ||||
| 
 | ||||
|     def seekable(self) -> bool: | ||||
|         return False | ||||
| 
 | ||||
|     def readable(self) -> bool: | ||||
|         return True | ||||
| 
 | ||||
|     def close(self): | ||||
|         with self._lock: | ||||
|             self._channel.remove_message_handler(self._handle_message) | ||||
|             self._listeners.clear() | ||||
| 
 | ||||
|     def __enter__(self): | ||||
|         return self | ||||
| 
 | ||||
|     def __exit__(self, exc_type, exc_val, exc_tb): | ||||
|         self.close() | ||||
|         return False | ||||
| 
 | ||||
| 
 | ||||
| class RawChannelWriter(RawIOBase, AbstractContextManager): | ||||
|     """ | ||||
|     An implementation of RawIOBase that receives | ||||
|     binary stream data sent over a channel. | ||||
| 
 | ||||
|       This class generally need not be instantiated directly. | ||||
|       Use :func:`RNS.Buffer.create_reader`, | ||||
|       :func:`RNS.Buffer.create_writer`, and | ||||
|       :func:`RNS.Buffer.create_bidirectional_buffer` functions | ||||
|       to create buffered streams with optional callbacks. | ||||
| 
 | ||||
|       For additional information on the API of this | ||||
|       object, see the Python documentation for | ||||
|       ``RawIOBase``. | ||||
|     """ | ||||
|     def __init__(self, stream_id: int, channel: Channel): | ||||
|         """ | ||||
|         Create a raw channel writer. | ||||
| 
 | ||||
|         :param stream_id: remote stream id to sent do | ||||
|         :param channel: ``Channel`` object to send on | ||||
|         """ | ||||
|         self._stream_id = stream_id | ||||
|         self._channel = channel | ||||
|         self._eof = False | ||||
| 
 | ||||
|     def write(self, __b: bytes) -> int | None: | ||||
|         try: | ||||
|             if self._channel.is_ready_to_send(): | ||||
|                 chunk = __b[:StreamDataMessage.MAX_DATA_LEN] | ||||
|                 message = StreamDataMessage(self._stream_id, chunk, self._eof) | ||||
|                 self._channel.send(message) | ||||
|                 return len(chunk) | ||||
|         except RNS.Channel.ChannelException as cex: | ||||
|             if cex.type != RNS.Channel.CEType.ME_LINK_NOT_READY: | ||||
|                 raise | ||||
|         return 0 | ||||
| 
 | ||||
|     def close(self): | ||||
|         self._eof = True | ||||
|         self.write(bytes()) | ||||
| 
 | ||||
|     def __enter__(self): | ||||
|         return self | ||||
| 
 | ||||
|     def __exit__(self, exc_type, exc_val, exc_tb): | ||||
|         self.close() | ||||
|         return False | ||||
| 
 | ||||
|     def seekable(self) -> bool: | ||||
|         return False | ||||
| 
 | ||||
|     def readable(self) -> bool: | ||||
|         return False | ||||
| 
 | ||||
|     def writable(self) -> bool: | ||||
|         return True | ||||
| 
 | ||||
| class Buffer: | ||||
|     """ | ||||
|     Static functions for creating buffered streams that send | ||||
|     and receive over a ``Channel``. | ||||
| 
 | ||||
|     These functions use ``BufferedReader``, ``BufferedWriter``, | ||||
|     and ``BufferedRWPair`` to add buffering to | ||||
|     ``RawChannelReader`` and ``RawChannelWriter``. | ||||
|     """ | ||||
|     @staticmethod | ||||
|     def create_reader(stream_id: int, channel: Channel, | ||||
|                       ready_callback: Callable[[int], None] | None = None) -> BufferedReader: | ||||
|         """ | ||||
|         Create a buffered reader that reads binary data sent | ||||
|         over a ``Channel``, with an optional callback when | ||||
|         new data is available. | ||||
| 
 | ||||
|         Callback signature: ``(ready_bytes: int) -> None`` | ||||
| 
 | ||||
|         For more information on the reader-specific functions | ||||
|         of this object, see the Python documentation for | ||||
|         ``BufferedReader`` | ||||
| 
 | ||||
|         :param stream_id: the local stream id to receive from | ||||
|         :param channel: the channel to receive on | ||||
|         :param ready_callback: function to call when new data is available | ||||
|         :return: a BufferedReader object | ||||
|         """ | ||||
|         reader = RawChannelReader(stream_id, channel) | ||||
|         if ready_callback: | ||||
|             reader.add_ready_callback(ready_callback) | ||||
|         return BufferedReader(reader) | ||||
| 
 | ||||
|     @staticmethod | ||||
|     def create_writer(stream_id: int, channel: Channel) -> BufferedWriter: | ||||
|         """ | ||||
|         Create a buffered writer that writes binary data over | ||||
|         a ``Channel``. | ||||
| 
 | ||||
|         For more information on the writer-specific functions | ||||
|         of this object, see the Python documentation for | ||||
|         ``BufferedWriter`` | ||||
| 
 | ||||
|         :param stream_id: the remote stream id to send to | ||||
|         :param channel: the channel to send on | ||||
|         :return: a BufferedWriter object | ||||
|         """ | ||||
|         writer = RawChannelWriter(stream_id, channel) | ||||
|         return BufferedWriter(writer) | ||||
| 
 | ||||
|     @staticmethod | ||||
|     def create_bidirectional_buffer(receive_stream_id: int, send_stream_id: int, channel: Channel, | ||||
|                                     ready_callback: Callable[[int], None] | None = None) -> BufferedRWPair: | ||||
|         """ | ||||
|         Create a buffered reader/writer pair that reads and | ||||
|         writes binary data over a ``Channel``, with an | ||||
|         optional callback when new data is available. | ||||
| 
 | ||||
|         Callback signature: ``(ready_bytes: int) -> None`` | ||||
| 
 | ||||
|         For more information on the reader-specific functions | ||||
|         of this object, see the Python documentation for | ||||
|         ``BufferedRWPair`` | ||||
| 
 | ||||
|         :param receive_stream_id: the local stream id to receive at | ||||
|         :param send_stream_id:  the remote stream id to send to | ||||
|         :param channel: the channel to send and receive on | ||||
|         :param ready_callback: function to call when new data is available | ||||
|         :return: a BufferedRWPair object | ||||
|         """ | ||||
|         reader = RawChannelReader(receive_stream_id, channel) | ||||
|         if ready_callback: | ||||
|             reader.add_ready_callback(ready_callback) | ||||
|         writer = RawChannelWriter(send_stream_id, channel) | ||||
|         return BufferedRWPair(reader, writer) | ||||
| @ -34,6 +34,8 @@ import RNS | ||||
| from abc import ABC, abstractmethod | ||||
| TPacket = TypeVar("TPacket") | ||||
| 
 | ||||
| class SystemMessageTypes(enum.IntEnum): | ||||
|     SMT_STREAM_DATA = 0xff00 | ||||
| 
 | ||||
| class ChannelOutletBase(ABC, Generic[TPacket]): | ||||
|     """ | ||||
|  | ||||
| @ -33,6 +33,7 @@ from .Reticulum import Reticulum | ||||
| from .Identity import Identity | ||||
| from .Link import Link, RequestReceipt | ||||
| from .Channel import MessageBase | ||||
| from .Buffer import Buffer, RawChannelReader, RawChannelWriter | ||||
| from .Transport import Transport | ||||
| from .Destination import Destination | ||||
| from .Packet import Packet | ||||
|  | ||||
| @ -149,6 +149,48 @@ This chapter lists and explains all classes exposed by the Reticulum Network Sta | ||||
| .. autoclass:: RNS.MessageBase() | ||||
|    :members: | ||||
| 
 | ||||
| .. _api-buffer: | ||||
| 
 | ||||
| .. only:: html | ||||
| 
 | ||||
|    |start-h3| Buffer |end-h3| | ||||
| 
 | ||||
| .. only:: latex | ||||
| 
 | ||||
|    Buffer | ||||
|    ------ | ||||
| 
 | ||||
| .. autoclass:: RNS.Buffer | ||||
|    :members: | ||||
| 
 | ||||
| .. _api-rawchannelreader: | ||||
| 
 | ||||
| .. only:: html | ||||
| 
 | ||||
|    |start-h3| RawChannelReader |end-h3| | ||||
| 
 | ||||
| .. only:: latex | ||||
| 
 | ||||
|    RawChannelReader | ||||
|    ---------------- | ||||
| 
 | ||||
| .. autoclass:: RNS.RawChannelReader | ||||
|    :members: __init__, add_ready_callback, remove_ready_callback | ||||
| 
 | ||||
| .. _api-rawchannelwriter: | ||||
| 
 | ||||
| .. only:: html | ||||
| 
 | ||||
|    |start-h3| RawChannelWriter |end-h3| | ||||
| 
 | ||||
| .. only:: latex | ||||
| 
 | ||||
|    RawChannelWriter | ||||
|    ---------------- | ||||
| 
 | ||||
| .. autoclass:: RNS.RawChannelWriter | ||||
|    :members: __init__ | ||||
| 
 | ||||
| .. _api-transport: | ||||
| 
 | ||||
| .. only:: html | ||||
|  | ||||
							
								
								
									
										115
									
								
								tests/channel.py
									
									
									
									
									
								
							
							
						
						
									
										115
									
								
								tests/channel.py
									
									
									
									
									
								
							| @ -2,6 +2,7 @@ from __future__ import annotations | ||||
| import threading | ||||
| import RNS | ||||
| from RNS.Channel import MessageState, ChannelOutletBase, Channel, MessageBase | ||||
| import RNS.Buffer | ||||
| from RNS.vendor import umsgpack | ||||
| from typing import Callable | ||||
| import contextlib | ||||
| @ -91,17 +92,20 @@ class ChannelOutletTest(ChannelOutletBase): | ||||
|         self._rtt = rtt | ||||
|         self._usable = True | ||||
|         self.packets = [] | ||||
|         self.lock = threading.RLock() | ||||
|         self.packet_callback: Callable[[ChannelOutletBase, bytes], None] | None = None | ||||
| 
 | ||||
|     def send(self, raw: bytes) -> Packet: | ||||
|         packet = Packet(raw) | ||||
|         packet.send() | ||||
|         self.packets.append(packet) | ||||
|         return packet | ||||
|         with self.lock: | ||||
|             packet = Packet(raw) | ||||
|             packet.send() | ||||
|             self.packets.append(packet) | ||||
|             return packet | ||||
| 
 | ||||
|     def resend(self, packet: Packet) -> Packet: | ||||
|         packet.send() | ||||
|         return packet | ||||
|         with self.lock: | ||||
|             packet.send() | ||||
|             return packet | ||||
| 
 | ||||
|     @property | ||||
|     def mdu(self): | ||||
| @ -370,6 +374,105 @@ class TestChannel(unittest.TestCase): | ||||
| 
 | ||||
|         self.eat_own_dog_food(message, check) | ||||
| 
 | ||||
|     def test_buffer_small_bidirectional(self): | ||||
|         data = "Hello\n" | ||||
|         with RNS.Buffer.create_bidirectional_buffer(0, 0, self.h.channel) as buffer: | ||||
|             count = buffer.write(data.encode("utf-8")) | ||||
|             buffer.flush() | ||||
| 
 | ||||
|             self.assertEqual(len(data), count) | ||||
|             self.assertEqual(1, len(self.h.outlet.packets)) | ||||
| 
 | ||||
|             packet = self.h.outlet.packets[0] | ||||
|             self.h.channel._receive(packet.raw) | ||||
|             result = buffer.readline() | ||||
| 
 | ||||
|             self.assertIsNotNone(result) | ||||
|             self.assertEqual(len(result), len(data)) | ||||
| 
 | ||||
|             decoded = result.decode("utf-8") | ||||
| 
 | ||||
|             self.assertEqual(data, decoded) | ||||
| 
 | ||||
|     def test_buffer_big(self): | ||||
|         writer = RNS.Buffer.create_writer(15, self.h.channel) | ||||
|         reader = RNS.Buffer.create_reader(15, self.h.channel) | ||||
|         data = "01234556789"*1024  # 10 KB | ||||
|         count = 0 | ||||
|         write_finished = False | ||||
| 
 | ||||
|         def write_thread(): | ||||
|             nonlocal count, write_finished | ||||
|             count = writer.write(data.encode("utf-8")) | ||||
|             writer.flush() | ||||
|             writer.close() | ||||
|             write_finished = True | ||||
|         threading.Thread(target=write_thread, name="Write Thread", daemon=True).start() | ||||
| 
 | ||||
|         while not write_finished or next(filter(lambda x: x.state != MessageState.MSGSTATE_DELIVERED, | ||||
|                                                 self.h.outlet.packets), None) is not None: | ||||
|             with self.h.outlet.lock: | ||||
|                 for packet in self.h.outlet.packets: | ||||
|                     if packet.state != MessageState.MSGSTATE_DELIVERED: | ||||
|                         self.h.channel._receive(packet.raw) | ||||
|                         packet.delivered() | ||||
|             time.sleep(0.0001) | ||||
| 
 | ||||
|         self.assertEqual(len(data), count) | ||||
| 
 | ||||
|         read_finished = False | ||||
|         result = bytes() | ||||
| 
 | ||||
|         def read_thread(): | ||||
|             nonlocal read_finished, result | ||||
|             result = reader.read() | ||||
|             read_finished = True | ||||
|         threading.Thread(target=read_thread, name="Read Thread", daemon=True).start() | ||||
| 
 | ||||
|         timeout_at = time.time() + 7 | ||||
|         while not read_finished and time.time() < timeout_at: | ||||
|             time.sleep(0.001) | ||||
| 
 | ||||
|         self.assertTrue(read_finished) | ||||
|         self.assertEqual(len(data), len(result)) | ||||
| 
 | ||||
|         decoded = result.decode("utf-8") | ||||
| 
 | ||||
|         self.assertSequenceEqual(data, decoded) | ||||
| 
 | ||||
|     def test_buffer_small_with_callback(self): | ||||
|         callbacks = 0 | ||||
|         last_cb_value = None | ||||
| 
 | ||||
|         def callback(ready: int): | ||||
|             nonlocal callbacks, last_cb_value | ||||
|             callbacks += 1 | ||||
|             last_cb_value = ready | ||||
| 
 | ||||
|         data = "Hello\n" | ||||
|         with RNS.RawChannelWriter(0, self.h.channel) as writer, RNS.RawChannelReader(0, self.h.channel) as reader: | ||||
|             reader.add_ready_callback(callback) | ||||
|             count = writer.write(data.encode("utf-8")) | ||||
|             writer.flush() | ||||
| 
 | ||||
|             self.assertEqual(len(data), count) | ||||
|             self.assertEqual(1, len(self.h.outlet.packets)) | ||||
| 
 | ||||
|             packet = self.h.outlet.packets[0] | ||||
|             self.h.channel._receive(packet.raw) | ||||
| 
 | ||||
|             self.assertEqual(1, callbacks) | ||||
|             self.assertEqual(len(data), last_cb_value) | ||||
| 
 | ||||
|             result = reader.readline() | ||||
| 
 | ||||
|             self.assertIsNotNone(result) | ||||
|             self.assertEqual(len(result), len(data)) | ||||
| 
 | ||||
|             decoded = result.decode("utf-8") | ||||
| 
 | ||||
|             self.assertEqual(data, decoded) | ||||
| 
 | ||||
| 
 | ||||
| if __name__ == '__main__': | ||||
|     unittest.main(verbosity=2) | ||||
|  | ||||
| @ -396,6 +396,48 @@ class TestLink(unittest.TestCase): | ||||
|         self.assertEqual(l1.status, RNS.Link.CLOSED) | ||||
|         self.assertEqual(0, len(l1._channel._rx_ring)) | ||||
| 
 | ||||
|     def test_11_buffer_round_trip(self): | ||||
|         global c_rns | ||||
|         init_rns(self) | ||||
|         print("") | ||||
|         print("Buffer round trip test") | ||||
| 
 | ||||
|         # TODO: Load this from public bytes only | ||||
|         id1 = RNS.Identity.from_bytes(bytes.fromhex(fixed_keys[0][0])) | ||||
|         self.assertEqual(id1.hash, bytes.fromhex(fixed_keys[0][1])) | ||||
| 
 | ||||
|         dest = RNS.Destination(id1, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "link", "establish") | ||||
| 
 | ||||
|         self.assertEqual(dest.hash, bytes.fromhex("fb48da0e82e6e01ba0c014513f74540d")) | ||||
| 
 | ||||
|         l1 = RNS.Link(dest) | ||||
|         time.sleep(1) | ||||
|         self.assertEqual(l1.status, RNS.Link.ACTIVE) | ||||
|         buffer = None | ||||
| 
 | ||||
|         received = [] | ||||
|         def handle_data(ready_bytes: int): | ||||
|             data = buffer.read(ready_bytes) | ||||
|             received.append(data) | ||||
| 
 | ||||
|         channel = l1.get_channel() | ||||
|         buffer = RNS.Buffer.create_bidirectional_buffer(0, 0, channel, handle_data) | ||||
| 
 | ||||
|         buffer.write("Hi there".encode("utf-8")) | ||||
|         buffer.flush() | ||||
| 
 | ||||
|         time.sleep(0.5) | ||||
| 
 | ||||
|         self.assertEqual(1 , len(received)) | ||||
| 
 | ||||
|         rx_message = received[0].decode("utf-8") | ||||
| 
 | ||||
|         self.assertEqual("Hi there back at you", rx_message) | ||||
| 
 | ||||
|         l1.teardown() | ||||
|         time.sleep(0.5) | ||||
|         self.assertEqual(l1.status, RNS.Link.CLOSED) | ||||
| 
 | ||||
| 
 | ||||
|     def size_str(self, num, suffix='B'): | ||||
|         units = ['','K','M','G','T','P','E','Z'] | ||||
| @ -462,6 +504,15 @@ def targets(yp=False): | ||||
|         channel.register_message_type(MessageTest) | ||||
|         channel.add_message_handler(handle_message) | ||||
| 
 | ||||
|         buffer = None | ||||
| 
 | ||||
|         def handle_buffer(ready_bytes: int): | ||||
|             data = buffer.read(ready_bytes) | ||||
|             buffer.write((data.decode("utf-8") + " back at you").encode("utf-8")) | ||||
|             buffer.flush() | ||||
| 
 | ||||
|         buffer = RNS.Buffer.create_bidirectional_buffer(0, 0, channel, handle_buffer) | ||||
| 
 | ||||
|     m_rns = RNS.Reticulum("./tests/rnsconfig") | ||||
|     id1 = RNS.Identity.from_bytes(bytes.fromhex(fixed_keys[0][0])) | ||||
|     d1 = RNS.Destination(id1, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, "link", "establish") | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user