Vendor dependencies

Let's see how I like this workflow.
This commit is contained in:
John Doty 2022-12-19 08:27:18 -08:00
parent 34d1830413
commit 9c435dc440
7500 changed files with 1665121 additions and 99 deletions

View file

@ -0,0 +1,107 @@
#![allow(clippy::diverging_sub_expression)]
use std::rc::Rc;
#[allow(dead_code)]
type BoxStream<T> = std::pin::Pin<Box<dyn tokio_stream::Stream<Item = T>>>;
#[allow(dead_code)]
fn require_send<T: Send>(_t: &T) {}
#[allow(dead_code)]
fn require_sync<T: Sync>(_t: &T) {}
#[allow(dead_code)]
fn require_unpin<T: Unpin>(_t: &T) {}
#[allow(dead_code)]
struct Invalid;
trait AmbiguousIfSend<A> {
fn some_item(&self) {}
}
impl<T: ?Sized> AmbiguousIfSend<()> for T {}
impl<T: ?Sized + Send> AmbiguousIfSend<Invalid> for T {}
trait AmbiguousIfSync<A> {
fn some_item(&self) {}
}
impl<T: ?Sized> AmbiguousIfSync<()> for T {}
impl<T: ?Sized + Sync> AmbiguousIfSync<Invalid> for T {}
trait AmbiguousIfUnpin<A> {
fn some_item(&self) {}
}
impl<T: ?Sized> AmbiguousIfUnpin<()> for T {}
impl<T: ?Sized + Unpin> AmbiguousIfUnpin<Invalid> for T {}
macro_rules! into_todo {
($typ:ty) => {{
let x: $typ = todo!();
x
}};
}
macro_rules! async_assert_fn {
($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): Send & Sync) => {
#[allow(unreachable_code)]
#[allow(unused_variables)]
const _: fn() = || {
let f = $($f $(::<$($generic),*>)? )::+( $( into_todo!($arg) ),* );
require_send(&f);
require_sync(&f);
};
};
($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): Send & !Sync) => {
#[allow(unreachable_code)]
#[allow(unused_variables)]
const _: fn() = || {
let f = $($f $(::<$($generic),*>)? )::+( $( into_todo!($arg) ),* );
require_send(&f);
AmbiguousIfSync::some_item(&f);
};
};
($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): !Send & Sync) => {
#[allow(unreachable_code)]
#[allow(unused_variables)]
const _: fn() = || {
let f = $($f $(::<$($generic),*>)? )::+( $( into_todo!($arg) ),* );
AmbiguousIfSend::some_item(&f);
require_sync(&f);
};
};
($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): !Send & !Sync) => {
#[allow(unreachable_code)]
#[allow(unused_variables)]
const _: fn() = || {
let f = $($f $(::<$($generic),*>)? )::+( $( into_todo!($arg) ),* );
AmbiguousIfSend::some_item(&f);
AmbiguousIfSync::some_item(&f);
};
};
($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): !Unpin) => {
#[allow(unreachable_code)]
#[allow(unused_variables)]
const _: fn() = || {
let f = $($f $(::<$($generic),*>)? )::+( $( into_todo!($arg) ),* );
AmbiguousIfUnpin::some_item(&f);
};
};
($($f:ident $(< $($generic:ty),* > )? )::+($($arg:ty),*): Unpin) => {
#[allow(unreachable_code)]
#[allow(unused_variables)]
const _: fn() = || {
let f = $($f $(::<$($generic),*>)? )::+( $( into_todo!($arg) ),* );
require_unpin(&f);
};
};
}
async_assert_fn!(tokio_stream::empty<Rc<u8>>(): Send & Sync);
async_assert_fn!(tokio_stream::pending<Rc<u8>>(): Send & Sync);
async_assert_fn!(tokio_stream::iter(std::vec::IntoIter<u8>): Send & Sync);
async_assert_fn!(tokio_stream::StreamExt::next(&mut BoxStream<()>): !Unpin);
async_assert_fn!(tokio_stream::StreamExt::try_next(&mut BoxStream<Result<(), ()>>): !Unpin);
async_assert_fn!(tokio_stream::StreamExt::all(&mut BoxStream<()>, fn(())->bool): !Unpin);
async_assert_fn!(tokio_stream::StreamExt::any(&mut BoxStream<()>, fn(())->bool): !Unpin);
async_assert_fn!(tokio_stream::StreamExt::fold(&mut BoxStream<()>, (), fn((), ())->()): !Unpin);
async_assert_fn!(tokio_stream::StreamExt::collect<Vec<()>>(&mut BoxStream<()>): !Unpin);

