Vendor things

This commit is contained in:
John Doty 2024-03-08 11:03:01 -08:00
parent 5deceec006
commit 977e3c17e5
19434 changed files with 10682014 additions and 0 deletions

View file

@ -0,0 +1,637 @@
//! Bounded channel based on a preallocated array.
//!
//! This flavor has a fixed, positive capacity.
//!
//! The implementation is based on Dmitry Vyukov's bounded MPMC queue.
//!
//! Source:
//! - <http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue>
//! - <https://docs.google.com/document/d/1yIAYmbvL3JxOKOjuCyon7JhW4cSv1wy5hC0ApeGMV9s/pub>
use std::boxed::Box;
use std::cell::UnsafeCell;
use std::mem::{self, MaybeUninit};
use std::ptr;
use std::sync::atomic::{self, AtomicUsize, Ordering};
use std::time::Instant;
use crossbeam_utils::{Backoff, CachePadded};
use crate::context::Context;
use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
use crate::select::{Operation, SelectHandle, Selected, Token};
use crate::waker::SyncWaker;
/// A slot in a channel.
struct Slot<T> {
/// The current stamp.
stamp: AtomicUsize,
/// The message in this slot.
msg: UnsafeCell<MaybeUninit<T>>,
}
/// The token type for the array flavor.
#[derive(Debug)]
pub(crate) struct ArrayToken {
/// Slot to read from or write to.
slot: *const u8,
/// Stamp to store into the slot after reading or writing.
stamp: usize,
}
impl Default for ArrayToken {
#[inline]
fn default() -> Self {
ArrayToken {
slot: ptr::null(),
stamp: 0,
}
}
}
/// Bounded channel based on a preallocated array.
pub(crate) struct Channel<T> {
/// The head of the channel.
///
/// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but
/// packed into a single `usize`. The lower bits represent the index, while the upper bits
/// represent the lap. The mark bit in the head is always zero.
///
/// Messages are popped from the head of the channel.
head: CachePadded<AtomicUsize>,
/// The tail of the channel.
///
/// This value is a "stamp" consisting of an index into the buffer, a mark bit, and a lap, but
/// packed into a single `usize`. The lower bits represent the index, while the upper bits
/// represent the lap. The mark bit indicates that the channel is disconnected.
///
/// Messages are pushed into the tail of the channel.
tail: CachePadded<AtomicUsize>,
/// The buffer holding slots.
buffer: Box<[Slot<T>]>,
/// The channel capacity.
cap: usize,
/// A stamp with the value of `{ lap: 1, mark: 0, index: 0 }`.
one_lap: usize,
/// If this bit is set in the tail, that means the channel is disconnected.
mark_bit: usize,
/// Senders waiting while the channel is full.
senders: SyncWaker,
/// Receivers waiting while the channel is empty and not disconnected.
receivers: SyncWaker,
}
impl<T> Channel<T> {
/// Creates a bounded channel of capacity `cap`.
pub(crate) fn with_capacity(cap: usize) -> Self {
assert!(cap > 0, "capacity must be positive");
// Compute constants `mark_bit` and `one_lap`.
let mark_bit = (cap + 1).next_power_of_two();
let one_lap = mark_bit * 2;
// Head is initialized to `{ lap: 0, mark: 0, index: 0 }`.
let head = 0;
// Tail is initialized to `{ lap: 0, mark: 0, index: 0 }`.
let tail = 0;
// Allocate a buffer of `cap` slots initialized
// with stamps.
let buffer: Box<[Slot<T>]> = (0..cap)
.map(|i| {
// Set the stamp to `{ lap: 0, mark: 0, index: i }`.
Slot {
stamp: AtomicUsize::new(i),
msg: UnsafeCell::new(MaybeUninit::uninit()),
}
})
.collect();
Channel {
buffer,
cap,
one_lap,
mark_bit,
head: CachePadded::new(AtomicUsize::new(head)),
tail: CachePadded::new(AtomicUsize::new(tail)),
senders: SyncWaker::new(),
receivers: SyncWaker::new(),
}
}
/// Returns a receiver handle to the channel.
pub(crate) fn receiver(&self) -> Receiver<'_, T> {
Receiver(self)
}
/// Returns a sender handle to the channel.
pub(crate) fn sender(&self) -> Sender<'_, T> {
Sender(self)
}
/// Attempts to reserve a slot for sending a message.
fn start_send(&self, token: &mut Token) -> bool {
let backoff = Backoff::new();
let mut tail = self.tail.load(Ordering::Relaxed);
loop {
// Check if the channel is disconnected.
if tail & self.mark_bit != 0 {
token.array.slot = ptr::null();
token.array.stamp = 0;
return true;
}
// Deconstruct the tail.
let index = tail & (self.mark_bit - 1);
let lap = tail & !(self.one_lap - 1);
// Inspect the corresponding slot.
debug_assert!(index < self.buffer.len());
let slot = unsafe { self.buffer.get_unchecked(index) };
let stamp = slot.stamp.load(Ordering::Acquire);
// If the tail and the stamp match, we may attempt to push.
if tail == stamp {
let new_tail = if index + 1 < self.cap {
// Same lap, incremented index.
// Set to `{ lap: lap, mark: 0, index: index + 1 }`.
tail + 1
} else {
// One lap forward, index wraps around to zero.
// Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
lap.wrapping_add(self.one_lap)
};
// Try moving the tail.
match self.tail.compare_exchange_weak(
tail,
new_tail,
Ordering::SeqCst,
Ordering::Relaxed,
) {
Ok(_) => {
// Prepare the token for the follow-up call to `write`.
token.array.slot = slot as *const Slot<T> as *const u8;
token.array.stamp = tail + 1;
return true;
}
Err(t) => {
tail = t;
backoff.spin();
}
}
} else if stamp.wrapping_add(self.one_lap) == tail + 1 {
atomic::fence(Ordering::SeqCst);
let head = self.head.load(Ordering::Relaxed);
// If the head lags one lap behind the tail as well...
if head.wrapping_add(self.one_lap) == tail {
// ...then the channel is full.
return false;
}
backoff.spin();
tail = self.tail.load(Ordering::Relaxed);
} else {
// Snooze because we need to wait for the stamp to get updated.
backoff.snooze();
tail = self.tail.load(Ordering::Relaxed);
}
}
}
/// Writes a message into the channel.
pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
// If there is no slot, the channel is disconnected.
if token.array.slot.is_null() {
return Err(msg);
}
let slot: &Slot<T> = &*token.array.slot.cast::<Slot<T>>();
// Write the message into the slot and update the stamp.
slot.msg.get().write(MaybeUninit::new(msg));
slot.stamp.store(token.array.stamp, Ordering::Release);
// Wake a sleeping receiver.
self.receivers.notify();
Ok(())
}
/// Attempts to reserve a slot for receiving a message.
fn start_recv(&self, token: &mut Token) -> bool {
let backoff = Backoff::new();
let mut head = self.head.load(Ordering::Relaxed);
loop {
// Deconstruct the head.
let index = head & (self.mark_bit - 1);
let lap = head & !(self.one_lap - 1);
// Inspect the corresponding slot.
debug_assert!(index < self.buffer.len());
let slot = unsafe { self.buffer.get_unchecked(index) };
let stamp = slot.stamp.load(Ordering::Acquire);
// If the the stamp is ahead of the head by 1, we may attempt to pop.
if head + 1 == stamp {
let new = if index + 1 < self.cap {
// Same lap, incremented index.
// Set to `{ lap: lap, mark: 0, index: index + 1 }`.
head + 1
} else {
// One lap forward, index wraps around to zero.
// Set to `{ lap: lap.wrapping_add(1), mark: 0, index: 0 }`.
lap.wrapping_add(self.one_lap)
};
// Try moving the head.
match self.head.compare_exchange_weak(
head,
new,
Ordering::SeqCst,
Ordering::Relaxed,
) {
Ok(_) => {
// Prepare the token for the follow-up call to `read`.
token.array.slot = slot as *const Slot<T> as *const u8;
token.array.stamp = head.wrapping_add(self.one_lap);
return true;
}
Err(h) => {
head = h;
backoff.spin();
}
}
} else if stamp == head {
atomic::fence(Ordering::SeqCst);
let tail = self.tail.load(Ordering::Relaxed);
// If the tail equals the head, that means the channel is empty.
if (tail & !self.mark_bit) == head {
// If the channel is disconnected...
if tail & self.mark_bit != 0 {
// ...then receive an error.
token.array.slot = ptr::null();
token.array.stamp = 0;
return true;
} else {
// Otherwise, the receive operation is not ready.
return false;
}
}
backoff.spin();
head = self.head.load(Ordering::Relaxed);
} else {
// Snooze because we need to wait for the stamp to get updated.
backoff.snooze();
head = self.head.load(Ordering::Relaxed);
}
}
}
/// Reads a message from the channel.
pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
if token.array.slot.is_null() {
// The channel is disconnected.
return Err(());
}
let slot: &Slot<T> = &*token.array.slot.cast::<Slot<T>>();
// Read the message from the slot and update the stamp.
let msg = slot.msg.get().read().assume_init();
slot.stamp.store(token.array.stamp, Ordering::Release);
// Wake a sleeping sender.
self.senders.notify();
Ok(msg)
}
/// Attempts to send a message into the channel.
pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
let token = &mut Token::default();
if self.start_send(token) {
unsafe { self.write(token, msg).map_err(TrySendError::Disconnected) }
} else {
Err(TrySendError::Full(msg))
}
}
/// Sends a message into the channel.
pub(crate) fn send(
&self,
msg: T,
deadline: Option<Instant>,
) -> Result<(), SendTimeoutError<T>> {
let token = &mut Token::default();
loop {
// Try sending a message several times.
let backoff = Backoff::new();
loop {
if self.start_send(token) {
let res = unsafe { self.write(token, msg) };
return res.map_err(SendTimeoutError::Disconnected);
}
if backoff.is_completed() {
break;
} else {
backoff.snooze();
}
}
if let Some(d) = deadline {
if Instant::now() >= d {
return Err(SendTimeoutError::Timeout(msg));
}
}
Context::with(|cx| {
// Prepare for blocking until a receiver wakes us up.
let oper = Operation::hook(token);
self.senders.register(oper, cx);
// Has the channel become ready just now?
if !self.is_full() || self.is_disconnected() {
let _ = cx.try_select(Selected::Aborted);
}
// Block the current thread.
let sel = cx.wait_until(deadline);
match sel {
Selected::Waiting => unreachable!(),
Selected::Aborted | Selected::Disconnected => {
self.senders.unregister(oper).unwrap();
}
Selected::Operation(_) => {}
}
});
}
}
/// Attempts to receive a message without blocking.
pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
let token = &mut Token::default();
if self.start_recv(token) {
unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
} else {
Err(TryRecvError::Empty)
}
}
/// Receives a message from the channel.
pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
let token = &mut Token::default();
loop {
// Try receiving a message several times.
let backoff = Backoff::new();
loop {
if self.start_recv(token) {
let res = unsafe { self.read(token) };
return res.map_err(|_| RecvTimeoutError::Disconnected);
}
if backoff.is_completed() {
break;
} else {
backoff.snooze();
}
}
if let Some(d) = deadline {
if Instant::now() >= d {
return Err(RecvTimeoutError::Timeout);
}
}
Context::with(|cx| {
// Prepare for blocking until a sender wakes us up.
let oper = Operation::hook(token);
self.receivers.register(oper, cx);
// Has the channel become ready just now?
if !self.is_empty() || self.is_disconnected() {
let _ = cx.try_select(Selected::Aborted);
}
// Block the current thread.
let sel = cx.wait_until(deadline);
match sel {
Selected::Waiting => unreachable!(),
Selected::Aborted | Selected::Disconnected => {
self.receivers.unregister(oper).unwrap();
// If the channel was disconnected, we still have to check for remaining
// messages.
}
Selected::Operation(_) => {}
}
});
}
}
/// Returns the current number of messages inside the channel.
pub(crate) fn len(&self) -> usize {
loop {
// Load the tail, then load the head.
let tail = self.tail.load(Ordering::SeqCst);
let head = self.head.load(Ordering::SeqCst);
// If the tail didn't change, we've got consistent values to work with.
if self.tail.load(Ordering::SeqCst) == tail {
let hix = head & (self.mark_bit - 1);
let tix = tail & (self.mark_bit - 1);
return if hix < tix {
tix - hix
} else if hix > tix {
self.cap - hix + tix
} else if (tail & !self.mark_bit) == head {
0
} else {
self.cap
};
}
}
}
/// Returns the capacity of the channel.
pub(crate) fn capacity(&self) -> Option<usize> {
Some(self.cap)
}
/// Disconnects the channel and wakes up all blocked senders and receivers.
///
/// Returns `true` if this call disconnected the channel.
pub(crate) fn disconnect(&self) -> bool {
let tail = self.tail.fetch_or(self.mark_bit, Ordering::SeqCst);
if tail & self.mark_bit == 0 {
self.senders.disconnect();
self.receivers.disconnect();
true
} else {
false
}
}
/// Returns `true` if the channel is disconnected.
pub(crate) fn is_disconnected(&self) -> bool {
self.tail.load(Ordering::SeqCst) & self.mark_bit != 0
}
/// Returns `true` if the channel is empty.
pub(crate) fn is_empty(&self) -> bool {
let head = self.head.load(Ordering::SeqCst);
let tail = self.tail.load(Ordering::SeqCst);
// Is the tail equal to the head?
//
// Note: If the head changes just before we load the tail, that means there was a moment
// when the channel was not empty, so it is safe to just return `false`.
(tail & !self.mark_bit) == head
}
/// Returns `true` if the channel is full.
pub(crate) fn is_full(&self) -> bool {
let tail = self.tail.load(Ordering::SeqCst);
let head = self.head.load(Ordering::SeqCst);
// Is the head lagging one lap behind tail?
//
// Note: If the tail changes just before we load the head, that means there was a moment
// when the channel was not full, so it is safe to just return `false`.
head.wrapping_add(self.one_lap) == tail & !self.mark_bit
}
}
impl<T> Drop for Channel<T> {
fn drop(&mut self) {
if mem::needs_drop::<T>() {
// Get the index of the head.
let head = *self.head.get_mut();
let tail = *self.tail.get_mut();
let hix = head & (self.mark_bit - 1);
let tix = tail & (self.mark_bit - 1);
let len = if hix < tix {
tix - hix
} else if hix > tix {
self.cap - hix + tix
} else if (tail & !self.mark_bit) == head {
0
} else {
self.cap
};
// Loop over all slots that hold a message and drop them.
for i in 0..len {
// Compute the index of the next slot holding a message.
let index = if hix + i < self.cap {
hix + i
} else {
hix + i - self.cap
};
unsafe {
debug_assert!(index < self.buffer.len());
let slot = self.buffer.get_unchecked_mut(index);
(*slot.msg.get()).assume_init_drop();
}
}
}
}
}
/// Receiver handle to a channel.
pub(crate) struct Receiver<'a, T>(&'a Channel<T>);
/// Sender handle to a channel.
pub(crate) struct Sender<'a, T>(&'a Channel<T>);
impl<T> SelectHandle for Receiver<'_, T> {
fn try_select(&self, token: &mut Token) -> bool {
self.0.start_recv(token)
}
fn deadline(&self) -> Option<Instant> {
None
}
fn register(&self, oper: Operation, cx: &Context) -> bool {
self.0.receivers.register(oper, cx);
self.is_ready()
}
fn unregister(&self, oper: Operation) {
self.0.receivers.unregister(oper);
}
fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
self.try_select(token)
}
fn is_ready(&self) -> bool {
!self.0.is_empty() || self.0.is_disconnected()
}
fn watch(&self, oper: Operation, cx: &Context) -> bool {
self.0.receivers.watch(oper, cx);
self.is_ready()
}
fn unwatch(&self, oper: Operation) {
self.0.receivers.unwatch(oper);
}
}
impl<T> SelectHandle for Sender<'_, T> {
fn try_select(&self, token: &mut Token) -> bool {
self.0.start_send(token)
}
fn deadline(&self) -> Option<Instant> {
None
}
fn register(&self, oper: Operation, cx: &Context) -> bool {
self.0.senders.register(oper, cx);
self.is_ready()
}
fn unregister(&self, oper: Operation) {
self.0.senders.unregister(oper);
}
fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
self.try_select(token)
}
fn is_ready(&self) -> bool {
!self.0.is_full() || self.0.is_disconnected()
}
fn watch(&self, oper: Operation, cx: &Context) -> bool {
self.0.senders.watch(oper, cx);
self.is_ready()
}
fn unwatch(&self, oper: Operation) {
self.0.senders.unwatch(oper);
}
}

