Compare commits

...
Sign in to create a new pull request.

3 commits

Author SHA1 Message Date
Brandon W Maister
05c0a0a4de make server logging show messages in the frontend
now you can use the log crate and get messages in the frontend.
2024-07-31 15:35:11 -07:00
Brandon W Maister
6c10d8eece feat: Discover docker ports as well
If processes are running in a container then the fwd process
can't read their internal FDs without the CAP_SYS_ADMIN property
which is equivalent to sudo. Even with sudo, I think you need to do
a lot of work to be able to read them -- spawning a process within
the cgroup, doing work there, and then communicating back.

This just uses the docker api to populate some default ports, which
later get overwritten if fwd can find a native process.

The Docker port scan takes about 1.5ms, and the full port scan takes
40+ms, so this adds basically no overhead.
2024-07-31 15:32:45 -07:00
Brandon W Maister
66da323481 feat: Show errored ports as an error state 2024-07-31 15:32:04 -07:00
7 changed files with 838 additions and 140 deletions

771
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -15,9 +15,11 @@ bench = false
[dependencies]
anyhow = "1.0"
bollard = "0.17.0"
bytes = "1"
copypasta = "0.10.1"
crossterm = { version = "0.25", features = ["event-stream"] }
env_logger = { version = "0.11.5", default-features = false }
home = "0.5.4"
indoc = "1"
log = { version = "0.4", features = ["std"] }

View file

