Logging and reconnect loop

This commit is contained in:
John Doty 2022-10-12 16:48:06 -07:00
parent 4b2c9811dc
commit 68cca8cb7d
4 changed files with 149 additions and 46 deletions

1
Cargo.lock generated
View file

@ -165,6 +165,7 @@ dependencies = [
"anyhow", "anyhow",
"bytes", "bytes",
"crossterm", "crossterm",
"log",
"procfs", "procfs",
"thiserror", "thiserror",
"tokio", "tokio",

View file

@ -12,6 +12,7 @@ crossterm = { version = "0.25", features = ["event-stream"] }
thiserror = "1.0" thiserror = "1.0"
tokio = { version = "1", features = ["full"] } tokio = { version = "1", features = ["full"] }
tokio-stream = "0.1" tokio-stream = "0.1"
log = { version = "0.4", features = ["std"] }
[target.'cfg(target_os="linux")'.dependencies] [target.'cfg(target_os="linux")'.dependencies]
procfs = "0.14.1" procfs = "0.14.1"

View file

@ -1,9 +1,14 @@
use anyhow::{bail, Result}; use anyhow::{bail, Result};
use connection::ConnectionTable; use connection::ConnectionTable;
use log::LevelFilter;
use log::{error, info, warn};
use message::{Message, MessageReader, MessageWriter}; use message::{Message, MessageReader, MessageWriter};
use std::collections::HashMap; use std::collections::HashMap;
use std::net::{Ipv4Addr, SocketAddrV4}; use std::net::{Ipv4Addr, SocketAddrV4};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader, BufWriter}; use tokio::io::{
AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader,
BufWriter,
};
use tokio::net::{TcpListener, TcpStream}; use tokio::net::{TcpListener, TcpStream};
use tokio::process; use tokio::process;
use tokio::sync::mpsc; use tokio::sync::mpsc;
@ -61,7 +66,7 @@ async fn server_handle_connection(
let mut writer = writer.clone(); let mut writer = writer.clone();
connection::process(channel, &mut stream, &mut data, &mut writer).await; connection::process(channel, &mut stream, &mut data, &mut writer).await;
// eprintln!("< Done server!"); info!("< Done server!");
} }
} }
} }
@ -71,7 +76,7 @@ async fn server_read<T: AsyncRead + Unpin>(
writer: mpsc::Sender<Message>, writer: mpsc::Sender<Message>,
connections: ConnectionTable, connections: ConnectionTable,
) -> Result<()> { ) -> Result<()> {
// eprintln!("< Processing packets..."); // info!("< Processing packets...");
loop { loop {
let message = reader.read().await?; let message = reader.read().await?;
@ -106,13 +111,13 @@ async fn server_read<T: AsyncRead + Unpin>(
let ports = match refresh::get_entries() { let ports = match refresh::get_entries() {
Ok(ports) => ports, Ok(ports) => ports,
Err(_e) => { Err(_e) => {
// eprintln!("< Error scanning: {:?}", _e); error!("< Error scanning: {:?}", _e);
vec![] vec![]
} }
}; };
if let Err(_e) = writer.send(Message::Ports(ports)).await { if let Err(_e) = writer.send(Message::Ports(ports)).await {
// Writer has been closed for some reason, we can just quit.... I hope everything is OK? // Writer has been closed for some reason, we can just quit.... I hope everything is OK?
// eprintln!("< Warning: Error sending: {:?}", _e); warn!("< Warning: Error sending: {:?}", _e);
} }
}); });
} }
@ -163,7 +168,7 @@ async fn server_main<Reader: AsyncRead + Unpin, Writer: AsyncWrite + Unpin>(
} }
async fn client_sync<Read: AsyncRead + Unpin>(reader: &mut Read) -> Result<(), tokio::io::Error> { async fn client_sync<Read: AsyncRead + Unpin>(reader: &mut Read) -> Result<(), tokio::io::Error> {
// eprintln!("> Waiting for synchronization marker..."); info!("> Waiting for synchronization marker...");
// Run these two loops in parallel; the copy of stdin should stop when // Run these two loops in parallel; the copy of stdin should stop when
// we've seen the marker from the client. If the pipe closes for whatever // we've seen the marker from the client. If the pipe closes for whatever
@ -202,9 +207,9 @@ async fn client_handle_connection(
let mut writer = writer.clone(); let mut writer = writer.clone();
connection::process(channel, socket, &mut data, &mut writer).await; connection::process(channel, socket, &mut data, &mut writer).await;
// eprintln!("> Done client!"); info!("> Done client!");
} else { } else {
// eprintln!("> Failed to connect to remote"); error!("> Failed to connect to remote");
} }
} }
} }
@ -217,14 +222,10 @@ async fn client_listen(
loop { loop {
let listener = TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port)).await?; let listener = TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port)).await?;
loop { loop {
// eprintln!("LISTENING ON PORT {port}");
// The second item contains the IP and port of the new // The second item contains the IP and port of the new
// connection, but we don't care. // connection, but we don't care.
let (mut socket, _) = listener.accept().await?; let (mut socket, _) = listener.accept().await?;
// eprintln!("GOT ONE!");
let (writer, connections) = (writer.clone(), connections.clone()); let (writer, connections) = (writer.clone(), connections.clone());
tokio::spawn(async move { tokio::spawn(async move {
client_handle_connection(port, writer, connections, &mut socket).await; client_handle_connection(port, writer, connections, &mut socket).await;
@ -241,9 +242,10 @@ async fn client_read<T: AsyncRead + Unpin>(
) -> Result<()> { ) -> Result<()> {
let mut listeners: HashMap<u16, oneshot::Sender<()>> = HashMap::new(); let mut listeners: HashMap<u16, oneshot::Sender<()>> = HashMap::new();
// eprintln!("> Processing packets..."); info!("> Processing packets...");
loop { loop {
let message = reader.read().await?; let message = reader.read().await?;
info!("> packet {:?}", message);
use Message::*; use Message::*;
match message { match message {
@ -293,9 +295,9 @@ async fn client_read<T: AsyncRead + Unpin>(
_ = stop => Ok(()), _ = stop => Ok(()),
}; };
if let Err(_e) = result { if let Err(_e) = result {
// eprintln!("> Error listening on port {port}: {_e:?}"); error!("> Error listening on port {port}: {_e:?}");
} else { } else {
// eprintln!("> Stopped listening on port {port}"); info!("> Stopped listening on port {port}");
} }
}); });
} }
@ -311,9 +313,27 @@ async fn client_read<T: AsyncRead + Unpin>(
} }
} }
async fn client_pipe_stderr<Debug: AsyncBufRead + Unpin>(debug: &mut Debug) {
loop {
let mut line = String::new();
match debug.read_line(&mut line).await {
Err(e) => {
error!("Error reading stderr from server: {:?}", e);
break;
}
Ok(0) => {
warn!("stderr stream closed");
break;
}
_ => info!("[Server] {}", line.trim()),
}
}
}
async fn client_main<Reader: AsyncRead + Unpin, Writer: AsyncWrite + Unpin>( async fn client_main<Reader: AsyncRead + Unpin, Writer: AsyncWrite + Unpin>(
reader: &mut MessageReader<Reader>, reader: &mut MessageReader<Reader>,
writer: &mut MessageWriter<Writer>, writer: &mut MessageWriter<Writer>,
port_sender: mpsc::Sender<Vec<message::PortDesc>>,
) -> Result<()> { ) -> Result<()> {
// Wait for the server's announcement. // Wait for the server's announcement.
if let Message::Hello(major, minor, _) = reader.read().await? { if let Message::Hello(major, minor, _) = reader.read().await? {
@ -328,11 +348,8 @@ async fn client_main<Reader: AsyncRead + Unpin, Writer: AsyncWrite + Unpin>(
// eprintln!("> Sending initial list command..."); // eprintln!("> Sending initial list command...");
// writer.write(Message::Refresh).await?; // writer.write(Message::Refresh).await?;
let (port_sender, mut port_receiver) = mpsc::channel(2);
let connections = ConnectionTable::new(); let connections = ConnectionTable::new();
let mut ui = tokio::spawn(async move { ui::run_ui(&mut port_receiver).await });
// And now really get into it... // And now really get into it...
let (msg_sender, mut msg_receiver) = mpsc::channel(32); let (msg_sender, mut msg_receiver) = mpsc::channel(32);
let refresher = msg_sender.clone(); // Special for loop. let refresher = msg_sender.clone(); // Special for loop.
@ -345,10 +362,6 @@ async fn client_main<Reader: AsyncRead + Unpin, Writer: AsyncWrite + Unpin>(
let (mut done_writing, mut done_reading) = (false, false); let (mut done_writing, mut done_reading) = (false, false);
while !(done_reading && done_writing) { while !(done_reading && done_writing) {
tokio::select! { tokio::select! {
_ = &mut ui => {
// UI said to quit.
break;
}
result = async { result = async {
loop { loop {
use tokio::time::{sleep, Duration}; use tokio::time::{sleep, Duration};
@ -411,33 +424,63 @@ async fn spawn_ssh(server: &str) -> Result<tokio::process::Child, std::io::Error
cmd.stdout(std::process::Stdio::piped()); cmd.stdout(std::process::Stdio::piped());
cmd.stdin(std::process::Stdio::piped()); cmd.stdin(std::process::Stdio::piped());
cmd.stderr(std::process::Stdio::piped());
cmd.spawn() cmd.spawn()
} }
pub async fn run_client(remote: &str) { async fn client_connect_loop(remote: &str, port_sender: mpsc::Sender<Vec<message::PortDesc>>) {
// TODO: Drive a reconnect loop loop {
let mut child = spawn_ssh(remote).await.expect("failed to spawn"); let mut child = spawn_ssh(remote).await.expect("failed to spawn");
let writer = child let mut stderr = BufReader::new(
.stdin child
.take() .stderr
.expect("child did not have a handle to stdin"); .take()
.expect("child did not have a handle to stderr"),
);
let mut reader = BufReader::new( let writer = child
child .stdin
.stdout
.take() .take()
.expect("child did not have a handle to stdout"), .expect("child did not have a handle to stdin");
);
if let Err(e) = client_sync(&mut reader).await { let mut reader = BufReader::new(
eprintln!("Error synchronizing: {:?}", e); child
return; .stdout
} .take()
.expect("child did not have a handle to stdout"),
);
let mut writer = MessageWriter::new(BufWriter::new(writer)); if let Err(e) = client_sync(&mut reader).await {
let mut reader = MessageReader::new(reader); eprintln!("Error synchronizing: {:?}", e);
if let Err(e) = client_main(&mut reader, &mut writer).await { return;
eprintln!("Error: {:?}", e); }
let mut writer = MessageWriter::new(BufWriter::new(writer));
let mut reader = MessageReader::new(reader);
tokio::spawn(async move {
client_pipe_stderr(&mut stderr).await;
});
if let Err(e) = client_main(&mut reader, &mut writer, port_sender.clone()).await {
error!("Server disconnected with error: {:?}", e);
} else {
warn!("Disconnected from server, reconnecting...");
}
}
}
pub async fn run_client(remote: &str) {
let (log_sender, mut log_receiver) = mpsc::channel(1024);
_ = log::set_boxed_logger(ui::Logger::new(log_sender));
log::set_max_level(LevelFilter::Info);
let (port_sender, mut port_receiver) = mpsc::channel(2);
// Start the reconnect loop.
tokio::select! {
_ = ui::run_ui(&mut port_receiver, &mut log_receiver) => (),
_ = client_connect_loop(remote, port_sender) => ()
} }
} }

View file

@ -10,24 +10,59 @@ use crossterm::{
EnterAlternateScreen, LeaveAlternateScreen, EnterAlternateScreen, LeaveAlternateScreen,
}, },
}; };
use log::{Level, LevelFilter, Metadata, Record};
use std::collections::vec_deque::VecDeque;
use std::io::{stdout, Write}; use std::io::{stdout, Write};
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio_stream::StreamExt; use tokio_stream::StreamExt;
pub async fn run_ui(port_receiver: &mut mpsc::Receiver<Vec<PortDesc>>) -> Result<()> { #[derive(Debug, Clone)]
pub struct Logger {
line_sender: mpsc::Sender<String>,
}
impl Logger {
pub fn new(line_sender: mpsc::Sender<String>) -> Box<Logger> {
Box::new(Logger { line_sender })
}
}
impl log::Log for Logger {
fn enabled(&self, metadata: &Metadata) -> bool {
metadata.level() <= Level::Info
}
fn log(&self, record: &Record) {
if self.enabled(record.metadata()) {
let line = format!("{} - {}", record.level(), record.args());
_ = self.line_sender.try_send(line);
}
}
fn flush(&self) {}
}
pub async fn run_ui(
port_receiver: &mut mpsc::Receiver<Vec<PortDesc>>,
log_receiver: &mut mpsc::Receiver<String>,
) -> Result<()> {
enable_raw_mode()?; enable_raw_mode()?;
let result = run_ui_core(port_receiver).await; let result = run_ui_core(port_receiver, log_receiver).await;
execute!(stdout(), EnableLineWrap, LeaveAlternateScreen)?; execute!(stdout(), EnableLineWrap, LeaveAlternateScreen)?;
disable_raw_mode()?; disable_raw_mode()?;
result result
} }
async fn run_ui_core(port_receiver: &mut mpsc::Receiver<Vec<PortDesc>>) -> Result<()> { async fn run_ui_core(
port_receiver: &mut mpsc::Receiver<Vec<PortDesc>>,
log: &mut mpsc::Receiver<String>,
) -> Result<()> {
let mut stdout = stdout(); let mut stdout = stdout();
execute!(stdout, EnterAlternateScreen, DisableLineWrap)?; execute!(stdout, EnterAlternateScreen, DisableLineWrap)?;
let mut events = EventStream::new(); let mut events = EventStream::new();
let mut lines: VecDeque<String> = VecDeque::with_capacity(1024);
let mut ports = None; let mut ports = None;
loop { loop {
tokio::select! { tokio::select! {
@ -51,6 +86,17 @@ async fn run_ui_core(port_receiver: &mut mpsc::Receiver<Vec<PortDesc>>) -> Resul
None => break, None => break,
} }
} }
l = log.recv() => {
match l {
Some(line) => {
if lines.len() > 1024 {
lines.pop_front();
}
lines.push_back(line);
},
None => break,
}
}
} }
let (columns, rows) = size()?; let (columns, rows) = size()?;
@ -77,7 +123,7 @@ async fn run_ui_core(port_receiver: &mut mpsc::Receiver<Vec<PortDesc>>) -> Resul
); );
if let Some(ports) = &mut ports { if let Some(ports) = &mut ports {
ports.sort_by(|a, b| a.port.partial_cmp(&b.port).unwrap()); ports.sort_by(|a, b| a.port.partial_cmp(&b.port).unwrap());
for port in ports { for port in ports.into_iter().take(((rows / 2) - 1).into()) {
print!( print!(
" {:port_width$} {:url_width$} {:description_width$}\r\n", " {:port_width$} {:url_width$} {:description_width$}\r\n",
port.port, port.port,
@ -87,6 +133,18 @@ async fn run_ui_core(port_receiver: &mut mpsc::Receiver<Vec<PortDesc>>) -> Resul
} }
} }
let hr: usize = ((rows / 2) - 1).into();
let start: usize = if lines.len() > hr {
lines.len() - hr
} else {
0
};
queue!(stdout, MoveTo(0, rows / 2))?;
for line in lines.range(start..) {
print!("{}\r\n", line);
}
queue!(stdout, MoveTo(0, rows - 1))?; queue!(stdout, MoveTo(0, rows - 1))?;
print!( print!(
"{}", "{}",