View file

@ -0,0 +1,197 @@
//! Channel that delivers a message at a certain moment in time.
//!
//! Messages cannot be sent into this kind of channel; they are materialized on demand.
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use std::time::Instant;
use crate::context::Context;
use crate::err::{RecvTimeoutError, TryRecvError};
use crate::select::{Operation, SelectHandle, Token};
use crate::utils;
/// Result of a receive operation.
pub(crate) type AtToken = Option<Instant>;
/// Channel that delivers a message at a certain moment in time
pub(crate) struct Channel {
/// The instant at which the message will be delivered.
delivery_time: Instant,
/// `true` if the message has been received.
received: AtomicBool,
}
impl Channel {
/// Creates a channel that delivers a message at a certain instant in time.
#[inline]
pub(crate) fn new_deadline(when: Instant) -> Self {
Channel {
delivery_time: when,
received: AtomicBool::new(false),
}
}
/// Attempts to receive a message without blocking.
#[inline]
pub(crate) fn try_recv(&self) -> Result<Instant, TryRecvError> {
// We use relaxed ordering because this is just an optional optimistic check.
if self.received.load(Ordering::Relaxed) {
// The message has already been received.
return Err(TryRecvError::Empty);
}
if Instant::now() < self.delivery_time {
// The message was not delivered yet.
return Err(TryRecvError::Empty);
}
// Try receiving the message if it is still available.
if !self.received.swap(true, Ordering::SeqCst) {
// Success! Return delivery time as the message.
Ok(self.delivery_time)
} else {
// The message was already received.
Err(TryRecvError::Empty)
}
}
/// Receives a message from the channel.
#[inline]
pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<Instant, RecvTimeoutError> {
// We use relaxed ordering because this is just an optional optimistic check.
if self.received.load(Ordering::Relaxed) {
// The message has already been received.
utils::sleep_until(deadline);
return Err(RecvTimeoutError::Timeout);
}
// Wait until the message is received or the deadline is reached.
loop {
let now = Instant::now();
let deadline = match deadline {
// Check if we can receive the next message.
_ if now >= self.delivery_time => break,
// Check if the timeout deadline has been reached.
Some(d) if now >= d => return Err(RecvTimeoutError::Timeout),
// Sleep until one of the above happens
Some(d) if d < self.delivery_time => d,
_ => self.delivery_time,
};
thread::sleep(deadline - now);
}
// Try receiving the message if it is still available.
if !self.received.swap(true, Ordering::SeqCst) {
// Success! Return the message, which is the instant at which it was delivered.
Ok(self.delivery_time)
} else {
// The message was already received. Block forever.
utils::sleep_until(None);
unreachable!()
}
}
/// Reads a message from the channel.
#[inline]
pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<Instant, ()> {
token.at.ok_or(())
}
/// Returns `true` if the channel is empty.
#[inline]
pub(crate) fn is_empty(&self) -> bool {
// We use relaxed ordering because this is just an optional optimistic check.
if self.received.load(Ordering::Relaxed) {
return true;
}
// If the delivery time hasn't been reached yet, the channel is empty.
if Instant::now() < self.delivery_time {
return true;
}
// The delivery time has been reached. The channel is empty only if the message has already
// been received.
self.received.load(Ordering::SeqCst)
}
/// Returns `true` if the channel is full.
#[inline]
pub(crate) fn is_full(&self) -> bool {
!self.is_empty()
}
/// Returns the number of messages in the channel.
#[inline]
pub(crate) fn len(&self) -> usize {
if self.is_empty() {
0
} else {
1
}
}
/// Returns the capacity of the channel.
#[inline]
pub(crate) fn capacity(&self) -> Option<usize> {
Some(1)
}
}
impl SelectHandle for Channel {
#[inline]
fn try_select(&self, token: &mut Token) -> bool {
match self.try_recv() {
Ok(msg) => {
token.at = Some(msg);
true
}
Err(TryRecvError::Disconnected) => {
token.at = None;
true
}
Err(TryRecvError::Empty) => false,
}
}
#[inline]
fn deadline(&self) -> Option<Instant> {
// We use relaxed ordering because this is just an optional optimistic check.
if self.received.load(Ordering::Relaxed) {
None
} else {
Some(self.delivery_time)
}
}
#[inline]
fn register(&self, _oper: Operation, _cx: &Context) -> bool {
self.is_ready()
}
#[inline]
fn unregister(&self, _oper: Operation) {}
#[inline]
fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
self.try_select(token)
}
#[inline]
fn is_ready(&self) -> bool {
!self.is_empty()
}
#[inline]
fn watch(&self, _oper: Operation, _cx: &Context) -> bool {
self.is_ready()
}
#[inline]
fn unwatch(&self, _oper: Operation) {}
}

