Start writing I guess
This commit is contained in:
parent
bf4cdcfb6a
commit
3157ff3cbb
1 changed files with 30 additions and 4 deletions
|
|
@ -5,15 +5,31 @@ use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
|
||||||
use tokio::net::TcpStream;
|
use tokio::net::TcpStream;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
|
|
||||||
|
const MAX_PACKET: usize = u16::max_value() as usize;
|
||||||
|
|
||||||
/// Read from a socket and convert the reads into Messages to put into the
|
/// Read from a socket and convert the reads into Messages to put into the
|
||||||
/// queue until the socket is closed for reading or an error occurs.
|
/// queue until the socket is closed for reading or an error occurs. We read
|
||||||
|
/// at most 2^16-1 bytes at a time, accepting the overhead of multiple reads
|
||||||
|
/// to keep one writer from clogging the pipe for everybody else. Each read
|
||||||
|
/// is converted into a [Message::Data] message that is sent to the `writer`
|
||||||
|
/// channel.
|
||||||
|
///
|
||||||
|
/// Once we're done reading (either because of a connection error or a clean
|
||||||
|
/// shutdown) we send a [Message::Close] message on the channel before
|
||||||
|
/// returning.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
/// If an error occurs reading from `read` we return [Error::IO]. If the
|
||||||
|
/// message channel is closed before we can send to it then we return
|
||||||
|
/// [Error::ConnectionReset].
|
||||||
|
///
|
||||||
async fn connection_read<T: AsyncRead + Unpin>(
|
async fn connection_read<T: AsyncRead + Unpin>(
|
||||||
channel: u64,
|
channel: u64,
|
||||||
read: &mut T,
|
read: &mut T,
|
||||||
writer: &mut mpsc::Sender<Message>,
|
writer: &mut mpsc::Sender<Message>,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
let result = loop {
|
let result = loop {
|
||||||
let mut buffer = BytesMut::with_capacity(64 * 1024);
|
let mut buffer = BytesMut::with_capacity(MAX_PACKET);
|
||||||
if let Err(e) = read.read_buf(&mut buffer).await {
|
if let Err(e) = read.read_buf(&mut buffer).await {
|
||||||
break Err(Error::IO(e));
|
break Err(Error::IO(e));
|
||||||
}
|
}
|
||||||
|
|
@ -37,7 +53,11 @@ async fn connection_read<T: AsyncRead + Unpin>(
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get messages from a queue and write them out to a socket until there are
|
/// Get messages from a queue and write them out to a socket until there are
|
||||||
/// no more messages in the queue or the write breaks for some reason.
|
/// no more messages in the queue or a write fails for some reason.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
/// If a write fails this returns `Error::IO`.
|
||||||
|
///
|
||||||
async fn connection_write<T: AsyncWrite + Unpin>(
|
async fn connection_write<T: AsyncWrite + Unpin>(
|
||||||
data: &mut mpsc::Receiver<Bytes>,
|
data: &mut mpsc::Receiver<Bytes>,
|
||||||
write: &mut T,
|
write: &mut T,
|
||||||
|
|
@ -51,7 +71,13 @@ async fn connection_write<T: AsyncWrite + Unpin>(
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Handle a connection, from the socket to the multiplexer and from the
|
/// Handle a connection, from the socket to the multiplexer and from the
|
||||||
/// multiplexer to the socket.
|
/// multiplexer to the socket. Keeps running until both the read and write
|
||||||
|
/// side are closed. In natural circumstances, we expect the write side to
|
||||||
|
/// close when the `data` sender is dropped from the connection table (see
|
||||||
|
/// `ConnectionTable`), and we expect the read side to close when the
|
||||||
|
/// socket's read half closes (which will cause a `Close` to be sent which
|
||||||
|
/// should drop the `data` sender on the other side, etc...).
|
||||||
|
///
|
||||||
pub async fn process(
|
pub async fn process(
|
||||||
channel: u64,
|
channel: u64,
|
||||||
stream: &mut TcpStream,
|
stream: &mut TcpStream,
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue