From 68cca8cb7d814a9e791a486e795ad46fe67cbac5 Mon Sep 17 00:00:00 2001 From: John Doty Date: Wed, 12 Oct 2022 16:48:06 -0700 Subject: [PATCH] Logging and reconnect loop --- Cargo.lock | 1 + Cargo.toml | 1 + src/lib.rs | 127 +++++++++++++++++++++++++++++++++++------------------ src/ui.rs | 66 ++++++++++++++++++++++++++-- 4 files changed, 149 insertions(+), 46 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d8238b2..044c4a2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -165,6 +165,7 @@ dependencies = [ "anyhow", "bytes", "crossterm", + "log", "procfs", "thiserror", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 5427046..67963ea 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ crossterm = { version = "0.25", features = ["event-stream"] } thiserror = "1.0" tokio = { version = "1", features = ["full"] } tokio-stream = "0.1" +log = { version = "0.4", features = ["std"] } [target.'cfg(target_os="linux")'.dependencies] procfs = "0.14.1" diff --git a/src/lib.rs b/src/lib.rs index dce6a3c..4e8ebac 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,9 +1,14 @@ use anyhow::{bail, Result}; use connection::ConnectionTable; +use log::LevelFilter; +use log::{error, info, warn}; use message::{Message, MessageReader, MessageWriter}; use std::collections::HashMap; use std::net::{Ipv4Addr, SocketAddrV4}; -use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader, BufWriter}; +use tokio::io::{ + AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader, + BufWriter, +}; use tokio::net::{TcpListener, TcpStream}; use tokio::process; use tokio::sync::mpsc; @@ -61,7 +66,7 @@ async fn server_handle_connection( let mut writer = writer.clone(); connection::process(channel, &mut stream, &mut data, &mut writer).await; - // eprintln!("< Done server!"); + info!("< Done server!"); } } } @@ -71,7 +76,7 @@ async fn server_read( writer: mpsc::Sender, connections: ConnectionTable, ) -> Result<()> { - // eprintln!("< Processing packets..."); + // info!("< Processing packets..."); loop { let message = reader.read().await?; @@ -106,13 +111,13 @@ async fn server_read( let ports = match refresh::get_entries() { Ok(ports) => ports, Err(_e) => { - // eprintln!("< Error scanning: {:?}", _e); + error!("< Error scanning: {:?}", _e); vec![] } }; if let Err(_e) = writer.send(Message::Ports(ports)).await { // Writer has been closed for some reason, we can just quit.... I hope everything is OK? - // eprintln!("< Warning: Error sending: {:?}", _e); + warn!("< Warning: Error sending: {:?}", _e); } }); } @@ -163,7 +168,7 @@ async fn server_main( } async fn client_sync(reader: &mut Read) -> Result<(), tokio::io::Error> { - // eprintln!("> Waiting for synchronization marker..."); + info!("> Waiting for synchronization marker..."); // Run these two loops in parallel; the copy of stdin should stop when // we've seen the marker from the client. If the pipe closes for whatever @@ -202,9 +207,9 @@ async fn client_handle_connection( let mut writer = writer.clone(); connection::process(channel, socket, &mut data, &mut writer).await; - // eprintln!("> Done client!"); + info!("> Done client!"); } else { - // eprintln!("> Failed to connect to remote"); + error!("> Failed to connect to remote"); } } } @@ -217,14 +222,10 @@ async fn client_listen( loop { let listener = TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port)).await?; loop { - // eprintln!("LISTENING ON PORT {port}"); - // The second item contains the IP and port of the new // connection, but we don't care. let (mut socket, _) = listener.accept().await?; - // eprintln!("GOT ONE!"); - let (writer, connections) = (writer.clone(), connections.clone()); tokio::spawn(async move { client_handle_connection(port, writer, connections, &mut socket).await; @@ -241,9 +242,10 @@ async fn client_read( ) -> Result<()> { let mut listeners: HashMap> = HashMap::new(); - // eprintln!("> Processing packets..."); + info!("> Processing packets..."); loop { let message = reader.read().await?; + info!("> packet {:?}", message); use Message::*; match message { @@ -293,9 +295,9 @@ async fn client_read( _ = stop => Ok(()), }; if let Err(_e) = result { - // eprintln!("> Error listening on port {port}: {_e:?}"); + error!("> Error listening on port {port}: {_e:?}"); } else { - // eprintln!("> Stopped listening on port {port}"); + info!("> Stopped listening on port {port}"); } }); } @@ -311,9 +313,27 @@ async fn client_read( } } +async fn client_pipe_stderr(debug: &mut Debug) { + loop { + let mut line = String::new(); + match debug.read_line(&mut line).await { + Err(e) => { + error!("Error reading stderr from server: {:?}", e); + break; + } + Ok(0) => { + warn!("stderr stream closed"); + break; + } + _ => info!("[Server] {}", line.trim()), + } + } +} + async fn client_main( reader: &mut MessageReader, writer: &mut MessageWriter, + port_sender: mpsc::Sender>, ) -> Result<()> { // Wait for the server's announcement. if let Message::Hello(major, minor, _) = reader.read().await? { @@ -328,11 +348,8 @@ async fn client_main( // eprintln!("> Sending initial list command..."); // writer.write(Message::Refresh).await?; - let (port_sender, mut port_receiver) = mpsc::channel(2); let connections = ConnectionTable::new(); - let mut ui = tokio::spawn(async move { ui::run_ui(&mut port_receiver).await }); - // And now really get into it... let (msg_sender, mut msg_receiver) = mpsc::channel(32); let refresher = msg_sender.clone(); // Special for loop. @@ -345,10 +362,6 @@ async fn client_main( let (mut done_writing, mut done_reading) = (false, false); while !(done_reading && done_writing) { tokio::select! { - _ = &mut ui => { - // UI said to quit. - break; - } result = async { loop { use tokio::time::{sleep, Duration}; @@ -411,33 +424,63 @@ async fn spawn_ssh(server: &str) -> Result>) { + loop { + let mut child = spawn_ssh(remote).await.expect("failed to spawn"); - let writer = child - .stdin - .take() - .expect("child did not have a handle to stdin"); + let mut stderr = BufReader::new( + child + .stderr + .take() + .expect("child did not have a handle to stderr"), + ); - let mut reader = BufReader::new( - child - .stdout + let writer = child + .stdin .take() - .expect("child did not have a handle to stdout"), - ); + .expect("child did not have a handle to stdin"); - if let Err(e) = client_sync(&mut reader).await { - eprintln!("Error synchronizing: {:?}", e); - return; - } + let mut reader = BufReader::new( + child + .stdout + .take() + .expect("child did not have a handle to stdout"), + ); - let mut writer = MessageWriter::new(BufWriter::new(writer)); - let mut reader = MessageReader::new(reader); - if let Err(e) = client_main(&mut reader, &mut writer).await { - eprintln!("Error: {:?}", e); + if let Err(e) = client_sync(&mut reader).await { + eprintln!("Error synchronizing: {:?}", e); + return; + } + + let mut writer = MessageWriter::new(BufWriter::new(writer)); + let mut reader = MessageReader::new(reader); + + tokio::spawn(async move { + client_pipe_stderr(&mut stderr).await; + }); + + if let Err(e) = client_main(&mut reader, &mut writer, port_sender.clone()).await { + error!("Server disconnected with error: {:?}", e); + } else { + warn!("Disconnected from server, reconnecting..."); + } + } +} + +pub async fn run_client(remote: &str) { + let (log_sender, mut log_receiver) = mpsc::channel(1024); + _ = log::set_boxed_logger(ui::Logger::new(log_sender)); + log::set_max_level(LevelFilter::Info); + + let (port_sender, mut port_receiver) = mpsc::channel(2); + + // Start the reconnect loop. + tokio::select! { + _ = ui::run_ui(&mut port_receiver, &mut log_receiver) => (), + _ = client_connect_loop(remote, port_sender) => () } } diff --git a/src/ui.rs b/src/ui.rs index 412d487..51f5eac 100644 --- a/src/ui.rs +++ b/src/ui.rs @@ -10,24 +10,59 @@ use crossterm::{ EnterAlternateScreen, LeaveAlternateScreen, }, }; +use log::{Level, LevelFilter, Metadata, Record}; +use std::collections::vec_deque::VecDeque; use std::io::{stdout, Write}; use tokio::sync::mpsc; use tokio_stream::StreamExt; -pub async fn run_ui(port_receiver: &mut mpsc::Receiver>) -> Result<()> { +#[derive(Debug, Clone)] +pub struct Logger { + line_sender: mpsc::Sender, +} + +impl Logger { + pub fn new(line_sender: mpsc::Sender) -> Box { + Box::new(Logger { line_sender }) + } +} + +impl log::Log for Logger { + fn enabled(&self, metadata: &Metadata) -> bool { + metadata.level() <= Level::Info + } + + fn log(&self, record: &Record) { + if self.enabled(record.metadata()) { + let line = format!("{} - {}", record.level(), record.args()); + _ = self.line_sender.try_send(line); + } + } + + fn flush(&self) {} +} + +pub async fn run_ui( + port_receiver: &mut mpsc::Receiver>, + log_receiver: &mut mpsc::Receiver, +) -> Result<()> { enable_raw_mode()?; - let result = run_ui_core(port_receiver).await; + let result = run_ui_core(port_receiver, log_receiver).await; execute!(stdout(), EnableLineWrap, LeaveAlternateScreen)?; disable_raw_mode()?; result } -async fn run_ui_core(port_receiver: &mut mpsc::Receiver>) -> Result<()> { +async fn run_ui_core( + port_receiver: &mut mpsc::Receiver>, + log: &mut mpsc::Receiver, +) -> Result<()> { let mut stdout = stdout(); execute!(stdout, EnterAlternateScreen, DisableLineWrap)?; let mut events = EventStream::new(); + let mut lines: VecDeque = VecDeque::with_capacity(1024); let mut ports = None; loop { tokio::select! { @@ -51,6 +86,17 @@ async fn run_ui_core(port_receiver: &mut mpsc::Receiver>) -> Resul None => break, } } + l = log.recv() => { + match l { + Some(line) => { + if lines.len() > 1024 { + lines.pop_front(); + } + lines.push_back(line); + }, + None => break, + } + } } let (columns, rows) = size()?; @@ -77,7 +123,7 @@ async fn run_ui_core(port_receiver: &mut mpsc::Receiver>) -> Resul ); if let Some(ports) = &mut ports { ports.sort_by(|a, b| a.port.partial_cmp(&b.port).unwrap()); - for port in ports { + for port in ports.into_iter().take(((rows / 2) - 1).into()) { print!( " {:port_width$} {:url_width$} {:description_width$}\r\n", port.port, @@ -87,6 +133,18 @@ async fn run_ui_core(port_receiver: &mut mpsc::Receiver>) -> Resul } } + let hr: usize = ((rows / 2) - 1).into(); + let start: usize = if lines.len() > hr { + lines.len() - hr + } else { + 0 + }; + + queue!(stdout, MoveTo(0, rows / 2))?; + for line in lines.range(start..) { + print!("{}\r\n", line); + } + queue!(stdout, MoveTo(0, rows - 1))?; print!( "{}",