There's a lot here that feels like it should be better given the new design.
This commit is contained in:
John Doty 2022-10-16 09:23:49 -07:00
parent 1f91fc68ca
commit 93ac35df93

View file

@ -32,9 +32,6 @@ mod ui;
/// (because the last write end of this channel closed) *or* the remote /// (because the last write end of this channel closed) *or* the remote
/// connection has closed. [client_main] uses this fact to its advantage to /// connection has closed. [client_main] uses this fact to its advantage to
/// detect when the connection has failed. /// detect when the connection has failed.
///
/// At some point we may even automatically reconnect in response!
///
async fn pump_write<T: AsyncWrite + Unpin>( async fn pump_write<T: AsyncWrite + Unpin>(
messages: &mut mpsc::Receiver<Message>, messages: &mut mpsc::Receiver<Message>,
writer: &mut MessageWriter<T>, writer: &mut MessageWriter<T>,
@ -48,7 +45,7 @@ async fn pump_write<T: AsyncWrite + Unpin>(
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
// Server // Server
async fn server_read<T: AsyncRead + Unpin>( async fn server_handle_messages<T: AsyncRead + Unpin>(
reader: &mut MessageReader<T>, reader: &mut MessageReader<T>,
writer: mpsc::Sender<Message>, writer: mpsc::Sender<Message>,
) -> Result<()> { ) -> Result<()> {
@ -90,7 +87,7 @@ async fn server_main<Reader: AsyncRead + Unpin, Writer: AsyncWrite + Unpin>(
// Jump into it... // Jump into it...
let (msg_sender, mut msg_receiver) = mpsc::channel(32); let (msg_sender, mut msg_receiver) = mpsc::channel(32);
let writing = pump_write(&mut msg_receiver, writer); let writing = pump_write(&mut msg_receiver, writer);
let reading = server_read(reader, msg_sender); let reading = server_handle_messages(reader, msg_sender);
tokio::pin!(reading); tokio::pin!(reading);
tokio::pin!(writing); tokio::pin!(writing);
@ -119,21 +116,30 @@ async fn server_main<Reader: AsyncRead + Unpin, Writer: AsyncWrite + Unpin>(
} }
} }
// ----------------------------------------------------------------------------
// Client
/// Wait for the server to be ready; we know the server is there and
/// listening when we see the special sync marker, which is 8 NUL bytes in a
/// row.
///
/// TODO: We should be pumping stderr too.
async fn client_sync<Read: AsyncRead + Unpin>( async fn client_sync<Read: AsyncRead + Unpin>(
reader: &mut Read, reader: &mut Read,
) -> Result<(), tokio::io::Error> { ) -> Result<(), tokio::io::Error> {
info!("Waiting for synchronization marker..."); info!("Waiting for synchronization marker...");
let mut stdout = tokio::io::stdout(); let mut stdout = tokio::io::stdout();
let mut seen = 0;
tokio::select! { tokio::select! {
result = async { result = async {
let mut seen = 0;
while seen < 8 { while seen < 8 {
let byte = reader.read_u8().await?; let byte = reader.read_u8().await?;
if byte == 0 { if byte == 0 {
seen += 1; seen += 1;
} else { } else {
stdout.write_u8(byte).await?; stdout.write_u8(byte).await?;
seen = 0;
} }
} }
@ -143,7 +149,7 @@ async fn client_sync<Read: AsyncRead + Unpin>(
} }
/// Handle an incoming client connection, by forwarding it to the SOCKS5 /// Handle an incoming client connection, by forwarding it to the SOCKS5
/// server at the specified port. /// server at the specified port. This is the core of the entire thing.
/// ///
/// This contains a very simplified implementation of a SOCKS5 connector, /// This contains a very simplified implementation of a SOCKS5 connector,
/// enough to work with the SSH I have. I would have liked it to be SOCKS4, /// enough to work with the SSH I have. I would have liked it to be SOCKS4,
@ -258,6 +264,8 @@ async fn client_handle_connection(
Ok(()) Ok(())
} }
/// Listen on a port that we are currently forwarding, and use the SOCKS5
/// proxy on the specified port to handle the connections.
async fn client_listen(port: u16, socks_port: u16) -> Result<()> { async fn client_listen(port: u16, socks_port: u16) -> Result<()> {
loop { loop {
let listener = let listener =
@ -281,24 +289,20 @@ async fn client_listen(port: u16, socks_port: u16) -> Result<()> {
} }
} }
async fn client_read<T: AsyncRead + Unpin>( async fn client_handle_messages<T: AsyncRead + Unpin>(
reader: &mut MessageReader<T>, reader: &mut MessageReader<T>,
events: mpsc::Sender<ui::UIEvent>, events: mpsc::Sender<ui::UIEvent>,
) -> Result<()> { ) -> Result<()> {
info!("Running");
loop { loop {
let message = reader.read().await?;
// info!("> packet {:?}", message); // TODO: Smaller
use Message::*; use Message::*;
match message { match reader.read().await? {
Ping => (), Ping => (),
Ports(ports) => { Ports(ports) => {
if let Err(_) = events.send(ui::UIEvent::Ports(ports)).await { if let Err(_) = events.send(ui::UIEvent::Ports(ports)).await {
// TODO: Log // TODO: Log
} }
} }
_ => panic!("Unsupported: {:?}", message), message => panic!("Unsupported: {:?}", message),
}; };
} }
} }
@ -346,7 +350,7 @@ async fn client_main<Reader: AsyncRead + Unpin, Writer: AsyncWrite + Unpin>(
_ = events.send(ui::UIEvent::Connected(socks_port)).await; _ = events.send(ui::UIEvent::Connected(socks_port)).await;
let writing = pump_write(&mut msg_receiver, writer); let writing = pump_write(&mut msg_receiver, writer);
let reading = client_read(reader, events); let reading = client_handle_messages(reader, events);
tokio::pin!(reading); tokio::pin!(reading);
tokio::pin!(writing); tokio::pin!(writing);