Promises, promises

This commit is contained in:
John Doty 2023-06-29 10:10:40 -07:00
parent 17fdee51e6
commit a2dafeea12
6 changed files with 303 additions and 102 deletions

View file

@ -279,23 +279,8 @@ impl ContextRef {
}
/// Construct a new promise.
pub fn new_promise(&self) -> Result<Promise> {
unsafe {
let mut resolving_funcs: [sys::JSValue; 2] =
[sys::JS_MakeUndefined(), sys::JS_MakeUndefined()];
let val =
sys::JS_NewPromiseCapability(self.ctx, &mut resolving_funcs as *mut sys::JSValue);
if sys::JS_ValueGetTag(val) == sys::JS_TAG_EXCEPTION {
Err(self.exception_error())
} else {
Ok(Promise::new(
Value::from_raw(val, self),
Value::from_raw(resolving_funcs[0], self),
Value::from_raw(resolving_funcs[1], self),
))
}
}
pub fn new_promise(&self) -> Result<(Value, Promise)> {
self.get_runtime().new_promise(self)
}
/// Construct a new exception object, suitable for throwing.

View file

@ -1,5 +1,5 @@
use oden_js_sys as sys;
use std::ffi::{CString, NulError};
use std::ffi::NulError;
use thiserror::Error;
mod atom;

View file

@ -1,34 +1,69 @@
use crate::{ContextRef, Value, ValueRef};
use crate::{ContextRef, ValueResult};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::mpsc::Sender;
#[derive(Debug, Clone)]
#[derive(Eq, PartialEq, Hash, Debug, Clone, Copy)]
pub(crate) struct PromiseHandle(u64);
impl PromiseHandle {
pub fn new() -> Self {
static NEXT_ID: AtomicU64 = AtomicU64::new(0);
PromiseHandle(NEXT_ID.fetch_add(1, Ordering::SeqCst))
}
}
pub type PromiseResult = Box<dyn Fn(&ContextRef) -> ValueResult + Send + 'static>;
pub(crate) enum PromiseEvent {
Resolved(PromiseResult),
Rejected(PromiseResult),
}
/// A Promise is a small, thread-safe marker which represents a pending
/// promise inside the JS runtime. This is how you complete the promise: you
/// must either call `resolve` or `reject` before you drop the promise.
#[derive(Debug)]
pub struct Promise {
pub object: Value,
pub resolve_fn: Value,
pub reject_fn: Value,
complete: bool,
handle: PromiseHandle,
channel: Sender<(PromiseHandle, PromiseEvent)>,
}
impl Promise {
pub(crate) fn new(object: Value, resolve_fn: Value, reject_fn: Value) -> Self {
pub(crate) fn new(
handle: PromiseHandle,
channel: Sender<(PromiseHandle, PromiseEvent)>,
) -> Self {
Promise {
object,
resolve_fn,
reject_fn,
complete: false,
handle,
channel,
}
}
pub fn dup(&self, ctx: &ContextRef) -> Self {
Promise {
object: self.object.dup(ctx),
resolve_fn: self.resolve_fn.dup(ctx),
reject_fn: self.reject_fn.dup(ctx),
}
pub fn resolve<T>(mut self, value: T)
where
T: Fn(&ContextRef) -> ValueResult + Send + 'static,
{
let _ = self
.channel
.send((self.handle, PromiseEvent::Resolved(Box::new(value))));
self.complete = true;
}
pub fn resolve(self, context: &ContextRef, value: &ValueRef) {
let _ = self.resolve_fn.call(context, &[value]);
}
pub fn reject(self, context: &ContextRef, value: &ValueRef) {
let _ = self.reject_fn.call(context, &[value]);
pub fn reject<T>(mut self, value: T)
where
T: Fn(&ContextRef) -> ValueResult + Send + 'static,
{
let _ = self
.channel
.send((self.handle, PromiseEvent::Rejected(Box::new(value))));
self.complete = true;
}
}
impl Drop for Promise {
fn drop(&mut self) {
assert!(self.complete);
}
}

View file

@ -1,16 +1,76 @@
use crate::module::loader::{load_module, DefaultModuleLoader, ModuleLoader};
use crate::ContextRef;
use crate::{
module::loader::{load_module, DefaultModuleLoader, ModuleLoader},
promise::{PromiseEvent, PromiseHandle},
ContextRef, Promise, Result, Value,
};
use oden_js_sys as sys;
use std::cell::RefCell;
use std::cell::{RefCell, RefMut};
use std::collections::HashMap;
use std::ffi::CStr;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::Arc;
// NOTE: This is DEEPLY unsafe, but the runtime lifetime dominates the
// lifetime of all of this stuff, so we really *can* hold on to them.
struct PromiseEntry {
context: *mut sys::JSContext,
resolve: sys::JSValue,
reject: sys::JSValue,
}
impl PromiseEntry {
// NOTE: Takes ownership of resolve and reject.
fn new(context: *mut sys::JSContext, resolve: sys::JSValue, reject: sys::JSValue) -> Self {
PromiseEntry {
context: unsafe { sys::JS_DupContext(context) },
resolve,
reject,
}
}
}
impl Drop for PromiseEntry {
fn drop(&mut self) {
unsafe {
sys::JS_FreeValue(self.context, self.resolve);
sys::JS_FreeValue(self.context, self.reject);
sys::JS_FreeContext(self.context);
}
}
}
struct PrivateState {
refs: u64,
loader: Arc<Box<dyn ModuleLoader>>,
promise_send: Sender<(PromiseHandle, PromiseEvent)>,
promise_recv: Receiver<(PromiseHandle, PromiseEvent)>,
promise_table: HashMap<PromiseHandle, PromiseEntry>, // !
}
impl PrivateState {
pub fn new<T>(loader: T) -> Box<RefCell<Self>>
where
T: ModuleLoader + 'static,
{
let (send, recv) = channel();
Box::new(RefCell::new(PrivateState {
refs: 1,
loader: Arc::new(Box::new(loader)),
promise_send: send,
promise_recv: recv,
promise_table: HashMap::new(),
}))
}
pub unsafe fn from_rt_mut(rt: *mut sys::JSRuntime) -> RefMut<'static, PrivateState> {
unsafe {
let ptr = sys::JS_GetRuntimeOpaque(rt) as *const RefCell<PrivateState>;
ptr.as_ref()
.expect("We already know this runtime is one of ours!")
.borrow_mut()
}
}
unsafe extern "C" fn module_loader(
ctx: *mut sys::JSContext,
path: *const i8,
@ -45,10 +105,7 @@ impl Runtime {
}
pub fn with_loader<TLoader: ModuleLoader + 'static>(loader: TLoader) -> Runtime {
let state = Box::new(RefCell::new(PrivateState {
refs: 1,
loader: Arc::new(Box::new(loader)),
}));
let state = PrivateState::new(loader);
let rt = unsafe {
let rt = sys::JS_NewRuntime();
let state = Box::into_raw(state) as *mut _;
@ -60,12 +117,7 @@ impl Runtime {
}
pub(crate) fn from_raw(rt: *mut sys::JSRuntime) -> Self {
let mut state = unsafe {
let ptr = sys::JS_GetRuntimeOpaque(rt) as *const RefCell<PrivateState>;
ptr.as_ref()
.expect("We already know this runtime is one of ours!")
.borrow_mut()
};
let mut state = unsafe { PrivateState::from_rt_mut(rt) };
state.refs += 1;
Runtime { rt }
}
@ -94,6 +146,83 @@ impl Runtime {
sys::JS_RunGC(self.rt);
}
}
/// Construct a new promise.
pub fn new_promise(&self, context: &ContextRef) -> Result<(Value, Promise)> {
unsafe {
let mut state = PrivateState::from_rt_mut(self.rt);
let mut resolving_funcs: [sys::JSValue; 2] =
[sys::JS_MakeUndefined(), sys::JS_MakeUndefined()];
let val = sys::JS_NewPromiseCapability(
context.ctx,
&mut resolving_funcs as *mut sys::JSValue,
);
if sys::JS_ValueGetTag(val) == sys::JS_TAG_EXCEPTION {
return Err(context.exception_error());
}
let handle = PromiseHandle::new();
state.promise_table.insert(
handle,
PromiseEntry::new(context.ctx, resolving_funcs[0], resolving_funcs[1]),
);
let promise = Promise::new(handle, state.promise_send.clone());
drop(state); // Need to drop state so that the value constructor can addref.
Ok((Value::from_raw(val, context), promise))
}
}
/// Process all pending async jobs. This includes all promise resolutions.
fn process_promise_completions(&self) {
// TODO: This could be more robust if we buffered all the completed
// promise entries and then dropped the borrow of the state, so
// that we never invoked user code while borrowing our private
// state mutably.
let mut state = unsafe { PrivateState::from_rt_mut(self.rt) };
while let Ok((handle, evt)) = state.promise_recv.try_recv() {
if let Some(entry) = state.promise_table.remove(&handle) {
let ctx = ContextRef::from_raw(entry.context);
let (callback, value) = match evt {
PromiseEvent::Resolved(v) => (entry.resolve, v),
PromiseEvent::Rejected(v) => (entry.reject, v),
};
// Convert the result into a JS value, which we can only
// really do while we are on this thread.
let value = value(&ctx).expect("Should be able to convert promise result to value");
// Call the particular callback and make sure it doesn't throw.
ctx.check_exception(unsafe {
let mut args = [value.val];
sys::JS_Call(
entry.context,
callback,
sys::JS_MakeUndefined(),
1,
args.as_mut_ptr(),
)
})
.expect("Exception thrown by promise callback");
}
}
}
/// Process all pending async jobs. This includes all promise resolutions.
pub fn process_all_jobs(&self) -> Result<()> {
self.process_promise_completions();
loop {
let mut ctx1: *mut sys::JSContext = std::ptr::null_mut();
let err = unsafe { sys::JS_ExecutePendingJob(self.rt, &mut ctx1) };
if err == 0 {
break;
} else if err < 0 {
return Err(ContextRef::from_raw(ctx1).exception_error());
}
}
Ok(())
}
}
impl Clone for Runtime {