View file

@ -0,0 +1,84 @@
#![warn(rust_2018_idioms)]
#![cfg(all(feature = "time", feature = "sync", feature = "io-util"))]
use tokio::time;
use tokio_stream::{self as stream, StreamExt};
use tokio_test::assert_pending;
use tokio_test::task;
use futures::FutureExt;
use std::time::Duration;
#[tokio::test(start_paused = true)]
async fn usage() {
let iter = vec![1, 2, 3].into_iter();
let stream0 = stream::iter(iter);
let iter = vec![4].into_iter();
let stream1 =
stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(3)).map(move |_| n));
let chunk_stream = stream0
.chain(stream1)
.chunks_timeout(4, Duration::from_secs(2));
let mut chunk_stream = task::spawn(chunk_stream);
assert_pending!(chunk_stream.poll_next());
time::advance(Duration::from_secs(2)).await;
assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3]));
assert_pending!(chunk_stream.poll_next());
time::advance(Duration::from_secs(2)).await;
assert_eq!(chunk_stream.next().await, Some(vec![4]));
}
#[tokio::test(start_paused = true)]
async fn full_chunk_with_timeout() {
let iter = vec![1, 2].into_iter();
let stream0 = stream::iter(iter);
let iter = vec![3].into_iter();
let stream1 =
stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(1)).map(move |_| n));
let iter = vec![4].into_iter();
let stream2 =
stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(3)).map(move |_| n));
let chunk_stream = stream0
.chain(stream1)
.chain(stream2)
.chunks_timeout(3, Duration::from_secs(2));
let mut chunk_stream = task::spawn(chunk_stream);
assert_pending!(chunk_stream.poll_next());
time::advance(Duration::from_secs(2)).await;
assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3]));
assert_pending!(chunk_stream.poll_next());
time::advance(Duration::from_secs(2)).await;
assert_eq!(chunk_stream.next().await, Some(vec![4]));
}
#[tokio::test]
#[ignore]
async fn real_time() {
let iter = vec![1, 2, 3, 4].into_iter();
let stream0 = stream::iter(iter);
let iter = vec![5].into_iter();
let stream1 =
stream::iter(iter).then(move |n| time::sleep(Duration::from_secs(5)).map(move |_| n));
let chunk_stream = stream0
.chain(stream1)
.chunks_timeout(3, Duration::from_secs(2));
let mut chunk_stream = task::spawn(chunk_stream);
assert_eq!(chunk_stream.next().await, Some(vec![1, 2, 3]));
assert_eq!(chunk_stream.next().await, Some(vec![4]));
assert_eq!(chunk_stream.next().await, Some(vec![5]));
}

View file

@ -0,0 +1,100 @@
use tokio_stream::{self as stream, Stream, StreamExt};
use tokio_test::{assert_pending, assert_ready, task};
mod support {
pub(crate) mod mpsc;
}
use support::mpsc;
#[tokio::test]
async fn basic_usage() {
let one = stream::iter(vec![1, 2, 3]);
let two = stream::iter(vec![4, 5, 6]);
let mut stream = one.chain(two);
assert_eq!(stream.size_hint(), (6, Some(6)));
assert_eq!(stream.next().await, Some(1));
assert_eq!(stream.size_hint(), (5, Some(5)));
assert_eq!(stream.next().await, Some(2));
assert_eq!(stream.size_hint(), (4, Some(4)));
assert_eq!(stream.next().await, Some(3));
assert_eq!(stream.size_hint(), (3, Some(3)));
assert_eq!(stream.next().await, Some(4));
assert_eq!(stream.size_hint(), (2, Some(2)));
assert_eq!(stream.next().await, Some(5));
assert_eq!(stream.size_hint(), (1, Some(1)));
assert_eq!(stream.next().await, Some(6));
assert_eq!(stream.size_hint(), (0, Some(0)));
assert_eq!(stream.next().await, None);
assert_eq!(stream.size_hint(), (0, Some(0)));
assert_eq!(stream.next().await, None);
}
#[tokio::test]
async fn pending_first() {
let (tx1, rx1) = mpsc::unbounded_channel_stream();
let (tx2, rx2) = mpsc::unbounded_channel_stream();
let mut stream = task::spawn(rx1.chain(rx2));
assert_eq!(stream.size_hint(), (0, None));
assert_pending!(stream.poll_next());
tx2.send(2).unwrap();
assert!(!stream.is_woken());
assert_pending!(stream.poll_next());
tx1.send(1).unwrap();
assert!(stream.is_woken());
assert_eq!(Some(1), assert_ready!(stream.poll_next()));
assert_pending!(stream.poll_next());
drop(tx1);
assert_eq!(stream.size_hint(), (0, None));
assert!(stream.is_woken());
assert_eq!(Some(2), assert_ready!(stream.poll_next()));
assert_eq!(stream.size_hint(), (0, None));
drop(tx2);
assert_eq!(stream.size_hint(), (0, None));
assert_eq!(None, assert_ready!(stream.poll_next()));
}
#[test]
fn size_overflow() {
struct Monster;
impl tokio_stream::Stream for Monster {
type Item = ();
fn poll_next(
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<()>> {
panic!()
}
fn size_hint(&self) -> (usize, Option<usize>) {
(usize::MAX, Some(usize::MAX))
}
}
let m1 = Monster;
let m2 = Monster;
let m = m1.chain(m2);
assert_eq!(m.size_hint(), (usize::MAX, None));
}

View file

@ -0,0 +1,146 @@
use tokio_stream::{self as stream, StreamExt};
use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok, task};
mod support {
pub(crate) mod mpsc;
}
use support::mpsc;
#[allow(clippy::let_unit_value)]
#[tokio::test]
async fn empty_unit() {
// Drains the stream.
let mut iter = vec![(), (), ()].into_iter();
let _: () = stream::iter(&mut iter).collect().await;
assert!(iter.next().is_none());
}
#[tokio::test]
async fn empty_vec() {
let coll: Vec<u32> = stream::empty().collect().await;
assert!(coll.is_empty());
}
#[tokio::test]
async fn empty_box_slice() {
let coll: Box<[u32]> = stream::empty().collect().await;
assert!(coll.is_empty());
}
#[tokio::test]
async fn empty_string() {
let coll: String = stream::empty::<&str>().collect().await;
assert!(coll.is_empty());
}
#[tokio::test]
async fn empty_result() {
let coll: Result<Vec<u32>, &str> = stream::empty().collect().await;
assert_eq!(Ok(vec![]), coll);
}
#[tokio::test]
async fn collect_vec_items() {
let (tx, rx) = mpsc::unbounded_channel_stream();
let mut fut = task::spawn(rx.collect::<Vec<i32>>());
assert_pending!(fut.poll());
tx.send(1).unwrap();
assert!(fut.is_woken());
assert_pending!(fut.poll());
tx.send(2).unwrap();
assert!(fut.is_woken());
assert_pending!(fut.poll());
drop(tx);
assert!(fut.is_woken());
let coll = assert_ready!(fut.poll());
assert_eq!(vec![1, 2], coll);
}
#[tokio::test]
async fn collect_string_items() {
let (tx, rx) = mpsc::unbounded_channel_stream();
let mut fut = task::spawn(rx.collect::<String>());
assert_pending!(fut.poll());
tx.send("hello ".to_string()).unwrap();
assert!(fut.is_woken());
assert_pending!(fut.poll());
tx.send("world".to_string()).unwrap();
assert!(fut.is_woken());
assert_pending!(fut.poll());
drop(tx);
assert!(fut.is_woken());
let coll = assert_ready!(fut.poll());
assert_eq!("hello world", coll);
}
#[tokio::test]
async fn collect_str_items() {
let (tx, rx) = mpsc::unbounded_channel_stream();
let mut fut = task::spawn(rx.collect::<String>());
assert_pending!(fut.poll());
tx.send("hello ").unwrap();
assert!(fut.is_woken());
assert_pending!(fut.poll());
tx.send("world").unwrap();
assert!(fut.is_woken());
assert_pending!(fut.poll());
drop(tx);
assert!(fut.is_woken());
let coll = assert_ready!(fut.poll());
assert_eq!("hello world", coll);
}
#[tokio::test]
async fn collect_results_ok() {
let (tx, rx) = mpsc::unbounded_channel_stream();
let mut fut = task::spawn(rx.collect::<Result<String, &str>>());
assert_pending!(fut.poll());
tx.send(Ok("hello ")).unwrap();
assert!(fut.is_woken());
assert_pending!(fut.poll());
tx.send(Ok("world")).unwrap();
assert!(fut.is_woken());
assert_pending!(fut.poll());
drop(tx);
assert!(fut.is_woken());
let coll = assert_ready_ok!(fut.poll());
assert_eq!("hello world", coll);
}
#[tokio::test]
async fn collect_results_err() {
let (tx, rx) = mpsc::unbounded_channel_stream();
let mut fut = task::spawn(rx.collect::<Result<String, &str>>());
assert_pending!(fut.poll());
tx.send(Ok("hello ")).unwrap();
assert!(fut.is_woken());
assert_pending!(fut.poll());
tx.send(Err("oh no")).unwrap();
assert!(fut.is_woken());
let err = assert_ready_err!(fut.poll());
assert_eq!("oh no", err);
}

