From e184bba39e750d81834a0a8842cb338e6c33af37 Mon Sep 17 00:00:00 2001 From: John Doty Date: Fri, 14 Oct 2022 08:46:42 -0700 Subject: [PATCH] Cleaner logging and better UI events Including connected/disconnected events for an even prettier UI --- src/lib.rs | 61 +++++++++++++------------ src/ui.rs | 128 +++++++++++++++++++++++++++++++---------------------- 2 files changed, 107 insertions(+), 82 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index b1ca95d..093edb8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -168,7 +168,7 @@ async fn server_main( } async fn client_sync(reader: &mut Read) -> Result<(), tokio::io::Error> { - info!("> Waiting for synchronization marker..."); + info!("Waiting for synchronization marker..."); // Run these two loops in parallel; the copy of stdin should stop when // we've seen the marker from the client. If the pipe closes for whatever @@ -206,10 +206,8 @@ async fn client_handle_connection( if let Ok(_) = connected.await { let mut writer = writer.clone(); connection::process(channel, socket, &mut data, &mut writer).await; - - info!("> Done client!"); } else { - error!("> Failed to connect to remote"); + error!("Failed to connect to remote"); } } } @@ -238,11 +236,11 @@ async fn client_read( reader: &mut MessageReader, writer: mpsc::Sender, connections: ConnectionTable, - port_sender: mpsc::Sender>, + events: mpsc::Sender, ) -> Result<()> { let mut listeners: HashMap> = HashMap::new(); - info!("> Processing packets..."); + info!("Running"); loop { let message = reader.read().await?; // info!("> packet {:?}", message); // TODO: Smaller @@ -294,17 +292,17 @@ async fn client_read( r = client_listen(port, writer, connections) => r, _ = stop => Ok(()), }; - if let Err(_e) = result { - error!("> Error listening on port {port}: {_e:?}"); + if let Err(e) = result { + error!("Error listening on port {port}: {e:?}"); } else { - info!("> Stopped listening on port {port}"); + info!("Stopped listening on port {port}"); } }); } } listeners = new_listeners; - if let Err(_) = port_sender.send(ports).await { + if let Err(_) = events.send(ui::UIEvent::Ports(ports)).await { // TODO: Log } } @@ -313,7 +311,10 @@ async fn client_read( } } -async fn client_pipe_stderr(debug: &mut Debug) { +async fn client_pipe_stderr( + debug: &mut Debug, + events: mpsc::Sender, +) { loop { let mut line = String::new(); match debug.read_line(&mut line).await { @@ -325,7 +326,9 @@ async fn client_pipe_stderr(debug: &mut Debug) { warn!("stderr stream closed"); break; } - _ => info!("[Server] {}", line.trim()), + _ => { + _ = events.send(ui::UIEvent::ServerLine(line)).await; + } } } } @@ -333,7 +336,7 @@ async fn client_pipe_stderr(debug: &mut Debug) { async fn client_main( reader: &mut MessageReader, writer: &mut MessageWriter, - port_sender: mpsc::Sender>, + events: mpsc::Sender, ) -> Result<()> { // Wait for the server's announcement. if let Message::Hello(major, minor, _) = reader.read().await? { @@ -344,10 +347,6 @@ async fn client_main( bail!("Expected a hello message from the remote server"); } - // Kick things off with a listing of the ports... - // eprintln!("> Sending initial list command..."); - // writer.write(Message::Refresh).await?; - let connections = ConnectionTable::new(); // And now really get into it... @@ -355,7 +354,7 @@ async fn client_main( let refresher = msg_sender.clone(); // Special for loop. let writing = pump_write(&mut msg_receiver, writer); - let reading = client_read(reader, msg_sender, connections, port_sender); + let reading = client_read(reader, msg_sender, connections, events); tokio::pin!(reading); tokio::pin!(writing); @@ -428,8 +427,10 @@ async fn spawn_ssh(server: &str) -> Result>) { +async fn client_connect_loop(remote: &str, events: mpsc::Sender) { loop { + _ = events.send(ui::UIEvent::Disconnected).await; + let mut child = spawn_ssh(remote).await.expect("failed to spawn"); let mut stderr = BufReader::new( @@ -452,18 +453,22 @@ async fn client_connect_loop(remote: &str, port_sender: mpsc::Sender (), - _ = client_connect_loop(remote, port_sender) => () + _ = ui::run_ui(&mut event_receiver) => (), + _ = client_connect_loop(remote, event_sender) => () } } diff --git a/src/ui.rs b/src/ui.rs index 32e2b4b..cd42329 100644 --- a/src/ui.rs +++ b/src/ui.rs @@ -4,7 +4,7 @@ use crossterm::{ cursor::MoveTo, event::{Event, EventStream, KeyCode, KeyEvent}, execute, queue, - style::Stylize, + style::{Color, PrintStyledContent, Stylize}, terminal::{ disable_raw_mode, enable_raw_mode, size, Clear, ClearType, DisableLineWrap, EnableLineWrap, EnterAlternateScreen, LeaveAlternateScreen, @@ -17,13 +17,21 @@ use std::io::{stdout, Write}; use tokio::sync::mpsc; use tokio_stream::StreamExt; +pub enum UIEvent { + Connected, + Disconnected, + ServerLine(String), + LogLine(log::Level, String), + Ports(Vec), +} + #[derive(Debug, Clone)] pub struct Logger { - line_sender: mpsc::Sender, + line_sender: mpsc::Sender, } impl Logger { - pub fn new(line_sender: mpsc::Sender) -> Box { + pub fn new(line_sender: mpsc::Sender) -> Box { Box::new(Logger { line_sender }) } } @@ -35,41 +43,38 @@ impl log::Log for Logger { fn log(&self, record: &Record) { if self.enabled(record.metadata()) { - let line = format!("{} - {}", record.level(), record.args()); - _ = self.line_sender.try_send(line); + let line = format!("{}", record.args()); + _ = self + .line_sender + .try_send(UIEvent::LogLine(record.level(), line)); } } fn flush(&self) {} } -pub async fn run_ui( - port_receiver: &mut mpsc::Receiver>, - log_receiver: &mut mpsc::Receiver, -) -> Result<()> { +pub async fn run_ui(events: &mut mpsc::Receiver) -> Result<()> { enable_raw_mode()?; - let result = run_ui_core(port_receiver, log_receiver).await; + let result = run_ui_core(events).await; execute!(stdout(), EnableLineWrap, LeaveAlternateScreen)?; disable_raw_mode()?; result } -async fn run_ui_core( - port_receiver: &mut mpsc::Receiver>, - log: &mut mpsc::Receiver, -) -> Result<()> { +async fn run_ui_core(events: &mut mpsc::Receiver) -> Result<()> { let mut stdout = stdout(); execute!(stdout, EnterAlternateScreen, DisableLineWrap)?; - let mut events = EventStream::new(); + let mut console_events = EventStream::new(); + let mut connected = false; let mut selection = 0; let mut show_logs = false; let mut lines: VecDeque = VecDeque::with_capacity(1024); let mut ports: Option> = None; loop { tokio::select! { - ev = events.next() => { + ev = console_events.next() => { match ev { Some(Ok(Event::Key(ev))) => { match ev { @@ -107,59 +112,76 @@ async fn run_ui_core( None => (), // ....no events? what? } } - pr = port_receiver.recv() => { - match pr { - Some(mut p) => { + ev = events.recv() => { + match ev { + Some(UIEvent::Disconnected) => { + connected = false; + } + Some(UIEvent::Connected) => { + connected = true; + } + Some(UIEvent::Ports(mut p)) => { p.sort_by(|a, b| a.port.partial_cmp(&b.port).unwrap()); ports = Some(p); } - None => break, - } - } - l = log.recv() => { - match l { - Some(line) => { - if lines.len() > 1024 { + Some(UIEvent::ServerLine(line)) => { + while lines.len() >= 1024 { lines.pop_front(); } - lines.push_back(line); - }, + lines.push_back(format!("[SERVER] {line}")); + } + Some(UIEvent::LogLine(_level, line)) => { + while lines.len() >= 1024 { + lines.pop_front(); + } + lines.push_back(format!("[CLIENT] {line}")); + } None => break, } } } let (columns, rows) = size()?; + let columns: usize = columns.into(); queue!(stdout, Clear(ClearType::All), MoveTo(0, 0))?; + if connected { + // List of open ports + // How wide are all the things? + let padding = 1; + let port_width = 5; // 5 characters for 16-bit number - // List of open ports - // How wide are all the things? - let columns: usize = columns.into(); - let padding = 1; - let port_width = 5; // 5 characters for 16-bit number + let description_width = columns - (padding + padding + port_width + padding); - let description_width = columns - (padding + padding + port_width + padding); - - print!( - "{}", - format!( - " {port:>port_width$} {description:port_width$} {description: