Test reader/writer too

This commit is contained in:
John Doty 2022-10-08 16:22:38 +00:00
parent 5ab189461d
commit 0d79ccd068
2 changed files with 121 additions and 59 deletions

View file

@ -8,7 +8,6 @@ use tokio::process;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
mod error;
mod message;
mod refresh;
@ -25,6 +24,42 @@ pub enum Error {
ConnectionReset,
}
impl PartialEq for Error {
fn eq(&self, other: &Error) -> bool {
use Error::*;
match self {
Protocol => match other {
Protocol => true,
_ => false,
},
ProtocolVersion => match other {
ProtocolVersion => true,
_ => false,
},
IO(s) => match other {
IO(o) => s.kind() == o.kind(),
_ => false,
},
MessageIncomplete => match other {
MessageIncomplete => true,
_ => false,
},
MessageUnknown => match other {
MessageUnknown => true,
_ => false,
},
MessageCorrupt => match other {
MessageCorrupt => true,
_ => false,
},
ConnectionReset => match other {
ConnectionReset => true,
_ => false,
},
}
}
}
async fn pump_write<T: AsyncWrite + Unpin>(
messages: &mut mpsc::Receiver<Message>,
writer: &mut MessageWriter<T>,

View file

@ -3,13 +3,16 @@ use bytes::{Buf, BufMut, Bytes, BytesMut};
use std::io::Cursor;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
// ----------------------------------------------------------------------------
// Messages
#[derive(Debug, PartialEq, Clone)]
pub struct PortDesc {
pub port: u16,
pub desc: String,
}
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Clone)]
pub enum Message {
Ping, // Ignored on both sides, can be used to test connection.
Hello(u8, u8, Vec<String>), // Server info announcement: major version, minor version, headers.
@ -129,63 +132,6 @@ impl Message {
}
}
#[cfg(test)]
mod message_tests {
use crate::message::Message;
use crate::message::Message::*;
use crate::message::PortDesc;
fn assert_round_trip(message: Message) {
let encoded = message.encode();
let mut cursor = std::io::Cursor::new(&encoded[..]);
let result = Message::decode(&mut cursor);
assert_eq!(Ok(message), result);
}
#[test]
fn round_trip() {
assert_round_trip(Ping);
assert_round_trip(Hello(
0x12,
0x00,
vec!["One".to_string(), "Two".to_string(), "Three".to_string()],
));
assert_round_trip(Hello(0x00, 0x01, vec![]));
assert_round_trip(Connect(0x1234567890123456, 0x1234));
assert_round_trip(Connected(0x1234567890123456));
assert_round_trip(Close(0x1234567890123456));
assert_round_trip(Refresh);
assert_round_trip(Ports(vec![]));
assert_round_trip(Ports(vec![
PortDesc {
port: 8080,
desc: "query-service".to_string(),
},
PortDesc {
port: 9090,
desc: "metadata-library".to_string(),
},
]));
assert_round_trip(Data(0x1234567890123456, vec![1, 2, 3, 4].into()));
}
#[test]
fn big_port_desc() {
// Strings are capped at 64k let's make a big one!
let char = String::from_utf8(vec![0xe0, 0xa0, 0x83]).unwrap();
let mut str = String::with_capacity(128 * 1024);
while str.len() < 128 * 1024 {
str.push_str(&char);
}
let msg = Ports(vec![PortDesc {
port: 8080,
desc: str,
}]);
msg.encode();
}
}
fn get_u8(cursor: &mut Cursor<&[u8]>) -> Result<u8, Error> {
if !cursor.has_remaining() {
return Err(Error::MessageIncomplete);
@ -294,3 +240,84 @@ impl<T: AsyncRead + Unpin> MessageReader<T> {
Message::decode(&mut cursor)
}
}
#[cfg(test)]
mod message_tests {
use crate::message::Message::*;
use crate::message::PortDesc;
use crate::message::{Message, MessageReader, MessageWriter};
fn assert_round_trip(message: Message) {
let encoded = message.encode();
let mut cursor = std::io::Cursor::new(&encoded[..]);
let result = Message::decode(&mut cursor);
assert_eq!(Ok(message.clone()), result);
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("Unable to start tokio runtime");
rt.block_on(async move {
let (client, server) = tokio::io::duplex(64);
let expected = message.clone();
let write = tokio::spawn(async move {
let mut writer = MessageWriter::new(client);
writer.write(message).await.expect("Write failed");
});
let read = tokio::spawn(async move {
let mut reader = MessageReader::new(server);
let actual = reader.read().await.expect("Read failed");
assert_eq!(expected, actual);
});
write.await.expect("Write proc failed");
read.await.expect("Read proc failed");
});
}
#[test]
fn round_trip() {
assert_round_trip(Ping);
assert_round_trip(Hello(
0x12,
0x00,
vec!["One".to_string(), "Two".to_string(), "Three".to_string()],
));
assert_round_trip(Hello(0x00, 0x01, vec![]));
assert_round_trip(Connect(0x1234567890123456, 0x1234));
assert_round_trip(Connected(0x1234567890123456));
assert_round_trip(Close(0x1234567890123456));
assert_round_trip(Refresh);
assert_round_trip(Ports(vec![]));
assert_round_trip(Ports(vec![
PortDesc {
port: 8080,
desc: "query-service".to_string(),
},
PortDesc {
port: 9090,
desc: "metadata-library".to_string(),
},
]));
assert_round_trip(Data(0x1234567890123456, vec![1, 2, 3, 4].into()));
}
#[test]
fn big_port_desc() {
// Strings are capped at 64k let's make a big one!
let char = String::from_utf8(vec![0xe0, 0xa0, 0x83]).unwrap();
let mut str = String::with_capacity(128 * 1024);
while str.len() < 128 * 1024 {
str.push_str(&char);
}
let msg = Ports(vec![PortDesc {
port: 8080,
desc: str,
}]);
msg.encode();
}
}