View file

@ -0,0 +1,11 @@
use tokio_stream::{self as stream, Stream, StreamExt};
#[tokio::test]
async fn basic_usage() {
let mut stream = stream::empty::<i32>();
for _ in 0..2 {
assert_eq!(stream.size_hint(), (0, Some(0)));
assert_eq!(None, stream.next().await);
}
}

View file

@ -0,0 +1,50 @@
use tokio_stream::{Stream, StreamExt};
use std::pin::Pin;
use std::task::{Context, Poll};
// a stream which alternates between Some and None
struct Alternate {
state: i32,
}
impl Stream for Alternate {
type Item = i32;
fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<i32>> {
let val = self.state;
self.state += 1;
// if it's even, Some(i32), else None
if val % 2 == 0 {
Poll::Ready(Some(val))
} else {
Poll::Ready(None)
}
}
}
#[tokio::test]
async fn basic_usage() {
let mut stream = Alternate { state: 0 };
// the stream goes back and forth
assert_eq!(stream.next().await, Some(0));
assert_eq!(stream.next().await, None);
assert_eq!(stream.next().await, Some(2));
assert_eq!(stream.next().await, None);
// however, once it is fused
let mut stream = stream.fuse();
assert_eq!(stream.size_hint(), (0, None));
assert_eq!(stream.next().await, Some(4));
assert_eq!(stream.size_hint(), (0, None));
assert_eq!(stream.next().await, None);
// it will always return `None` after the first time.
assert_eq!(stream.size_hint(), (0, Some(0)));
assert_eq!(stream.next().await, None);
assert_eq!(stream.size_hint(), (0, Some(0)));
}

View file

@ -0,0 +1,18 @@
use tokio_stream as stream;
use tokio_test::task;
use std::iter;
#[tokio::test]
async fn coop() {
let mut stream = task::spawn(stream::iter(iter::repeat(1)));
for _ in 0..10_000 {
if stream.poll_next().is_pending() {
assert!(stream.is_woken());
return;
}
}
panic!("did not yield");
}

View file

