Compare commits

..

No commits in common. "05c0a0a4dec3aee74ac51ae9524b2d1ce85914f7" and "633594459145053bdd72f3771f558e4089833cf1" have entirely different histories.

7 changed files with 136 additions and 834 deletions

763
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -15,11 +15,9 @@ bench = false
[dependencies] [dependencies]
anyhow = "1.0" anyhow = "1.0"
bollard = "0.17.0"
bytes = "1" bytes = "1"
copypasta = "0.10.1" copypasta = "0.10.1"
crossterm = { version = "0.25", features = ["event-stream"] } crossterm = { version = "0.25", features = ["event-stream"] }
env_logger = { version = "0.11.5", default-features = false }
home = "0.5.4" home = "0.5.4"
indoc = "1" indoc = "1"
log = { version = "0.4", features = ["std"] } log = { version = "0.4", features = ["std"] }

View file

@ -323,7 +323,6 @@ async fn client_main<Reader: AsyncRead + Unpin, Writer: AsyncWrite + Unpin>(
async fn spawn_ssh( async fn spawn_ssh(
server: &str, server: &str,
sudo: bool, sudo: bool,
log_filter: &str,
) -> Result<(tokio::process::Child, u16), std::io::Error> { ) -> Result<(tokio::process::Child, u16), std::io::Error> {
let socks_port = { let socks_port = {
let listener = TcpListener::bind("127.0.0.1:0").await?; let listener = TcpListener::bind("127.0.0.1:0").await?;
@ -338,9 +337,7 @@ async fn spawn_ssh(
if sudo { if sudo {
cmd.arg("sudo"); cmd.arg("sudo");
} }
cmd.arg(format!("FWD_LOG={log_filter}")) cmd.arg("fwd").arg("--server");
.arg("fwd")
.arg("--server");
cmd.stdout(std::process::Stdio::piped()); cmd.stdout(std::process::Stdio::piped());
cmd.stdin(std::process::Stdio::piped()); cmd.stdin(std::process::Stdio::piped());
@ -370,15 +367,13 @@ fn is_sigint(status: std::process::ExitStatus) -> bool {
async fn client_connect_loop( async fn client_connect_loop(
remote: &str, remote: &str,
sudo: bool, sudo: bool,
log_filter: &str,
events: mpsc::Sender<ui::UIEvent>, events: mpsc::Sender<ui::UIEvent>,
) { ) {
loop { loop {
_ = events.send(ui::UIEvent::Disconnected).await; _ = events.send(ui::UIEvent::Disconnected).await;
let (mut child, socks_port) = spawn_ssh(remote, sudo, log_filter) let (mut child, socks_port) =
.await spawn_ssh(remote, sudo).await.expect("failed to spawn");
.expect("failed to spawn");
let mut stderr = child let mut stderr = child
.stderr .stderr
@ -436,7 +431,7 @@ async fn client_connect_loop(
} }
} }
pub async fn run_client(remote: &str, sudo: bool, log_filter: &str) { pub async fn run_client(remote: &str, sudo: bool) {
let (event_sender, event_receiver) = mpsc::channel(1024); let (event_sender, event_receiver) = mpsc::channel(1024);
_ = log::set_boxed_logger(ui::Logger::new(event_sender.clone())); _ = log::set_boxed_logger(ui::Logger::new(event_sender.clone()));
log::set_max_level(LevelFilter::Info); log::set_max_level(LevelFilter::Info);
@ -454,7 +449,7 @@ pub async fn run_client(remote: &str, sudo: bool, log_filter: &str) {
// Start the reconnect loop. // Start the reconnect loop.
tokio::select! { tokio::select! {
_ = ui.run() => (), _ = ui.run() => (),
_ = client_connect_loop(remote, sudo, log_filter, event_sender) => () _ = client_connect_loop(remote, sudo, event_sender) => ()
} }
} }

View file

@ -10,19 +10,16 @@ use crossterm::{
}, },
}; };
use log::{error, info, Level, Metadata, Record}; use log::{error, info, Level, Metadata, Record};
use std::collections::vec_deque::VecDeque;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::io::stdout; use std::io::stdout;
use std::{
collections::vec_deque::VecDeque,
sync::{Arc, Mutex},
};
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::sync::oneshot; use tokio::sync::oneshot;
use tokio_stream::StreamExt; use tokio_stream::StreamExt;
use tui::{ use tui::{
backend::{Backend, CrosstermBackend}, backend::{Backend, CrosstermBackend},
layout::{Constraint, Direction, Layout, Margin, Rect}, layout::{Constraint, Direction, Layout, Margin, Rect},
style::{Color, Modifier, Style}, style::{Color, Style},
widgets::{ widgets::{
Block, Borders, List, ListItem, ListState, Row, Table, TableState, Block, Borders, List, ListItem, ListState, Row, Table, TableState,
}, },
@ -70,22 +67,9 @@ impl log::Log for Logger {
fn flush(&self) {} fn flush(&self) {}
} }
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum State {
Enabled,
Broken,
Disabled,
}
impl State {
fn boxed(self) -> Arc<Mutex<State>> {
Arc::new(Mutex::new(self))
}
}
#[derive(Debug)] #[derive(Debug)]
struct Listener { struct Listener {
state: std::sync::Arc<std::sync::Mutex<State>>, enabled: bool,
stop: Option<oneshot::Sender<()>>, stop: Option<oneshot::Sender<()>>,
desc: Option<PortDesc>, desc: Option<PortDesc>,
} }
@ -96,15 +80,7 @@ impl Listener {
desc: PortDesc, desc: PortDesc,
enabled: bool, enabled: bool,
) -> Listener { ) -> Listener {
let mut listener = Listener { let mut listener = Listener { enabled, stop: None, desc: Some(desc) };
state: if enabled {
State::Enabled.boxed()
} else {
State::Disabled.boxed()
},
stop: None,
desc: Some(desc),
};
if enabled { if enabled {
listener.start(socks_port); listener.start(socks_port);
} }
@ -112,19 +88,15 @@ impl Listener {
} }
pub fn enabled(&self) -> bool { pub fn enabled(&self) -> bool {
*self.state.lock().unwrap() == State::Enabled self.enabled
}
fn state(&self) -> State {
*self.state.lock().unwrap()
} }
pub fn set_enabled(&mut self, socks_port: Option<u16>, enabled: bool) { pub fn set_enabled(&mut self, socks_port: Option<u16>, enabled: bool) {
if enabled { if enabled {
self.state = State::Enabled.boxed(); self.enabled = true;
self.start(socks_port); self.start(socks_port);
} else { } else {
self.state = State::Enabled.boxed(); self.enabled = false;
self.stop = None; self.stop = None;
} }
} }
@ -140,22 +112,19 @@ impl Listener {
} }
pub fn start(&mut self, socks_port: Option<u16>) { pub fn start(&mut self, socks_port: Option<u16>) {
if self.enabled() { if self.enabled {
if let (Some(desc), Some(socks_port), None) = if let (Some(desc), Some(socks_port), None) =
(&self.desc, socks_port, &self.stop) (&self.desc, socks_port, &self.stop)
{ {
info!("Starting port {port} to {socks_port}", port = desc.port); info!("Starting port {port} to {socks_port}", port = desc.port);
let (l, stop) = oneshot::channel(); let (l, stop) = oneshot::channel();
let port = desc.port; let port = desc.port;
let state = self.state.clone();
tokio::spawn(async move { tokio::spawn(async move {
let result = tokio::select! { let result = tokio::select! {
r = client_listen(port, socks_port) => r, r = client_listen(port, socks_port) => r,
_ = stop => Ok(()), _ = stop => Ok(()),
}; };
if let Err(e) = result { if let Err(e) = result {
let mut sg = state.lock().unwrap();
*sg = State::Broken;
error!("Error listening on port {port}: {e:?}"); error!("Error listening on port {port}: {e:?}");
} else { } else {
info!("Stopped listening on port {port}"); info!("Stopped listening on port {port}");
@ -274,8 +243,6 @@ impl UI {
fn render_ports<B: Backend>(&mut self, frame: &mut Frame<B>, size: Rect) { fn render_ports<B: Backend>(&mut self, frame: &mut Frame<B>, size: Rect) {
let enabled_port_style = Style::default(); let enabled_port_style = Style::default();
let disabled_port_style = Style::default().fg(Color::DarkGray); let disabled_port_style = Style::default().fg(Color::DarkGray);
let broken_port_style =
Style::default().fg(Color::Red).add_modifier(Modifier::DIM);
let mut rows = Vec::new(); let mut rows = Vec::new();
let ports = self.get_ui_ports(); let ports = self.get_ui_ports();
@ -283,21 +250,20 @@ impl UI {
ports.iter().map(|p| format!("{p}")).collect(); ports.iter().map(|p| format!("{p}")).collect();
for (index, port) in ports.into_iter().enumerate() { for (index, port) in ports.into_iter().enumerate() {
let listener = self.ports.get(&port).unwrap(); let listener = self.ports.get(&port).unwrap();
let (symbol, style) = match listener.state() {
State::Enabled => ("", enabled_port_style),
State::Broken => ("", broken_port_style),
State::Disabled => ("", disabled_port_style),
};
rows.push( rows.push(
Row::new(vec![ Row::new(vec![
symbol, if listener.enabled { "" } else { "" },
&port_strings[index][..], &port_strings[index][..],
match &listener.desc { match &listener.desc {
Some(port_desc) => &port_desc.desc, Some(port_desc) => &port_desc.desc,
None => "", None => "",
}, },
]) ])
.style(style), .style(if listener.enabled {
enabled_port_style
} else {
disabled_port_style
}),
); );
} }
@ -391,11 +357,7 @@ impl UI {
} }
fn enable_disable_port(&mut self, port: u16) { fn enable_disable_port(&mut self, port: u16) {
let state = self.ports.get(&port).map(Listener::state); if let Some(listener) = self.ports.get_mut(&port) {
if state == Some(State::Broken) {
// try turning it off and on again, it will at least get logs visible
self.ports.remove(&port);
} else if let Some(listener) = self.ports.get_mut(&port) {
listener.set_enabled(self.socks_port, !listener.enabled()); listener.set_enabled(self.socks_port, !listener.enabled());
} }
} }

View file

@ -26,10 +26,6 @@ Options:
run as root), but requires sudo access on the server and run as root), but requires sudo access on the server and
*might* end up forwarding ports that you do not want *might* end up forwarding ports that you do not want
forwarded (e.g., port 22 for sshd, or port 53 for systemd.) forwarded (e.g., port 22 for sshd, or port 53 for systemd.)
--log-filter FILTER
Set remote server's log level. Default is `warn`. Supports
all of Rust's env_logger filter syntax, e.g.
`--log-filter=fwd::trace`.
"}); "});
} }
@ -38,7 +34,7 @@ enum Args {
Help, Help,
Version, Version,
Server, Server,
Client(String, bool, String), Client(String, bool),
Browse(String), Browse(String),
Clip(String), Clip(String),
Error, Error,
@ -47,11 +43,9 @@ enum Args {
fn parse_args(args: Vec<String>) -> Args { fn parse_args(args: Vec<String>) -> Args {
let mut server = None; let mut server = None;
let mut sudo = None; let mut sudo = None;
let mut log_filter = None;
let mut rest = Vec::new(); let mut rest = Vec::new();
let mut arg_iter = args.into_iter().skip(1); for arg in args.into_iter().skip(1) {
while let Some(arg) = arg_iter.next() {
if arg == "--help" || arg == "-?" || arg == "-h" { if arg == "--help" || arg == "-?" || arg == "-h" {
return Args::Help; return Args::Help;
} else if arg == "--version" { } else if arg == "--version" {
@ -60,14 +54,6 @@ fn parse_args(args: Vec<String>) -> Args {
server = Some(true) server = Some(true)
} else if arg == "--sudo" || arg == "-s" { } else if arg == "--sudo" || arg == "-s" {
sudo = Some(true) sudo = Some(true)
} else if arg.starts_with("--log-filter") {
if arg.contains('=') {
log_filter = Some(arg.split('=').nth(1).unwrap().to_owned());
} else if let Some(arg) = arg_iter.next() {
log_filter = Some(arg);
} else {
return Args::Error;
}
} else { } else {
rest.push(arg) rest.push(arg)
} }
@ -85,32 +71,20 @@ fn parse_args(args: Vec<String>) -> Args {
if rest.len() == 2 { if rest.len() == 2 {
Args::Browse(rest[1].to_string()) Args::Browse(rest[1].to_string())
} else if rest.len() == 1 { } else if rest.len() == 1 {
Args::Client( Args::Client(rest[0].to_string(), sudo.unwrap_or(false))
rest[0].to_string(),
sudo.unwrap_or(false),
log_filter.unwrap_or("warn".to_owned()),
)
} else { } else {
Args::Error Args::Error
} }
} else if rest[0] == "clip" { } else if rest[0] == "clip" {
if rest.len() == 1 { if rest.len() == 1 {
Args::Client( Args::Client(rest[0].to_string(), sudo.unwrap_or(false))
rest[0].to_string(),
sudo.unwrap_or(false),
log_filter.unwrap_or("warn".to_owned()),
)
} else if rest.len() == 2 { } else if rest.len() == 2 {
Args::Clip(rest[1].to_string()) Args::Clip(rest[1].to_string())
} else { } else {
Args::Error Args::Error
} }
} else if rest.len() == 1 { } else if rest.len() == 1 {
Args::Client( Args::Client(rest[0].to_string(), sudo.unwrap_or(false))
rest[0].to_string(),
sudo.unwrap_or(false),
log_filter.unwrap_or("warn".to_owned()),
)
} else { } else {
Args::Error Args::Error
} }
@ -150,8 +124,8 @@ async fn main() {
Args::Clip(file) => { Args::Clip(file) => {
clip_file(file).await; clip_file(file).await;
} }
Args::Client(server, sudo, log_filter) => { Args::Client(server, sudo) => {
fwd::run_client(&server, sudo, &log_filter).await; fwd::run_client(&server, sudo).await;
} }
Args::Error => { Args::Error => {
usage(); usage();
@ -201,14 +175,14 @@ mod tests {
#[test] #[test]
fn client() { fn client() {
assert_arg_parse!(&["foo.com"], Args::Client(_, false, _)); assert_arg_parse!(&["foo.com"], Args::Client(_, false));
assert_arg_parse!(&["a"], Args::Client(_, false, _)); assert_arg_parse!(&["a"], Args::Client(_, false));
assert_arg_parse!(&["browse"], Args::Client(_, false, _)); assert_arg_parse!(&["browse"], Args::Client(_, false));
assert_arg_parse!(&["clip"], Args::Client(_, false, _)); assert_arg_parse!(&["clip"], Args::Client(_, false));
assert_arg_parse!(&["foo.com", "--sudo"], Args::Client(_, true, _)); assert_arg_parse!(&["foo.com", "--sudo"], Args::Client(_, true));
assert_arg_parse!(&["a", "-s"], Args::Client(_, true, _)); assert_arg_parse!(&["a", "-s"], Args::Client(_, true));
assert_arg_parse!(&["-s", "browse"], Args::Client(_, true, _)); assert_arg_parse!(&["-s", "browse"], Args::Client(_, true));
assert_arg_parse!(&["-s", "clip"], Args::Client(_, true, _)); assert_arg_parse!(&["-s", "clip"], Args::Client(_, true));
} }
#[test] #[test]

View file

@ -32,7 +32,7 @@ async fn server_loop<Reader: AsyncRead + Unpin>(
match reader.read().await? { match reader.read().await? {
Ping => (), Ping => (),
Refresh => { Refresh => {
let ports = match refresh::get_entries().await { let ports = match refresh::get_entries() {
Ok(ports) => ports, Ok(ports) => ports,
Err(e) => { Err(e) => {
error!("Error scanning: {:?}", e); error!("Error scanning: {:?}", e);
@ -82,10 +82,6 @@ async fn server_main<
} }
pub async fn run_server() { pub async fn run_server() {
env_logger::Builder::from_env(
env_logger::Env::new().filter_or("FWD_LOG", "warn"),
)
.init();
let stdin = tokio::io::stdin(); let stdin = tokio::io::stdin();
let stdout = tokio::io::stdout(); let stdout = tokio::io::stdout();
if let Err(e) = server_main(stdin, stdout).await { if let Err(e) = server_main(stdin, stdout).await {

View file

@ -1,17 +1,14 @@
use crate::message::PortDesc; use crate::message::PortDesc;
use anyhow::Result; use anyhow::Result;
#[cfg(target_os = "linux")]
use std::collections::HashMap;
#[cfg(not(target_os = "linux"))] #[cfg(not(target_os = "linux"))]
pub async fn get_entries() -> Result<Vec<PortDesc>> { pub fn get_entries() -> Result<Vec<PortDesc>> {
use anyhow::bail; use anyhow::bail;
bail!("Not supported on this operating system"); bail!("Not supported on this operating system");
} }
#[cfg(target_os = "linux")] #[cfg(target_os = "linux")]
pub async fn get_entries() -> Result<Vec<PortDesc>> { pub fn get_entries() -> Result<Vec<PortDesc>> {
let start = std::time::Instant::now();
use procfs::process::FDTarget; use procfs::process::FDTarget;
use std::collections::HashMap; use std::collections::HashMap;
@ -34,13 +31,8 @@ pub async fn get_entries() -> Result<Vec<PortDesc>> {
} }
} }
} }
log::trace!("procfs elapsed={:?}", start.elapsed());
let mut h = if let Ok(ports) = find_docker_ports().await { let mut h: HashMap<u16, PortDesc> = HashMap::new();
ports
} else {
HashMap::new()
};
// Go through all the listening IPv4 and IPv6 sockets and take the first // Go through all the listening IPv4 and IPv6 sockets and take the first
// instance of listening on each port *if* the address is loopback or // instance of listening on each port *if* the address is loopback or
@ -65,49 +57,5 @@ pub async fn get_entries() -> Result<Vec<PortDesc>> {
} }
} }
let vals = h.into_values().collect(); Ok(h.into_values().collect())
log::trace!("total portscan elapsed={:?}", start.elapsed());
Ok(vals)
}
#[cfg(target_os = "linux")]
async fn find_docker_ports(
) -> Result<HashMap<u16, PortDesc>, bollard::errors::Error> {
use bollard::container::ListContainersOptions;
use bollard::Docker;
let start = std::time::Instant::now();
let client = Docker::connect_with_defaults()?;
log::trace!("docker connect elapsed={:?}", start.elapsed());
let port_start = std::time::Instant::now();
let mut port_to_name = HashMap::new();
let opts: ListContainersOptions<String> =
ListContainersOptions { all: false, ..Default::default() };
for container in client.list_containers(Some(opts)).await? {
let name = container
.names
.into_iter()
.flatten()
.next()
.unwrap_or_else(|| "<unknown docker>".to_owned());
for port in container.ports.iter().flatten() {
if let Some(public_port) = port.public_port {
let private_port = port.private_port;
port_to_name.insert(
public_port,
PortDesc {
port: public_port,
desc: format!("{name} (docker->{private_port})"),
},
);
}
}
}
log::trace!(
"docker port elapsed={:?} total docker elapsed={:?}",
port_start.elapsed(),
start.elapsed()
);
Ok(port_to_name)
} }