View file

@ -0,0 +1,755 @@
//! Unbounded channel implemented as a linked list.
use std::boxed::Box;
use std::cell::UnsafeCell;
use std::marker::PhantomData;
use std::mem::MaybeUninit;
use std::ptr;
use std::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering};
use std::time::Instant;
use crossbeam_utils::{Backoff, CachePadded};
use crate::context::Context;
use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
use crate::select::{Operation, SelectHandle, Selected, Token};
use crate::waker::SyncWaker;
// TODO(stjepang): Once we bump the minimum required Rust version to 1.28 or newer, re-apply the
// following changes by @kleimkuhler:
//
// 1. https://github.com/crossbeam-rs/crossbeam-channel/pull/100
// 2. https://github.com/crossbeam-rs/crossbeam-channel/pull/101
// Bits indicating the state of a slot:
// * If a message has been written into the slot, `WRITE` is set.
// * If a message has been read from the slot, `READ` is set.
// * If the block is being destroyed, `DESTROY` is set.
const WRITE: usize = 1;
const READ: usize = 2;
const DESTROY: usize = 4;
// Each block covers one "lap" of indices.
const LAP: usize = 32;
// The maximum number of messages a block can hold.
const BLOCK_CAP: usize = LAP - 1;
// How many lower bits are reserved for metadata.
const SHIFT: usize = 1;
// Has two different purposes:
// * If set in head, indicates that the block is not the last one.
// * If set in tail, indicates that the channel is disconnected.
const MARK_BIT: usize = 1;
/// A slot in a block.
struct Slot<T> {
/// The message.
msg: UnsafeCell<MaybeUninit<T>>,
/// The state of the slot.
state: AtomicUsize,
}
impl<T> Slot<T> {
const UNINIT: Self = Self {
msg: UnsafeCell::new(MaybeUninit::uninit()),
state: AtomicUsize::new(0),
};
/// Waits until a message is written into the slot.
fn wait_write(&self) {
let backoff = Backoff::new();
while self.state.load(Ordering::Acquire) & WRITE == 0 {
backoff.snooze();
}
}
}
/// A block in a linked list.
///
/// Each block in the list can hold up to `BLOCK_CAP` messages.
struct Block<T> {
/// The next block in the linked list.
next: AtomicPtr<Block<T>>,
/// Slots for messages.
slots: [Slot<T>; BLOCK_CAP],
}
impl<T> Block<T> {
/// Creates an empty block.
fn new() -> Block<T> {
Self {
next: AtomicPtr::new(ptr::null_mut()),
slots: [Slot::UNINIT; BLOCK_CAP],
}
}
/// Waits until the next pointer is set.
fn wait_next(&self) -> *mut Block<T> {
let backoff = Backoff::new();
loop {
let next = self.next.load(Ordering::Acquire);
if !next.is_null() {
return next;
}
backoff.snooze();
}
}
/// Sets the `DESTROY` bit in slots starting from `start` and destroys the block.
unsafe fn destroy(this: *mut Block<T>, start: usize) {
// It is not necessary to set the `DESTROY` bit in the last slot because that slot has
// begun destruction of the block.
for i in start..BLOCK_CAP - 1 {
let slot = (*this).slots.get_unchecked(i);
// Mark the `DESTROY` bit if a thread is still using the slot.
if slot.state.load(Ordering::Acquire) & READ == 0
&& slot.state.fetch_or(DESTROY, Ordering::AcqRel) & READ == 0
{
// If a thread is still using the slot, it will continue destruction of the block.
return;
}
}
// No thread is using the block, now it is safe to destroy it.
drop(Box::from_raw(this));
}
}
/// A position in a channel.
#[derive(Debug)]
struct Position<T> {
/// The index in the channel.
index: AtomicUsize,
/// The block in the linked list.
block: AtomicPtr<Block<T>>,
}
/// The token type for the list flavor.
#[derive(Debug)]
pub(crate) struct ListToken {
/// The block of slots.
block: *const u8,
/// The offset into the block.
offset: usize,
}
impl Default for ListToken {
#[inline]
fn default() -> Self {
ListToken {
block: ptr::null(),
offset: 0,
}
}
}
/// Unbounded channel implemented as a linked list.
///
/// Each message sent into the channel is assigned a sequence number, i.e. an index. Indices are
/// represented as numbers of type `usize` and wrap on overflow.
///
/// Consecutive messages are grouped into blocks in order to put less pressure on the allocator and
/// improve cache efficiency.
pub(crate) struct Channel<T> {
/// The head of the channel.
head: CachePadded<Position<T>>,
/// The tail of the channel.
tail: CachePadded<Position<T>>,
/// Receivers waiting while the channel is empty and not disconnected.
receivers: SyncWaker,
/// Indicates that dropping a `Channel<T>` may drop messages of type `T`.
_marker: PhantomData<T>,
}
impl<T> Channel<T> {
/// Creates a new unbounded channel.
pub(crate) fn new() -> Self {
Channel {
head: CachePadded::new(Position {
block: AtomicPtr::new(ptr::null_mut()),
index: AtomicUsize::new(0),
}),
tail: CachePadded::new(Position {
block: AtomicPtr::new(ptr::null_mut()),
index: AtomicUsize::new(0),
}),
receivers: SyncWaker::new(),
_marker: PhantomData,
}
}
/// Returns a receiver handle to the channel.
pub(crate) fn receiver(&self) -> Receiver<'_, T> {
Receiver(self)
}
/// Returns a sender handle to the channel.
pub(crate) fn sender(&self) -> Sender<'_, T> {
Sender(self)
}
/// Attempts to reserve a slot for sending a message.
fn start_send(&self, token: &mut Token) -> bool {
let backoff = Backoff::new();
let mut tail = self.tail.index.load(Ordering::Acquire);
let mut block = self.tail.block.load(Ordering::Acquire);
let mut next_block = None;
loop {
// Check if the channel is disconnected.
if tail & MARK_BIT != 0 {
token.list.block = ptr::null();
return true;
}
// Calculate the offset of the index into the block.
let offset = (tail >> SHIFT) % LAP;
// If we reached the end of the block, wait until the next one is installed.
if offset == BLOCK_CAP {
backoff.snooze();
tail = self.tail.index.load(Ordering::Acquire);
block = self.tail.block.load(Ordering::Acquire);
continue;
}
// If we're going to have to install the next block, allocate it in advance in order to
// make the wait for other threads as short as possible.
if offset + 1 == BLOCK_CAP && next_block.is_none() {
next_block = Some(Box::new(Block::<T>::new()));
}
// If this is the first message to be sent into the channel, we need to allocate the
// first block and install it.
if block.is_null() {
let new = Box::into_raw(Box::new(Block::<T>::new()));
if self
.tail
.block
.compare_exchange(block, new, Ordering::Release, Ordering::Relaxed)
.is_ok()
{
self.head.block.store(new, Ordering::Release);
block = new;
} else {
next_block = unsafe { Some(Box::from_raw(new)) };
tail = self.tail.index.load(Ordering::Acquire);
block = self.tail.block.load(Ordering::Acquire);
continue;
}
}
let new_tail = tail + (1 << SHIFT);
// Try advancing the tail forward.
match self.tail.index.compare_exchange_weak(
tail,
new_tail,
Ordering::SeqCst,
Ordering::Acquire,
) {
Ok(_) => unsafe {
// If we've reached the end of the block, install the next one.
if offset + 1 == BLOCK_CAP {
let next_block = Box::into_raw(next_block.unwrap());
self.tail.block.store(next_block, Ordering::Release);
self.tail.index.fetch_add(1 << SHIFT, Ordering::Release);
(*block).next.store(next_block, Ordering::Release);
}
token.list.block = block as *const u8;
token.list.offset = offset;
return true;
},
Err(t) => {
tail = t;
block = self.tail.block.load(Ordering::Acquire);
backoff.spin();
}
}
}
}
/// Writes a message into the channel.
pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
// If there is no slot, the channel is disconnected.
if token.list.block.is_null() {
return Err(msg);
}
// Write the message into the slot.
let block = token.list.block.cast::<Block<T>>();
let offset = token.list.offset;
let slot = (*block).slots.get_unchecked(offset);
slot.msg.get().write(MaybeUninit::new(msg));
slot.state.fetch_or(WRITE, Ordering::Release);
// Wake a sleeping receiver.
self.receivers.notify();
Ok(())
}
/// Attempts to reserve a slot for receiving a message.
fn start_recv(&self, token: &mut Token) -> bool {
let backoff = Backoff::new();
let mut head = self.head.index.load(Ordering::Acquire);
let mut block = self.head.block.load(Ordering::Acquire);
loop {
// Calculate the offset of the index into the block.
let offset = (head >> SHIFT) % LAP;
// If we reached the end of the block, wait until the next one is installed.
if offset == BLOCK_CAP {
backoff.snooze();
head = self.head.index.load(Ordering::Acquire);
block = self.head.block.load(Ordering::Acquire);
continue;
}
let mut new_head = head + (1 << SHIFT);
if new_head & MARK_BIT == 0 {
atomic::fence(Ordering::SeqCst);
let tail = self.tail.index.load(Ordering::Relaxed);
// If the tail equals the head, that means the channel is empty.
if head >> SHIFT == tail >> SHIFT {
// If the channel is disconnected...
if tail & MARK_BIT != 0 {
// ...then receive an error.
token.list.block = ptr::null();
return true;
} else {
// Otherwise, the receive operation is not ready.
return false;
}
}
// If head and tail are not in the same block, set `MARK_BIT` in head.
if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP {
new_head |= MARK_BIT;
}
}
// The block can be null here only if the first message is being sent into the channel.
// In that case, just wait until it gets initialized.
if block.is_null() {
backoff.snooze();
head = self.head.index.load(Ordering::Acquire);
block = self.head.block.load(Ordering::Acquire);
continue;
}
// Try moving the head index forward.
match self.head.index.compare_exchange_weak(
head,
new_head,
Ordering::SeqCst,
Ordering::Acquire,
) {
Ok(_) => unsafe {
// If we've reached the end of the block, move to the next one.
if offset + 1 == BLOCK_CAP {
let next = (*block).wait_next();
let mut next_index = (new_head & !MARK_BIT).wrapping_add(1 << SHIFT);
if !(*next).next.load(Ordering::Relaxed).is_null() {
next_index |= MARK_BIT;
}
self.head.block.store(next, Ordering::Release);
self.head.index.store(next_index, Ordering::Release);
}
token.list.block = block as *const u8;
token.list.offset = offset;
return true;
},
Err(h) => {
head = h;
block = self.head.block.load(Ordering::Acquire);
backoff.spin();
}
}
}
}
/// Reads a message from the channel.
pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
if token.list.block.is_null() {
// The channel is disconnected.
return Err(());
}
// Read the message.
let block = token.list.block as *mut Block<T>;
let offset = token.list.offset;
let slot = (*block).slots.get_unchecked(offset);
slot.wait_write();
let msg = slot.msg.get().read().assume_init();
// Destroy the block if we've reached the end, or if another thread wanted to destroy but
// couldn't because we were busy reading from the slot.
if offset + 1 == BLOCK_CAP {
Block::destroy(block, 0);
} else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 {
Block::destroy(block, offset + 1);
}
Ok(msg)
}
/// Attempts to send a message into the channel.
pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
self.send(msg, None).map_err(|err| match err {
SendTimeoutError::Disconnected(msg) => TrySendError::Disconnected(msg),
SendTimeoutError::Timeout(_) => unreachable!(),
})
}
/// Sends a message into the channel.
pub(crate) fn send(
&self,
msg: T,
_deadline: Option<Instant>,
) -> Result<(), SendTimeoutError<T>> {
let token = &mut Token::default();
assert!(self.start_send(token));
unsafe {
self.write(token, msg)
.map_err(SendTimeoutError::Disconnected)
}
}
/// Attempts to receive a message without blocking.
pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
let token = &mut Token::default();
if self.start_recv(token) {
unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
} else {
Err(TryRecvError::Empty)
}
}
/// Receives a message from the channel.
pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
let token = &mut Token::default();
loop {
// Try receiving a message several times.
let backoff = Backoff::new();
loop {
if self.start_recv(token) {
unsafe {
return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
}
}
if backoff.is_completed() {
break;
} else {
backoff.snooze();
}
}
if let Some(d) = deadline {
if Instant::now() >= d {
return Err(RecvTimeoutError::Timeout);
}
}
// Prepare for blocking until a sender wakes us up.
Context::with(|cx| {
let oper = Operation::hook(token);
self.receivers.register(oper, cx);
// Has the channel become ready just now?
if !self.is_empty() || self.is_disconnected() {
let _ = cx.try_select(Selected::Aborted);
}
// Block the current thread.
let sel = cx.wait_until(deadline);
match sel {
Selected::Waiting => unreachable!(),
Selected::Aborted | Selected::Disconnected => {
self.receivers.unregister(oper).unwrap();
// If the channel was disconnected, we still have to check for remaining
// messages.
}
Selected::Operation(_) => {}
}
});
}
}
/// Returns the current number of messages inside the channel.
pub(crate) fn len(&self) -> usize {
loop {
// Load the tail index, then load the head index.
let mut tail = self.tail.index.load(Ordering::SeqCst);
let mut head = self.head.index.load(Ordering::SeqCst);
// If the tail index didn't change, we've got consistent indices to work with.
if self.tail.index.load(Ordering::SeqCst) == tail {
// Erase the lower bits.
tail &= !((1 << SHIFT) - 1);
head &= !((1 << SHIFT) - 1);
// Fix up indices if they fall onto block ends.
if (tail >> SHIFT) & (LAP - 1) == LAP - 1 {
tail = tail.wrapping_add(1 << SHIFT);
}
if (head >> SHIFT) & (LAP - 1) == LAP - 1 {
head = head.wrapping_add(1 << SHIFT);
}
// Rotate indices so that head falls into the first block.
let lap = (head >> SHIFT) / LAP;
tail = tail.wrapping_sub((lap * LAP) << SHIFT);
head = head.wrapping_sub((lap * LAP) << SHIFT);
// Remove the lower bits.
tail >>= SHIFT;
head >>= SHIFT;
// Return the difference minus the number of blocks between tail and head.
return tail - head - tail / LAP;
}
}
}
/// Returns the capacity of the channel.
pub(crate) fn capacity(&self) -> Option<usize> {
None
}
/// Disconnects senders and wakes up all blocked receivers.
///
/// Returns `true` if this call disconnected the channel.
pub(crate) fn disconnect_senders(&self) -> bool {
let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
if tail & MARK_BIT == 0 {
self.receivers.disconnect();
true
} else {
false
}
}
/// Disconnects receivers.
///
/// Returns `true` if this call disconnected the channel.
pub(crate) fn disconnect_receivers(&self) -> bool {
let tail = self.tail.index.fetch_or(MARK_BIT, Ordering::SeqCst);
if tail & MARK_BIT == 0 {
// If receivers are dropped first, discard all messages to free
// memory eagerly.
self.discard_all_messages();
true
} else {
false
}
}
/// Discards all messages.
///
/// This method should only be called when all receivers are dropped.
fn discard_all_messages(&self) {
let backoff = Backoff::new();
let mut tail = self.tail.index.load(Ordering::Acquire);
loop {
let offset = (tail >> SHIFT) % LAP;
if offset != BLOCK_CAP {
break;
}
// New updates to tail will be rejected by MARK_BIT and aborted unless it's
// at boundary. We need to wait for the updates take affect otherwise there
// can be memory leaks.
backoff.snooze();
tail = self.tail.index.load(Ordering::Acquire);
}
let mut head = self.head.index.load(Ordering::Acquire);
let mut block = self.head.block.swap(ptr::null_mut(), Ordering::AcqRel);
// If we're going to be dropping messages we need to synchronize with initialization
if head >> SHIFT != tail >> SHIFT {
// The block can be null here only if a sender is in the process of initializing the
// channel while another sender managed to send a message by inserting it into the
// semi-initialized channel and advanced the tail.
// In that case, just wait until it gets initialized.
while block.is_null() {
backoff.snooze();
block = self.head.block.load(Ordering::Acquire);
}
}
unsafe {
// Drop all messages between head and tail and deallocate the heap-allocated blocks.
while head >> SHIFT != tail >> SHIFT {
let offset = (head >> SHIFT) % LAP;
if offset < BLOCK_CAP {
// Drop the message in the slot.
let slot = (*block).slots.get_unchecked(offset);
slot.wait_write();
(*slot.msg.get()).assume_init_drop();
} else {
(*block).wait_next();
// Deallocate the block and move to the next one.
let next = (*block).next.load(Ordering::Acquire);
drop(Box::from_raw(block));
block = next;
}
head = head.wrapping_add(1 << SHIFT);
}
// Deallocate the last remaining block.
if !block.is_null() {
drop(Box::from_raw(block));
}
}
head &= !MARK_BIT;
self.head.index.store(head, Ordering::Release);
}
/// Returns `true` if the channel is disconnected.
pub(crate) fn is_disconnected(&self) -> bool {
self.tail.index.load(Ordering::SeqCst) & MARK_BIT != 0
}
/// Returns `true` if the channel is empty.
pub(crate) fn is_empty(&self) -> bool {
let head = self.head.index.load(Ordering::SeqCst);
let tail = self.tail.index.load(Ordering::SeqCst);
head >> SHIFT == tail >> SHIFT
}
/// Returns `true` if the channel is full.
pub(crate) fn is_full(&self) -> bool {
false
}
}
impl<T> Drop for Channel<T> {
fn drop(&mut self) {
let mut head = *self.head.index.get_mut();
let mut tail = *self.tail.index.get_mut();
let mut block = *self.head.block.get_mut();
// Erase the lower bits.
head &= !((1 << SHIFT) - 1);
tail &= !((1 << SHIFT) - 1);
unsafe {
// Drop all messages between head and tail and deallocate the heap-allocated blocks.
while head != tail {
let offset = (head >> SHIFT) % LAP;
if offset < BLOCK_CAP {
// Drop the message in the slot.
let slot = (*block).slots.get_unchecked(offset);
(*slot.msg.get()).assume_init_drop();
} else {
// Deallocate the block and move to the next one.
let next = *(*block).next.get_mut();
drop(Box::from_raw(block));
block = next;
}
head = head.wrapping_add(1 << SHIFT);
}
// Deallocate the last remaining block.
if !block.is_null() {
drop(Box::from_raw(block));
}
}
}
}
/// Receiver handle to a channel.
pub(crate) struct Receiver<'a, T>(&'a Channel<T>);
/// Sender handle to a channel.
pub(crate) struct Sender<'a, T>(&'a Channel<T>);
impl<T> SelectHandle for Receiver<'_, T> {
fn try_select(&self, token: &mut Token) -> bool {
self.0.start_recv(token)
}
fn deadline(&self) -> Option<Instant> {
None
}
fn register(&self, oper: Operation, cx: &Context) -> bool {
self.0.receivers.register(oper, cx);
self.is_ready()
}
fn unregister(&self, oper: Operation) {
self.0.receivers.unregister(oper);
}
fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
self.try_select(token)
}
fn is_ready(&self) -> bool {
!self.0.is_empty() || self.0.is_disconnected()
}
fn watch(&self, oper: Operation, cx: &Context) -> bool {
self.0.receivers.watch(oper, cx);
self.is_ready()
}
fn unwatch(&self, oper: Operation) {
self.0.receivers.unwatch(oper);
}
}
impl<T> SelectHandle for Sender<'_, T> {
fn try_select(&self, token: &mut Token) -> bool {
self.0.start_send(token)
}
fn deadline(&self) -> Option<Instant> {
None
}
fn register(&self, _oper: Operation, _cx: &Context) -> bool {
self.is_ready()
}
fn unregister(&self, _oper: Operation) {}
fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
self.try_select(token)
}
fn is_ready(&self) -> bool {
true
}
fn watch(&self, _oper: Operation, _cx: &Context) -> bool {
self.is_ready()
}
fn unwatch(&self, _oper: Operation) {}
}