@ -0,0 +1,83 @@
use tokio_stream::{self as stream, Stream, StreamExt};
use tokio_test::task;
use tokio_test::{assert_pending, assert_ready};
mod support {
pub(crate) mod mpsc;
}
use support::mpsc;
#[tokio::test]
async fn merge_sync_streams() {
let mut s = stream::iter(vec![0, 2, 4, 6]).merge(stream::iter(vec![1, 3, 5]));
for i in 0..7 {
let rem = 7 - i;
assert_eq!(s.size_hint(), (rem, Some(rem)));
assert_eq!(Some(i), s.next().await);
}
assert!(s.next().await.is_none());
}
#[tokio::test]
async fn merge_async_streams() {
let (tx1, rx1) = mpsc::unbounded_channel_stream();
let (tx2, rx2) = mpsc::unbounded_channel_stream();
let mut rx = task::spawn(rx1.merge(rx2));
assert_eq!(rx.size_hint(), (0, None));
assert_pending!(rx.poll_next());
tx1.send(1).unwrap();
assert!(rx.is_woken());
assert_eq!(Some(1), assert_ready!(rx.poll_next()));
assert_pending!(rx.poll_next());
tx2.send(2).unwrap();
assert!(rx.is_woken());
assert_eq!(Some(2), assert_ready!(rx.poll_next()));
assert_pending!(rx.poll_next());
drop(tx1);
assert!(rx.is_woken());
assert_pending!(rx.poll_next());
tx2.send(3).unwrap();
assert!(rx.is_woken());
assert_eq!(Some(3), assert_ready!(rx.poll_next()));
assert_pending!(rx.poll_next());
drop(tx2);
assert!(rx.is_woken());
assert_eq!(None, assert_ready!(rx.poll_next()));
}
#[test]
fn size_overflow() {
struct Monster;
impl tokio_stream::Stream for Monster {
type Item = ();
fn poll_next(
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<()>> {
panic!()
}
fn size_hint(&self) -> (usize, Option<usize>) {
(usize::MAX, Some(usize::MAX))
}
}
let m1 = Monster;
let m2 = Monster;
let m = m1.merge(m2);
assert_eq!(m.size_hint(), (usize::MAX, None));
}

View file

@ -0,0 +1,12 @@
use tokio_stream::{self as stream, Stream, StreamExt};
#[tokio::test]
async fn basic_usage() {
let mut one = stream::once(1);
assert_eq!(one.size_hint(), (1, Some(1)));
assert_eq!(Some(1), one.next().await);
assert_eq!(one.size_hint(), (0, Some(0)));
assert_eq!(None, one.next().await);
}

View file

@ -0,0 +1,55 @@
#![warn(rust_2018_idioms)]
#![cfg(all(feature = "time", not(target_os = "wasi")))] // Wasi does not support panic recovery
use parking_lot::{const_mutex, Mutex};
use std::error::Error;
use std::panic;
use std::sync::Arc;
use tokio::time::Duration;
use tokio_stream::{self as stream, StreamExt};
fn test_panic<Func: FnOnce() + panic::UnwindSafe>(func: Func) -> Option<String> {
static PANIC_MUTEX: Mutex<()> = const_mutex(());
{
let _guard = PANIC_MUTEX.lock();
let panic_file: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
let prev_hook = panic::take_hook();
{
let panic_file = panic_file.clone();
panic::set_hook(Box::new(move |panic_info| {
let panic_location = panic_info.location().unwrap();
panic_file
.lock()
.clone_from(&Some(panic_location.file().to_string()));
}));
}
let result = panic::catch_unwind(func);
// Return to the previously set panic hook (maybe default) so that we get nice error
// messages in the tests.
panic::set_hook(prev_hook);
if result.is_err() {
panic_file.lock().clone()
} else {
None
}
}
}
#[test]
fn stream_chunks_timeout_panic_caller() -> Result<(), Box<dyn Error>> {
let panic_location_file = test_panic(|| {
let iter = vec![1, 2, 3].into_iter();
let stream0 = stream::iter(iter);
let _chunk_stream = stream0.chunks_timeout(0, Duration::from_secs(2));
});
// The panic location should be in this file
assert_eq!(&panic_location_file.unwrap(), file!());
Ok(())
}

View file

@ -0,0 +1,14 @@
use tokio_stream::{self as stream, Stream, StreamExt};
use tokio_test::{assert_pending, task};
#[tokio::test]
async fn basic_usage() {
let mut stream = stream::pending::<i32>();
for _ in 0..2 {
assert_eq!(stream.size_hint(), (0, None));
let mut next = task::spawn(async { stream.next().await });
assert_pending!(next.poll());
}
}

View file

@ -0,0 +1,387 @@
use tokio_stream::{self as stream, pending, Stream, StreamExt, StreamMap};
use tokio_test::{assert_ok, assert_pending, assert_ready, task};
mod support {
pub(crate) mod mpsc;
}
use support::mpsc;
use std::pin::Pin;
macro_rules! assert_ready_some {
($($t:tt)*) => {
match assert_ready!($($t)*) {
Some(v) => v,
None => panic!("expected `Some`, got `None`"),
}
};
}
macro_rules! assert_ready_none {
($($t:tt)*) => {
match assert_ready!($($t)*) {
None => {}
Some(v) => panic!("expected `None`, got `Some({:?})`", v),
}
};
}
#[tokio::test]
async fn empty() {
let mut map = StreamMap::<&str, stream::Pending<()>>::new();
assert_eq!(map.len(), 0);
assert!(map.is_empty());
assert!(map.next().await.is_none());
assert!(map.next().await.is_none());
assert!(map.remove("foo").is_none());
}
#[tokio::test]
async fn single_entry() {
let mut map = task::spawn(StreamMap::new());
let (tx, rx) = mpsc::unbounded_channel_stream();
let rx = Box::pin(rx);
assert_ready_none!(map.poll_next());
assert!(map.insert("foo", rx).is_none());
assert!(map.contains_key("foo"));
assert!(!map.contains_key("bar"));
assert_eq!(map.len(), 1);
assert!(!map.is_empty());
assert_pending!(map.poll_next());
assert_ok!(tx.send(1));
assert!(map.is_woken());
let (k, v) = assert_ready_some!(map.poll_next());
assert_eq!(k, "foo");
assert_eq!(v, 1);
assert_pending!(map.poll_next());
assert_ok!(tx.send(2));
assert!(map.is_woken());
let (k, v) = assert_ready_some!(map.poll_next());
assert_eq!(k, "foo");
assert_eq!(v, 2);
assert_pending!(map.poll_next());
drop(tx);
assert!(map.is_woken());
assert_ready_none!(map.poll_next());
}
#[tokio::test]
async fn multiple_entries() {
let mut map = task::spawn(StreamMap::new());
let (tx1, rx1) = mpsc::unbounded_channel_stream();
let (tx2, rx2) = mpsc::unbounded_channel_stream();
let rx1 = Box::pin(rx1);
let rx2 = Box::pin(rx2);
map.insert("foo", rx1);
map.insert("bar", rx2);
assert_pending!(map.poll_next());
assert_ok!(tx1.send(1));
assert!(map.is_woken());
let (k, v) = assert_ready_some!(map.poll_next());
assert_eq!(k, "foo");
assert_eq!(v, 1);
assert_pending!(map.poll_next());
assert_ok!(tx2.send(2));
assert!(map.is_woken());
let (k, v) = assert_ready_some!(map.poll_next());
assert_eq!(k, "bar");
assert_eq!(v, 2);
assert_pending!(map.poll_next());
assert_ok!(tx1.send(3));
assert_ok!(tx2.send(4));
assert!(map.is_woken());
// Given the randomization, there is no guarantee what order the values will
// be received in.
let mut v = (0..2)
.map(|_| assert_ready_some!(map.poll_next()))
.collect::<Vec<_>>();
assert_pending!(map.poll_next());
v.sort_unstable();
assert_eq!(v[0].0, "bar");
assert_eq!(v[0].1, 4);
assert_eq!(v[1].0, "foo");
assert_eq!(v[1].1, 3);
drop(tx1);
assert!(map.is_woken());
assert_pending!(map.poll_next());
drop(tx2);
assert_ready_none!(map.poll_next());
}
#[tokio::test]
async fn insert_remove() {
let mut map = task::spawn(StreamMap::new());
let (tx, rx) = mpsc::unbounded_channel_stream();
let rx = Box::pin(rx);
assert_ready_none!(map.poll_next());
assert!(map.insert("foo", rx).is_none());
let rx = map.remove("foo").unwrap();
assert_ok!(tx.send(1));
assert!(!map.is_woken());
assert_ready_none!(map.poll_next());
assert!(map.insert("bar", rx).is_none());
let v = assert_ready_some!(map.poll_next());
assert_eq!(v.0, "bar");
assert_eq!(v.1, 1);
assert!(map.remove("bar").is_some());
assert_ready_none!(map.poll_next());
assert!(map.is_empty());
assert_eq!(0, map.len());
}
#[tokio::test]
async fn replace() {
let mut map = task::spawn(StreamMap::new());
let (tx1, rx1) = mpsc::unbounded_channel_stream();
let (tx2, rx2) = mpsc::unbounded_channel_stream();
let rx1 = Box::pin(rx1);
let rx2 = Box::pin(rx2);
assert!(map.insert("foo", rx1).is_none());
assert_pending!(map.poll_next());
let _rx1 = map.insert("foo", rx2).unwrap();
assert_pending!(map.poll_next());
tx1.send(1).unwrap();
assert_pending!(map.poll_next());
tx2.send(2).unwrap();
assert!(map.is_woken());
let v = assert_ready_some!(map.poll_next());
assert_eq!(v.0, "foo");
assert_eq!(v.1, 2);
}
#[test]
fn size_hint_with_upper() {
let mut map = StreamMap::new();
map.insert("a", stream::iter(vec![1]));
map.insert("b", stream::iter(vec![1, 2]));
map.insert("c", stream::iter(vec![1, 2, 3]));
assert_eq!(3, map.len());
assert!(!map.is_empty());
let size_hint = map.size_hint();
assert_eq!(size_hint, (6, Some(6)));
}
#[test]
fn size_hint_without_upper() {
let mut map = StreamMap::new();
map.insert("a", pin_box(stream::iter(vec![1])));
map.insert("b", pin_box(stream::iter(vec![1, 2])));
map.insert("c", pin_box(pending()));
let size_hint = map.size_hint();
assert_eq!(size_hint, (3, None));
}
#[test]
fn new_capacity_zero() {
let map = StreamMap::<&str, stream::Pending<()>>::new();
assert_eq!(0, map.capacity());
assert!(map.keys().next().is_none());
}
#[test]
fn with_capacity() {
let map = StreamMap::<&str, stream::Pending<()>>::with_capacity(10);
assert!(10 <= map.capacity());
assert!(map.keys().next().is_none());
}
#[test]
fn iter_keys() {
let mut map = StreamMap::new();
map.insert("a", pending::<i32>());
map.insert("b", pending());
map.insert("c", pending());
let mut keys = map.keys().collect::<Vec<_>>();
keys.sort_unstable();
assert_eq!(&keys[..], &[&"a", &"b", &"c"]);
}
#[test]
fn iter_values() {
let mut map = StreamMap::new();
map.insert("a", stream::iter(vec![1]));
map.insert("b", stream::iter(vec![1, 2]));
map.insert("c", stream::iter(vec![1, 2, 3]));
let mut size_hints = map.values().map(|s| s.size_hint().0).collect::<Vec<_>>();
size_hints.sort_unstable();
assert_eq!(&size_hints[..], &[1, 2, 3]);
}
#[test]
fn iter_values_mut() {
let mut map = StreamMap::new();
map.insert("a", stream::iter(vec![1]));
map.insert("b", stream::iter(vec![1, 2]));
map.insert("c", stream::iter(vec![1, 2, 3]));
let mut size_hints = map
.values_mut()
.map(|s: &mut _| s.size_hint().0)
.collect::<Vec<_>>();
size_hints.sort_unstable();
assert_eq!(&size_hints[..], &[1, 2, 3]);
}
#[test]
fn clear() {
let mut map = task::spawn(StreamMap::new());
map.insert("a", stream::iter(vec![1]));
map.insert("b", stream::iter(vec![1, 2]));
map.insert("c", stream::iter(vec![1, 2, 3]));
assert_ready_some!(map.poll_next());
map.clear();
assert_ready_none!(map.poll_next());
assert!(map.is_empty());
}
#[test]
fn contains_key_borrow() {
let mut map = StreamMap::new();
map.insert("foo".to_string(), pending::<()>());
assert!(map.contains_key("foo"));
}
#[test]
fn one_ready_many_none() {
// Run a few times because of randomness
for _ in 0..100 {
let mut map = task::spawn(StreamMap::new());
map.insert(0, pin_box(stream::empty()));
map.insert(1, pin_box(stream::empty()));
map.insert(2, pin_box(stream::once("hello")));
map.insert(3, pin_box(stream::pending()));
let v = assert_ready_some!(map.poll_next());
assert_eq!(v, (2, "hello"));
}
}
#[cfg(not(target_os = "wasi"))]
proptest::proptest! {
#[test]
fn fuzz_pending_complete_mix(kinds: Vec<bool>) {
use std::task::{Context, Poll};
struct DidPoll<T> {
did_poll: bool,
inner: T,
}
impl<T: Stream + Unpin> Stream for DidPoll<T> {
type Item = T::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Option<T::Item>>
{
self.did_poll = true;
Pin::new(&mut self.inner).poll_next(cx)
}
}
for _ in 0..10 {
let mut map = task::spawn(StreamMap::new());
let mut expect = 0;
for (i, &is_empty) in kinds.iter().enumerate() {
let inner = if is_empty {
pin_box(stream::empty::<()>())
} else {
expect += 1;
pin_box(stream::pending::<()>())
};
let stream = DidPoll {
did_poll: false,
inner,
};
map.insert(i, stream);
}
if expect == 0 {
assert_ready_none!(map.poll_next());
} else {
assert_pending!(map.poll_next());
assert_eq!(expect, map.values().count());
for stream in map.values() {
assert!(stream.did_poll);
}
}
}
}
}
fn pin_box<T: Stream<Item = U> + 'static, U>(s: T) -> Pin<Box<dyn Stream<Item = U>>> {
Box::pin(s)
}

