diff --git a/oden-js/src/context.rs b/oden-js/src/context.rs index b11b82df..d939dd08 100644 --- a/oden-js/src/context.rs +++ b/oden-js/src/context.rs @@ -279,23 +279,8 @@ impl ContextRef { } /// Construct a new promise. - pub fn new_promise(&self) -> Result { - unsafe { - let mut resolving_funcs: [sys::JSValue; 2] = - [sys::JS_MakeUndefined(), sys::JS_MakeUndefined()]; - let val = - sys::JS_NewPromiseCapability(self.ctx, &mut resolving_funcs as *mut sys::JSValue); - - if sys::JS_ValueGetTag(val) == sys::JS_TAG_EXCEPTION { - Err(self.exception_error()) - } else { - Ok(Promise::new( - Value::from_raw(val, self), - Value::from_raw(resolving_funcs[0], self), - Value::from_raw(resolving_funcs[1], self), - )) - } - } + pub fn new_promise(&self) -> Result<(Value, Promise)> { + self.get_runtime().new_promise(self) } /// Construct a new exception object, suitable for throwing. diff --git a/oden-js/src/lib.rs b/oden-js/src/lib.rs index 1784abb6..6d6397c5 100644 --- a/oden-js/src/lib.rs +++ b/oden-js/src/lib.rs @@ -1,5 +1,5 @@ use oden_js_sys as sys; -use std::ffi::{CString, NulError}; +use std::ffi::NulError; use thiserror::Error; mod atom; diff --git a/oden-js/src/promise.rs b/oden-js/src/promise.rs index 152085b2..f071e6b8 100644 --- a/oden-js/src/promise.rs +++ b/oden-js/src/promise.rs @@ -1,34 +1,69 @@ -use crate::{ContextRef, Value, ValueRef}; +use crate::{ContextRef, ValueResult}; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::mpsc::Sender; -#[derive(Debug, Clone)] +#[derive(Eq, PartialEq, Hash, Debug, Clone, Copy)] +pub(crate) struct PromiseHandle(u64); + +impl PromiseHandle { + pub fn new() -> Self { + static NEXT_ID: AtomicU64 = AtomicU64::new(0); + PromiseHandle(NEXT_ID.fetch_add(1, Ordering::SeqCst)) + } +} + +pub type PromiseResult = Box ValueResult + Send + 'static>; + +pub(crate) enum PromiseEvent { + Resolved(PromiseResult), + Rejected(PromiseResult), +} + +/// A Promise is a small, thread-safe marker which represents a pending +/// promise inside the JS runtime. This is how you complete the promise: you +/// must either call `resolve` or `reject` before you drop the promise. +#[derive(Debug)] pub struct Promise { - pub object: Value, - pub resolve_fn: Value, - pub reject_fn: Value, + complete: bool, + handle: PromiseHandle, + channel: Sender<(PromiseHandle, PromiseEvent)>, } impl Promise { - pub(crate) fn new(object: Value, resolve_fn: Value, reject_fn: Value) -> Self { + pub(crate) fn new( + handle: PromiseHandle, + channel: Sender<(PromiseHandle, PromiseEvent)>, + ) -> Self { Promise { - object, - resolve_fn, - reject_fn, + complete: false, + handle, + channel, } } - pub fn dup(&self, ctx: &ContextRef) -> Self { - Promise { - object: self.object.dup(ctx), - resolve_fn: self.resolve_fn.dup(ctx), - reject_fn: self.reject_fn.dup(ctx), - } + pub fn resolve(mut self, value: T) + where + T: Fn(&ContextRef) -> ValueResult + Send + 'static, + { + let _ = self + .channel + .send((self.handle, PromiseEvent::Resolved(Box::new(value)))); + self.complete = true; } - pub fn resolve(self, context: &ContextRef, value: &ValueRef) { - let _ = self.resolve_fn.call(context, &[value]); - } - - pub fn reject(self, context: &ContextRef, value: &ValueRef) { - let _ = self.reject_fn.call(context, &[value]); + pub fn reject(mut self, value: T) + where + T: Fn(&ContextRef) -> ValueResult + Send + 'static, + { + let _ = self + .channel + .send((self.handle, PromiseEvent::Rejected(Box::new(value)))); + self.complete = true; + } +} + +impl Drop for Promise { + fn drop(&mut self) { + assert!(self.complete); } } diff --git a/oden-js/src/runtime.rs b/oden-js/src/runtime.rs index 0e92e095..1719f3a0 100644 --- a/oden-js/src/runtime.rs +++ b/oden-js/src/runtime.rs @@ -1,16 +1,76 @@ -use crate::module::loader::{load_module, DefaultModuleLoader, ModuleLoader}; -use crate::ContextRef; +use crate::{ + module::loader::{load_module, DefaultModuleLoader, ModuleLoader}, + promise::{PromiseEvent, PromiseHandle}, + ContextRef, Promise, Result, Value, +}; use oden_js_sys as sys; -use std::cell::RefCell; +use std::cell::{RefCell, RefMut}; +use std::collections::HashMap; use std::ffi::CStr; +use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::Arc; +// NOTE: This is DEEPLY unsafe, but the runtime lifetime dominates the +// lifetime of all of this stuff, so we really *can* hold on to them. +struct PromiseEntry { + context: *mut sys::JSContext, + resolve: sys::JSValue, + reject: sys::JSValue, +} + +impl PromiseEntry { + // NOTE: Takes ownership of resolve and reject. + fn new(context: *mut sys::JSContext, resolve: sys::JSValue, reject: sys::JSValue) -> Self { + PromiseEntry { + context: unsafe { sys::JS_DupContext(context) }, + resolve, + reject, + } + } +} + +impl Drop for PromiseEntry { + fn drop(&mut self) { + unsafe { + sys::JS_FreeValue(self.context, self.resolve); + sys::JS_FreeValue(self.context, self.reject); + sys::JS_FreeContext(self.context); + } + } +} + struct PrivateState { refs: u64, loader: Arc>, + promise_send: Sender<(PromiseHandle, PromiseEvent)>, + promise_recv: Receiver<(PromiseHandle, PromiseEvent)>, + promise_table: HashMap, // ! } impl PrivateState { + pub fn new(loader: T) -> Box> + where + T: ModuleLoader + 'static, + { + let (send, recv) = channel(); + Box::new(RefCell::new(PrivateState { + refs: 1, + loader: Arc::new(Box::new(loader)), + promise_send: send, + promise_recv: recv, + promise_table: HashMap::new(), + })) + } + + pub unsafe fn from_rt_mut(rt: *mut sys::JSRuntime) -> RefMut<'static, PrivateState> { + unsafe { + let ptr = sys::JS_GetRuntimeOpaque(rt) as *const RefCell; + ptr.as_ref() + .expect("We already know this runtime is one of ours!") + .borrow_mut() + } + } + unsafe extern "C" fn module_loader( ctx: *mut sys::JSContext, path: *const i8, @@ -45,10 +105,7 @@ impl Runtime { } pub fn with_loader(loader: TLoader) -> Runtime { - let state = Box::new(RefCell::new(PrivateState { - refs: 1, - loader: Arc::new(Box::new(loader)), - })); + let state = PrivateState::new(loader); let rt = unsafe { let rt = sys::JS_NewRuntime(); let state = Box::into_raw(state) as *mut _; @@ -60,12 +117,7 @@ impl Runtime { } pub(crate) fn from_raw(rt: *mut sys::JSRuntime) -> Self { - let mut state = unsafe { - let ptr = sys::JS_GetRuntimeOpaque(rt) as *const RefCell; - ptr.as_ref() - .expect("We already know this runtime is one of ours!") - .borrow_mut() - }; + let mut state = unsafe { PrivateState::from_rt_mut(rt) }; state.refs += 1; Runtime { rt } } @@ -94,6 +146,83 @@ impl Runtime { sys::JS_RunGC(self.rt); } } + + /// Construct a new promise. + pub fn new_promise(&self, context: &ContextRef) -> Result<(Value, Promise)> { + unsafe { + let mut state = PrivateState::from_rt_mut(self.rt); + let mut resolving_funcs: [sys::JSValue; 2] = + [sys::JS_MakeUndefined(), sys::JS_MakeUndefined()]; + let val = sys::JS_NewPromiseCapability( + context.ctx, + &mut resolving_funcs as *mut sys::JSValue, + ); + + if sys::JS_ValueGetTag(val) == sys::JS_TAG_EXCEPTION { + return Err(context.exception_error()); + } + let handle = PromiseHandle::new(); + state.promise_table.insert( + handle, + PromiseEntry::new(context.ctx, resolving_funcs[0], resolving_funcs[1]), + ); + + let promise = Promise::new(handle, state.promise_send.clone()); + drop(state); // Need to drop state so that the value constructor can addref. + Ok((Value::from_raw(val, context), promise)) + } + } + + /// Process all pending async jobs. This includes all promise resolutions. + fn process_promise_completions(&self) { + // TODO: This could be more robust if we buffered all the completed + // promise entries and then dropped the borrow of the state, so + // that we never invoked user code while borrowing our private + // state mutably. + let mut state = unsafe { PrivateState::from_rt_mut(self.rt) }; + while let Ok((handle, evt)) = state.promise_recv.try_recv() { + if let Some(entry) = state.promise_table.remove(&handle) { + let ctx = ContextRef::from_raw(entry.context); + let (callback, value) = match evt { + PromiseEvent::Resolved(v) => (entry.resolve, v), + PromiseEvent::Rejected(v) => (entry.reject, v), + }; + + // Convert the result into a JS value, which we can only + // really do while we are on this thread. + let value = value(&ctx).expect("Should be able to convert promise result to value"); + + // Call the particular callback and make sure it doesn't throw. + ctx.check_exception(unsafe { + let mut args = [value.val]; + sys::JS_Call( + entry.context, + callback, + sys::JS_MakeUndefined(), + 1, + args.as_mut_ptr(), + ) + }) + .expect("Exception thrown by promise callback"); + } + } + } + + /// Process all pending async jobs. This includes all promise resolutions. + pub fn process_all_jobs(&self) -> Result<()> { + self.process_promise_completions(); + + loop { + let mut ctx1: *mut sys::JSContext = std::ptr::null_mut(); + let err = unsafe { sys::JS_ExecutePendingJob(self.rt, &mut ctx1) }; + if err == 0 { + break; + } else if err < 0 { + return Err(ContextRef::from_raw(ctx1).exception_error()); + } + } + Ok(()) + } } impl Clone for Runtime { diff --git a/src/script.rs b/src/script.rs index 496b6a5d..fe03f228 100644 --- a/src/script.rs +++ b/src/script.rs @@ -1,11 +1,9 @@ use oden_js::{ module::loader::{ModuleLoader, ModuleSource}, - Context, ContextRef, Promise, Result, Runtime, Value, ValueResult, + Context, ContextRef, Result, Runtime, Value, }; -use std::collections::HashMap; use std::ffi::OsStr; use std::path::Path; -use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::mpsc::{channel, Receiver}; pub mod graphics; @@ -15,6 +13,7 @@ mod typescript; use typescript::transpile_to_javascript; pub mod assets; +pub mod io; struct Loader {} @@ -39,21 +38,6 @@ impl ModuleLoader for Loader { } } -#[derive(Eq, PartialEq, Hash, Debug, Clone, Copy)] -pub struct PromiseHandle(u64); - -impl PromiseHandle { - pub fn new() -> Self { - static NEXT_ID: AtomicU64 = AtomicU64::new(0); - PromiseHandle(NEXT_ID.fetch_add(1, Ordering::SeqCst)) - } -} - -pub enum ScriptEvent { - AddPromise(PromiseHandle, Promise), - CompletePromise(PromiseHandle, Box ValueResult>), -} - pub struct ScriptContext { context: Context, init: Value, @@ -63,9 +47,6 @@ pub struct ScriptContext { gfx: graphics::GraphicsAPI, _assets: assets::AssetsAPI, gfx_receive: Receiver, - - script_receive: Receiver, - promises: HashMap, } impl ScriptContext { @@ -78,12 +59,12 @@ impl ScriptContext { context.add_intrinsic_operators(); let (gfx_send, gfx_receive) = channel(); - let (script_send, script_receive) = channel(); let gfx = graphics::GraphicsAPI::define(&context, gfx_send.clone()) .expect("Graphics module should load without error"); let assets = assets::AssetsAPI::define(&context, gfx_send.clone()) .expect("Assets module should load without error"); + let _io = io::IoAPI::define(&context).expect("IO module should load without error"); let module = context .import_module("./src/main.ts", "") @@ -110,9 +91,6 @@ impl ScriptContext { gfx_receive, _assets: assets, - - script_receive, - promises: HashMap::new(), } } @@ -127,31 +105,8 @@ impl ScriptContext { } pub fn update(&mut self) { - // Handle any promises that have completed before calling update. - while let Ok(event) = self.script_receive.try_recv() { - match event { - // TODO: Capture debugging information. - ScriptEvent::AddPromise(handle, promise) => { - self.promises.insert(handle, promise); - } - ScriptEvent::CompletePromise(handle, value_producer) => { - if let Some(promise) = self.promises.remove(&handle) { - let result = value_producer(&self.context); - match result { - Ok(v) => { - promise.resolve(&self.context, &v); - } - Err(e) => { - let error = e.to_js_error(&self.context); - promise.reject(&self.context, &error); - } - } - } - } - } - } - - // Tell the runtime to process all pending "jobs". + // Tell the runtime to process all pending "jobs". This includes + // promise completions. self.context .process_all_jobs() .expect("Error processing async jobs"); diff --git a/src/script/io.rs b/src/script/io.rs new file mode 100644 index 00000000..3d9aaa9a --- /dev/null +++ b/src/script/io.rs @@ -0,0 +1,97 @@ +use oden_js::{module::native::NativeModuleBuilder, ContextRef, ValueResult}; +use std::sync::mpsc::{channel, Receiver, Sender}; +use std::sync::{Arc, Mutex}; +use std::thread; + +type Job = Box; + +struct ThreadPoolWorker { + _thread: thread::JoinHandle<()>, +} + +impl ThreadPoolWorker { + fn new(queue: Arc>>) -> Self { + let thread = thread::spawn(move || loop { + let r = { + let locked = queue.lock(); + locked.expect("Should not be orphaning the lock").recv() + }; + if let Ok(item) = r { + item(); + } else { + break; + } + }); + ThreadPoolWorker { _thread: thread } + } +} + +struct ThreadPool { + _workers: Vec, + sender: Sender, +} + +impl ThreadPool { + fn new(size: usize) -> Self { + let (sender, receiver) = channel(); + + let receiver = Arc::new(Mutex::new(receiver)); + + let mut workers = vec![]; + for _ in 0..size { + workers.push(ThreadPoolWorker::new(receiver.clone())); + } + + ThreadPool { + _workers: workers, + sender, + } + } + + fn execute(&self, job: Job) { + let _ = self.sender.send(job); + } +} + +struct IoImpl { + thread_pool: ThreadPool, +} + +impl IoImpl { + fn new() -> Self { + IoImpl { + thread_pool: ThreadPool::new(4), + } + } + + fn load(&self, context: &ContextRef, path: &str) -> ValueResult { + let (value, promise) = context.new_promise()?; + + let path = path.to_string(); + self.thread_pool.execute(Box::new(move || { + // TODO: Actually read the path. + let path = path; + promise.resolve(move |ctx: &ContextRef| ctx.new_string(&path)); + })); + + Ok(value) + } +} + +pub struct IoAPI {} + +impl IoAPI { + pub fn define(ctx: &ContextRef) -> oden_js::Result { + let io = Arc::new(IoImpl::new()); + let mut builder = NativeModuleBuilder::new(ctx); + { + let io = io.clone(); + builder.export( + "load", + ctx.new_fn(move |ctx: &ContextRef, p: String| io.load(ctx, &p))?, + )?; + } + builder.build("io-core")?; + Ok(IoAPI {}) + } +}