Split client and server into modules

This commit is contained in:
John Doty 2022-10-17 19:57:28 -07:00
parent a3727812a2
commit 86ea099b2c
5 changed files with 447 additions and 443 deletions

58
src/server/mod.rs Normal file
View file

@ -0,0 +1,58 @@
use crate::message::{Message, MessageReader, MessageWriter};
use anyhow::Result;
use log::{error, warn};
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, BufWriter};
mod refresh;
async fn server_main<Reader: AsyncRead + Unpin, Writer: AsyncWrite + Unpin>(
reader: &mut MessageReader<Reader>,
writer: &mut MessageWriter<Writer>,
) -> Result<()> {
// The first message we send must be an announcement.
writer.write(Message::Hello(0, 1, vec![])).await?;
loop {
use Message::*;
match reader.read().await? {
Ping => (),
Refresh => {
let ports = match refresh::get_entries() {
Ok(ports) => ports,
Err(e) => {
error!("Error scanning: {:?}", e);
vec![]
}
};
if let Err(e) = writer.write(Message::Ports(ports)).await {
// Writer has been closed for some reason, we can just
// quit.... I hope everything is OK?
warn!("Warning: Error sending: {:?}", e);
}
}
message => panic!("Unsupported: {:?}", message),
};
}
}
pub async fn run_server() {
let reader = BufReader::new(tokio::io::stdin());
let mut writer = BufWriter::new(tokio::io::stdout());
// Write the 8-byte synchronization marker.
writer
.write_u64(0x00_00_00_00_00_00_00_00)
.await
.expect("Error writing marker");
if let Err(e) = writer.flush().await {
eprintln!("Error writing sync marker: {:?}", e);
return;
}
let mut writer = MessageWriter::new(writer);
let mut reader = MessageReader::new(reader);
if let Err(e) = server_main(&mut reader, &mut writer).await {
eprintln!("Error: {:?}", e);
}
}

65
src/server/refresh.rs Normal file
View file

@ -0,0 +1,65 @@
use crate::message::PortDesc;
use anyhow::Result;
#[cfg(not(target_os = "linux"))]
pub fn get_entries() -> Result<Vec<PortDesc>> {
use anyhow::bail;
bail!("Not supported on this operating system");
}
#[cfg(target_os = "linux")]
pub fn get_entries() -> Result<Vec<PortDesc>> {
use procfs::process::FDTarget;
use std::collections::HashMap;
let all_procs = procfs::process::all_processes()?;
// build up a map between socket inodes and process stat info. Ignore any
// error we encounter as it probably means we have no access to that
// process or something.
let mut map: HashMap<u64, String> = HashMap::new();
for p in all_procs {
if let Ok(process) = p {
if !process.is_alive() {
continue; // Ignore zombies.
}
if let (Ok(fds), Ok(cmd)) = (process.fd(), process.cmdline()) {
for fd in fds {
if let Ok(fd) = fd {
if let FDTarget::Socket(inode) = fd.target {
map.insert(inode, cmd.join(" "));
}
}
}
}
}
}
let mut h: HashMap<u16, PortDesc> = HashMap::new();
// Go through all the listening IPv4 and IPv6 sockets and take the first
// instance of listening on each port *if* the address is loopback or
// unspecified. (TODO: Do we want this restriction really?)
let tcp = procfs::net::tcp()?;
let tcp6 = procfs::net::tcp6()?;
for tcp_entry in tcp.into_iter().chain(tcp6) {
if tcp_entry.state == procfs::net::TcpState::Listen
&& (tcp_entry.local_address.ip().is_loopback()
|| tcp_entry.local_address.ip().is_unspecified())
&& !h.contains_key(&tcp_entry.local_address.port())
{
if let Some(cmd) = map.get(&tcp_entry.inode) {
h.insert(
tcp_entry.local_address.port(),
PortDesc {
port: tcp_entry.local_address.port(),
desc: cmd.clone(),
},
);
}
}
}
Ok(h.into_values().collect())
}