From 20412a4815c66584ac1b81913cd486a8bc1e2005 Mon Sep 17 00:00:00 2001 From: John Doty Date: Sun, 16 Oct 2022 09:40:42 -0700 Subject: [PATCH] Radical simplification Honestly the new design is so much better --- src/lib.rs | 204 +++++++++++++++-------------------------------------- 1 file changed, 58 insertions(+), 146 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index c4dcbaf..2cbeb99 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,68 +15,9 @@ mod message; mod refresh; mod ui; -// ---------------------------------------------------------------------------- -// Write Management - -/// Gathers writes from an mpsc queue and writes them to the specified -/// writer. -/// -/// This is kind of an odd function. It raises a lot of questions. -/// -/// *Why can't this just be a wrapper function on top of MessageWriter that -/// everybody calls?* Well, we could do that, but we also need to synchronize -/// writes to the underlying stream. -/// -/// *Why not use an async mutex?* Because this function has a nice side -/// benefit: if it ever quits, we're *either* doing an orderly shutdown -/// (because the last write end of this channel closed) *or* the remote -/// connection has closed. [client_main] uses this fact to its advantage to -/// detect when the connection has failed. -async fn pump_write( - messages: &mut mpsc::Receiver, - writer: &mut MessageWriter, -) -> Result<()> { - while let Some(msg) = messages.recv().await { - writer.write(msg).await?; - } - Ok(()) -} - // ---------------------------------------------------------------------------- // Server -async fn server_handle_messages( - reader: &mut MessageReader, - writer: mpsc::Sender, -) -> Result<()> { - // info!("< Processing packets..."); - loop { - let message = reader.read().await?; - - use Message::*; - match message { - Ping => (), - Refresh => { - let writer = writer.clone(); - tokio::spawn(async move { - let ports = match refresh::get_entries() { - Ok(ports) => ports, - Err(_e) => { - error!("< Error scanning: {:?}", _e); - vec![] - } - }; - if let Err(_e) = writer.send(Message::Ports(ports)).await { - // Writer has been closed for some reason, we can just quit.... I hope everything is OK? - warn!("< Warning: Error sending: {:?}", _e); - } - }); - } - _ => panic!("Unsupported: {:?}", message), - }; - } -} - async fn server_main( reader: &mut MessageReader, writer: &mut MessageWriter, @@ -84,35 +25,48 @@ async fn server_main( // The first message we send must be an announcement. writer.write(Message::Hello(0, 1, vec![])).await?; - // Jump into it... - let (msg_sender, mut msg_receiver) = mpsc::channel(32); - let writing = pump_write(&mut msg_receiver, writer); - let reading = server_handle_messages(reader, msg_sender); - tokio::pin!(reading); - tokio::pin!(writing); - - let (mut done_writing, mut done_reading) = (false, false); loop { - tokio::select! { - result = &mut writing, if !done_writing => { - done_writing = true; - if let Err(e) = result { - return Err(e); + use Message::*; + match reader.read().await? { + Ping => (), + Refresh => { + let ports = match refresh::get_entries() { + Ok(ports) => ports, + Err(e) => { + error!("Error scanning: {:?}", e); + vec![] + } + }; + if let Err(e) = writer.write(Message::Ports(ports)).await { + // Writer has been closed for some reason, we can just + // quit.... I hope everything is OK? + warn!("Warning: Error sending: {:?}", e); } - if done_reading && done_writing { - return Ok(()); - } - }, - result = &mut reading, if !done_reading => { - done_reading = true; - if let Err(e) = result { - return Err(e); - } - if done_reading && done_writing { - return Ok(()); - } - }, - } + } + message => panic!("Unsupported: {:?}", message), + }; + } +} + +pub async fn run_server() { + let reader = BufReader::new(tokio::io::stdin()); + let mut writer = BufWriter::new(tokio::io::stdout()); + + // Write the 8-byte synchronization marker. + writer + .write_u64(0x00_00_00_00_00_00_00_00) + .await + .expect("Error writing marker"); + + if let Err(e) = writer.flush().await { + eprintln!("Error writing sync marker: {:?}", e); + return; + } + + let mut writer = MessageWriter::new(writer); + let mut reader = MessageReader::new(reader); + if let Err(e) = server_main(&mut reader, &mut writer).await { + eprintln!("Error: {:?}", e); } } @@ -349,71 +303,29 @@ async fn client_main( // And now really get into it... _ = events.send(ui::UIEvent::Connected(socks_port)).await; - let (msg_sender, mut msg_receiver) = mpsc::channel(32); - let writing = pump_write(&mut msg_receiver, writer); - let reading = client_handle_messages(reader, events); - tokio::pin!(reading); - tokio::pin!(writing); - - let (mut done_writing, mut done_reading) = (false, false); - while !(done_reading && done_writing) { - tokio::select! { - result = async { - loop { - use tokio::time::{sleep, Duration}; - if let Err(e) = msg_sender.send(Message::Refresh).await { - break Err::<(), _>(e); - } - sleep(Duration::from_millis(500)).await; + tokio::select! { + result = async { + loop { + use tokio::time::{sleep, Duration}; + if let Err(e) = writer.write(Message::Refresh).await { + break Err::<(), _>(e); } - }, if !done_writing => { - if let Err(e) = result { - return Err(e.into()); - } - }, - result = &mut writing, if !done_writing => { - done_writing = true; - if let Err(e) = result { - return Err(e); - } - }, - result = &mut reading, if !done_reading => { - done_reading = true; - if let Err(e) = result { - return Err(e); - } - }, - } + sleep(Duration::from_millis(500)).await; + } + } => { + if let Err(e) = result { + return Err(e.into()); + } + }, + result = client_handle_messages(reader, events) => { + if let Err(e) = result { + return Err(e.into()); + } + }, } Ok(()) } -///// - -pub async fn run_server() { - let reader = BufReader::new(tokio::io::stdin()); - let mut writer = BufWriter::new(tokio::io::stdout()); - - // Write the 8-byte synchronization marker. - // eprintln!("< Writing marker..."); - writer - .write_u64(0x00_00_00_00_00_00_00_00) - .await - .expect("Error writing marker"); - - if let Err(_) = writer.flush().await { - // eprintln!("Error writing sync marker: {:?}", e); - return; - } - // eprintln!("< Done!"); - - let mut writer = MessageWriter::new(writer); - let mut reader = MessageReader::new(reader); - if let Err(_) = server_main(&mut reader, &mut writer).await { - // eprintln!("Error: {:?}", e); - } -} - async fn spawn_ssh( server: &str, ) -> Result<(tokio::process::Child, u16), std::io::Error> {