Compare commits

...
Sign in to create a new pull request.

3 commits

Author SHA1 Message Date
Brandon W Maister
05c0a0a4de make server logging show messages in the frontend
now you can use the log crate and get messages in the frontend.
2024-07-31 15:35:11 -07:00
Brandon W Maister
6c10d8eece feat: Discover docker ports as well
If processes are running in a container then the fwd process
can't read their internal FDs without the CAP_SYS_ADMIN property
which is equivalent to sudo. Even with sudo, I think you need to do
a lot of work to be able to read them -- spawning a process within
the cgroup, doing work there, and then communicating back.

This just uses the docker api to populate some default ports, which
later get overwritten if fwd can find a native process.

The Docker port scan takes about 1.5ms, and the full port scan takes
40+ms, so this adds basically no overhead.
2024-07-31 15:32:45 -07:00
Brandon W Maister
66da323481 feat: Show errored ports as an error state 2024-07-31 15:32:04 -07:00
7 changed files with 838 additions and 140 deletions

771
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -15,9 +15,11 @@ bench = false
[dependencies] [dependencies]
anyhow = "1.0" anyhow = "1.0"
bollard = "0.17.0"
bytes = "1" bytes = "1"
copypasta = "0.10.1" copypasta = "0.10.1"
crossterm = { version = "0.25", features = ["event-stream"] } crossterm = { version = "0.25", features = ["event-stream"] }
env_logger = { version = "0.11.5", default-features = false }
home = "0.5.4" home = "0.5.4"
indoc = "1" indoc = "1"
log = { version = "0.4", features = ["std"] } log = { version = "0.4", features = ["std"] }

View file