View file

@ -0,0 +1,109 @@
#![cfg(all(feature = "time", feature = "sync", feature = "io-util"))]
use tokio::time::{self, sleep, Duration};
use tokio_stream::{self, StreamExt};
use tokio_test::*;
use futures::stream;
async fn maybe_sleep(idx: i32) -> i32 {
if idx % 2 == 0 {
sleep(ms(200)).await;
}
idx
}
fn ms(n: u64) -> Duration {
Duration::from_millis(n)
}
#[tokio::test]
async fn basic_usage() {
time::pause();
// Items 2 and 4 time out. If we run the stream until it completes,
// we end up with the following items:
//
// [Ok(1), Err(Elapsed), Ok(2), Ok(3), Err(Elapsed), Ok(4)]
let stream = stream::iter(1..=4).then(maybe_sleep).timeout(ms(100));
let mut stream = task::spawn(stream);
// First item completes immediately
assert_ready_eq!(stream.poll_next(), Some(Ok(1)));
// Second item is delayed 200ms, times out after 100ms
assert_pending!(stream.poll_next());
time::advance(ms(150)).await;
let v = assert_ready!(stream.poll_next());
assert!(v.unwrap().is_err());
assert_pending!(stream.poll_next());
time::advance(ms(100)).await;
assert_ready_eq!(stream.poll_next(), Some(Ok(2)));
// Third item is ready immediately
assert_ready_eq!(stream.poll_next(), Some(Ok(3)));
// Fourth item is delayed 200ms, times out after 100ms
assert_pending!(stream.poll_next());
time::advance(ms(60)).await;
assert_pending!(stream.poll_next()); // nothing ready yet
time::advance(ms(60)).await;
let v = assert_ready!(stream.poll_next());
assert!(v.unwrap().is_err()); // timeout!
time::advance(ms(120)).await;
assert_ready_eq!(stream.poll_next(), Some(Ok(4)));
// Done.
assert_ready_eq!(stream.poll_next(), None);
}
#[tokio::test]
async fn return_elapsed_errors_only_once() {
time::pause();
let stream = stream::iter(1..=3).then(maybe_sleep).timeout(ms(50));
let mut stream = task::spawn(stream);
// First item completes immediately
assert_ready_eq!(stream.poll_next(), Some(Ok(1)));
// Second item is delayed 200ms, times out after 50ms. Only one `Elapsed`
// error is returned.
assert_pending!(stream.poll_next());
//
time::advance(ms(51)).await;
let v = assert_ready!(stream.poll_next());
assert!(v.unwrap().is_err()); // timeout!
// deadline elapses again, but no error is returned
time::advance(ms(50)).await;
assert_pending!(stream.poll_next());
time::advance(ms(100)).await;
assert_ready_eq!(stream.poll_next(), Some(Ok(2)));
assert_ready_eq!(stream.poll_next(), Some(Ok(3)));
// Done
assert_ready_eq!(stream.poll_next(), None);
}
#[tokio::test]
async fn no_timeouts() {
let stream = stream::iter(vec![1, 3, 5])
.then(maybe_sleep)
.timeout(ms(100));
let mut stream = task::spawn(stream);
assert_ready_eq!(stream.poll_next(), Some(Ok(1)));
assert_ready_eq!(stream.poll_next(), Some(Ok(3)));
assert_ready_eq!(stream.poll_next(), Some(Ok(5)));
assert_ready_eq!(stream.poll_next(), None);
}

