Radical simplification

Honestly the new design is so much better
This commit is contained in:
John Doty 2022-10-16 09:40:42 -07:00
parent 45e39220e2
commit 20412a4815

View file

@ -15,68 +15,9 @@ mod message;
mod refresh; mod refresh;
mod ui; mod ui;
// ----------------------------------------------------------------------------
// Write Management
/// Gathers writes from an mpsc queue and writes them to the specified
/// writer.
///
/// This is kind of an odd function. It raises a lot of questions.
///
/// *Why can't this just be a wrapper function on top of MessageWriter that
/// everybody calls?* Well, we could do that, but we also need to synchronize
/// writes to the underlying stream.
///
/// *Why not use an async mutex?* Because this function has a nice side
/// benefit: if it ever quits, we're *either* doing an orderly shutdown
/// (because the last write end of this channel closed) *or* the remote
/// connection has closed. [client_main] uses this fact to its advantage to
/// detect when the connection has failed.
async fn pump_write<T: AsyncWrite + Unpin>(
messages: &mut mpsc::Receiver<Message>,
writer: &mut MessageWriter<T>,
) -> Result<()> {
while let Some(msg) = messages.recv().await {
writer.write(msg).await?;
}
Ok(())
}
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
// Server // Server
async fn server_handle_messages<T: AsyncRead + Unpin>(
reader: &mut MessageReader<T>,
writer: mpsc::Sender<Message>,
) -> Result<()> {
// info!("< Processing packets...");
loop {
let message = reader.read().await?;
use Message::*;
match message {
Ping => (),
Refresh => {
let writer = writer.clone();
tokio::spawn(async move {
let ports = match refresh::get_entries() {
Ok(ports) => ports,
Err(_e) => {
error!("< Error scanning: {:?}", _e);
vec![]
}
};
if let Err(_e) = writer.send(Message::Ports(ports)).await {
// Writer has been closed for some reason, we can just quit.... I hope everything is OK?
warn!("< Warning: Error sending: {:?}", _e);
}
});
}
_ => panic!("Unsupported: {:?}", message),
};
}
}
async fn server_main<Reader: AsyncRead + Unpin, Writer: AsyncWrite + Unpin>( async fn server_main<Reader: AsyncRead + Unpin, Writer: AsyncWrite + Unpin>(
reader: &mut MessageReader<Reader>, reader: &mut MessageReader<Reader>,
writer: &mut MessageWriter<Writer>, writer: &mut MessageWriter<Writer>,
@ -84,35 +25,48 @@ async fn server_main<Reader: AsyncRead + Unpin, Writer: AsyncWrite + Unpin>(
// The first message we send must be an announcement. // The first message we send must be an announcement.
writer.write(Message::Hello(0, 1, vec![])).await?; writer.write(Message::Hello(0, 1, vec![])).await?;
// Jump into it...
let (msg_sender, mut msg_receiver) = mpsc::channel(32);
let writing = pump_write(&mut msg_receiver, writer);
let reading = server_handle_messages(reader, msg_sender);
tokio::pin!(reading);
tokio::pin!(writing);
let (mut done_writing, mut done_reading) = (false, false);
loop { loop {
tokio::select! { use Message::*;
result = &mut writing, if !done_writing => { match reader.read().await? {
done_writing = true; Ping => (),
if let Err(e) = result { Refresh => {
return Err(e); let ports = match refresh::get_entries() {
Ok(ports) => ports,
Err(e) => {
error!("Error scanning: {:?}", e);
vec![]
}
};
if let Err(e) = writer.write(Message::Ports(ports)).await {
// Writer has been closed for some reason, we can just
// quit.... I hope everything is OK?
warn!("Warning: Error sending: {:?}", e);
} }
if done_reading && done_writing { }
return Ok(()); message => panic!("Unsupported: {:?}", message),
} };
}, }
result = &mut reading, if !done_reading => { }
done_reading = true;
if let Err(e) = result { pub async fn run_server() {
return Err(e); let reader = BufReader::new(tokio::io::stdin());
} let mut writer = BufWriter::new(tokio::io::stdout());
if done_reading && done_writing {
return Ok(()); // Write the 8-byte synchronization marker.
} writer
}, .write_u64(0x00_00_00_00_00_00_00_00)
} .await
.expect("Error writing marker");
if let Err(e) = writer.flush().await {
eprintln!("Error writing sync marker: {:?}", e);
return;
}
let mut writer = MessageWriter::new(writer);
let mut reader = MessageReader::new(reader);
if let Err(e) = server_main(&mut reader, &mut writer).await {
eprintln!("Error: {:?}", e);
} }
} }
@ -349,71 +303,29 @@ async fn client_main<Reader: AsyncRead + Unpin, Writer: AsyncWrite + Unpin>(
// And now really get into it... // And now really get into it...
_ = events.send(ui::UIEvent::Connected(socks_port)).await; _ = events.send(ui::UIEvent::Connected(socks_port)).await;
let (msg_sender, mut msg_receiver) = mpsc::channel(32); tokio::select! {
let writing = pump_write(&mut msg_receiver, writer); result = async {
let reading = client_handle_messages(reader, events); loop {
tokio::pin!(reading); use tokio::time::{sleep, Duration};
tokio::pin!(writing); if let Err(e) = writer.write(Message::Refresh).await {
break Err::<(), _>(e);
let (mut done_writing, mut done_reading) = (false, false);
while !(done_reading && done_writing) {
tokio::select! {
result = async {
loop {
use tokio::time::{sleep, Duration};
if let Err(e) = msg_sender.send(Message::Refresh).await {
break Err::<(), _>(e);
}
sleep(Duration::from_millis(500)).await;
} }
}, if !done_writing => { sleep(Duration::from_millis(500)).await;
if let Err(e) = result { }
return Err(e.into()); } => {
} if let Err(e) = result {
}, return Err(e.into());
result = &mut writing, if !done_writing => { }
done_writing = true; },
if let Err(e) = result { result = client_handle_messages(reader, events) => {
return Err(e); if let Err(e) = result {
} return Err(e.into());
}, }
result = &mut reading, if !done_reading => { },
done_reading = true;
if let Err(e) = result {
return Err(e);
}
},
}
} }
Ok(()) Ok(())
} }
/////
pub async fn run_server() {
let reader = BufReader::new(tokio::io::stdin());
let mut writer = BufWriter::new(tokio::io::stdout());
// Write the 8-byte synchronization marker.
// eprintln!("< Writing marker...");
writer
.write_u64(0x00_00_00_00_00_00_00_00)
.await
.expect("Error writing marker");
if let Err(_) = writer.flush().await {
// eprintln!("Error writing sync marker: {:?}", e);
return;
}
// eprintln!("< Done!");
let mut writer = MessageWriter::new(writer);
let mut reader = MessageReader::new(reader);
if let Err(_) = server_main(&mut reader, &mut writer).await {
// eprintln!("Error: {:?}", e);
}
}
async fn spawn_ssh( async fn spawn_ssh(
server: &str, server: &str,
) -> Result<(tokio::process::Child, u16), std::io::Error> { ) -> Result<(tokio::process::Child, u16), std::io::Error> {