@ -323,6 +323,7 @@ async fn client_main<Reader: AsyncRead + Unpin, Writer: AsyncWrite + Unpin>(
async fn spawn_ssh( async fn spawn_ssh(
server: &str, server: &str,
sudo: bool, sudo: bool,
log_filter: &str,
) -> Result<(tokio::process::Child, u16), std::io::Error> { ) -> Result<(tokio::process::Child, u16), std::io::Error> {
let socks_port = { let socks_port = {
let listener = TcpListener::bind("127.0.0.1:0").await?; let listener = TcpListener::bind("127.0.0.1:0").await?;
@ -337,7 +338,9 @@ async fn spawn_ssh(
if sudo { if sudo {
cmd.arg("sudo"); cmd.arg("sudo");
} }
cmd.arg("fwd").arg("--server"); cmd.arg(format!("FWD_LOG={log_filter}"))
.arg("fwd")
.arg("--server");
cmd.stdout(std::process::Stdio::piped()); cmd.stdout(std::process::Stdio::piped());
cmd.stdin(std::process::Stdio::piped()); cmd.stdin(std::process::Stdio::piped());
@ -367,13 +370,15 @@ fn is_sigint(status: std::process::ExitStatus) -> bool {
async fn client_connect_loop( async fn client_connect_loop(
remote: &str, remote: &str,
sudo: bool, sudo: bool,
log_filter: &str,
events: mpsc::Sender<ui::UIEvent>, events: mpsc::Sender<ui::UIEvent>,
) { ) {
loop { loop {
_ = events.send(ui::UIEvent::Disconnected).await; _ = events.send(ui::UIEvent::Disconnected).await;
let (mut child, socks_port) = let (mut child, socks_port) = spawn_ssh(remote, sudo, log_filter)
spawn_ssh(remote, sudo).await.expect("failed to spawn"); .await
.expect("failed to spawn");
let mut stderr = child let mut stderr = child
.stderr .stderr
@ -431,7 +436,7 @@ async fn client_connect_loop(
} }
} }
pub async fn run_client(remote: &str, sudo: bool) { pub async fn run_client(remote: &str, sudo: bool, log_filter: &str) {
let (event_sender, event_receiver) = mpsc::channel(1024); let (event_sender, event_receiver) = mpsc::channel(1024);
_ = log::set_boxed_logger(ui::Logger::new(event_sender.clone())); _ = log::set_boxed_logger(ui::Logger::new(event_sender.clone()));
log::set_max_level(LevelFilter::Info); log::set_max_level(LevelFilter::Info);
@ -449,7 +454,7 @@ pub async fn run_client(remote: &str, sudo: bool) {
// Start the reconnect loop. // Start the reconnect loop.
tokio::select! { tokio::select! {
_ = ui.run() => (), _ = ui.run() => (),
_ = client_connect_loop(remote, sudo, event_sender) => () _ = client_connect_loop(remote, sudo, log_filter, event_sender) => ()
} }
} }

View file

@ -10,16 +10,19 @@ use crossterm::{
}, },
}; };
use log::{error, info, Level, Metadata, Record}; use log::{error, info, Level, Metadata, Record};
use std::collections::vec_deque::VecDeque;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::io::stdout; use std::io::stdout;
use std::{
collections::vec_deque::VecDeque,
sync::{Arc, Mutex},
};
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::sync::oneshot; use tokio::sync::oneshot;
use tokio_stream::StreamExt; use tokio_stream::StreamExt;
use tui::{ use tui::{
backend::{Backend, CrosstermBackend}, backend::{Backend, CrosstermBackend},
layout::{Constraint, Direction, Layout, Margin, Rect}, layout::{Constraint, Direction, Layout, Margin, Rect},
style::{Color, Style}, style::{Color, Modifier, Style},
widgets::{ widgets::{
Block, Borders, List, ListItem, ListState, Row, Table, TableState, Block, Borders, List, ListItem, ListState, Row, Table, TableState,
}, },
@ -67,9 +70,22 @@ impl log::Log for Logger {
fn flush(&self) {} fn flush(&self) {}
} }
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum State {
Enabled,
Broken,
Disabled,
}
impl State {
fn boxed(self) -> Arc<Mutex<State>> {
Arc::new(Mutex::new(self))
}
}
#[derive(Debug)] #[derive(Debug)]
struct Listener { struct Listener {
enabled: bool, state: std::sync::Arc<std::sync::Mutex<State>>,
stop: Option<oneshot::Sender<()>>, stop: Option<oneshot::Sender<()>>,
desc: Option<PortDesc>, desc: Option<PortDesc>,
} }
@ -80,7 +96,15 @@ impl Listener {
desc: PortDesc, desc: PortDesc,
enabled: bool, enabled: bool,
) -> Listener { ) -> Listener {
let mut listener = Listener { enabled, stop: None, desc: Some(desc) }; let mut listener = Listener {
state: if enabled {
State::Enabled.boxed()
} else {
State::Disabled.boxed()
},
stop: None,
desc: Some(desc),
};
if enabled { if enabled {
listener.start(socks_port); listener.start(socks_port);
} }
@ -88,15 +112,19 @@ impl Listener {
} }
pub fn enabled(&self) -> bool { pub fn enabled(&self) -> bool {
self.enabled *self.state.lock().unwrap() == State::Enabled
}
fn state(&self) -> State {
*self.state.lock().unwrap()
} }
pub fn set_enabled(&mut self, socks_port: Option<u16>, enabled: bool) { pub fn set_enabled(&mut self, socks_port: Option<u16>, enabled: bool) {
if enabled { if enabled {
self.enabled = true; self.state = State::Enabled.boxed();
self.start(socks_port); self.start(socks_port);
} else { } else {
self.enabled = false; self.state = State::Enabled.boxed();
self.stop = None; self.stop = None;
} }
} }
@ -112,19 +140,22 @@ impl Listener {
} }
pub fn start(&mut self, socks_port: Option<u16>) { pub fn start(&mut self, socks_port: Option<u16>) {
if self.enabled { if self.enabled() {
if let (Some(desc), Some(socks_port), None) = if let (Some(desc), Some(socks_port), None) =
(&self.desc, socks_port, &self.stop) (&self.desc, socks_port, &self.stop)
{ {
info!("Starting port {port} to {socks_port}", port = desc.port); info!("Starting port {port} to {socks_port}", port = desc.port);
let (l, stop) = oneshot::channel(); let (l, stop) = oneshot::channel();
let port = desc.port; let port = desc.port;
let state = self.state.clone();
tokio::spawn(async move { tokio::spawn(async move {
let result = tokio::select! { let result = tokio::select! {
r = client_listen(port, socks_port) => r, r = client_listen(port, socks_port) => r,
_ = stop => Ok(()), _ = stop => Ok(()),
}; };
if let Err(e) = result { if let Err(e) = result {
let mut sg = state.lock().unwrap();
*sg = State::Broken;
error!("Error listening on port {port}: {e:?}"); error!("Error listening on port {port}: {e:?}");
} else { } else {
info!("Stopped listening on port {port}"); info!("Stopped listening on port {port}");
@ -243,6 +274,8 @@ impl UI {
fn render_ports<B: Backend>(&mut self, frame: &mut Frame<B>, size: Rect) { fn render_ports<B: Backend>(&mut self, frame: &mut Frame<B>, size: Rect) {
let enabled_port_style = Style::default(); let enabled_port_style = Style::default();
let disabled_port_style = Style::default().fg(Color::DarkGray); let disabled_port_style = Style::default().fg(Color::DarkGray);
let broken_port_style =
Style::default().fg(Color::Red).add_modifier(Modifier::DIM);
let mut rows = Vec::new(); let mut rows = Vec::new();
let ports = self.get_ui_ports(); let ports = self.get_ui_ports();
@ -250,20 +283,21 @@ impl UI {
ports.iter().map(|p| format!("{p}")).collect(); ports.iter().map(|p| format!("{p}")).collect();
for (index, port) in ports.into_iter().enumerate() { for (index, port) in ports.into_iter().enumerate() {
let listener = self.ports.get(&port).unwrap(); let listener = self.ports.get(&port).unwrap();
let (symbol, style) = match listener.state() {
State::Enabled => ("", enabled_port_style),
State::Broken => ("", broken_port_style),
State::Disabled => ("", disabled_port_style),
};
rows.push( rows.push(
Row::new(vec![ Row::new(vec![
if listener.enabled { "" } else { "" }, symbol,
&port_strings[index][..], &port_strings[index][..],
match &listener.desc { match &listener.desc {
Some(port_desc) => &port_desc.desc, Some(port_desc) => &port_desc.desc,
None => "", None => "",
}, },
]) ])
.style(if listener.enabled { .style(style),
enabled_port_style
} else {
disabled_port_style
}),
); );
} }
@ -357,7 +391,11 @@ impl UI {
} }
fn enable_disable_port(&mut self, port: u16) { fn enable_disable_port(&mut self, port: u16) {
if let Some(listener) = self.ports.get_mut(&port) { let state = self.ports.get(&port).map(Listener::state);
if state == Some(State::Broken) {
// try turning it off and on again, it will at least get logs visible
self.ports.remove(&port);
} else if let Some(listener) = self.ports.get_mut(&port) {
listener.set_enabled(self.socks_port, !listener.enabled()); listener.set_enabled(self.socks_port, !listener.enabled());
} }
} }

View file

@ -26,6 +26,10 @@ Options:
run as root), but requires sudo access on the server and run as root), but requires sudo access on the server and
*might* end up forwarding ports that you do not want *might* end up forwarding ports that you do not want
forwarded (e.g., port 22 for sshd, or port 53 for systemd.) forwarded (e.g., port 22 for sshd, or port 53 for systemd.)
--log-filter FILTER
Set remote server's log level. Default is `warn`. Supports
all of Rust's env_logger filter syntax, e.g.
`--log-filter=fwd::trace`.
"}); "});
} }
@ -34,7 +38,7 @@ enum Args {
Help, Help,
Version, Version,
Server, Server,
Client(String, bool), Client(String, bool, String),
Browse(String), Browse(String),
Clip(String), Clip(String),
Error, Error,
@ -43,9 +47,11 @@ enum Args {
fn parse_args(args: Vec<String>) -> Args { fn parse_args(args: Vec<String>) -> Args {
let mut server = None; let mut server = None;
let mut sudo = None; let mut sudo = None;
let mut log_filter = None;
let mut rest = Vec::new(); let mut rest = Vec::new();
for arg in args.into_iter().skip(1) { let mut arg_iter = args.into_iter().skip(1);
while let Some(arg) = arg_iter.next() {
if arg == "--help" || arg == "-?" || arg == "-h" { if arg == "--help" || arg == "-?" || arg == "-h" {
return Args::Help; return Args::Help;
} else if arg == "--version" { } else if arg == "--version" {
@ -54,6 +60,14 @@ fn parse_args(args: Vec<String>) -> Args {
server = Some(true) server = Some(true)
} else if arg == "--sudo" || arg == "-s" { } else if arg == "--sudo" || arg == "-s" {
sudo = Some(true) sudo = Some(true)
} else if arg.starts_with("--log-filter") {
if arg.contains('=') {
log_filter = Some(arg.split('=').nth(1).unwrap().to_owned());
} else if let Some(arg) = arg_iter.next() {
log_filter = Some(arg);
} else {
return Args::Error;
}
} else { } else {
rest.push(arg) rest.push(arg)
} }
@ -71,20 +85,32 @@ fn parse_args(args: Vec<String>) -> Args {
if rest.len() == 2 { if rest.len() == 2 {
Args::Browse(rest[1].to_string()) Args::Browse(rest[1].to_string())
} else if rest.len() == 1 { } else if rest.len() == 1 {
Args::Client(rest[0].to_string(), sudo.unwrap_or(false)) Args::Client(
rest[0].to_string(),
sudo.unwrap_or(false),
log_filter.unwrap_or("warn".to_owned()),
)
} else { } else {
Args::Error Args::Error
} }
} else if rest[0] == "clip" { } else if rest[0] == "clip" {
if rest.len() == 1 { if rest.len() == 1 {
Args::Client(rest[0].to_string(), sudo.unwrap_or(false)) Args::Client(
rest[0].to_string(),
sudo.unwrap_or(false),
log_filter.unwrap_or("warn".to_owned()),
)
} else if rest.len() == 2 { } else if rest.len() == 2 {
Args::Clip(rest[1].to_string()) Args::Clip(rest[1].to_string())
} else { } else {
Args::Error Args::Error
} }
} else if rest.len() == 1 { } else if rest.len() == 1 {
Args::Client(rest[0].to_string(), sudo.unwrap_or(false)) Args::Client(
rest[0].to_string(),
sudo.unwrap_or(false),
log_filter.unwrap_or("warn".to_owned()),
)
} else { } else {
Args::Error Args::Error
} }
@ -124,8 +150,8 @@ async fn main() {
Args::Clip(file) => { Args::Clip(file) => {
clip_file(file).await; clip_file(file).await;
} }
Args::Client(server, sudo) => { Args::Client(server, sudo, log_filter) => {
fwd::run_client(&server, sudo).await; fwd::run_client(&server, sudo, &log_filter).await;
} }
Args::Error => { Args::Error => {
usage(); usage();
@ -175,14 +201,14 @@ mod tests {
#[test] #[test]
fn client() { fn client() {
assert_arg_parse!(&["foo.com"], Args::Client(_, false)); assert_arg_parse!(&["foo.com"], Args::Client(_, false, _));
assert_arg_parse!(&["a"], Args::Client(_, false)); assert_arg_parse!(&["a"], Args::Client(_, false, _));
assert_arg_parse!(&["browse"], Args::Client(_, false)); assert_arg_parse!(&["browse"], Args::Client(_, false, _));
assert_arg_parse!(&["clip"], Args::Client(_, false)); assert_arg_parse!(&["clip"], Args::Client(_, false, _));
assert_arg_parse!(&["foo.com", "--sudo"], Args::Client(_, true)); assert_arg_parse!(&["foo.com", "--sudo"], Args::Client(_, true, _));
assert_arg_parse!(&["a", "-s"], Args::Client(_, true)); assert_arg_parse!(&["a", "-s"], Args::Client(_, true, _));
assert_arg_parse!(&["-s", "browse"], Args::Client(_, true)); assert_arg_parse!(&["-s", "browse"], Args::Client(_, true, _));
assert_arg_parse!(&["-s", "clip"], Args::Client(_, true)); assert_arg_parse!(&["-s", "clip"], Args::Client(_, true, _));
} }
#[test] #[test]

View file

@ -32,7 +32,7 @@ async fn server_loop<Reader: AsyncRead + Unpin>(
match reader.read().await? { match reader.read().await? {
Ping => (), Ping => (),
Refresh => { Refresh => {
let ports = match refresh::get_entries() { let ports = match refresh::get_entries().await {
Ok(ports) => ports, Ok(ports) => ports,
Err(e) => { Err(e) => {
error!("Error scanning: {:?}", e); error!("Error scanning: {:?}", e);
@ -82,6 +82,10 @@ async fn server_main<
} }
pub async fn run_server() { pub async fn run_server() {
env_logger::Builder::from_env(
env_logger::Env::new().filter_or("FWD_LOG", "warn"),
)
.init();
let stdin = tokio::io::stdin(); let stdin = tokio::io::stdin();
let stdout = tokio::io::stdout(); let stdout = tokio::io::stdout();
if let Err(e) = server_main(stdin, stdout).await { if let Err(e) = server_main(stdin, stdout).await {

View file

@ -1,14 +1,17 @@
use crate::message::PortDesc; use crate::message::PortDesc;
use anyhow::Result; use anyhow::Result;
#[cfg(target_os = "linux")]
use std::collections::HashMap;
#[cfg(not(target_os = "linux"))] #[cfg(not(target_os = "linux"))]
pub fn get_entries() -> Result<Vec<PortDesc>> { pub async fn get_entries() -> Result<Vec<PortDesc>> {
use anyhow::bail; use anyhow::bail;
bail!("Not supported on this operating system"); bail!("Not supported on this operating system");
} }
#[cfg(target_os = "linux")] #[cfg(target_os = "linux")]
pub fn get_entries() -> Result<Vec<PortDesc>> { pub async fn get_entries() -> Result<Vec<PortDesc>> {
let start = std::time::Instant::now();
use procfs::process::FDTarget; use procfs::process::FDTarget;
use std::collections::HashMap; use std::collections::HashMap;
@ -31,8 +34,13 @@ pub fn get_entries() -> Result<Vec<PortDesc>> {
} }
} }
} }
log::trace!("procfs elapsed={:?}", start.elapsed());
let mut h: HashMap<u16, PortDesc> = HashMap::new(); let mut h = if let Ok(ports) = find_docker_ports().await {
ports
} else {
HashMap::new()
};
// Go through all the listening IPv4 and IPv6 sockets and take the first // Go through all the listening IPv4 and IPv6 sockets and take the first
// instance of listening on each port *if* the address is loopback or // instance of listening on each port *if* the address is loopback or
@ -57,5 +65,49 @@ pub fn get_entries() -> Result<Vec<PortDesc>> {
} }
} }
Ok(h.into_values().collect()) let vals = h.into_values().collect();
log::trace!("total portscan elapsed={:?}", start.elapsed());
Ok(vals)
}
#[cfg(target_os = "linux")]
async fn find_docker_ports(
) -> Result<HashMap<u16, PortDesc>, bollard::errors::Error> {
use bollard::container::ListContainersOptions;
use bollard::Docker;
let start = std::time::Instant::now();
let client = Docker::connect_with_defaults()?;
log::trace!("docker connect elapsed={:?}", start.elapsed());
let port_start = std::time::Instant::now();
let mut port_to_name = HashMap::new();
let opts: ListContainersOptions<String> =
ListContainersOptions { all: false, ..Default::default() };
for container in client.list_containers(Some(opts)).await? {
let name = container
.names
.into_iter()
.flatten()
.next()
.unwrap_or_else(|| "<unknown docker>".to_owned());
for port in container.ports.iter().flatten() {
if let Some(public_port) = port.public_port {
let private_port = port.private_port;
port_to_name.insert(
public_port,
PortDesc {
port: public_port,
desc: format!("{name} (docker->{private_port})"),
},
);
}
}
}
log::trace!(
"docker port elapsed={:?} total docker elapsed={:?}",
port_start.elapsed(),
start.elapsed()
);
Ok(port_to_name)
} }