@ -323,6 +323,7 @@ async fn client_main<Reader: AsyncRead + Unpin, Writer: AsyncWrite + Unpin>(
async fn spawn_ssh(
server: &str,
sudo: bool,
log_filter: &str,
) -> Result<(tokio::process::Child, u16), std::io::Error> {
let socks_port = {
let listener = TcpListener::bind("127.0.0.1:0").await?;
@ -337,7 +338,9 @@ async fn spawn_ssh(
if sudo {
cmd.arg("sudo");
}
cmd.arg("fwd").arg("--server");
cmd.arg(format!("FWD_LOG={log_filter}"))
.arg("fwd")
.arg("--server");
cmd.stdout(std::process::Stdio::piped());
cmd.stdin(std::process::Stdio::piped());
@ -367,13 +370,15 @@ fn is_sigint(status: std::process::ExitStatus) -> bool {
async fn client_connect_loop(
remote: &str,
sudo: bool,
log_filter: &str,
events: mpsc::Sender<ui::UIEvent>,
) {
loop {
_ = events.send(ui::UIEvent::Disconnected).await;
let (mut child, socks_port) =
spawn_ssh(remote, sudo).await.expect("failed to spawn");
let (mut child, socks_port) = spawn_ssh(remote, sudo, log_filter)
.await
.expect("failed to spawn");
let mut stderr = child
.stderr
@ -431,7 +436,7 @@ async fn client_connect_loop(
}
}
pub async fn run_client(remote: &str, sudo: bool) {
pub async fn run_client(remote: &str, sudo: bool, log_filter: &str) {
let (event_sender, event_receiver) = mpsc::channel(1024);
_ = log::set_boxed_logger(ui::Logger::new(event_sender.clone()));
log::set_max_level(LevelFilter::Info);
@ -449,7 +454,7 @@ pub async fn run_client(remote: &str, sudo: bool) {
// Start the reconnect loop.
tokio::select! {
_ = ui.run() => (),
_ = client_connect_loop(remote, sudo, event_sender) => ()
_ = client_connect_loop(remote, sudo, log_filter, event_sender) => ()
}
}

View file

@ -10,16 +10,19 @@ use crossterm::{
},
};
use log::{error, info, Level, Metadata, Record};
use std::collections::vec_deque::VecDeque;
use std::collections::{HashMap, HashSet};
use std::io::stdout;
use std::{
collections::vec_deque::VecDeque,
sync::{Arc, Mutex},
};
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio_stream::StreamExt;
use tui::{
backend::{Backend, CrosstermBackend},
layout::{Constraint, Direction, Layout, Margin, Rect},
style::{Color, Style},
style::{Color, Modifier, Style},
widgets::{
Block, Borders, List, ListItem, ListState, Row, Table, TableState,
},
@ -67,9 +70,22 @@ impl log::Log for Logger {
fn flush(&self) {}
}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum State {
Enabled,
Broken,
Disabled,
}
impl State {
fn boxed(self) -> Arc<Mutex<State>> {
Arc::new(Mutex::new(self))
}
}
#[derive(Debug)]
struct Listener {
enabled: bool,
state: std::sync::Arc<std::sync::Mutex<State>>,
stop: Option<oneshot::Sender<()>>,
desc: Option<PortDesc>,
}
@ -80,7 +96,15 @@ impl Listener {
desc: PortDesc,
enabled: bool,
) -> Listener {
let mut listener = Listener { enabled, stop: None, desc: Some(desc) };
let mut listener = Listener {
state: if enabled {
State::Enabled.boxed()
} else {
State::Disabled.boxed()
},
stop: None,
desc: Some(desc),
};
if enabled {
listener.start(socks_port);
}
@ -88,15 +112,19 @@ impl Listener {
}
pub fn enabled(&self) -> bool {
self.enabled
*self.state.lock().unwrap() == State::Enabled
}
fn state(&self) -> State {
*self.state.lock().unwrap()
}
pub fn set_enabled(&mut self, socks_port: Option<u16>, enabled: bool) {
if enabled {
self.enabled = true;
self.state = State::Enabled.boxed();
self.start(socks_port);
} else {
self.enabled = false;
self.state = State::Enabled.boxed();
self.stop = None;
}
}
@ -112,19 +140,22 @@ impl Listener {
}
pub fn start(&mut self, socks_port: Option<u16>) {
if self.enabled {
if self.enabled() {
if let (Some(desc), Some(socks_port), None) =
(&self.desc, socks_port, &self.stop)
{
info!("Starting port {port} to {socks_port}", port = desc.port);
let (l, stop) = oneshot::channel();
let port = desc.port;
let state = self.state.clone();
tokio::spawn(async move {
let result = tokio::select! {
r = client_listen(port, socks_port) => r,
_ = stop => Ok(()),
};
if let Err(e) = result {
let mut sg = state.lock().unwrap();
*sg = State::Broken;
error!("Error listening on port {port}: {e:?}");
} else {
info!("Stopped listening on port {port}");
@ -243,6 +274,8 @@ impl UI {
fn render_ports<B: Backend>(&mut self, frame: &mut Frame<B>, size: Rect) {
let enabled_port_style = Style::default();
let disabled_port_style = Style::default().fg(Color::DarkGray);
let broken_port_style =
Style::default().fg(Color::Red).add_modifier(Modifier::DIM);
let mut rows = Vec::new();
let ports = self.get_ui_ports();
@ -250,20 +283,21 @@ impl UI {
ports.iter().map(|p| format!("{p}")).collect();
for (index, port) in ports.into_iter().enumerate() {
let listener = self.ports.get(&port).unwrap();
let (symbol, style) = match listener.state() {
State::Enabled => ("", enabled_port_style),
State::Broken => ("", broken_port_style),
State::Disabled => ("", disabled_port_style),
};
rows.push(
Row::new(vec![
if listener.enabled { "" } else { "" },
symbol,
&port_strings[index][..],
match &listener.desc {
Some(port_desc) => &port_desc.desc,
None => "",
},
])
.style(if listener.enabled {
enabled_port_style
} else {
disabled_port_style
}),
.style(style),
);
}
@ -357,7 +391,11 @@ impl UI {
}
fn enable_disable_port(&mut self, port: u16) {
if let Some(listener) = self.ports.get_mut(&port) {
let state = self.ports.get(&port).map(Listener::state);
if state == Some(State::Broken) {
// try turning it off and on again, it will at least get logs visible
self.ports.remove(&port);
} else if let Some(listener) = self.ports.get_mut(&port) {
listener.set_enabled(self.socks_port, !listener.enabled());
}
}

View file

@ -26,6 +26,10 @@ Options:
run as root), but requires sudo access on the server and
*might* end up forwarding ports that you do not want
forwarded (e.g., port 22 for sshd, or port 53 for systemd.)
--log-filter FILTER
Set remote server's log level. Default is `warn`. Supports
all of Rust's env_logger filter syntax, e.g.
`--log-filter=fwd::trace`.
"});
}
@ -34,7 +38,7 @@ enum Args {
Help,
Version,
Server,
Client(String, bool),
Client(String, bool, String),
Browse(String),
Clip(String),
Error,
@ -43,9 +47,11 @@ enum Args {
fn parse_args(args: Vec<String>) -> Args {
let mut server = None;
let mut sudo = None;
let mut log_filter = None;
let mut rest = Vec::new();
for arg in args.into_iter().skip(1) {
let mut arg_iter = args.into_iter().skip(1);
while let Some(arg) = arg_iter.next() {
if arg == "--help" || arg == "-?" || arg == "-h" {
return Args::Help;
} else if arg == "--version" {
@ -54,6 +60,14 @@ fn parse_args(args: Vec<String>) -> Args {
server = Some(true)
} else if arg == "--sudo" || arg == "-s" {
sudo = Some(true)
} else if arg.starts_with("--log-filter") {
if arg.contains('=') {
log_filter = Some(arg.split('=').nth(1).unwrap().to_owned());
} else if let Some(arg) = arg_iter.next() {
log_filter = Some(arg);
} else {
return Args::Error;
}
} else {
rest.push(arg)
}
@ -71,20 +85,32 @@ fn parse_args(args: Vec<String>) -> Args {
if rest.len() == 2 {
Args::Browse(rest[1].to_string())
} else if rest.len() == 1 {
Args::Client(rest[0].to_string(), sudo.unwrap_or(false))
Args::Client(
rest[0].to_string(),
sudo.unwrap_or(false),
log_filter.unwrap_or("warn".to_owned()),
)
} else {
Args::Error
}
} else if rest[0] == "clip" {
if rest.len() == 1 {
Args::Client(rest[0].to_string(), sudo.unwrap_or(false))
Args::Client(
rest[0].to_string(),
sudo.unwrap_or(false),
log_filter.unwrap_or("warn".to_owned()),
)
} else if rest.len() == 2 {
Args::Clip(rest[1].to_string())
} else {
Args::Error
}
} else if rest.len() == 1 {
Args::Client(rest[0].to_string(), sudo.unwrap_or(false))
Args::Client(
rest[0].to_string(),
sudo.unwrap_or(false),
log_filter.unwrap_or("warn".to_owned()),
)
} else {
Args::Error
}
@ -124,8 +150,8 @@ async fn main() {
Args::Clip(file) => {
clip_file(file).await;
}
Args::Client(server, sudo) => {
fwd::run_client(&server, sudo).await;
Args::Client(server, sudo, log_filter) => {
fwd::run_client(&server, sudo, &log_filter).await;
}
Args::Error => {
usage();
@ -175,14 +201,14 @@ mod tests {
#[test]
fn client() {
assert_arg_parse!(&["foo.com"], Args::Client(_, false));
assert_arg_parse!(&["a"], Args::Client(_, false));
assert_arg_parse!(&["browse"], Args::Client(_, false));
assert_arg_parse!(&["clip"], Args::Client(_, false));
assert_arg_parse!(&["foo.com", "--sudo"], Args::Client(_, true));
assert_arg_parse!(&["a", "-s"], Args::Client(_, true));
assert_arg_parse!(&["-s", "browse"], Args::Client(_, true));
assert_arg_parse!(&["-s", "clip"], Args::Client(_, true));
assert_arg_parse!(&["foo.com"], Args::Client(_, false, _));
assert_arg_parse!(&["a"], Args::Client(_, false, _));
assert_arg_parse!(&["browse"], Args::Client(_, false, _));
assert_arg_parse!(&["clip"], Args::Client(_, false, _));
assert_arg_parse!(&["foo.com", "--sudo"], Args::Client(_, true, _));
assert_arg_parse!(&["a", "-s"], Args::Client(_, true, _));
assert_arg_parse!(&["-s", "browse"], Args::Client(_, true, _));
assert_arg_parse!(&["-s", "clip"], Args::Client(_, true, _));
}
#[test]

View file

@ -32,7 +32,7 @@ async fn server_loop<Reader: AsyncRead + Unpin>(
match reader.read().await? {
Ping => (),
Refresh => {
let ports = match refresh::get_entries() {
let ports = match refresh::get_entries().await {
Ok(ports) => ports,
Err(e) => {
error!("Error scanning: {:?}", e);
@ -82,6 +82,10 @@ async fn server_main<
}
pub async fn run_server() {
env_logger::Builder::from_env(
env_logger::Env::new().filter_or("FWD_LOG", "warn"),
)
.init();
let stdin = tokio::io::stdin();
let stdout = tokio::io::stdout();
if let Err(e) = server_main(stdin, stdout).await {

View file

@ -1,14 +1,17 @@
use crate::message::PortDesc;
use anyhow::Result;
#[cfg(target_os = "linux")]
use std::collections::HashMap;
#[cfg(not(target_os = "linux"))]
pub fn get_entries() -> Result<Vec<PortDesc>> {
pub async fn get_entries() -> Result<Vec<PortDesc>> {
use anyhow::bail;
bail!("Not supported on this operating system");
}
#[cfg(target_os = "linux")]
pub fn get_entries() -> Result<Vec<PortDesc>> {
pub async fn get_entries() -> Result<Vec<PortDesc>> {
let start = std::time::Instant::now();
use procfs::process::FDTarget;
use std::collections::HashMap;
@ -31,8 +34,13 @@ pub fn get_entries() -> Result<Vec<PortDesc>> {
}
}
}
log::trace!("procfs elapsed={:?}", start.elapsed());
let mut h: HashMap<u16, PortDesc> = HashMap::new();
let mut h = if let Ok(ports) = find_docker_ports().await {
ports
} else {
HashMap::new()
};
// Go through all the listening IPv4 and IPv6 sockets and take the first
// instance of listening on each port *if* the address is loopback or
@ -57,5 +65,49 @@ pub fn get_entries() -> Result<Vec<PortDesc>> {
}
}
Ok(h.into_values().collect())
let vals = h.into_values().collect();
log::trace!("total portscan elapsed={:?}", start.elapsed());
Ok(vals)
}
#[cfg(target_os = "linux")]
async fn find_docker_ports(
) -> Result<HashMap<u16, PortDesc>, bollard::errors::Error> {
use bollard::container::ListContainersOptions;
use bollard::Docker;
let start = std::time::Instant::now();
let client = Docker::connect_with_defaults()?;
log::trace!("docker connect elapsed={:?}", start.elapsed());
let port_start = std::time::Instant::now();
let mut port_to_name = HashMap::new();
let opts: ListContainersOptions<String> =
ListContainersOptions { all: false, ..Default::default() };
for container in client.list_containers(Some(opts)).await? {
let name = container
.names
.into_iter()
.flatten()
.next()
.unwrap_or_else(|| "<unknown docker>".to_owned());
for port in container.ports.iter().flatten() {
if let Some(public_port) = port.public_port {
let private_port = port.private_port;
port_to_name.insert(
public_port,
PortDesc {
port: public_port,
desc: format!("{name} (docker->{private_port})"),
},
);
}
}
}
log::trace!(
"docker port elapsed={:?} total docker elapsed={:?}",
port_start.elapsed(),
start.elapsed()
);
Ok(port_to_name)
}