View file

@ -0,0 +1,17 @@
//! Channel flavors.
//!
//! There are six flavors:
//!
//! 1. `at` - Channel that delivers a message after a certain amount of time.
//! 2. `array` - Bounded channel based on a preallocated array.
//! 3. `list` - Unbounded channel implemented as a linked list.
//! 4. `never` - Channel that never delivers messages.
//! 5. `tick` - Channel that delivers messages periodically.
//! 6. `zero` - Zero-capacity channel.
pub(crate) mod array;
pub(crate) mod at;
pub(crate) mod list;
pub(crate) mod never;
pub(crate) mod tick;
pub(crate) mod zero;

View file

@ -0,0 +1,110 @@
//! Channel that never delivers messages.
//!
//! Messages cannot be sent into this kind of channel.
use std::marker::PhantomData;
use std::time::Instant;
use crate::context::Context;
use crate::err::{RecvTimeoutError, TryRecvError};
use crate::select::{Operation, SelectHandle, Token};
use crate::utils;
/// This flavor doesn't need a token.
pub(crate) type NeverToken = ();
/// Channel that never delivers messages.
pub(crate) struct Channel<T> {
_marker: PhantomData<T>,
}
impl<T> Channel<T> {
/// Creates a channel that never delivers messages.
#[inline]
pub(crate) fn new() -> Self {
Channel {
_marker: PhantomData,
}
}
/// Attempts to receive a message without blocking.
#[inline]
pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
Err(TryRecvError::Empty)
}
/// Receives a message from the channel.
#[inline]
pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
utils::sleep_until(deadline);
Err(RecvTimeoutError::Timeout)
}
/// Reads a message from the channel.
#[inline]
pub(crate) unsafe fn read(&self, _token: &mut Token) -> Result<T, ()> {
Err(())
}
/// Returns `true` if the channel is empty.
#[inline]
pub(crate) fn is_empty(&self) -> bool {
true
}
/// Returns `true` if the channel is full.
#[inline]
pub(crate) fn is_full(&self) -> bool {
true
}
/// Returns the number of messages in the channel.
#[inline]
pub(crate) fn len(&self) -> usize {
0
}
/// Returns the capacity of the channel.
#[inline]
pub(crate) fn capacity(&self) -> Option<usize> {
Some(0)
}
}
impl<T> SelectHandle for Channel<T> {
#[inline]
fn try_select(&self, _token: &mut Token) -> bool {
false
}
#[inline]
fn deadline(&self) -> Option<Instant> {
None
}
#[inline]
fn register(&self, _oper: Operation, _cx: &Context) -> bool {
self.is_ready()
}
#[inline]
fn unregister(&self, _oper: Operation) {}
#[inline]
fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
self.try_select(token)
}
#[inline]
fn is_ready(&self) -> bool {
false
}
#[inline]
fn watch(&self, _oper: Operation, _cx: &Context) -> bool {
self.is_ready()
}
#[inline]
fn unwatch(&self, _oper: Operation) {}
}

