From 0d79ccd068d62384cc734e8d689b0f575cf82f81 Mon Sep 17 00:00:00 2001 From: John Doty Date: Sat, 8 Oct 2022 16:22:38 +0000 Subject: [PATCH] Test reader/writer too --- src/lib.rs | 37 ++++++++++++- src/message.rs | 143 +++++++++++++++++++++++++++++-------------------- 2 files changed, 121 insertions(+), 59 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 8ea6a0d..e0c5b05 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,7 +8,6 @@ use tokio::process; use tokio::sync::mpsc; use tokio::sync::oneshot; -mod error; mod message; mod refresh; @@ -25,6 +24,42 @@ pub enum Error { ConnectionReset, } +impl PartialEq for Error { + fn eq(&self, other: &Error) -> bool { + use Error::*; + match self { + Protocol => match other { + Protocol => true, + _ => false, + }, + ProtocolVersion => match other { + ProtocolVersion => true, + _ => false, + }, + IO(s) => match other { + IO(o) => s.kind() == o.kind(), + _ => false, + }, + MessageIncomplete => match other { + MessageIncomplete => true, + _ => false, + }, + MessageUnknown => match other { + MessageUnknown => true, + _ => false, + }, + MessageCorrupt => match other { + MessageCorrupt => true, + _ => false, + }, + ConnectionReset => match other { + ConnectionReset => true, + _ => false, + }, + } + } +} + async fn pump_write( messages: &mut mpsc::Receiver, writer: &mut MessageWriter, diff --git a/src/message.rs b/src/message.rs index 4d0d8fc..aaa8a9d 100644 --- a/src/message.rs +++ b/src/message.rs @@ -3,13 +3,16 @@ use bytes::{Buf, BufMut, Bytes, BytesMut}; use std::io::Cursor; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +// ---------------------------------------------------------------------------- +// Messages + #[derive(Debug, PartialEq, Clone)] pub struct PortDesc { pub port: u16, pub desc: String, } -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Clone)] pub enum Message { Ping, // Ignored on both sides, can be used to test connection. Hello(u8, u8, Vec), // Server info announcement: major version, minor version, headers. @@ -129,63 +132,6 @@ impl Message { } } -#[cfg(test)] -mod message_tests { - use crate::message::Message; - use crate::message::Message::*; - use crate::message::PortDesc; - - fn assert_round_trip(message: Message) { - let encoded = message.encode(); - let mut cursor = std::io::Cursor::new(&encoded[..]); - let result = Message::decode(&mut cursor); - assert_eq!(Ok(message), result); - } - - #[test] - fn round_trip() { - assert_round_trip(Ping); - assert_round_trip(Hello( - 0x12, - 0x00, - vec!["One".to_string(), "Two".to_string(), "Three".to_string()], - )); - assert_round_trip(Hello(0x00, 0x01, vec![])); - assert_round_trip(Connect(0x1234567890123456, 0x1234)); - assert_round_trip(Connected(0x1234567890123456)); - assert_round_trip(Close(0x1234567890123456)); - assert_round_trip(Refresh); - assert_round_trip(Ports(vec![])); - assert_round_trip(Ports(vec![ - PortDesc { - port: 8080, - desc: "query-service".to_string(), - }, - PortDesc { - port: 9090, - desc: "metadata-library".to_string(), - }, - ])); - assert_round_trip(Data(0x1234567890123456, vec![1, 2, 3, 4].into())); - } - - #[test] - fn big_port_desc() { - // Strings are capped at 64k let's make a big one! - let char = String::from_utf8(vec![0xe0, 0xa0, 0x83]).unwrap(); - let mut str = String::with_capacity(128 * 1024); - while str.len() < 128 * 1024 { - str.push_str(&char); - } - - let msg = Ports(vec![PortDesc { - port: 8080, - desc: str, - }]); - msg.encode(); - } -} - fn get_u8(cursor: &mut Cursor<&[u8]>) -> Result { if !cursor.has_remaining() { return Err(Error::MessageIncomplete); @@ -294,3 +240,84 @@ impl MessageReader { Message::decode(&mut cursor) } } + +#[cfg(test)] +mod message_tests { + use crate::message::Message::*; + use crate::message::PortDesc; + use crate::message::{Message, MessageReader, MessageWriter}; + + fn assert_round_trip(message: Message) { + let encoded = message.encode(); + let mut cursor = std::io::Cursor::new(&encoded[..]); + let result = Message::decode(&mut cursor); + assert_eq!(Ok(message.clone()), result); + + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("Unable to start tokio runtime"); + + rt.block_on(async move { + let (client, server) = tokio::io::duplex(64); + + let expected = message.clone(); + let write = tokio::spawn(async move { + let mut writer = MessageWriter::new(client); + writer.write(message).await.expect("Write failed"); + }); + + let read = tokio::spawn(async move { + let mut reader = MessageReader::new(server); + let actual = reader.read().await.expect("Read failed"); + assert_eq!(expected, actual); + }); + + write.await.expect("Write proc failed"); + read.await.expect("Read proc failed"); + }); + } + + #[test] + fn round_trip() { + assert_round_trip(Ping); + assert_round_trip(Hello( + 0x12, + 0x00, + vec!["One".to_string(), "Two".to_string(), "Three".to_string()], + )); + assert_round_trip(Hello(0x00, 0x01, vec![])); + assert_round_trip(Connect(0x1234567890123456, 0x1234)); + assert_round_trip(Connected(0x1234567890123456)); + assert_round_trip(Close(0x1234567890123456)); + assert_round_trip(Refresh); + assert_round_trip(Ports(vec![])); + assert_round_trip(Ports(vec![ + PortDesc { + port: 8080, + desc: "query-service".to_string(), + }, + PortDesc { + port: 9090, + desc: "metadata-library".to_string(), + }, + ])); + assert_round_trip(Data(0x1234567890123456, vec![1, 2, 3, 4].into())); + } + + #[test] + fn big_port_desc() { + // Strings are capped at 64k let's make a big one! + let char = String::from_utf8(vec![0xe0, 0xa0, 0x83]).unwrap(); + let mut str = String::with_capacity(128 * 1024); + while str.len() < 128 * 1024 { + str.push_str(&char); + } + + let msg = Ports(vec![PortDesc { + port: 8080, + desc: str, + }]); + msg.encode(); + } +}