Cleaner logging and better UI events

Including connected/disconnected events for an even prettier UI
This commit is contained in:
John Doty 2022-10-14 08:46:42 -07:00
parent 006eba0dfe
commit e184bba39e
2 changed files with 107 additions and 82 deletions

View file

@ -168,7 +168,7 @@ async fn server_main<Reader: AsyncRead + Unpin, Writer: AsyncWrite + Unpin>(
}
async fn client_sync<Read: AsyncRead + Unpin>(reader: &mut Read) -> Result<(), tokio::io::Error> {
info!("> Waiting for synchronization marker...");
info!("Waiting for synchronization marker...");
// Run these two loops in parallel; the copy of stdin should stop when
// we've seen the marker from the client. If the pipe closes for whatever
@ -206,10 +206,8 @@ async fn client_handle_connection(
if let Ok(_) = connected.await {
let mut writer = writer.clone();
connection::process(channel, socket, &mut data, &mut writer).await;
info!("> Done client!");
} else {
error!("> Failed to connect to remote");
error!("Failed to connect to remote");
}
}
}
@ -238,11 +236,11 @@ async fn client_read<T: AsyncRead + Unpin>(
reader: &mut MessageReader<T>,
writer: mpsc::Sender<Message>,
connections: ConnectionTable,
port_sender: mpsc::Sender<Vec<message::PortDesc>>,
events: mpsc::Sender<ui::UIEvent>,
) -> Result<()> {
let mut listeners: HashMap<u16, oneshot::Sender<()>> = HashMap::new();
info!("> Processing packets...");
info!("Running");
loop {
let message = reader.read().await?;
// info!("> packet {:?}", message); // TODO: Smaller
@ -294,17 +292,17 @@ async fn client_read<T: AsyncRead + Unpin>(
r = client_listen(port, writer, connections) => r,
_ = stop => Ok(()),
};
if let Err(_e) = result {
error!("> Error listening on port {port}: {_e:?}");
if let Err(e) = result {
error!("Error listening on port {port}: {e:?}");
} else {
info!("> Stopped listening on port {port}");
info!("Stopped listening on port {port}");
}
});
}
}
listeners = new_listeners;
if let Err(_) = port_sender.send(ports).await {
if let Err(_) = events.send(ui::UIEvent::Ports(ports)).await {
// TODO: Log
}
}
@ -313,7 +311,10 @@ async fn client_read<T: AsyncRead + Unpin>(
}
}
async fn client_pipe_stderr<Debug: AsyncBufRead + Unpin>(debug: &mut Debug) {
async fn client_pipe_stderr<Debug: AsyncBufRead + Unpin>(
debug: &mut Debug,
events: mpsc::Sender<ui::UIEvent>,
) {
loop {
let mut line = String::new();
match debug.read_line(&mut line).await {
@ -325,7 +326,9 @@ async fn client_pipe_stderr<Debug: AsyncBufRead + Unpin>(debug: &mut Debug) {
warn!("stderr stream closed");
break;
}
_ => info!("[Server] {}", line.trim()),
_ => {
_ = events.send(ui::UIEvent::ServerLine(line)).await;
}
}
}
}
@ -333,7 +336,7 @@ async fn client_pipe_stderr<Debug: AsyncBufRead + Unpin>(debug: &mut Debug) {
async fn client_main<Reader: AsyncRead + Unpin, Writer: AsyncWrite + Unpin>(
reader: &mut MessageReader<Reader>,
writer: &mut MessageWriter<Writer>,
port_sender: mpsc::Sender<Vec<message::PortDesc>>,
events: mpsc::Sender<ui::UIEvent>,
) -> Result<()> {
// Wait for the server's announcement.
if let Message::Hello(major, minor, _) = reader.read().await? {
@ -344,10 +347,6 @@ async fn client_main<Reader: AsyncRead + Unpin, Writer: AsyncWrite + Unpin>(
bail!("Expected a hello message from the remote server");
}
// Kick things off with a listing of the ports...
// eprintln!("> Sending initial list command...");
// writer.write(Message::Refresh).await?;
let connections = ConnectionTable::new();
// And now really get into it...
@ -355,7 +354,7 @@ async fn client_main<Reader: AsyncRead + Unpin, Writer: AsyncWrite + Unpin>(
let refresher = msg_sender.clone(); // Special for loop.
let writing = pump_write(&mut msg_receiver, writer);
let reading = client_read(reader, msg_sender, connections, port_sender);
let reading = client_read(reader, msg_sender, connections, events);
tokio::pin!(reading);
tokio::pin!(writing);
@ -428,8 +427,10 @@ async fn spawn_ssh(server: &str) -> Result<tokio::process::Child, std::io::Error
cmd.spawn()
}
async fn client_connect_loop(remote: &str, port_sender: mpsc::Sender<Vec<message::PortDesc>>) {
async fn client_connect_loop(remote: &str, events: mpsc::Sender<ui::UIEvent>) {
loop {
_ = events.send(ui::UIEvent::Disconnected).await;
let mut child = spawn_ssh(remote).await.expect("failed to spawn");
let mut stderr = BufReader::new(
@ -452,18 +453,22 @@ async fn client_connect_loop(remote: &str, port_sender: mpsc::Sender<Vec<message
);
if let Err(e) = client_sync(&mut reader).await {
eprintln!("Error synchronizing: {:?}", e);
return;
error!("Error synchronizing: {:?}", e);
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
continue;
}
_ = events.send(ui::UIEvent::Connected).await;
let mut writer = MessageWriter::new(BufWriter::new(writer));
let mut reader = MessageReader::new(reader);
let sec = events.clone();
tokio::spawn(async move {
client_pipe_stderr(&mut stderr).await;
client_pipe_stderr(&mut stderr, sec).await;
});
if let Err(e) = client_main(&mut reader, &mut writer, port_sender.clone()).await {
if let Err(e) = client_main(&mut reader, &mut writer, events.clone()).await {
error!("Server disconnected with error: {:?}", e);
} else {
warn!("Disconnected from server, reconnecting...");
@ -472,15 +477,13 @@ async fn client_connect_loop(remote: &str, port_sender: mpsc::Sender<Vec<message
}
pub async fn run_client(remote: &str) {
let (log_sender, mut log_receiver) = mpsc::channel(1024);
_ = log::set_boxed_logger(ui::Logger::new(log_sender));
let (event_sender, mut event_receiver) = mpsc::channel(1024);
_ = log::set_boxed_logger(ui::Logger::new(event_sender.clone()));
log::set_max_level(LevelFilter::Info);
let (port_sender, mut port_receiver) = mpsc::channel(2);
// Start the reconnect loop.
tokio::select! {
_ = ui::run_ui(&mut port_receiver, &mut log_receiver) => (),
_ = client_connect_loop(remote, port_sender) => ()
_ = ui::run_ui(&mut event_receiver) => (),
_ = client_connect_loop(remote, event_sender) => ()
}
}

128
src/ui.rs
View file

@ -4,7 +4,7 @@ use crossterm::{
cursor::MoveTo,
event::{Event, EventStream, KeyCode, KeyEvent},
execute, queue,
style::Stylize,
style::{Color, PrintStyledContent, Stylize},
terminal::{
disable_raw_mode, enable_raw_mode, size, Clear, ClearType, DisableLineWrap, EnableLineWrap,
EnterAlternateScreen, LeaveAlternateScreen,
@ -17,13 +17,21 @@ use std::io::{stdout, Write};
use tokio::sync::mpsc;
use tokio_stream::StreamExt;
pub enum UIEvent {
Connected,
Disconnected,
ServerLine(String),
LogLine(log::Level, String),
Ports(Vec<PortDesc>),
}
#[derive(Debug, Clone)]
pub struct Logger {
line_sender: mpsc::Sender<String>,
line_sender: mpsc::Sender<UIEvent>,
}
impl Logger {
pub fn new(line_sender: mpsc::Sender<String>) -> Box<Logger> {
pub fn new(line_sender: mpsc::Sender<UIEvent>) -> Box<Logger> {
Box::new(Logger { line_sender })
}
}
@ -35,41 +43,38 @@ impl log::Log for Logger {
fn log(&self, record: &Record) {
if self.enabled(record.metadata()) {
let line = format!("{} - {}", record.level(), record.args());
_ = self.line_sender.try_send(line);
let line = format!("{}", record.args());
_ = self
.line_sender
.try_send(UIEvent::LogLine(record.level(), line));
}
}
fn flush(&self) {}
}
pub async fn run_ui(
port_receiver: &mut mpsc::Receiver<Vec<PortDesc>>,
log_receiver: &mut mpsc::Receiver<String>,
) -> Result<()> {
pub async fn run_ui(events: &mut mpsc::Receiver<UIEvent>) -> Result<()> {
enable_raw_mode()?;
let result = run_ui_core(port_receiver, log_receiver).await;
let result = run_ui_core(events).await;
execute!(stdout(), EnableLineWrap, LeaveAlternateScreen)?;
disable_raw_mode()?;
result
}
async fn run_ui_core(
port_receiver: &mut mpsc::Receiver<Vec<PortDesc>>,
log: &mut mpsc::Receiver<String>,
) -> Result<()> {
async fn run_ui_core(events: &mut mpsc::Receiver<UIEvent>) -> Result<()> {
let mut stdout = stdout();
execute!(stdout, EnterAlternateScreen, DisableLineWrap)?;
let mut events = EventStream::new();
let mut console_events = EventStream::new();
let mut connected = false;
let mut selection = 0;
let mut show_logs = false;
let mut lines: VecDeque<String> = VecDeque::with_capacity(1024);
let mut ports: Option<Vec<PortDesc>> = None;
loop {
tokio::select! {
ev = events.next() => {
ev = console_events.next() => {
match ev {
Some(Ok(Event::Key(ev))) => {
match ev {
@ -107,59 +112,76 @@ async fn run_ui_core(
None => (), // ....no events? what?
}
}
pr = port_receiver.recv() => {
match pr {
Some(mut p) => {
ev = events.recv() => {
match ev {
Some(UIEvent::Disconnected) => {
connected = false;
}
Some(UIEvent::Connected) => {
connected = true;
}
Some(UIEvent::Ports(mut p)) => {
p.sort_by(|a, b| a.port.partial_cmp(&b.port).unwrap());
ports = Some(p);
}
None => break,
}
}
l = log.recv() => {
match l {
Some(line) => {
if lines.len() > 1024 {
Some(UIEvent::ServerLine(line)) => {
while lines.len() >= 1024 {
lines.pop_front();
}
lines.push_back(line);
},
lines.push_back(format!("[SERVER] {line}"));
}
Some(UIEvent::LogLine(_level, line)) => {
while lines.len() >= 1024 {
lines.pop_front();
}
lines.push_back(format!("[CLIENT] {line}"));
}
None => break,
}
}
}
let (columns, rows) = size()?;
let columns: usize = columns.into();
queue!(stdout, Clear(ClearType::All), MoveTo(0, 0))?;
if connected {
// List of open ports
// How wide are all the things?
let padding = 1;
let port_width = 5; // 5 characters for 16-bit number
// List of open ports
// How wide are all the things?
let columns: usize = columns.into();
let padding = 1;
let port_width = 5; // 5 characters for 16-bit number
let description_width = columns - (padding + padding + port_width + padding);
let description_width = columns - (padding + padding + port_width + padding);
print!(
"{}",
format!(
" {port:>port_width$} {description:<description_width$}\r\n",
port = "port",
description = "description"
)
.negative()
);
if let Some(ports) = &mut ports {
let max_ports: usize = if show_logs { (rows / 2) - 1 } else { rows - 2 }.into();
for (index, port) in ports.into_iter().take(max_ports).enumerate() {
print!(
"{} {:port_width$} {:description_width$}\r\n",
if index == selection { "\u{2B46}" } else { " " },
port.port,
port.desc
);
print!(
"{}",
format!(
" {port:>port_width$} {description:<description_width$}\r\n",
port = "port",
description = "description"
)
.negative()
);
if let Some(ports) = &mut ports {
let max_ports: usize = if show_logs { (rows / 2) - 1 } else { rows - 2 }.into();
for (index, port) in ports.into_iter().take(max_ports).enumerate() {
print!(
"{} {:port_width$} {:description_width$}\r\n",
if index == selection { "\u{2B46}" } else { " " },
port.port,
port.desc
);
}
}
} else {
queue!(
stdout,
PrintStyledContent(
format!("{:^columns$}", "Not Connected")
.with(Color::Black)
.on(Color::Red)
)
)?;
}
// Log