From 3157ff3cbb038b215efa1b843b1fb4d8d6cbad6a Mon Sep 17 00:00:00 2001 From: John Doty Date: Sat, 8 Oct 2022 20:00:44 +0000 Subject: [PATCH] Start writing I guess --- src/connection.rs | 34 ++++++++++++++++++++++++++++++---- 1 file changed, 30 insertions(+), 4 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index 9521c7d..87d62a4 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -5,15 +5,31 @@ use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use tokio::net::TcpStream; use tokio::sync::mpsc; +const MAX_PACKET: usize = u16::max_value() as usize; + /// Read from a socket and convert the reads into Messages to put into the -/// queue until the socket is closed for reading or an error occurs. +/// queue until the socket is closed for reading or an error occurs. We read +/// at most 2^16-1 bytes at a time, accepting the overhead of multiple reads +/// to keep one writer from clogging the pipe for everybody else. Each read +/// is converted into a [Message::Data] message that is sent to the `writer` +/// channel. +/// +/// Once we're done reading (either because of a connection error or a clean +/// shutdown) we send a [Message::Close] message on the channel before +/// returning. +/// +/// # Errors +/// If an error occurs reading from `read` we return [Error::IO]. If the +/// message channel is closed before we can send to it then we return +/// [Error::ConnectionReset]. +/// async fn connection_read( channel: u64, read: &mut T, writer: &mut mpsc::Sender, ) -> Result<(), Error> { let result = loop { - let mut buffer = BytesMut::with_capacity(64 * 1024); + let mut buffer = BytesMut::with_capacity(MAX_PACKET); if let Err(e) = read.read_buf(&mut buffer).await { break Err(Error::IO(e)); } @@ -37,7 +53,11 @@ async fn connection_read( } /// Get messages from a queue and write them out to a socket until there are -/// no more messages in the queue or the write breaks for some reason. +/// no more messages in the queue or a write fails for some reason. +/// +/// # Errors +/// If a write fails this returns `Error::IO`. +/// async fn connection_write( data: &mut mpsc::Receiver, write: &mut T, @@ -51,7 +71,13 @@ async fn connection_write( } /// Handle a connection, from the socket to the multiplexer and from the -/// multiplexer to the socket. +/// multiplexer to the socket. Keeps running until both the read and write +/// side are closed. In natural circumstances, we expect the write side to +/// close when the `data` sender is dropped from the connection table (see +/// `ConnectionTable`), and we expect the read side to close when the +/// socket's read half closes (which will cause a `Close` to be sent which +/// should drop the `data` sender on the other side, etc...). +/// pub async fn process( channel: u64, stream: &mut TcpStream,