View file

@ -0,0 +1,15 @@
use async_stream::stream;
use tokio::sync::mpsc::{self, UnboundedSender};
use tokio_stream::Stream;
pub fn unbounded_channel_stream<T: Unpin>() -> (UnboundedSender<T>, impl Stream<Item = T>) {
let (tx, mut rx) = mpsc::unbounded_channel();
let stream = stream! {
while let Some(item) = rx.recv().await {
yield item;
}
};
(tx, stream)
}

View file

@ -0,0 +1,28 @@
#![warn(rust_2018_idioms)]
#![cfg(all(feature = "time", feature = "sync", feature = "io-util"))]
use tokio::time;
use tokio_stream::StreamExt;
use tokio_test::*;
use std::time::Duration;
#[tokio::test]
async fn usage() {
time::pause();
let mut stream = task::spawn(futures::stream::repeat(()).throttle(Duration::from_millis(100)));
assert_ready!(stream.poll_next());
assert_pending!(stream.poll_next());
time::advance(Duration::from_millis(90)).await;
assert_pending!(stream.poll_next());
time::advance(Duration::from_millis(101)).await;
assert!(stream.is_woken());
assert_ready!(stream.poll_next());
}

29
vendor/tokio-stream/tests/watch.rs vendored Normal file
View file

@ -0,0 +1,29 @@
#![cfg(feature = "sync")]
use tokio::sync::watch;
use tokio_stream::wrappers::WatchStream;
use tokio_stream::StreamExt;
#[tokio::test]
async fn message_not_twice() {
let (tx, rx) = watch::channel("hello");
let mut counter = 0;
let mut stream = WatchStream::new(rx).map(move |payload| {
println!("{}", payload);
if payload == "goodbye" {
counter += 1;
}
if counter >= 2 {
panic!("too many goodbyes");
}
});
let task = tokio::spawn(async move { while stream.next().await.is_some() {} });
// Send goodbye just once
tx.send("goodbye").unwrap();
drop(tx);
task.await.unwrap();
}