Enable/disable ports

This commit is contained in:
John Doty 2022-10-14 11:55:29 -07:00
parent e184bba39e
commit 3844200118
2 changed files with 253 additions and 194 deletions

View file

@ -3,7 +3,6 @@ use connection::ConnectionTable;
use log::LevelFilter;
use log::{error, info, warn};
use message::{Message, MessageReader, MessageWriter};
use std::collections::HashMap;
use std::net::{Ipv4Addr, SocketAddrV4};
use tokio::io::{
AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader,
@ -234,12 +233,9 @@ async fn client_listen(
async fn client_read<T: AsyncRead + Unpin>(
reader: &mut MessageReader<T>,
writer: mpsc::Sender<Message>,
connections: ConnectionTable,
events: mpsc::Sender<ui::UIEvent>,
) -> Result<()> {
let mut listeners: HashMap<u16, oneshot::Sender<()>> = HashMap::new();
info!("Running");
loop {
let message = reader.read().await?;
@ -267,41 +263,6 @@ async fn client_read<T: AsyncRead + Unpin>(
});
}
Ports(ports) => {
let mut new_listeners = HashMap::new();
for port in &ports {
let port = port.port;
if let Some(l) = listeners.remove(&port) {
if !l.is_closed() {
// `l` here is, of course, the channel that we
// use to tell the listener task to stop (see the
// spawn call below). If it isn't closed then
// that means a spawn task is still running so we
// should just let it keep running and re-use the
// existing listener.
new_listeners.insert(port, l);
}
}
if !new_listeners.contains_key(&port) {
let (l, stop) = oneshot::channel();
new_listeners.insert(port, l);
let (writer, connections) = (writer.clone(), connections.clone());
tokio::spawn(async move {
let result = tokio::select! {
r = client_listen(port, writer, connections) => r,
_ = stop => Ok(()),
};
if let Err(e) = result {
error!("Error listening on port {port}: {e:?}");
} else {
info!("Stopped listening on port {port}");
}
});
}
}
listeners = new_listeners;
if let Err(_) = events.send(ui::UIEvent::Ports(ports)).await {
// TODO: Log
}
@ -351,10 +312,16 @@ async fn client_main<Reader: AsyncRead + Unpin, Writer: AsyncWrite + Unpin>(
// And now really get into it...
let (msg_sender, mut msg_receiver) = mpsc::channel(32);
let refresher = msg_sender.clone(); // Special for loop.
_ = events
.send(ui::UIEvent::Connected(
msg_sender.clone(),
connections.clone(),
))
.await;
let writing = pump_write(&mut msg_receiver, writer);
let reading = client_read(reader, msg_sender, connections, events);
let reading = client_read(reader, connections, events);
tokio::pin!(reading);
tokio::pin!(writing);
@ -364,7 +331,7 @@ async fn client_main<Reader: AsyncRead + Unpin, Writer: AsyncWrite + Unpin>(
result = async {
loop {
use tokio::time::{sleep, Duration};
if let Err(e) = refresher.send(Message::Refresh).await {
if let Err(e) = msg_sender.send(Message::Refresh).await {
break Err::<(), _>(e);
}
sleep(Duration::from_millis(500)).await;
@ -458,8 +425,6 @@ async fn client_connect_loop(remote: &str, events: mpsc::Sender<ui::UIEvent>) {
continue;
}
_ = events.send(ui::UIEvent::Connected).await;
let mut writer = MessageWriter::new(BufWriter::new(writer));
let mut reader = MessageReader::new(reader);
@ -477,13 +442,15 @@ async fn client_connect_loop(remote: &str, events: mpsc::Sender<ui::UIEvent>) {
}
pub async fn run_client(remote: &str) {
let (event_sender, mut event_receiver) = mpsc::channel(1024);
let (event_sender, event_receiver) = mpsc::channel(1024);
_ = log::set_boxed_logger(ui::Logger::new(event_sender.clone()));
log::set_max_level(LevelFilter::Info);
let mut ui = ui::UI::new(event_receiver);
// Start the reconnect loop.
tokio::select! {
_ = ui::run_ui(&mut event_receiver) => (),
_ = ui.run() => (),
_ = client_connect_loop(remote, event_sender) => ()
}
}