Handle transfer-encoding chunked in docker responses
Yeah, OK, thanks HTTP.
This commit is contained in:
parent
665fccf753
commit
542127f723
1 changed files with 138 additions and 7 deletions
|
|
@ -20,6 +20,7 @@ Host: localhost\r\n\
|
||||||
User-Agent: fwd/1.0\r\n\
|
User-Agent: fwd/1.0\r\n\
|
||||||
Accept: */*\r\n\
|
Accept: */*\r\n\
|
||||||
\r\n";
|
\r\n";
|
||||||
|
|
||||||
let mut stream = tokio::io::BufStream::new(stream);
|
let mut stream = tokio::io::BufStream::new(stream);
|
||||||
stream.write_all(DOCKER_LIST_CONTAINERS).await?;
|
stream.write_all(DOCKER_LIST_CONTAINERS).await?;
|
||||||
stream.flush().await?;
|
stream.flush().await?;
|
||||||
|
|
@ -27,14 +28,15 @@ Accept: */*\r\n\
|
||||||
// Check the HTTP response.
|
// Check the HTTP response.
|
||||||
let mut line = String::new();
|
let mut line = String::new();
|
||||||
stream.read_line(&mut line).await?;
|
stream.read_line(&mut line).await?;
|
||||||
trace!("[docker] {}", line.trim_end());
|
trace!("[docker] {}", &line.trim_end());
|
||||||
let parts: Vec<&str> = line.split(" ").collect();
|
let parts: Vec<&str> = line.split(" ").collect();
|
||||||
if parts.len() < 2 || parts[1] != "200" {
|
if parts.len() < 2 || parts[1] != "200" {
|
||||||
bail!("Error response from docker: {line}");
|
bail!("Error response from docker: {line:?}");
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process the headers; all we really care about is content-length.
|
// Process the headers; all we really care about is content-length or content-encoding.
|
||||||
let mut content_length: usize = 0;
|
let mut content_length: Option<usize> = None;
|
||||||
|
let mut chunked = false;
|
||||||
loop {
|
loop {
|
||||||
line.clear();
|
line.clear();
|
||||||
stream.read_line(&mut line).await?;
|
stream.read_line(&mut line).await?;
|
||||||
|
|
@ -44,13 +46,55 @@ Accept: */*\r\n\
|
||||||
}
|
}
|
||||||
line.make_ascii_lowercase();
|
line.make_ascii_lowercase();
|
||||||
if let Some(rest) = line.strip_prefix("content-length: ") {
|
if let Some(rest) = line.strip_prefix("content-length: ") {
|
||||||
content_length = rest.trim().parse()?;
|
content_length = Some(rest.trim().parse()?);
|
||||||
|
}
|
||||||
|
if let Some(rest) = line.strip_prefix("transfer-encoding: ") {
|
||||||
|
chunked = rest.trim() == "chunked";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read the JSON response.
|
// Read the JSON response.
|
||||||
let mut response_buffer = vec![0; content_length];
|
let mut response_buffer = vec![0; content_length.unwrap_or(0)];
|
||||||
|
if content_length.is_some() {
|
||||||
stream.read_exact(&mut response_buffer).await?;
|
stream.read_exact(&mut response_buffer).await?;
|
||||||
|
} else if chunked {
|
||||||
|
// Docker will send a chunked encoding if the response seems too big to do
|
||||||
|
// all at once. I don't know the heuristic it uses but we need to deal with
|
||||||
|
// it. Fortunately chunked encoding is not too bad?
|
||||||
|
loop {
|
||||||
|
line.clear();
|
||||||
|
stream.read_line(&mut line).await?;
|
||||||
|
// This is the hex length of the thing.
|
||||||
|
let Some(chunk_length) = line.split(";").next() else {
|
||||||
|
bail!("Can't make sense of chunk length line: {line:?}");
|
||||||
|
};
|
||||||
|
let Ok(chunk_length) =
|
||||||
|
usize::from_str_radix(chunk_length.trim(), 16)
|
||||||
|
else {
|
||||||
|
bail!("Cannot interpret chunk length '{chunk_length}' as hex (Full line: {line:?})");
|
||||||
|
};
|
||||||
|
if chunk_length > 0 {
|
||||||
|
let old_length = response_buffer.len();
|
||||||
|
let new_length = old_length + chunk_length;
|
||||||
|
response_buffer.resize(new_length, 0);
|
||||||
|
stream
|
||||||
|
.read_exact(&mut response_buffer[old_length..new_length])
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut eol: [u8; 2] = [0, 0];
|
||||||
|
stream.read_exact(&mut eol).await?;
|
||||||
|
if eol[0] != b'\r' || eol[1] != b'\n' {
|
||||||
|
bail!("Mal-formed end-of-chunk marker from server");
|
||||||
|
}
|
||||||
|
if chunk_length == 0 {
|
||||||
|
break; // All done.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
trace!("Docker did not send a content_length, just reading to the end");
|
||||||
|
stream.read_to_end(&mut response_buffer).await?;
|
||||||
|
}
|
||||||
|
|
||||||
if log::log_enabled!(log::Level::Trace) {
|
if log::log_enabled!(log::Level::Trace) {
|
||||||
match std::str::from_utf8(&response_buffer) {
|
match std::str::from_utf8(&response_buffer) {
|
||||||
|
|
@ -895,4 +939,91 @@ mod test {
|
||||||
let input = b"\"\\";
|
let input = b"\"\\";
|
||||||
let _ = JsonValue::parse(input);
|
let _ = JsonValue::parse(input);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn accept_and_send_single_response(
|
||||||
|
listener: tokio::net::TcpListener,
|
||||||
|
response: &[u8],
|
||||||
|
) {
|
||||||
|
println!("[server] Awaiting connection...");
|
||||||
|
let (stream, _) = listener
|
||||||
|
.accept()
|
||||||
|
.await
|
||||||
|
.expect("Unable to accept connection");
|
||||||
|
let mut stream = tokio::io::BufStream::new(stream);
|
||||||
|
|
||||||
|
println!("[server] Reading request...");
|
||||||
|
let mut line = String::new();
|
||||||
|
loop {
|
||||||
|
line.clear();
|
||||||
|
stream
|
||||||
|
.read_line(&mut line)
|
||||||
|
.await
|
||||||
|
.expect("Unable to read line in server");
|
||||||
|
if line.trim().is_empty() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
println!("[server] Sending response...");
|
||||||
|
stream
|
||||||
|
.write_all(response)
|
||||||
|
.await
|
||||||
|
.expect("Unable to write response");
|
||||||
|
stream.flush().await.expect("Unable to flush");
|
||||||
|
println!("[server] Done.");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
pub async fn docker_chunked_transfer_encoding() {
|
||||||
|
let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
|
||||||
|
.await
|
||||||
|
.expect("Unable to create listener on localhost");
|
||||||
|
let port = listener.local_addr().unwrap().port();
|
||||||
|
|
||||||
|
let mut set = tokio::task::JoinSet::new();
|
||||||
|
set.spawn(async move {
|
||||||
|
const RESPONSE: &[u8] = b"\
|
||||||
|
HTTP/1.1 200 OK\r\n\
|
||||||
|
Transfer-Encoding: chunked\r\n\
|
||||||
|
\r\n\
|
||||||
|
4\r\nWiki\r\n7\r\npedia i\r\nB\r\nn \r\nchunks.\r\n0\r\n\r\n";
|
||||||
|
|
||||||
|
accept_and_send_single_response(listener, RESPONSE).await;
|
||||||
|
});
|
||||||
|
|
||||||
|
let addr = format!("127.0.0.1:{port}");
|
||||||
|
let stream = tokio::net::TcpStream::connect(&addr)
|
||||||
|
.await
|
||||||
|
.expect("Unable to connect");
|
||||||
|
let response = list_containers_with_connection(stream)
|
||||||
|
.await
|
||||||
|
.expect("Unable to get response");
|
||||||
|
assert_eq!(&response, b"Wikipedia in \r\nchunks.");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
pub async fn docker_with_no_content_length() {
|
||||||
|
let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
|
||||||
|
.await
|
||||||
|
.expect("Unable to create listener on localhost");
|
||||||
|
let port = listener.local_addr().unwrap().port();
|
||||||
|
|
||||||
|
let mut set = tokio::task::JoinSet::new();
|
||||||
|
set.spawn(async move {
|
||||||
|
const RESPONSE: &[u8] = b"\
|
||||||
|
HTTP/1.1 200 OK\r\n\
|
||||||
|
\r\n\
|
||||||
|
[\"Booo this is some data\"]\r\n";
|
||||||
|
accept_and_send_single_response(listener, RESPONSE).await;
|
||||||
|
});
|
||||||
|
|
||||||
|
let addr = format!("127.0.0.1:{port}");
|
||||||
|
let stream = tokio::net::TcpStream::connect(&addr)
|
||||||
|
.await
|
||||||
|
.expect("Unable to connect");
|
||||||
|
let response = list_containers_with_connection(stream)
|
||||||
|
.await
|
||||||
|
.expect("Unable to get response");
|
||||||
|
assert_eq!(&response, b"[\"Booo this is some data\"]\r\n");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue