diff --git a/Cargo.lock b/Cargo.lock index 3d3945b3f..39c394771 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4087,6 +4087,15 @@ version = "1.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1261fe7e33c73b354eab43b1273a57c8f967d0391e80353e51f764ac02cf6775" +[[package]] +name = "once_map" +version = "0.0.1" +dependencies = [ + "dashmap", + "futures", + "tokio", +] + [[package]] name = "oorandom" version = "11.1.3" diff --git a/Cargo.toml b/Cargo.toml index 59b223104..b5a77bef3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ members = [ "plugins/cairo-lang-macro-attributes", "plugins/cairo-lang-macro-stable", "utils/create-output-dir", + "utils/once-map", "utils/scarb-proc-macro-server-types", "utils/scarb-build-metadata", "utils/scarb-stable-hash", @@ -65,6 +66,7 @@ clap = { version = "4", features = ["derive", "env", "string"] } console = "0.15" convert_case = "0.6.0" darling = "0.20" +dashmap = "6" data-encoding = "2" deno_task_shell = ">=0.13" derive_builder = ">=0.12" @@ -75,7 +77,7 @@ expect-test = "1.5" flate2 = { version = "1.0.34", default-features = false, features = ["zlib"] } fs4 = { version = "0.7", features = ["tokio"] } fs_extra = "1" -futures = { version = "0.3", default-features = false, features = ["std", "async-await"] } +futures = { version = "0.3", default-features = false, features = ["std", "async-await", "executor"] } gix = ">=0.55" gix-path = "0.10" glob = "0.3" diff --git a/utils/once-map/Cargo.toml b/utils/once-map/Cargo.toml new file mode 100644 index 000000000..4b29bf48a --- /dev/null +++ b/utils/once-map/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "once_map" +version = "0.0.1" +publish = false +edition.workspace = true +homepage.workspace = true +repository.workspace = true +authors.workspace = true +license.workspace = true + +[dependencies] +dashmap.workspace = true +tokio.workspace = true +futures.workspace = true diff --git a/utils/once-map/src/lib.rs b/utils/once-map/src/lib.rs new file mode 100644 index 000000000..7f5f35b33 --- /dev/null +++ b/utils/once-map/src/lib.rs @@ -0,0 +1,134 @@ +// This file has been copied verbatim from `uv` repository. +// Original code location: https://github.com/astral-sh/uv/blob/main/crates/uv-once-map/src/lib.rs + +use std::borrow::Borrow; +use std::hash::{BuildHasher, Hash, RandomState}; +use std::pin::pin; +use std::sync::Arc; + +use dashmap::DashMap; +use tokio::sync::Notify; + +/// Run tasks only once and store the results in a parallel hash map. +/// +/// We often have jobs `Fn(K) -> V` that we only want to run once and memoize, e.g. network +/// requests for metadata. When multiple tasks start the same query in parallel, e.g. through source +/// dist builds, we want to wait until the other task is done and get a reference to the same +/// result. +/// +/// Note that this always clones the value out of the underlying map. Because +/// of this, it's common to wrap the `V` in an `Arc` to make cloning cheap. +pub struct OnceMap { + pub items: DashMap, H>, +} + +impl OnceMap { + /// Register that you want to start a job. + /// + /// If this method returns `true`, you need to start a job and call [`OnceMap::done`] eventually + /// or other tasks will hang. If it returns `false`, this job is already in progress and you + /// can [`OnceMap::wait`] for the result. + pub fn register(&self, key: K) -> bool { + let entry = self.items.entry(key); + match entry { + dashmap::mapref::entry::Entry::Occupied(_) => false, + dashmap::mapref::entry::Entry::Vacant(entry) => { + entry.insert(Value::Waiting(Arc::new(Notify::new()))); + true + } + } + } + + /// Submit the result of a job you registered. + pub fn done(&self, key: K, value: V) { + if let Some(Value::Waiting(notify)) = self.items.insert(key, Value::Filled(value)) { + notify.notify_waiters(); + } + } + + /// Wait for the result of a job that is running. + /// + /// Will hang if [`OnceMap::done`] isn't called for this key. + pub async fn wait(&self, key: &K) -> Option { + let notify = { + let entry = self.items.get(key)?; + match entry.value() { + Value::Filled(value) => return Some(value.clone()), + Value::Waiting(notify) => notify.clone(), + } + }; + + // Register the waiter for calls to `notify_waiters`. + let notification = pin!(notify.notified()); + + // Make sure the value wasn't inserted in-between us checking the map and registering the waiter. + if let Value::Filled(value) = self.items.get(key).expect("map is append-only").value() { + return Some(value.clone()); + }; + + // Wait until the value is inserted. + notification.await; + + let entry = self.items.get(key).expect("map is append-only"); + match entry.value() { + Value::Filled(value) => Some(value.clone()), + Value::Waiting(_) => unreachable!("notify was called"), + } + } + + /// Wait for the result of a job that is running, in a blocking context. + /// + /// Will hang if [`OnceMap::done`] isn't called for this key. + pub fn wait_blocking(&self, key: &K) -> Option { + let notify = { + let entry = self.items.get(key)?; + match entry.value() { + Value::Filled(value) => return Some(value.clone()), + Value::Waiting(notify) => notify.clone(), + } + }; + + // Register the waiter for calls to `notify_waiters`. + let notification = pin!(notify.notified()); + + // Make sure the value wasn't inserted in-between us checking the map and registering the waiter. + if let Value::Filled(value) = self.items.get(key).expect("map is append-only").value() { + return Some(value.clone()); + }; + + // Wait until the value is inserted. + futures::executor::block_on(notification); + + let entry = self.items.get(key).expect("map is append-only"); + match entry.value() { + Value::Filled(value) => Some(value.clone()), + Value::Waiting(_) => unreachable!("notify was called"), + } + } + + /// Return the result of a previous job, if any. + pub fn get(&self, key: &Q) -> Option + where + K: Borrow, + { + let entry = self.items.get(key)?; + match entry.value() { + Value::Filled(value) => Some(value.clone()), + Value::Waiting(_) => None, + } + } +} + +impl Default for OnceMap { + fn default() -> Self { + Self { + items: DashMap::with_hasher(H::default()), + } + } +} + +#[derive(Debug)] +pub enum Value { + Waiting(Arc), + Filled(V), +}