View file

@ -0,0 +1,167 @@
//! Channel that delivers messages periodically.
//!
//! Messages cannot be sent into this kind of channel; they are materialized on demand.
use std::thread;
use std::time::{Duration, Instant};
use crossbeam_utils::atomic::AtomicCell;
use crate::context::Context;
use crate::err::{RecvTimeoutError, TryRecvError};
use crate::select::{Operation, SelectHandle, Token};
/// Result of a receive operation.
pub(crate) type TickToken = Option<Instant>;
/// Channel that delivers messages periodically.
pub(crate) struct Channel {
/// The instant at which the next message will be delivered.
delivery_time: AtomicCell<Instant>,
/// The time interval in which messages get delivered.
duration: Duration,
}
impl Channel {
/// Creates a channel that delivers messages periodically.
#[inline]
pub(crate) fn new(delivery_time: Instant, dur: Duration) -> Self {
Channel {
delivery_time: AtomicCell::new(delivery_time),
duration: dur,
}
}
/// Attempts to receive a message without blocking.
#[inline]
pub(crate) fn try_recv(&self) -> Result<Instant, TryRecvError> {
loop {
let now = Instant::now();
let delivery_time = self.delivery_time.load();
if now < delivery_time {
return Err(TryRecvError::Empty);
}
if self
.delivery_time
.compare_exchange(delivery_time, now + self.duration)
.is_ok()
{
return Ok(delivery_time);
}
}
}
/// Receives a message from the channel.
#[inline]
pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<Instant, RecvTimeoutError> {
loop {
let delivery_time = self.delivery_time.load();
let now = Instant::now();
if let Some(d) = deadline {
if d < delivery_time {
if now < d {
thread::sleep(d - now);
}
return Err(RecvTimeoutError::Timeout);
}
}
if self
.delivery_time
.compare_exchange(delivery_time, delivery_time.max(now) + self.duration)
.is_ok()
{
if now < delivery_time {
thread::sleep(delivery_time - now);
}
return Ok(delivery_time);
}
}
}
/// Reads a message from the channel.
#[inline]
pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<Instant, ()> {
token.tick.ok_or(())
}
/// Returns `true` if the channel is empty.
#[inline]
pub(crate) fn is_empty(&self) -> bool {
Instant::now() < self.delivery_time.load()
}
/// Returns `true` if the channel is full.
#[inline]
pub(crate) fn is_full(&self) -> bool {
!self.is_empty()
}
/// Returns the number of messages in the channel.
#[inline]
pub(crate) fn len(&self) -> usize {
if self.is_empty() {
0
} else {
1
}
}
/// Returns the capacity of the channel.
#[inline]
pub(crate) fn capacity(&self) -> Option<usize> {
Some(1)
}
}
impl SelectHandle for Channel {
#[inline]
fn try_select(&self, token: &mut Token) -> bool {
match self.try_recv() {
Ok(msg) => {
token.tick = Some(msg);
true
}
Err(TryRecvError::Disconnected) => {
token.tick = None;
true
}
Err(TryRecvError::Empty) => false,
}
}
#[inline]
fn deadline(&self) -> Option<Instant> {
Some(self.delivery_time.load())
}
#[inline]
fn register(&self, _oper: Operation, _cx: &Context) -> bool {
self.is_ready()
}
#[inline]
fn unregister(&self, _oper: Operation) {}
#[inline]
fn accept(&self, token: &mut Token, _cx: &Context) -> bool {
self.try_select(token)
}
#[inline]
fn is_ready(&self) -> bool {
!self.is_empty()
}
#[inline]
fn watch(&self, _oper: Operation, _cx: &Context) -> bool {
self.is_ready()
}
#[inline]
fn unwatch(&self, _oper: Operation) {}
}

