From 01ef65c78738fbcca0d20498ad240a9a2b03afdb Mon Sep 17 00:00:00 2001 From: John Doty Date: Sat, 8 Oct 2022 16:28:41 +0000 Subject: [PATCH] Split out connection --- src/connection.rs | 76 +++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 83 +++-------------------------------------------- 2 files changed, 81 insertions(+), 78 deletions(-) create mode 100644 src/connection.rs diff --git a/src/connection.rs b/src/connection.rs new file mode 100644 index 0000000..9521c7d --- /dev/null +++ b/src/connection.rs @@ -0,0 +1,76 @@ +use crate::message::Message; +use crate::Error; +use bytes::{Bytes, BytesMut}; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use tokio::net::TcpStream; +use tokio::sync::mpsc; + +/// Read from a socket and convert the reads into Messages to put into the +/// queue until the socket is closed for reading or an error occurs. +async fn connection_read( + channel: u64, + read: &mut T, + writer: &mut mpsc::Sender, +) -> Result<(), Error> { + let result = loop { + let mut buffer = BytesMut::with_capacity(64 * 1024); + if let Err(e) = read.read_buf(&mut buffer).await { + break Err(Error::IO(e)); + } + if buffer.len() == 0 { + break Ok(()); + } + + if let Err(_) = writer.send(Message::Data(channel, buffer.into())).await { + break Err(Error::ConnectionReset); + } + + // TODO: Flow control here, wait for the packet to be acknowleged so + // there isn't head-of-line blocking or infinite bufferingon the + // remote side. Also buffer re-use! + }; + + // We are effectively closed on this side, send the close to drop the + // corresponding write side on the other end of the pipe. + _ = writer.send(Message::Close(channel)).await; + return result; +} + +/// Get messages from a queue and write them out to a socket until there are +/// no more messages in the queue or the write breaks for some reason. +async fn connection_write( + data: &mut mpsc::Receiver, + write: &mut T, +) -> Result<(), Error> { + while let Some(buf) = data.recv().await { + if let Err(e) = write.write_all(&buf[..]).await { + return Err(Error::IO(e)); + } + } + Ok(()) +} + +/// Handle a connection, from the socket to the multiplexer and from the +/// multiplexer to the socket. +pub async fn process( + channel: u64, + stream: &mut TcpStream, + data: &mut mpsc::Receiver, + writer: &mut mpsc::Sender, +) { + let (mut read_half, mut write_half) = stream.split(); + + let read = connection_read(channel, &mut read_half, writer); + let write = connection_write(data, &mut write_half); + + tokio::pin!(read); + tokio::pin!(write); + + let (mut done_reading, mut done_writing) = (false, false); + while !(done_reading && done_writing) { + tokio::select! { + _ = &mut read, if !done_reading => { done_reading = true; }, + _ = &mut write, if !done_writing => { done_writing = true;}, + } + } +} diff --git a/src/lib.rs b/src/lib.rs index e0c5b05..7d78d8e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,5 @@ -use bytes::{Bytes, BytesMut}; +use bytes::Bytes; +use message::{Message, MessageReader, MessageWriter}; use std::collections::HashMap; use std::net::{Ipv4Addr, SocketAddrV4}; use std::sync::{Arc, Mutex}; @@ -8,11 +9,10 @@ use tokio::process; use tokio::sync::mpsc; use tokio::sync::oneshot; +mod connection; mod message; mod refresh; -use message::{Message, MessageReader, MessageWriter}; - #[derive(Debug)] pub enum Error { Protocol, @@ -70,79 +70,6 @@ async fn pump_write( Ok(()) } -// ---------------------------------------------------------------------------- -// Connection - -/// Read from a socket and convert the reads into Messages to put into the -/// queue until the socket is closed for reading or an error occurs. -async fn connection_read( - channel: u64, - read: &mut T, - writer: &mut mpsc::Sender, -) -> Result<(), Error> { - let result = loop { - let mut buffer = BytesMut::with_capacity(64 * 1024); - if let Err(e) = read.read_buf(&mut buffer).await { - break Err(Error::IO(e)); - } - if buffer.len() == 0 { - break Ok(()); - } - - if let Err(_) = writer.send(Message::Data(channel, buffer.into())).await { - break Err(Error::ConnectionReset); - } - - // TODO: Flow control here, wait for the packet to be acknowleged so - // there isn't head-of-line blocking or infinite bufferingon the - // remote side. Also buffer re-use! - }; - - // We are effectively closed on this side, send the close to drop the - // corresponding write side on the other end of the pipe. - _ = writer.send(Message::Close(channel)).await; - return result; -} - -/// Get messages from a queue and write them out to a socket until there are -/// no more messages in the queue or the write breaks for some reason. -async fn connection_write( - data: &mut mpsc::Receiver, - write: &mut T, -) -> Result<(), Error> { - while let Some(buf) = data.recv().await { - if let Err(e) = write.write_all(&buf[..]).await { - return Err(Error::IO(e)); - } - } - Ok(()) -} - -/// Handle a connection, from the socket to the multiplexer and from the -/// multiplexer to the socket. -async fn connection_process( - channel: u64, - stream: &mut TcpStream, - data: &mut mpsc::Receiver, - writer: &mut mpsc::Sender, -) { - let (mut read_half, mut write_half) = stream.split(); - - let read = connection_read(channel, &mut read_half, writer); - let write = connection_write(data, &mut write_half); - - tokio::pin!(read); - tokio::pin!(write); - - let (mut done_reading, mut done_writing) = (false, false); - while !(done_reading && done_writing) { - tokio::select! { - _ = &mut read, if !done_reading => { done_reading = true; }, - _ = &mut write, if !done_writing => { done_writing = true;}, - } - } -} - // ---------------------------------------------------------------------------- // Server @@ -200,7 +127,7 @@ async fn server_handle_connection( connections.add(channel, send_data); if let Ok(_) = writer.send(Message::Connected(channel)).await { let mut writer = writer.clone(); - connection_process(channel, &mut stream, &mut data, &mut writer).await; + connection::process(channel, &mut stream, &mut data, &mut writer).await; eprintln!("< Done server!"); } @@ -417,7 +344,7 @@ async fn client_handle_connection( if let Ok(_) = writer.send(Message::Connect(channel, port)).await { if let Ok(_) = connected.await { let mut writer = writer.clone(); - connection_process(channel, socket, &mut data, &mut writer).await; + connection::process(channel, socket, &mut data, &mut writer).await; eprintln!("> Done client!"); } else {