diff --git a/src/lib.rs b/src/lib.rs index 093edb8..6b7f97c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,7 +3,6 @@ use connection::ConnectionTable; use log::LevelFilter; use log::{error, info, warn}; use message::{Message, MessageReader, MessageWriter}; -use std::collections::HashMap; use std::net::{Ipv4Addr, SocketAddrV4}; use tokio::io::{ AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader, @@ -234,12 +233,9 @@ async fn client_listen( async fn client_read( reader: &mut MessageReader, - writer: mpsc::Sender, connections: ConnectionTable, events: mpsc::Sender, ) -> Result<()> { - let mut listeners: HashMap> = HashMap::new(); - info!("Running"); loop { let message = reader.read().await?; @@ -267,41 +263,6 @@ async fn client_read( }); } Ports(ports) => { - let mut new_listeners = HashMap::new(); - for port in &ports { - let port = port.port; - if let Some(l) = listeners.remove(&port) { - if !l.is_closed() { - // `l` here is, of course, the channel that we - // use to tell the listener task to stop (see the - // spawn call below). If it isn't closed then - // that means a spawn task is still running so we - // should just let it keep running and re-use the - // existing listener. - new_listeners.insert(port, l); - } - } - - if !new_listeners.contains_key(&port) { - let (l, stop) = oneshot::channel(); - new_listeners.insert(port, l); - - let (writer, connections) = (writer.clone(), connections.clone()); - tokio::spawn(async move { - let result = tokio::select! { - r = client_listen(port, writer, connections) => r, - _ = stop => Ok(()), - }; - if let Err(e) = result { - error!("Error listening on port {port}: {e:?}"); - } else { - info!("Stopped listening on port {port}"); - } - }); - } - } - - listeners = new_listeners; if let Err(_) = events.send(ui::UIEvent::Ports(ports)).await { // TODO: Log } @@ -351,10 +312,16 @@ async fn client_main( // And now really get into it... let (msg_sender, mut msg_receiver) = mpsc::channel(32); - let refresher = msg_sender.clone(); // Special for loop. + + _ = events + .send(ui::UIEvent::Connected( + msg_sender.clone(), + connections.clone(), + )) + .await; let writing = pump_write(&mut msg_receiver, writer); - let reading = client_read(reader, msg_sender, connections, events); + let reading = client_read(reader, connections, events); tokio::pin!(reading); tokio::pin!(writing); @@ -364,7 +331,7 @@ async fn client_main( result = async { loop { use tokio::time::{sleep, Duration}; - if let Err(e) = refresher.send(Message::Refresh).await { + if let Err(e) = msg_sender.send(Message::Refresh).await { break Err::<(), _>(e); } sleep(Duration::from_millis(500)).await; @@ -458,8 +425,6 @@ async fn client_connect_loop(remote: &str, events: mpsc::Sender) { continue; } - _ = events.send(ui::UIEvent::Connected).await; - let mut writer = MessageWriter::new(BufWriter::new(writer)); let mut reader = MessageReader::new(reader); @@ -477,13 +442,15 @@ async fn client_connect_loop(remote: &str, events: mpsc::Sender) { } pub async fn run_client(remote: &str) { - let (event_sender, mut event_receiver) = mpsc::channel(1024); + let (event_sender, event_receiver) = mpsc::channel(1024); _ = log::set_boxed_logger(ui::Logger::new(event_sender.clone())); log::set_max_level(LevelFilter::Info); + let mut ui = ui::UI::new(event_receiver); + // Start the reconnect loop. tokio::select! { - _ = ui::run_ui(&mut event_receiver) => (), + _ = ui.run() => (), _ = client_connect_loop(remote, event_sender) => () } } diff --git a/src/ui.rs b/src/ui.rs index cd42329..0808018 100644 --- a/src/ui.rs +++ b/src/ui.rs @@ -1,4 +1,8 @@ -use crate::message::PortDesc; +use crate::client_listen; +use crate::{ + message::{Message, PortDesc}, + ConnectionTable, +}; use anyhow::Result; use crossterm::{ cursor::MoveTo, @@ -10,15 +14,17 @@ use crossterm::{ EnterAlternateScreen, LeaveAlternateScreen, }, }; -use log::{Level, Metadata, Record}; +use log::{error, info, Level, Metadata, Record}; use open; use std::collections::vec_deque::VecDeque; +use std::collections::HashMap; use std::io::{stdout, Write}; use tokio::sync::mpsc; +use tokio::sync::oneshot; use tokio_stream::StreamExt; pub enum UIEvent { - Connected, + Connected(mpsc::Sender, ConnectionTable), Disconnected, ServerLine(String), LogLine(log::Level, String), @@ -53,164 +59,250 @@ impl log::Log for Logger { fn flush(&self) {} } -pub async fn run_ui(events: &mut mpsc::Receiver) -> Result<()> { - enable_raw_mode()?; - let result = run_ui_core(events).await; - execute!(stdout(), EnableLineWrap, LeaveAlternateScreen)?; - disable_raw_mode()?; - result +pub struct UI { + events: mpsc::Receiver, + writer: Option>, + connections: Option, + listeners: HashMap>, } -async fn run_ui_core(events: &mut mpsc::Receiver) -> Result<()> { - let mut stdout = stdout(); - - execute!(stdout, EnterAlternateScreen, DisableLineWrap)?; - let mut console_events = EventStream::new(); - - let mut connected = false; - let mut selection = 0; - let mut show_logs = false; - let mut lines: VecDeque = VecDeque::with_capacity(1024); - let mut ports: Option> = None; - loop { - tokio::select! { - ev = console_events.next() => { - match ev { - Some(Ok(Event::Key(ev))) => { - match ev { - KeyEvent {code:KeyCode::Esc, ..} - | KeyEvent {code:KeyCode::Char('q'), ..} => { break; }, - KeyEvent {code:KeyCode::Char('l'), ..} => { - show_logs = !show_logs; - } - KeyEvent { code:KeyCode::Up, ..} - | KeyEvent { code:KeyCode::Char('j'), ..} => { - if selection > 0 { - selection -= 1; - } - } - KeyEvent { code:KeyCode::Down, ..} - | KeyEvent { code:KeyCode::Char('k'), ..} => { - if let Some(p) = &ports { - if selection != p.len() - 1 { - selection += 1; - } - } - } - KeyEvent { code:KeyCode::Enter, ..} => { - if let Some(p) = &ports { - if selection < p.len() { - _ = open::that(format!("http://127.0.0.1:{}/", p[selection].port)); - } - } - } - _ => () - } - }, - Some(Ok(_)) => (), // Don't care about this event... - Some(Err(_)) => (), // Hmmmmmm.....? - None => (), // ....no events? what? - } - } - ev = events.recv() => { - match ev { - Some(UIEvent::Disconnected) => { - connected = false; - } - Some(UIEvent::Connected) => { - connected = true; - } - Some(UIEvent::Ports(mut p)) => { - p.sort_by(|a, b| a.port.partial_cmp(&b.port).unwrap()); - ports = Some(p); - } - Some(UIEvent::ServerLine(line)) => { - while lines.len() >= 1024 { - lines.pop_front(); - } - lines.push_back(format!("[SERVER] {line}")); - } - Some(UIEvent::LogLine(_level, line)) => { - while lines.len() >= 1024 { - lines.pop_front(); - } - lines.push_back(format!("[CLIENT] {line}")); - } - None => break, - } - } +impl UI { + pub fn new(events: mpsc::Receiver) -> UI { + UI { + events, + writer: None, + connections: None, + listeners: HashMap::new(), } + } - let (columns, rows) = size()?; - let columns: usize = columns.into(); + pub async fn run(&mut self) -> Result<()> { + enable_raw_mode()?; + let result = self.run_core().await; + execute!(stdout(), EnableLineWrap, LeaveAlternateScreen)?; + disable_raw_mode()?; + result + } - queue!(stdout, Clear(ClearType::All), MoveTo(0, 0))?; - if connected { - // List of open ports - // How wide are all the things? - let padding = 1; - let port_width = 5; // 5 characters for 16-bit number + async fn run_core(&mut self) -> Result<()> { + let mut stdout = stdout(); - let description_width = columns - (padding + padding + port_width + padding); + execute!(stdout, EnterAlternateScreen, DisableLineWrap)?; + let mut console_events = EventStream::new(); + let mut connected = false; + let mut selection = 0; + let mut show_logs = false; + let mut lines: VecDeque = VecDeque::with_capacity(1024); + let mut ports: Option> = None; + loop { + tokio::select! { + ev = console_events.next() => { + match ev { + Some(Ok(Event::Key(ev))) => { + match ev { + KeyEvent {code:KeyCode::Esc, ..} + | KeyEvent {code:KeyCode::Char('q'), ..} => { break; }, + KeyEvent {code:KeyCode::Char('l'), ..} => { + show_logs = !show_logs; + } + KeyEvent {code:KeyCode::Char('e'), ..} => { + if let Some(ports) = &ports { + if selection < ports.len() { + let p = ports[selection].port; + self.enable_disable_port(p); + } + } + } + KeyEvent { code:KeyCode::Up, ..} + | KeyEvent { code:KeyCode::Char('j'), ..} => { + if selection > 0 { + selection -= 1; + } + } + KeyEvent { code:KeyCode::Down, ..} + | KeyEvent { code:KeyCode::Char('k'), ..} => { + if let Some(p) = &ports { + if selection != p.len() - 1 { + selection += 1; + } + } + } + KeyEvent { code:KeyCode::Enter, ..} => { + if let Some(p) = &ports { + if selection < p.len() { + _ = open::that(format!("http://127.0.0.1:{}/", p[selection].port)); + } + } + } + _ => () + } + }, + Some(Ok(_)) => (), // Don't care about this event... + Some(Err(_)) => (), // Hmmmmmm.....? + None => (), // ....no events? what? + } + } + ev = self.events.recv() => { + match ev { + Some(UIEvent::Disconnected) => { + self.writer = None; + self.connections = None; + connected = false; + } + Some(UIEvent::Connected(w,t)) => { + self.writer = Some(w); + self.connections = Some(t); + connected = true; + } + Some(UIEvent::Ports(mut p)) => { + p.sort_by(|a, b| a.port.partial_cmp(&b.port).unwrap()); + if selection >= p.len() { + selection = p.len()-1; + } + + let mut new_listeners = HashMap::new(); + for port in &p { + let port = port.port; + if let Some(l) = self.listeners.remove(&port) { + if !l.is_closed() { + // `l` here is, of course, the channel that we + // use to tell the listener task to stop (see the + // spawn call below). If it isn't closed then + // that means a spawn task is still running so we + // should just let it keep running and re-use the + // existing listener. + new_listeners.insert(port, l); + } + } + } + + // This has the side effect of closing any + // listener that didn't get copied over to the + // new listeners table. + self.listeners = new_listeners; + ports = Some(p); + } + Some(UIEvent::ServerLine(line)) => { + while lines.len() >= 1024 { + lines.pop_front(); + } + lines.push_back(format!("[SERVER] {line}")); + } + Some(UIEvent::LogLine(_level, line)) => { + while lines.len() >= 1024 { + lines.pop_front(); + } + lines.push_back(format!("[CLIENT] {line}")); + } + None => break, + } + } + } + + let (columns, rows) = size()?; + let columns: usize = columns.into(); + + queue!(stdout, Clear(ClearType::All), MoveTo(0, 0))?; + if connected { + // List of open ports + // How wide are all the things? + let padding = 1; + let enabled_width = 1; // Just 1 character + let port_width = 5; // 5 characters for 16-bit number + + let description_width = + columns - (padding + padding + enabled_width + padding + port_width + padding); + + print!( + "{}", + format!( + " {enabled:>enabled_width$} {port:>port_width$} {description:enabled_width$} {:port_width$} {:description_width$}\r\n", + if index == selection { "\u{2B46}" } else { " " }, + if self.listeners.contains_key(&port.port) { + "+" + } else { + " " + }, + port.port, + port.desc + ); + } + } + } else { + queue!( + stdout, + PrintStyledContent( + format!("{:^columns$}", "Not Connected") + .with(Color::Black) + .on(Color::Red) + ) + )?; + } + + // Log + if show_logs { + let hr: usize = ((rows / 2) - 2).into(); + let start: usize = if lines.len() > hr { + lines.len() - hr + } else { + 0 + }; + + queue!(stdout, MoveTo(0, rows / 2))?; + print!("{}", format!("{:columns$}", " Log").negative()); + for line in lines.range(start..) { + print!("{}\r\n", line); + } + } + + queue!(stdout, MoveTo(0, rows - 1))?; print!( "{}", format!( - " {port:>port_width$} {description: - browse" + ).negative() ); - if let Some(ports) = &mut ports { - let max_ports: usize = if show_logs { (rows / 2) - 1 } else { rows - 2 }.into(); - for (index, port) in ports.into_iter().take(max_ports).enumerate() { - print!( - "{} {:port_width$} {:description_width$}\r\n", - if index == selection { "\u{2B46}" } else { " " }, - port.port, - port.desc - ); - } - } - } else { - queue!( - stdout, - PrintStyledContent( - format!("{:^columns$}", "Not Connected") - .with(Color::Black) - .on(Color::Red) - ) - )?; + stdout.flush()?; } - // Log - if show_logs { - let hr: usize = ((rows / 2) - 2).into(); - let start: usize = if lines.len() > hr { - lines.len() - hr - } else { - 0 - }; - - queue!(stdout, MoveTo(0, rows / 2))?; - print!("{}", format!("{:columns$}", " Log").negative()); - for line in lines.range(start..) { - print!("{}\r\n", line); - } - } - - queue!(stdout, MoveTo(0, rows - 1))?; - print!( - "{}", - format!( - "{:columns$}", - " q - quit | l - toggle log | \u{2191}/\u{2193} - select port | - browse" - ) - .negative() - ); - stdout.flush()?; + Ok(()) } - Ok(()) + fn enable_disable_port(&mut self, port: u16) { + if let (Some(writer), Some(connections)) = (&self.writer, &self.connections) { + if let Some(_) = self.listeners.remove(&port) { + return; // We disabled the listener. + } + + // We need to enable the listener. + let (l, stop) = oneshot::channel(); + self.listeners.insert(port, l); + + let (writer, connections) = (writer.clone(), connections.clone()); + tokio::spawn(async move { + let result = tokio::select! { + r = client_listen(port, writer, connections) => r, + _ = stop => Ok(()), + }; + if let Err(e) = result { + error!("Error listening on port {port}: {e:?}"); + } else { + info!("Stopped listening on port {port}"); + } + }); + } + } }