View file

@ -0,0 +1,496 @@
//! Zero-capacity channel.
//!
//! This kind of channel is also known as *rendezvous* channel.
use std::boxed::Box;
use std::cell::UnsafeCell;
use std::marker::PhantomData;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Mutex;
use std::time::Instant;
use std::{fmt, ptr};
use crossbeam_utils::Backoff;
use crate::context::Context;
use crate::err::{RecvTimeoutError, SendTimeoutError, TryRecvError, TrySendError};
use crate::select::{Operation, SelectHandle, Selected, Token};
use crate::waker::Waker;
/// A pointer to a packet.
pub(crate) struct ZeroToken(*mut ());
impl Default for ZeroToken {
fn default() -> Self {
Self(ptr::null_mut())
}
}
impl fmt::Debug for ZeroToken {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(&(self.0 as usize), f)
}
}
/// A slot for passing one message from a sender to a receiver.
struct Packet<T> {
/// Equals `true` if the packet is allocated on the stack.
on_stack: bool,
/// Equals `true` once the packet is ready for reading or writing.
ready: AtomicBool,
/// The message.
msg: UnsafeCell<Option<T>>,
}
impl<T> Packet<T> {
/// Creates an empty packet on the stack.
fn empty_on_stack() -> Packet<T> {
Packet {
on_stack: true,
ready: AtomicBool::new(false),
msg: UnsafeCell::new(None),
}
}
/// Creates an empty packet on the heap.
fn empty_on_heap() -> Box<Packet<T>> {
Box::new(Packet {
on_stack: false,
ready: AtomicBool::new(false),
msg: UnsafeCell::new(None),
})
}
/// Creates a packet on the stack, containing a message.
fn message_on_stack(msg: T) -> Packet<T> {
Packet {
on_stack: true,
ready: AtomicBool::new(false),
msg: UnsafeCell::new(Some(msg)),
}
}
/// Waits until the packet becomes ready for reading or writing.
fn wait_ready(&self) {
let backoff = Backoff::new();
while !self.ready.load(Ordering::Acquire) {
backoff.snooze();
}
}
}
/// Inner representation of a zero-capacity channel.
struct Inner {
/// Senders waiting to pair up with a receive operation.
senders: Waker,
/// Receivers waiting to pair up with a send operation.
receivers: Waker,
/// Equals `true` when the channel is disconnected.
is_disconnected: bool,
}
/// Zero-capacity channel.
pub(crate) struct Channel<T> {
/// Inner representation of the channel.
inner: Mutex<Inner>,
/// Indicates that dropping a `Channel<T>` may drop values of type `T`.
_marker: PhantomData<T>,
}
impl<T> Channel<T> {
/// Constructs a new zero-capacity channel.
pub(crate) fn new() -> Self {
Channel {
inner: Mutex::new(Inner {
senders: Waker::new(),
receivers: Waker::new(),
is_disconnected: false,
}),
_marker: PhantomData,
}
}
/// Returns a receiver handle to the channel.
pub(crate) fn receiver(&self) -> Receiver<'_, T> {
Receiver(self)
}
/// Returns a sender handle to the channel.
pub(crate) fn sender(&self) -> Sender<'_, T> {
Sender(self)
}
/// Attempts to reserve a slot for sending a message.
fn start_send(&self, token: &mut Token) -> bool {
let mut inner = self.inner.lock().unwrap();
// If there's a waiting receiver, pair up with it.
if let Some(operation) = inner.receivers.try_select() {
token.zero.0 = operation.packet;
true
} else if inner.is_disconnected {
token.zero.0 = ptr::null_mut();
true
} else {
false
}
}
/// Writes a message into the packet.
pub(crate) unsafe fn write(&self, token: &mut Token, msg: T) -> Result<(), T> {
// If there is no packet, the channel is disconnected.
if token.zero.0.is_null() {
return Err(msg);
}
let packet = &*(token.zero.0 as *const Packet<T>);
packet.msg.get().write(Some(msg));
packet.ready.store(true, Ordering::Release);
Ok(())
}
/// Attempts to pair up with a sender.
fn start_recv(&self, token: &mut Token) -> bool {
let mut inner = self.inner.lock().unwrap();
// If there's a waiting sender, pair up with it.
if let Some(operation) = inner.senders.try_select() {
token.zero.0 = operation.packet;
true
} else if inner.is_disconnected {
token.zero.0 = ptr::null_mut();
true
} else {
false
}
}
/// Reads a message from the packet.
pub(crate) unsafe fn read(&self, token: &mut Token) -> Result<T, ()> {
// If there is no packet, the channel is disconnected.
if token.zero.0.is_null() {
return Err(());
}
let packet = &*(token.zero.0 as *const Packet<T>);
if packet.on_stack {
// The message has been in the packet from the beginning, so there is no need to wait
// for it. However, after reading the message, we need to set `ready` to `true` in
// order to signal that the packet can be destroyed.
let msg = packet.msg.get().replace(None).unwrap();
packet.ready.store(true, Ordering::Release);
Ok(msg)
} else {
// Wait until the message becomes available, then read it and destroy the
// heap-allocated packet.
packet.wait_ready();
let msg = packet.msg.get().replace(None).unwrap();
drop(Box::from_raw(token.zero.0.cast::<Packet<T>>()));
Ok(msg)
}
}
/// Attempts to send a message into the channel.
pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
let token = &mut Token::default();
let mut inner = self.inner.lock().unwrap();
// If there's a waiting receiver, pair up with it.
if let Some(operation) = inner.receivers.try_select() {
token.zero.0 = operation.packet;
drop(inner);
unsafe {
self.write(token, msg).ok().unwrap();
}
Ok(())
} else if inner.is_disconnected {
Err(TrySendError::Disconnected(msg))
} else {
Err(TrySendError::Full(msg))
}
}
/// Sends a message into the channel.
pub(crate) fn send(
&self,
msg: T,
deadline: Option<Instant>,
) -> Result<(), SendTimeoutError<T>> {
let token = &mut Token::default();
let mut inner = self.inner.lock().unwrap();
// If there's a waiting receiver, pair up with it.
if let Some(operation) = inner.receivers.try_select() {
token.zero.0 = operation.packet;
drop(inner);
unsafe {
self.write(token, msg).ok().unwrap();
}
return Ok(());
}
if inner.is_disconnected {
return Err(SendTimeoutError::Disconnected(msg));
}
Context::with(|cx| {
// Prepare for blocking until a receiver wakes us up.
let oper = Operation::hook(token);
let mut packet = Packet::<T>::message_on_stack(msg);
inner
.senders
.register_with_packet(oper, &mut packet as *mut Packet<T> as *mut (), cx);
inner.receivers.notify();
drop(inner);
// Block the current thread.
let sel = cx.wait_until(deadline);
match sel {
Selected::Waiting => unreachable!(),
Selected::Aborted => {
self.inner.lock().unwrap().senders.unregister(oper).unwrap();
let msg = unsafe { packet.msg.get().replace(None).unwrap() };
Err(SendTimeoutError::Timeout(msg))
}
Selected::Disconnected => {
self.inner.lock().unwrap().senders.unregister(oper).unwrap();
let msg = unsafe { packet.msg.get().replace(None).unwrap() };
Err(SendTimeoutError::Disconnected(msg))
}
Selected::Operation(_) => {
// Wait until the message is read, then drop the packet.
packet.wait_ready();
Ok(())
}
}
})
}
/// Attempts to receive a message without blocking.
pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
let token = &mut Token::default();
let mut inner = self.inner.lock().unwrap();
// If there's a waiting sender, pair up with it.
if let Some(operation) = inner.senders.try_select() {
token.zero.0 = operation.packet;
drop(inner);
unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
} else if inner.is_disconnected {
Err(TryRecvError::Disconnected)
} else {
Err(TryRecvError::Empty)
}
}
/// Receives a message from the channel.
pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
let token = &mut Token::default();
let mut inner = self.inner.lock().unwrap();
// If there's a waiting sender, pair up with it.
if let Some(operation) = inner.senders.try_select() {
token.zero.0 = operation.packet;
drop(inner);
unsafe {
return self.read(token).map_err(|_| RecvTimeoutError::Disconnected);
}
}
if inner.is_disconnected {
return Err(RecvTimeoutError::Disconnected);
}
Context::with(|cx| {
// Prepare for blocking until a sender wakes us up.
let oper = Operation::hook(token);
let mut packet = Packet::<T>::empty_on_stack();
inner.receivers.register_with_packet(
oper,
&mut packet as *mut Packet<T> as *mut (),
cx,
);
inner.senders.notify();
drop(inner);
// Block the current thread.
let sel = cx.wait_until(deadline);
match sel {
Selected::Waiting => unreachable!(),
Selected::Aborted => {
self.inner
.lock()
.unwrap()
.receivers
.unregister(oper)
.unwrap();
Err(RecvTimeoutError::Timeout)
}
Selected::Disconnected => {
self.inner
.lock()
.unwrap()
.receivers
.unregister(oper)
.unwrap();
Err(RecvTimeoutError::Disconnected)
}
Selected::Operation(_) => {
// Wait until the message is provided, then read it.
packet.wait_ready();
unsafe { Ok(packet.msg.get().replace(None).unwrap()) }
}
}
})
}
/// Disconnects the channel and wakes up all blocked senders and receivers.
///
/// Returns `true` if this call disconnected the channel.
pub(crate) fn disconnect(&self) -> bool {
let mut inner = self.inner.lock().unwrap();
if !inner.is_disconnected {
inner.is_disconnected = true;
inner.senders.disconnect();
inner.receivers.disconnect();
true
} else {
false
}
}
/// Returns the current number of messages inside the channel.
pub(crate) fn len(&self) -> usize {
0
}
/// Returns the capacity of the channel.
pub(crate) fn capacity(&self) -> Option<usize> {
Some(0)
}
/// Returns `true` if the channel is empty.
pub(crate) fn is_empty(&self) -> bool {
true
}
/// Returns `true` if the channel is full.
pub(crate) fn is_full(&self) -> bool {
true
}
}
/// Receiver handle to a channel.
pub(crate) struct Receiver<'a, T>(&'a Channel<T>);
/// Sender handle to a channel.
pub(crate) struct Sender<'a, T>(&'a Channel<T>);
impl<T> SelectHandle for Receiver<'_, T> {
fn try_select(&self, token: &mut Token) -> bool {
self.0.start_recv(token)
}
fn deadline(&self) -> Option<Instant> {
None
}
fn register(&self, oper: Operation, cx: &Context) -> bool {
let packet = Box::into_raw(Packet::<T>::empty_on_heap());
let mut inner = self.0.inner.lock().unwrap();
inner
.receivers
.register_with_packet(oper, packet.cast::<()>(), cx);
inner.senders.notify();
inner.senders.can_select() || inner.is_disconnected
}
fn unregister(&self, oper: Operation) {
if let Some(operation) = self.0.inner.lock().unwrap().receivers.unregister(oper) {
unsafe {
drop(Box::from_raw(operation.packet.cast::<Packet<T>>()));
}
}
}
fn accept(&self, token: &mut Token, cx: &Context) -> bool {
token.zero.0 = cx.wait_packet();
true
}
fn is_ready(&self) -> bool {
let inner = self.0.inner.lock().unwrap();
inner.senders.can_select() || inner.is_disconnected
}
fn watch(&self, oper: Operation, cx: &Context) -> bool {
let mut inner = self.0.inner.lock().unwrap();
inner.receivers.watch(oper, cx);
inner.senders.can_select() || inner.is_disconnected
}
fn unwatch(&self, oper: Operation) {
let mut inner = self.0.inner.lock().unwrap();
inner.receivers.unwatch(oper);
}
}
impl<T> SelectHandle for Sender<'_, T> {
fn try_select(&self, token: &mut Token) -> bool {
self.0.start_send(token)
}
fn deadline(&self) -> Option<Instant> {
None
}
fn register(&self, oper: Operation, cx: &Context) -> bool {
let packet = Box::into_raw(Packet::<T>::empty_on_heap());
let mut inner = self.0.inner.lock().unwrap();
inner
.senders
.register_with_packet(oper, packet.cast::<()>(), cx);
inner.receivers.notify();
inner.receivers.can_select() || inner.is_disconnected
}
fn unregister(&self, oper: Operation) {
if let Some(operation) = self.0.inner.lock().unwrap().senders.unregister(oper) {
unsafe {
drop(Box::from_raw(operation.packet.cast::<Packet<T>>()));
}
}
}
fn accept(&self, token: &mut Token, cx: &Context) -> bool {
token.zero.0 = cx.wait_packet();
true
}
fn is_ready(&self) -> bool {
let inner = self.0.inner.lock().unwrap();
inner.receivers.can_select() || inner.is_disconnected
}
fn watch(&self, oper: Operation, cx: &Context) -> bool {
let mut inner = self.0.inner.lock().unwrap();
inner.senders.watch(oper, cx);
inner.receivers.can_select() || inner.is_disconnected
}
fn unwatch(&self, oper: Operation) {
let mut inner = self.0.inner.lock().unwrap();
inner.senders.unwatch(oper);
}
}