diff --git a/Cargo.lock b/Cargo.lock index 38ab56d782..28f8f7ea4f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -155,6 +155,12 @@ version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" +[[package]] +name = "arrayvec" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" + [[package]] name = "async-stream" version = "0.3.6" @@ -205,14 +211,13 @@ checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" [[package]] name = "attohttpc" -version = "0.16.3" +version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fdb8867f378f33f78a811a8eb9bf108ad99430d7aad43315dd9319c827ef6247" +checksum = "8d9a9bf8b79a749ee0b911b91b671cc2b6c670bdbc7e3dfd537576ddc94bb2a2" dependencies = [ "http 0.2.12", "log", "url", - "wildmatch", ] [[package]] @@ -350,6 +355,22 @@ version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "349f9b6a179ed607305526ca489b34ad0a41aed5f7980fa90eb03160b69598fb" +[[package]] +name = "bitcoin-io" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "340e09e8399c7bd8912f495af6aa58bea0c9214773417ffaa8f6460f93aaee56" + +[[package]] +name = "bitcoin_hashes" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb18c03d0db0247e147a21a6faafd5a7eb851c743db062de72018b6b7e8e4d16" +dependencies = [ + "bitcoin-io", + "hex-conservative", +] + [[package]] name = "bitflags" version = "1.3.2" @@ -641,6 +662,7 @@ dependencies = [ "ckb-logger", "ckb-spawn", "tokio", + "wasm-bindgen-futures", ] [[package]] @@ -1675,6 +1697,9 @@ dependencies = [ [[package]] name = "ckb-systemtime" version = "0.119.0-pre" +dependencies = [ + "web-time", +] [[package]] name = "ckb-test-chain-utils" @@ -2749,6 +2774,10 @@ name = "futures-timer" version = "3.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" +dependencies = [ + "gloo-timers", + "send_wrapper", +] [[package]] name = "futures-util" @@ -2847,6 +2876,18 @@ dependencies = [ "walkdir", ] +[[package]] +name = "gloo-timers" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b995a66bb87bebce9a0f4a95aed01daca4872c050bfcb21653361c03bc35e5c" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", +] + [[package]] name = "goblin" version = "0.2.3" @@ -3020,6 +3061,15 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "hex-conservative" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5313b072ce3c597065a808dbf612c4c8e8590bdbf8b579508bf7a762c5eae6cd" +dependencies = [ + "arrayvec", +] + [[package]] name = "hkdf" version = "0.12.4" @@ -3277,10 +3327,10 @@ dependencies = [ ] [[package]] -name = "igd" -version = "0.12.1" +name = "igd-next" +version = "0.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "556b5a75cd4adb7c4ea21c64af1c48cefb2ce7d43dc4352c720a1fe47c21f355" +checksum = "76b0d7d4541def58a37bf8efc559683f21edce7c82f0d866c93ac21f7e098f93" dependencies = [ "attohttpc", "log", @@ -3578,7 +3628,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4" dependencies = [ "cfg-if", - "windows-targets 0.52.6", + "windows-targets 0.48.5", ] [[package]] @@ -5066,10 +5116,12 @@ dependencies = [ [[package]] name = "secp256k1" -version = "0.29.1" +version = "0.30.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9465315bc9d4566e1724f0fffcbcc446268cb522e60f9a27bcded6b19c108113" +checksum = "b50c5943d326858130af85e049f2661ba3c78b26589b8ab98e65e80ae44a1252" dependencies = [ + "bitcoin_hashes", + "rand 0.8.5", "secp256k1-sys", ] @@ -5111,6 +5163,12 @@ version = "1.0.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b" +[[package]] +name = "send_wrapper" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f638d531eccd6e23b980caf34876660d38e265409d8e99b397ab71eb3612fad0" + [[package]] name = "sentry" version = "0.34.0" @@ -5809,20 +5867,20 @@ dependencies = [ [[package]] name = "tentacle" -version = "0.6.1" +version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d335523ec132a2bbefbaf403b52eba047fb50bc83bed2d0b1d22c119bae2fec1" +checksum = "457c5771b5b59538fb07d70d5db3d6b9f94f45e5c1ea1197165aa050f2de4f4d" dependencies = [ "async-trait", "bytes", "futures", - "igd", + "futures-timer", + "igd-next", "js-sys", "libc", "log", "molecule", "nohash-hasher", - "once_cell", "parking_lot 0.12.3", "rand 0.8.5", "socket2", @@ -5830,6 +5888,7 @@ dependencies = [ "tentacle-secio", "thiserror", "tokio", + "tokio-tungstenite", "tokio-util", "tokio-yamux", "wasm-bindgen", @@ -5852,9 +5911,9 @@ dependencies = [ [[package]] name = "tentacle-secio" -version = "0.6.2" +version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cac8b23a7879426a4961acea6ae66287f7fe9a934d131a722cbb88f145e97fea" +checksum = "99df015b8649588f2958d4853eee221860f95d2721995857e9dde1462ceb3dc4" dependencies = [ "bs58", "bytes", @@ -6070,7 +6129,6 @@ dependencies = [ "bytes", "libc", "mio", - "parking_lot 0.12.3", "pin-project-lite", "signal-hook-registry", "socket2", @@ -6149,16 +6207,18 @@ dependencies = [ [[package]] name = "tokio-yamux" -version = "0.3.8" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2ed88a04bfbf9e70343a5748a423200ee0591c55e7e487d784a55ee8af17db9" +checksum = "208cecd45a38868bfc0a45aac52cb1aea4583c6b801bf57f351e9d531b23cb86" dependencies = [ "bytes", "futures", + "futures-timer", "log", "nohash-hasher", "tokio", "tokio-util", + "web-time", ] [[package]] @@ -6776,12 +6836,6 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7219d36b6eac893fa81e84ebe06485e7dcbb616177469b142df14f1f4deb1311" -[[package]] -name = "wildmatch" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f44b95f62d34113cf558c93511ac93027e03e9c29a60dd0fd70e6e025c7270a" - [[package]] name = "winapi" version = "0.3.9" diff --git a/db/Cargo.toml b/db/Cargo.toml index 13ead9bea4..28a53c54f5 100644 --- a/db/Cargo.toml +++ b/db/Cargo.toml @@ -13,7 +13,9 @@ ckb-app-config = { path = "../util/app-config", version = "= 0.119.0-pre" } ckb-logger = { path = "../util/logger", version = "= 0.119.0-pre" } ckb-error = { path = "../error", version = "= 0.119.0-pre" } libc = "0.2" -rocksdb = { package = "ckb-rocksdb", version ="=0.21.1", features = ["snappy"], default-features = false } +rocksdb = { package = "ckb-rocksdb", version = "=0.21.1", features = [ + "snappy", +], default-features = false } ckb-db-schema = { path = "../db-schema", version = "= 0.119.0-pre" } [dev-dependencies] diff --git a/network/Cargo.toml b/network/Cargo.toml index 90cf548193..4e7591e960 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -25,7 +25,7 @@ bs58 = { version = "0.5.0", optional = true } sentry = { version = "0.34.0", optional = true } faster-hex = { version = "0.6", optional = true } ckb-hash = { path = "../util/hash", version = "= 0.119.0-pre" } -secp256k1 = { version = "0.29", features = ["recovery"], optional = true } +secp256k1 = { version = "0.30", features = ["recovery"], optional = true } trust-dns-resolver = { version = "0.23", optional = true } snap = "1" ckb-types = { path = "../util/types", version = "= 0.119.0-pre" } @@ -33,15 +33,24 @@ ipnetwork = "0.20" serde_json = "1.0" bloom-filters = "0.1" ckb-spawn = { path = "../util/spawn", version = "= 0.119.0-pre" } -socket2 = "0.5" bitflags = "1.0" +p2p = { version = "0.6.2", package = "tentacle", default-features = false } -p2p = { version = "0.6.1", package = "tentacle", features = [ +[target.'cfg(not(target_family = "wasm"))'.dependencies] +p2p = { version = "0.6.2", package = "tentacle", features = [ "upnp", "parking_lot", "openssl-vendored", + "ws", +] } +socket2 = "0.5" + +[target.'cfg(target_family = "wasm")'.dependencies] +p2p = { version = "0.6.2", package = "tentacle", default-features = false, features = [ + "wasm-timer", ] } + [features] with_sentry = ["sentry"] with_dns_seeding = ["bs58", "faster-hex", "trust-dns-resolver", "secp256k1"] @@ -55,6 +64,7 @@ num_cpus = "1.10" ckb-systemtime = { path = "../util/systemtime", version = "= 0.119.0-pre", features = [ "enable_faketime", ] } +ckb-app-config = { path = "../util/app-config", version = "= 0.119.0-pre" } [[bench]] name = "peer_store" diff --git a/network/src/lib.rs b/network/src/lib.rs index 5a524198e6..9a24bd6d69 100644 --- a/network/src/lib.rs +++ b/network/src/lib.rs @@ -32,8 +32,8 @@ pub use crate::{ peer_registry::PeerRegistry, peer_store::Score, protocols::{ - identify::Flags, support_protocols::SupportProtocols, CKBProtocol, CKBProtocolContext, - CKBProtocolHandler, PeerIndex, + identify::Flags, support_protocols::SupportProtocols, BoxedCKBProtocolContext, CKBProtocol, + CKBProtocolContext, CKBProtocolHandler, PeerIndex, }, }; pub use p2p::{ diff --git a/network/src/network.rs b/network/src/network.rs index 734c8815ae..8082348005 100644 --- a/network/src/network.rs +++ b/network/src/network.rs @@ -22,6 +22,7 @@ use ckb_app_config::{default_support_all_protocols, NetworkConfig, SupportProtoc use ckb_logger::{debug, error, info, trace, warn}; use ckb_spawn::Spawn; use ckb_stop_handler::{broadcast_exit_signals, new_tokio_exit_rx, CancellationToken}; +use ckb_systemtime::{Duration, Instant}; use ckb_util::{Condvar, Mutex, RwLock}; use futures::{channel::mpsc::Sender, Future}; use ipnetwork::IpNetwork; @@ -55,8 +56,7 @@ use std::{ atomic::{AtomicBool, Ordering}, Arc, }, - thread, - time::{Duration, Instant}, + thread, usize, }; use tokio::{self, sync::oneshot}; @@ -92,6 +92,7 @@ pub struct NetworkState { impl NetworkState { /// Init from config pub fn from_config(config: NetworkConfig) -> Result { + #[cfg(not(target_family = "wasm"))] config.create_dir_if_not_exists()?; let local_private_key = config.fetch_private_key()?; let local_peer_id = local_private_key.peer_id(); @@ -890,7 +891,6 @@ impl NetworkService { }; service_builder = service_builder .handshake_type(network_state.local_private_key.clone().into()) - .upnp(config.upnp) .yamux_config(yamux_config) .forever(true) .max_connection_number(1024) @@ -898,30 +898,16 @@ impl NetworkService { .set_channel_size(config.channel_size()) .timeout(Duration::from_secs(5)); + #[cfg(not(target_family = "wasm"))] + { + service_builder = service_builder.upnp(config.upnp); + } + #[cfg(target_os = "linux")] let p2p_service = { if config.reuse_port_on_linux { let iter = config.listen_addresses.iter(); - #[derive(Clone, Copy, Debug, Eq, PartialEq)] - enum TransportType { - Ws, - Tcp, - } - - fn find_type(addr: &Multiaddr) -> TransportType { - let mut iter = addr.iter(); - - iter.find_map(|proto| { - if let p2p::multiaddr::Protocol::Ws = proto { - Some(TransportType::Ws) - } else { - None - } - }) - .unwrap_or(TransportType::Tcp) - } - #[derive(Clone, Copy, Debug, Eq, PartialEq)] enum BindType { None, @@ -947,38 +933,45 @@ impl NetworkService { } let mut init = BindType::None; - for addr in iter { + for multi_addr in iter { if init.is_ready() { break; } - match find_type(addr) { + match find_type(multi_addr) { // wait ckb enable ws support - TransportType::Ws => (), - TransportType::Tcp => { + TransportType::Tcp | TransportType::Ws => { // only bind once - if matches!(init, BindType::Tcp) { + if matches!(init, BindType::Tcp) || matches!(init, BindType::Ws) { continue; } - if let Some(addr) = multiaddr_to_socketaddr(addr) { + if let Some(addr) = multiaddr_to_socketaddr(multi_addr) { use p2p::service::TcpSocket; let domain = socket2::Domain::for_address(addr); - service_builder = - service_builder.tcp_config(move |socket: TcpSocket| { - let socket_ref = socket2::SockRef::from(&socket); - #[cfg(all( - unix, - not(target_os = "solaris"), - not(target_os = "illumos") - ))] - socket_ref.set_reuse_port(true)?; - - socket_ref.set_reuse_address(true)?; - if socket_ref.domain()? == domain { - socket_ref.bind(&addr.into())?; - } - Ok(socket) - }); - init.transform(TransportType::Tcp) + let bind_fn = move |socket: TcpSocket| { + let socket_ref = socket2::SockRef::from(&socket); + #[cfg(all( + unix, + not(target_os = "solaris"), + not(target_os = "illumos") + ))] + socket_ref.set_reuse_port(true)?; + + socket_ref.set_reuse_address(true)?; + if socket_ref.domain()? == domain { + socket_ref.bind(&addr.into())?; + } + Ok(socket) + }; + service_builder = match find_type(multi_addr) { + TransportType::Tcp => { + init.transform(TransportType::Tcp); + service_builder.tcp_config(bind_fn) + } + TransportType::Ws => { + init.transform(TransportType::Ws); + service_builder.tcp_config_on_ws(bind_fn) + } + }; } } } @@ -1094,11 +1087,13 @@ impl NetworkService { .unzip(); let receiver: CancellationToken = new_tokio_exit_rx(); + #[cfg(not(target_family = "wasm"))] let (start_sender, start_receiver) = mpsc::channel(); { let network_state = Arc::clone(&network_state); let p2p_control: ServiceAsyncControl = p2p_control.clone().into(); handle.spawn_task(async move { + #[cfg(not(target_family = "wasm"))] for addr in &config.listen_addresses { match p2p_service.listen(addr.to_owned()).await { Ok(listen_address) => { @@ -1121,8 +1116,9 @@ impl NetworkService { } }; } + #[cfg(not(target_family = "wasm"))] start_sender.send(Ok(())).unwrap(); - tokio::spawn(async move { p2p_service.run().await }); + p2p::runtime::spawn(async move { p2p_service.run().await }); tokio::select! { _ = receiver.cancelled() => { info!("NetworkService receive exit signal, start shutdown..."); @@ -1150,7 +1146,7 @@ impl NetworkService { } }); } - + #[cfg(not(target_family = "wasm"))] if let Ok(Err(e)) = start_receiver.recv() { return Err(e); } @@ -1419,3 +1415,17 @@ pub(crate) async fn async_disconnect_with_message( } control.disconnect(peer_index).await } + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub(crate) enum TransportType { + Ws, + Tcp, +} + +pub(crate) fn find_type(addr: &Multiaddr) -> TransportType { + if addr.iter().any(|proto| matches!(proto, Protocol::Ws)) { + TransportType::Ws + } else { + TransportType::Tcp + } +} diff --git a/network/src/peer.rs b/network/src/peer.rs index 48ae7196eb..fe7ee3dfb3 100644 --- a/network/src/peer.rs +++ b/network/src/peer.rs @@ -2,9 +2,9 @@ use crate::network_group::Group; use crate::{ multiaddr::Multiaddr, protocols::identify::Flags, ProtocolId, ProtocolVersion, SessionType, }; +use ckb_systemtime::{Duration, Instant}; use p2p::SessionId; use std::collections::HashMap; -use std::time::{Duration, Instant}; /// Peer info from identify protocol message #[derive(Clone, Debug)] diff --git a/network/src/peer_registry.rs b/network/src/peer_registry.rs index 9f3e55536b..dcc97d160f 100644 --- a/network/src/peer_registry.rs +++ b/network/src/peer_registry.rs @@ -6,6 +6,7 @@ use crate::{ extract_peer_id, Peer, PeerId, SessionType, }; use ckb_logger::debug; +use ckb_systemtime::Instant; use p2p::{multiaddr::Multiaddr, SessionId}; use rand::seq::SliceRandom; use rand::thread_rng; @@ -146,7 +147,7 @@ impl PeerRegistry { &mut candidate_peers, EVICTION_PROTECT_PEERS, |peer1, peer2| { - let now = std::time::Instant::now(); + let now = Instant::now(); let peer1_last_message = peer1 .last_ping_protocol_message_received_at .map(|t| now.saturating_duration_since(t).as_secs()) diff --git a/network/src/peer_store/addr_manager.rs b/network/src/peer_store/addr_manager.rs index b4b5179ba3..127b5e75a1 100644 --- a/network/src/peer_store/addr_manager.rs +++ b/network/src/peer_store/addr_manager.rs @@ -1,4 +1,6 @@ //! Address manager +#[cfg(target_family = "wasm")] +use crate::network::{find_type, TransportType}; use crate::peer_store::types::AddrInfo; use p2p::{multiaddr::Multiaddr, utils::multiaddr_to_socketaddr}; use rand::Rng; @@ -49,6 +51,10 @@ impl AddrManager { let mut addr_infos = Vec::with_capacity(count); let mut rng = rand::thread_rng(); let now_ms = ckb_systemtime::unix_time_as_millis(); + #[cfg(target_family = "wasm")] + let filter = |peer_addr: &AddrInfo| { + filter(peer_addr) && matches!(find_type(&peer_addr.addr), TransportType::Ws) + }; for i in 0..self.random_ids.len() { // reuse the for loop to shuffle random ids // https://en.wikipedia.org/wiki/Fisher%E2%80%93Yates_shuffle diff --git a/network/src/peer_store/peer_store_impl.rs b/network/src/peer_store/peer_store_impl.rs index 4c076066a2..0dd50d6e1e 100644 --- a/network/src/peer_store/peer_store_impl.rs +++ b/network/src/peer_store/peer_store_impl.rs @@ -1,3 +1,4 @@ +use crate::network::{find_type, TransportType}; use crate::{ errors::{PeerStoreError, Result}, extract_peer_id, multiaddr_to_socketaddr, @@ -64,6 +65,10 @@ impl PeerStore { if self.ban_list.is_addr_banned(&addr) { return Ok(()); } + #[cfg(target_family = "wasm")] + if !matches!(find_type(&addr), TransportType::Ws) { + return Ok(()); + } self.check_purge()?; let score = self.score_config.default_score; self.addr_manager @@ -165,20 +170,24 @@ impl PeerStore { let now_ms = ckb_systemtime::unix_time_as_millis(); let peers = &self.connected_peers; let addr_expired_ms = now_ms.saturating_sub(ADDR_TRY_TIMEOUT_MS); + + let filter = |peer_addr: &AddrInfo| { + extract_peer_id(&peer_addr.addr) + .map(|peer_id| !peers.contains_key(&peer_id)) + .unwrap_or_default() + && peer_addr + .connected(|t| t > addr_expired_ms && t <= now_ms.saturating_sub(DIAL_INTERVAL)) + && required_flags_filter(required_flags, Flags::from_bits_truncate(peer_addr.flags)) + }; + + // Any protocol expect websocket + #[cfg(not(target_family = "wasm"))] + let filter = |peer_addr: &AddrInfo| { + filter(peer_addr) && !matches!(find_type(&peer_addr.addr), TransportType::Ws) + }; + // get addrs that can attempt. - self.addr_manager - .fetch_random(count, |peer_addr: &AddrInfo| { - extract_peer_id(&peer_addr.addr) - .map(|peer_id| !peers.contains_key(&peer_id)) - .unwrap_or_default() - && peer_addr.connected(|t| { - t > addr_expired_ms && t <= now_ms.saturating_sub(DIAL_INTERVAL) - }) - && required_flags_filter( - required_flags, - Flags::from_bits_truncate(peer_addr.flags), - ) - }) + self.addr_manager.fetch_random(count, filter) } /// Get peers for feeler connection, this method randomly return peer addrs that we never @@ -192,14 +201,16 @@ impl PeerStore { let now_ms = ckb_systemtime::unix_time_as_millis(); let addr_expired_ms = now_ms.saturating_sub(ADDR_TRY_TIMEOUT_MS); let peers = &self.connected_peers; - self.addr_manager - .fetch_random(count, |peer_addr: &AddrInfo| { - extract_peer_id(&peer_addr.addr) - .map(|peer_id| !peers.contains_key(&peer_id)) - .unwrap_or_default() - && !peer_addr.tried_in_last_minute(now_ms) - && !peer_addr.connected(|t| t > addr_expired_ms) - }) + + let filter = |peer_addr: &AddrInfo| { + extract_peer_id(&peer_addr.addr) + .map(|peer_id| !peers.contains_key(&peer_id)) + .unwrap_or_default() + && !peer_addr.tried_in_last_minute(now_ms) + && !peer_addr.connected(|t| t > addr_expired_ms) + }; + + self.addr_manager.fetch_random(count, filter) } /// Return valid addrs that success connected, used for discovery. @@ -209,12 +220,14 @@ impl PeerStore { let now_ms = ckb_systemtime::unix_time_as_millis(); let addr_expired_ms = now_ms.saturating_sub(ADDR_TIMEOUT_MS); + + let filter = |peer_addr: &AddrInfo| { + required_flags_filter(required_flags, Flags::from_bits_truncate(peer_addr.flags)) + && peer_addr.connected(|t| t > addr_expired_ms) + }; + // get success connected addrs. - self.addr_manager - .fetch_random(count, |peer_addr: &AddrInfo| { - required_flags_filter(required_flags, Flags::from_bits_truncate(peer_addr.flags)) - && peer_addr.connected(|t| t > addr_expired_ms) - }) + self.addr_manager.fetch_random(count, filter) } /// Ban an addr diff --git a/network/src/protocols/discovery/mod.rs b/network/src/protocols/discovery/mod.rs index c08012ba1b..573fad175a 100644 --- a/network/src/protocols/discovery/mod.rs +++ b/network/src/protocols/discovery/mod.rs @@ -1,10 +1,7 @@ -use std::{ - collections::HashMap, - sync::Arc, - time::{Duration, Instant}, -}; +use std::{collections::HashMap, sync::Arc}; use ckb_logger::{debug, error, trace, warn}; +use ckb_systemtime::{Duration, Instant}; use p2p::{ async_trait, bytes, context::{ProtocolContext, ProtocolContextMutRef, SessionContext}, diff --git a/network/src/protocols/discovery/state.rs b/network/src/protocols/discovery/state.rs index cba4d41d14..a7c7dca091 100644 --- a/network/src/protocols/discovery/state.rs +++ b/network/src/protocols/discovery/state.rs @@ -1,4 +1,4 @@ -use std::time::{Duration, Instant}; +use ckb_systemtime::{Duration, Instant}; use ckb_logger::debug; use p2p::{ diff --git a/network/src/protocols/identify/mod.rs b/network/src/protocols/identify/mod.rs index d2fefde131..e534dda78f 100644 --- a/network/src/protocols/identify/mod.rs +++ b/network/src/protocols/identify/mod.rs @@ -1,9 +1,9 @@ use std::borrow::Cow; use std::collections::HashMap; use std::sync::{atomic::Ordering, Arc}; -use std::time::{Duration, Instant}; use ckb_logger::{debug, error, trace, warn}; +use ckb_systemtime::{Duration, Instant}; use p2p::{ async_trait, bytes::Bytes, diff --git a/network/src/protocols/mod.rs b/network/src/protocols/mod.rs index f2da17e4a0..80a117e73b 100644 --- a/network/src/protocols/mod.rs +++ b/network/src/protocols/mod.rs @@ -128,38 +128,36 @@ pub trait CKBProtocolContext: Send { } } +/// type alias of dyn ckb protocol context +pub type BoxedCKBProtocolContext = Arc; + /// Abstract protocol handle base on tentacle service handle #[async_trait] pub trait CKBProtocolHandler: Sync + Send { /// Init action on service run - async fn init(&mut self, nc: Arc); + async fn init(&mut self, nc: BoxedCKBProtocolContext); /// Called when opening protocol async fn connected( &mut self, - _nc: Arc, + _nc: BoxedCKBProtocolContext, _peer_index: PeerIndex, _version: &str, ) { } /// Called when closing protocol - async fn disconnected( - &mut self, - _nc: Arc, - _peer_index: PeerIndex, - ) { - } + async fn disconnected(&mut self, _nc: BoxedCKBProtocolContext, _peer_index: PeerIndex) {} /// Called when the corresponding protocol message is received async fn received( &mut self, - _nc: Arc, + _nc: BoxedCKBProtocolContext, _peer_index: PeerIndex, _data: Bytes, ) { } /// Called when the Service receives the notify task - async fn notify(&mut self, _nc: Arc, _token: u64) {} + async fn notify(&mut self, _nc: BoxedCKBProtocolContext, _token: u64) {} /// Behave like `Stream::poll` - async fn poll(&mut self, _nc: Arc) -> Option<()> { + async fn poll(&mut self, _nc: BoxedCKBProtocolContext) -> Option<()> { None } } @@ -648,6 +646,6 @@ impl Future for BlockingFutureTask { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - tokio::task::block_in_place(|| self.task.poll_unpin(cx)) + p2p::runtime::block_in_place(|| self.task.poll_unpin(cx)) } } diff --git a/network/src/protocols/ping.rs b/network/src/protocols/ping.rs index 984dbf2a15..5f692b46b1 100644 --- a/network/src/protocols/ping.rs +++ b/network/src/protocols/ping.rs @@ -18,9 +18,10 @@ use std::{ collections::{HashMap, HashSet}, str, sync::Arc, - time::{Duration, Instant}, }; +use ckb_systemtime::{Duration, Instant}; + const SEND_PING_TOKEN: u64 = 0; const CHECK_TIMEOUT_TOKEN: u64 = 1; const CONTROL_CHANNEL_BUFFER_SIZE: usize = 2; diff --git a/network/src/services/dns_seeding/seed_record.rs b/network/src/services/dns_seeding/seed_record.rs index 20ec1affcc..92bce2ca74 100644 --- a/network/src/services/dns_seeding/seed_record.rs +++ b/network/src/services/dns_seeding/seed_record.rs @@ -99,7 +99,7 @@ impl SeedRecord { return Err(SeedRecordError::InvalidRecord); } - let recid = RecoveryId::from_i32(i32::from(sig[64])) + let recid = RecoveryId::try_from(i32::from(sig[64])) .map_err(|_| SeedRecordError::InvalidSignature)?; let signature = RecoverableSignature::from_compact(&sig[0..64], recid) .map_err(|_| SeedRecordError::InvalidSignature)?; diff --git a/network/src/services/dump_peer_store.rs b/network/src/services/dump_peer_store.rs index 735ace3a71..12e3fd5d36 100644 --- a/network/src/services/dump_peer_store.rs +++ b/network/src/services/dump_peer_store.rs @@ -1,13 +1,13 @@ use crate::NetworkState; use ckb_logger::{debug, warn}; -use futures::Future; +use futures::{Future, StreamExt}; +use p2p::runtime::{Interval, MissedTickBehavior}; use std::{ pin::Pin, sync::Arc, task::{Context, Poll}, time::Duration, }; -use tokio::time::{Instant, Interval, MissedTickBehavior}; const DEFAULT_DUMP_INTERVAL: Duration = Duration::from_secs(3600); // 1 hour @@ -27,6 +27,7 @@ impl DumpPeerStoreService { fn dump_peer_store(&self) { let path = self.network_state.config.peer_store_path(); + #[cfg(not(target_family = "wasm"))] self.network_state.with_peer_store_mut(|peer_store| { if let Err(err) = peer_store.dump_to_dir(&path) { warn!("Dump peer store error, path: {:?} error: {}", path, err); @@ -50,17 +51,20 @@ impl Future for DumpPeerStoreService { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { if self.interval.is_none() { self.interval = { - let mut interval = tokio::time::interval_at( - Instant::now() + DEFAULT_DUMP_INTERVAL, - DEFAULT_DUMP_INTERVAL, - ); + let mut interval = Interval::new(DEFAULT_DUMP_INTERVAL); // The dump peer store service does not need to urgently compensate for the missed wake, // just delay behavior is enough interval.set_missed_tick_behavior(MissedTickBehavior::Delay); Some(interval) } } - while self.interval.as_mut().unwrap().poll_tick(cx).is_ready() { + while self + .interval + .as_mut() + .unwrap() + .poll_next_unpin(cx) + .is_ready() + { self.dump_peer_store() } Poll::Pending diff --git a/network/src/services/outbound_peer.rs b/network/src/services/outbound_peer.rs index 6498ec6894..10dca1c33c 100644 --- a/network/src/services/outbound_peer.rs +++ b/network/src/services/outbound_peer.rs @@ -4,7 +4,8 @@ use crate::{ }; use ckb_logger::trace; use ckb_systemtime::unix_time_as_millis; -use futures::Future; +use futures::{Future, StreamExt}; +use p2p::runtime::{Interval, MissedTickBehavior}; use p2p::{multiaddr::MultiAddr, service::ServiceControl}; use rand::prelude::IteratorRandom; use std::{ @@ -13,7 +14,6 @@ use std::{ task::{Context, Poll}, time::Duration, }; -use tokio::time::{Interval, MissedTickBehavior}; const FEELER_CONNECTION_COUNT: usize = 10; @@ -155,14 +155,20 @@ impl Future for OutboundPeerService { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { if self.interval.is_none() { self.interval = { - let mut interval = tokio::time::interval(self.try_connect_interval); + let mut interval = Interval::new(self.try_connect_interval); // The outbound service does not need to urgently compensate for the missed wake, // just skip behavior is enough interval.set_missed_tick_behavior(MissedTickBehavior::Skip); Some(interval) } } - while self.interval.as_mut().unwrap().poll_tick(cx).is_ready() { + while self + .interval + .as_mut() + .unwrap() + .poll_next_unpin(cx) + .is_ready() + { // keep whitelist peer on connected self.try_dial_whitelist(); // ensure feeler work at any time diff --git a/network/src/services/protocol_type_checker.rs b/network/src/services/protocol_type_checker.rs index 9a469a7d92..c41ba14147 100644 --- a/network/src/services/protocol_type_checker.rs +++ b/network/src/services/protocol_type_checker.rs @@ -10,15 +10,15 @@ /// Other protocols will be closed after a timeout. use crate::{network::disconnect_with_message, NetworkState, Peer, ProtocolId, SupportProtocols}; use ckb_logger::debug; -use futures::Future; +use ckb_systemtime::{Duration, Instant}; +use futures::{Future, StreamExt}; +use p2p::runtime::{Interval, MissedTickBehavior}; use p2p::service::ServiceControl; use std::{ pin::Pin, sync::Arc, task::{Context, Poll}, - time::{Duration, Instant}, }; -use tokio::time::{Interval, MissedTickBehavior}; const TIMEOUT: Duration = Duration::from_secs(10); const CHECK_INTERVAL: Duration = Duration::from_secs(30); @@ -129,14 +129,20 @@ impl Future for ProtocolTypeCheckerService { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { if self.interval.is_none() { self.interval = { - let mut interval = tokio::time::interval(CHECK_INTERVAL); + let mut interval = Interval::new(CHECK_INTERVAL); // The protocol type checker service does not need to urgently compensate for the missed wake, // just skip behavior is enough interval.set_missed_tick_behavior(MissedTickBehavior::Skip); Some(interval) } } - while self.interval.as_mut().unwrap().poll_tick(cx).is_ready() { + while self + .interval + .as_mut() + .unwrap() + .poll_next_unpin(cx) + .is_ready() + { self.check_protocol_type(); } Poll::Pending diff --git a/resource/src/lib.rs b/resource/src/lib.rs index 188b060381..b69b644b0f 100644 --- a/resource/src/lib.rs +++ b/resource/src/lib.rs @@ -33,7 +33,7 @@ use serde::{Deserialize, Serialize}; use std::borrow::Cow; use std::fmt; use std::fs; -use std::io::{self, BufReader, Read}; +use std::io::{self, BufReader, Cursor, Read}; use std::path::{Path, PathBuf}; use ckb_system_scripts::BUNDLED_CELL; @@ -71,6 +71,11 @@ pub enum Resource { /// The file path to the resource. file: PathBuf, }, + /// A resource that init by user custom + Raw { + /// raw data + raw: String, + }, } impl fmt::Display for Resource { @@ -78,6 +83,7 @@ impl fmt::Display for Resource { match self { Resource::Bundled { bundled } => write!(f, "Bundled({bundled})"), Resource::FileSystem { file } => write!(f, "FileSystem({})", file.display()), + Resource::Raw { raw } => write!(f, "Raw({})", raw), } } } @@ -93,6 +99,11 @@ impl Resource { Resource::FileSystem { file } } + /// Creates a reference to the resource resident in the memory. + pub fn raw(raw: String) -> Resource { + Resource::Raw { raw } + } + /// Creates the CKB config file resource from the file system. /// /// It searches the file name `CKB_CONFIG_FILE_NAME` in the directory `root_dir`. @@ -156,6 +167,7 @@ impl Resource { SourceFiles::new(&BUNDLED_CELL, &BUNDLED).is_available(bundled) } Resource::FileSystem { file } => file.exists(), + Resource::Raw { .. } => true, } } @@ -185,6 +197,7 @@ impl Resource { match self { Resource::Bundled { bundled } => SourceFiles::new(&BUNDLED_CELL, &BUNDLED).get(bundled), Resource::FileSystem { file } => Ok(Cow::Owned(fs::read(file)?)), + Resource::Raw { raw } => Ok(Cow::Owned(raw.to_owned().into_bytes())), } } @@ -195,6 +208,9 @@ impl Resource { SourceFiles::new(&BUNDLED_CELL, &BUNDLED).read(bundled) } Resource::FileSystem { file } => Ok(Box::new(BufReader::new(fs::File::open(file)?))), + Resource::Raw { raw } => Ok(Box::new(BufReader::new(Cursor::new( + raw.to_owned().into_bytes(), + )))), } } diff --git a/script/Cargo.toml b/script/Cargo.toml index 5c63d38216..0edf0152f6 100644 --- a/script/Cargo.toml +++ b/script/Cargo.toml @@ -28,6 +28,9 @@ ckb-logger = { path = "../util/logger", version = "= 0.119.0-pre", optional = tr serde = { version = "1.0", features = ["derive"] } ckb-error = { path = "../error", version = "= 0.119.0-pre" } ckb-chain-spec = { path = "../spec", version = "= 0.119.0-pre" } +tokio = { version = "1.35.0", features = [] } + +[target.'cfg(not(target_family = "wasm"))'.dependencies] tokio = { version = "1.35.0", features = ["rt-multi-thread"] } [dev-dependencies] diff --git a/script/src/verify.rs b/script/src/verify.rs index d20afd5ceb..09c674f76c 100644 --- a/script/src/verify.rs +++ b/script/src/verify.rs @@ -39,6 +39,7 @@ use std::{ collections::{BTreeMap, HashMap}, sync::RwLock, }; +#[cfg(not(target_family = "wasm"))] use tokio::sync::{ oneshot, watch::{self, Receiver}, @@ -689,6 +690,7 @@ where /// Performing a resumable verification on the transaction scripts with signal channel, /// if `Suspend` comes from `command_rx`, the process will be hang up until `Resume` comes, /// otherwise, it will return until the verification is completed. + #[cfg(not(target_family = "wasm"))] pub async fn resumable_verify_with_signal( &self, limit_cycles: Cycle, @@ -1072,6 +1074,7 @@ where } } + #[cfg(not(target_family = "wasm"))] async fn verify_group_with_signal( &self, group: &ScriptGroup, @@ -1163,6 +1166,7 @@ where } } + #[cfg(not(target_family = "wasm"))] async fn chunk_run_with_signal( &self, script_group: &ScriptGroup, diff --git a/spec/Cargo.toml b/spec/Cargo.toml index 157e9b140c..2aa3408865 100644 --- a/spec/Cargo.toml +++ b/spec/Cargo.toml @@ -25,6 +25,8 @@ ckb-error = { path = "../error", version = "= 0.119.0-pre" } ckb-traits = { path = "../traits", version = "= 0.119.0-pre" } ckb-logger = { path = "../util/logger", version = "= 0.119.0-pre" } +[target.'cfg(not(target_family = "wasm"))'.dependencies] +cacache = { version = "13.0.0", default-features = false, features = ["mmap"] } [dev-dependencies] tempfile.workspace = true diff --git a/spec/src/versionbits/mod.rs b/spec/src/versionbits/mod.rs index 5c62a9834a..f9cb5af1fe 100644 --- a/spec/src/versionbits/mod.rs +++ b/spec/src/versionbits/mod.rs @@ -149,6 +149,7 @@ pub struct Cache { impl Cache { /// Reads the entire contents of a cache file synchronously into a bytes vector, /// looking the data up by key. + #[cfg(not(target_family = "wasm"))] pub fn get(&self, key: &Byte32) -> Option { match cacache::read_sync(&self.path, Self::encode_key(key)) { Ok(bytes) => Some(Self::decode_value(bytes)), @@ -160,7 +161,15 @@ impl Cache { } } + /// Soft Versionbit only work on tx-pool/block_assembler, it will not work on wasm, + /// so it can unimplemented + #[cfg(target_family = "wasm")] + pub fn get(&self, key: &Byte32) -> Option { + unimplemented!() + } + /// Writes data to the cache synchronously + #[cfg(not(target_family = "wasm"))] pub fn insert(&self, key: &Byte32, value: ThresholdState) { if let Err(e) = cacache::write_sync(&self.path, Self::encode_key(key), Self::encode_value(value)) @@ -169,6 +178,13 @@ impl Cache { } } + /// Soft Versionbit only work on tx-pool/block_assembler, it will not work on wasm, + /// so it can unimplemented + #[cfg(target_family = "wasm")] + pub fn insert(&self, key: &Byte32, value: ThresholdState) { + unimplemented!() + } + fn decode_value(value: Vec) -> ThresholdState { ThresholdState::from_u8(value[0]) } diff --git a/util/app-config/src/configs/network.rs b/util/app-config/src/configs/network.rs index b30ff1689f..82ca7d4e75 100644 --- a/util/app-config/src/configs/network.rs +++ b/util/app-config/src/configs/network.rs @@ -297,6 +297,7 @@ impl Config { } /// Reads the private key from file or generates one if the file does not exist. + #[cfg(not(target_family = "wasm"))] pub fn fetch_private_key(&self) -> Result { match self.read_secret_key()? { Some(key) => Ok(key), @@ -307,6 +308,12 @@ impl Config { } } + #[cfg(target_family = "wasm")] + pub fn fetch_private_key(&self) -> Result { + let random_key_pair = generate_random_key(); + secio::SecioKeyPair::secp256k1_raw_key(&random_key_pair).map_err(Into::into) + } + /// Gets the list of whitelist peers. pub fn whitelist_peers(&self) -> Vec { self.whitelist_peers.clone() diff --git a/util/crypto/Cargo.toml b/util/crypto/Cargo.toml index 17b010eab8..702c4dbfb5 100644 --- a/util/crypto/Cargo.toml +++ b/util/crypto/Cargo.toml @@ -10,7 +10,7 @@ repository = "https://github.com/nervosnetwork/ckb" [dependencies] ckb-fixed-hash = { path = "../fixed-hash", version = "= 0.119.0-pre" } -secp256k1 = { version = "0.29", features = ["recovery"], optional = true } +secp256k1 = { version = "0.30", features = ["recovery"], optional = true } thiserror = "1.0.22" rand = { version = "0.8", features = ["small_rng"] } faster-hex = "0.6" diff --git a/util/crypto/src/secp/signature.rs b/util/crypto/src/secp/signature.rs index a5a0e45dd9..1e7e64b265 100644 --- a/util/crypto/src/secp/signature.rs +++ b/util/crypto/src/secp/signature.rs @@ -36,7 +36,7 @@ impl Signature { pub fn from_compact(rec_id: RecoveryId, ret: [u8; 64]) -> Self { let mut data = [0; 65]; data[0..64].copy_from_slice(&ret[0..64]); - data[64] = rec_id.to_i32() as u8; + data[64] = Into::::into(rec_id) as u8; Signature(data) } @@ -79,7 +79,7 @@ impl Signature { /// Converts compact signature to a recoverable signature pub fn to_recoverable(&self) -> Result { - let recovery_id = RecoveryId::from_i32(i32::from(self.0[64]))?; + let recovery_id = RecoveryId::try_from(i32::from(self.0[64]))?; Ok(RecoverableSignature::from_compact( &self.0[0..64], recovery_id, diff --git a/util/fixed-hash/core/Cargo.toml b/util/fixed-hash/core/Cargo.toml index b08197f184..69af156315 100644 --- a/util/fixed-hash/core/Cargo.toml +++ b/util/fixed-hash/core/Cargo.toml @@ -15,6 +15,5 @@ faster-hex = "0.6" schemars = { version = "0.8.19", package = "ckb_schemars" } - [dev-dependencies] serde_json = "1.0" diff --git a/util/hash/Cargo.toml b/util/hash/Cargo.toml index 021a3cd9ee..d930f110c6 100644 --- a/util/hash/Cargo.toml +++ b/util/hash/Cargo.toml @@ -10,12 +10,14 @@ repository = "https://github.com/nervosnetwork/ckb" [features] default = ["blake2b-ref", "blake2b-rs"] -ckb-contract = ["blake2b-ref"] # This feature is used for CKB contract development +ckb-contract = [ + "blake2b-ref", +] # This feature is used for CKB contract development -[target.'cfg(not(target_arch = "wasm32"))'.dependencies] +[target.'cfg(not(target_family = "wasm"))'.dependencies] blake2b-rs = { version = "0.2", optional = true } -[target.'cfg(target_arch = "wasm32")'.dependencies] +[target.'cfg(target_family = "wasm")'.dependencies] blake2b-ref = { version = "0.3", optional = true } [dependencies] diff --git a/util/hash/src/lib.rs b/util/hash/src/lib.rs index b988f041f5..530794e0c5 100644 --- a/util/hash/src/lib.rs +++ b/util/hash/src/lib.rs @@ -12,10 +12,10 @@ #[cfg(feature = "ckb-contract")] pub use blake2b_ref::{Blake2b, Blake2bBuilder}; -#[cfg(all(not(feature = "ckb-contract"), target_arch = "wasm32"))] +#[cfg(all(not(feature = "ckb-contract"), target_family = "wasm"))] pub use blake2b_ref::{Blake2b, Blake2bBuilder}; -#[cfg(all(not(feature = "ckb-contract"), not(target_arch = "wasm32")))] +#[cfg(all(not(feature = "ckb-contract"), not(target_family = "wasm")))] pub use blake2b_rs::{Blake2b, Blake2bBuilder}; #[doc(hidden)] diff --git a/util/jsonrpc-types/src/blockchain.rs b/util/jsonrpc-types/src/blockchain.rs index e5812d1d9e..abdf5ce939 100644 --- a/util/jsonrpc-types/src/blockchain.rs +++ b/util/jsonrpc-types/src/blockchain.rs @@ -25,7 +25,7 @@ use std::fmt; /// when the low 1 bit is 0, it indicates the data, /// and then it relies on the high 7 bits to indicate /// that the data actually corresponds to the version. -#[derive(Default, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, Debug, JsonSchema)] +#[derive(Default, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash, Debug, JsonSchema)] #[serde(rename_all = "snake_case")] pub enum ScriptHashType { /// Type "data" matches script code via cell data hash, and run the script code in v0 CKB VM. diff --git a/util/runtime/Cargo.toml b/util/runtime/Cargo.toml index fbf1d7539b..be589d9e94 100644 --- a/util/runtime/Cargo.toml +++ b/util/runtime/Cargo.toml @@ -9,6 +9,13 @@ homepage = "https://github.com/nervosnetwork/ckb" repository = "https://github.com/nervosnetwork/ckb" [dependencies] -tokio = { version = "1", features = ["full"] } +tokio = { version = "1", features = ["rt", "sync"] } ckb-logger = { path = "../logger", version = "= 0.119.0-pre" } -ckb-spawn = { path = "../spawn", version = "= 0.119.0-pre" } +ckb-spawn = { path = "../spawn", version = "= 0.119.0-pre" } + +[target.'cfg(not(target_family = "wasm"))'.dependencies] +tokio = { version = "1", features = ["rt-multi-thread"] } + + +[target.'cfg(target_family = "wasm")'.dependencies] +wasm-bindgen-futures = "0.4" diff --git a/util/runtime/src/brower.rs b/util/runtime/src/brower.rs new file mode 100644 index 0000000000..18c88678b9 --- /dev/null +++ b/util/runtime/src/brower.rs @@ -0,0 +1,27 @@ +use ckb_spawn::Spawn; +use std::future::Future; +use wasm_bindgen_futures::spawn_local; + +#[derive(Debug, Clone)] +pub struct Handle {} + +impl Handle { + /// Spawns a future onto the runtime. + /// + /// This spawns the given future onto the runtime's executor + pub fn spawn(&self, future: F) + where + F: Future + 'static, + { + spawn_local(async move { future.await }) + } +} + +impl Spawn for Handle { + fn spawn_task(&self, future: F) + where + F: Future + 'static, + { + self.spawn(future); + } +} diff --git a/util/runtime/src/lib.rs b/util/runtime/src/lib.rs index 5b9809bea0..89ca13d117 100644 --- a/util/runtime/src/lib.rs +++ b/util/runtime/src/lib.rs @@ -1,159 +1,15 @@ //! Utilities for tokio runtime. -use ckb_spawn::Spawn; -use core::future::Future; -use std::sync::atomic::{AtomicU32, Ordering}; -use std::thread::available_parallelism; -use tokio::runtime::Builder; -use tokio::runtime::Handle as TokioHandle; - -use tokio::task::JoinHandle; - pub use tokio; pub use tokio::runtime::Runtime; -use tokio::sync::mpsc::{Receiver, Sender}; - -// Handle is a newtype wrap and unwrap tokio::Handle, it is workaround with Rust Orphan Rules. -// We need `Handle` impl ckb spawn trait decouple tokio dependence - -/// Handle to the runtime. -#[derive(Debug, Clone)] -pub struct Handle { - pub(crate) inner: TokioHandle, - guard: Option>, -} - -impl Handle { - /// Create a new Handle - pub fn new(inner: TokioHandle, guard: Option>) -> Self { - Self { inner, guard } - } - - /// Drop the guard - pub fn drop_guard(&mut self) { - let _ = self.guard.take(); - } -} - -impl Handle { - /// Enter the runtime context. This allows you to construct types that must - /// have an executor available on creation such as [`tokio::time::Sleep`] or [`tokio::net::TcpStream`]. - /// It will also allow you to call methods such as [`tokio::spawn`]. - pub fn enter(&self, f: F) -> R - where - F: FnOnce() -> R, - { - let _enter = self.inner.enter(); - f() - } - - /// Spawns a future onto the runtime. - /// - /// This spawns the given future onto the runtime's executor - pub fn spawn(&self, future: F) -> JoinHandle - where - F: Future + Send + 'static, - F::Output: Send + 'static, - { - let tokio_task_guard = self.guard.clone(); - - self.inner.spawn(async move { - // move tokio_task_guard into the spawned future - // so that it will be dropped when the future is finished - let _guard = tokio_task_guard; - future.await - }) - } - - /// Run a future to completion on the Tokio runtime from a synchronous context. - pub fn block_on(&self, future: F) -> F::Output { - self.inner.block_on(future) - } - - /// Spawns a future onto the runtime blocking pool. - /// - /// This spawns the given future onto the runtime's blocking executor - pub fn spawn_blocking(&self, f: F) -> JoinHandle - where - F: FnOnce() -> R + Send + 'static, - R: Send + 'static, - { - self.inner.spawn_blocking(f) - } - - /// Transform to inner tokio handler - pub fn into_inner(self) -> TokioHandle { - self.inner - } -} - -/// Create a new runtime with unique name. -fn new_runtime(worker_num: Option) -> Runtime { - Builder::new_multi_thread() - .enable_all() - .worker_threads(worker_num.unwrap_or_else(|| available_parallelism().unwrap().into())) - .thread_name_fn(|| { - static ATOMIC_ID: AtomicU32 = AtomicU32::new(0); - let id = ATOMIC_ID - .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |n| { - // A long thread name will cut to 15 characters in debug tools. - // Such as "top", "htop", "gdb" and so on. - // It's a kernel limit. - // - // So if we want to see the whole name in debug tools, - // this number should have 6 digits at most, - // since the prefix uses 9 characters in below code. - // - // There still has a issue: - // When id wraps around, we couldn't know whether the old id - // is released or not. - // But we can ignore this, because it's almost impossible. - if n >= 999_999 { - Some(0) - } else { - Some(n + 1) - } - }) - .expect("impossible since the above closure must return Some(number)"); - format!("GlobalRt-{id}") - }) - .build() - .expect("ckb runtime initialized") -} - -/// Create new threaded_scheduler tokio Runtime, return `Runtime` -pub fn new_global_runtime(worker_num: Option) -> (Handle, Receiver<()>, Runtime) { - let runtime = new_runtime(worker_num); - let handle = runtime.handle().clone(); - let (guard, handle_stop_rx): (Sender<()>, Receiver<()>) = tokio::sync::mpsc::channel::<()>(1); - - (Handle::new(handle, Some(guard)), handle_stop_rx, runtime) -} - -/// Create new threaded_scheduler tokio Runtime, return `Handle` and background thread join handle, -/// NOTICE: This is only used in testing -pub fn new_background_runtime() -> Handle { - let runtime = new_runtime(None); - let handle = runtime.handle().clone(); - let (guard, mut handle_stop_rx): (Sender<()>, Receiver<()>) = - tokio::sync::mpsc::channel::<()>(1); - let _thread = std::thread::Builder::new() - .name("GlobalRtBuilder".to_string()) - .spawn(move || { - let ret = runtime.block_on(async move { handle_stop_rx.recv().await }); - ckb_logger::debug!("Global runtime finished {:?}", ret); - }) - .expect("tokio runtime started"); +#[cfg(not(target_family = "wasm"))] +pub use native::*; - Handle::new(handle, Some(guard)) -} +#[cfg(target_family = "wasm")] +pub use brower::*; -impl Spawn for Handle { - fn spawn_task(&self, future: F) - where - F: Future + Send + 'static, - { - self.spawn(future); - } -} +#[cfg(target_family = "wasm")] +mod brower; +#[cfg(not(target_family = "wasm"))] +mod native; diff --git a/util/runtime/src/native.rs b/util/runtime/src/native.rs new file mode 100644 index 0000000000..7c7a728f78 --- /dev/null +++ b/util/runtime/src/native.rs @@ -0,0 +1,153 @@ +use ckb_spawn::Spawn; +use core::future::Future; +use std::sync::atomic::{AtomicU32, Ordering}; +use std::thread::available_parallelism; +use tokio::runtime::{Builder, Handle as TokioHandle, Runtime}; + +use tokio::sync::mpsc::{Receiver, Sender}; +use tokio::task::JoinHandle; + +// Handle is a newtype wrap and unwrap tokio::Handle, it is workaround with Rust Orphan Rules. +// We need `Handle` impl ckb spawn trait decouple tokio dependence + +/// Handle to the runtime. +#[derive(Debug, Clone)] +pub struct Handle { + pub(crate) inner: TokioHandle, + guard: Option>, +} + +impl Handle { + /// Create a new Handle + pub fn new(inner: TokioHandle, guard: Option>) -> Self { + Self { inner, guard } + } + + /// Drop the guard + pub fn drop_guard(&mut self) { + let _ = self.guard.take(); + } +} + +impl Handle { + /// Enter the runtime context. This allows you to construct types that must + /// have an executor available on creation such as [`tokio::time::Sleep`] or [`tokio::net::TcpStream`]. + /// It will also allow you to call methods such as [`tokio::spawn`]. + pub fn enter(&self, f: F) -> R + where + F: FnOnce() -> R, + { + let _enter = self.inner.enter(); + f() + } + + /// Spawns a future onto the runtime. + /// + /// This spawns the given future onto the runtime's executor + pub fn spawn(&self, future: F) -> JoinHandle + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + let tokio_task_guard = self.guard.clone(); + + self.inner.spawn(async move { + // move tokio_task_guard into the spawned future + // so that it will be dropped when the future is finished + let _guard = tokio_task_guard; + future.await + }) + } + + /// Run a future to completion on the Tokio runtime from a synchronous context. + pub fn block_on(&self, future: F) -> F::Output { + self.inner.block_on(future) + } + + /// Spawns a future onto the runtime blocking pool. + /// + /// This spawns the given future onto the runtime's blocking executor + pub fn spawn_blocking(&self, f: F) -> JoinHandle + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + self.inner.spawn_blocking(f) + } + + /// Transform to inner tokio handler + pub fn into_inner(self) -> TokioHandle { + self.inner + } +} + +/// Create a new runtime with unique name. +fn new_runtime(worker_num: Option) -> Runtime { + Builder::new_multi_thread() + .enable_all() + .worker_threads(worker_num.unwrap_or_else(|| available_parallelism().unwrap().into())) + .thread_name_fn(|| { + static ATOMIC_ID: AtomicU32 = AtomicU32::new(0); + let id = ATOMIC_ID + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |n| { + // A long thread name will cut to 15 characters in debug tools. + // Such as "top", "htop", "gdb" and so on. + // It's a kernel limit. + // + // So if we want to see the whole name in debug tools, + // this number should have 6 digits at most, + // since the prefix uses 9 characters in below code. + // + // There still has a issue: + // When id wraps around, we couldn't know whether the old id + // is released or not. + // But we can ignore this, because it's almost impossible. + if n >= 999_999 { + Some(0) + } else { + Some(n + 1) + } + }) + .expect("impossible since the above closure must return Some(number)"); + format!("GlobalRt-{id}") + }) + .build() + .expect("ckb runtime initialized") +} + +/// Create new threaded_scheduler tokio Runtime, return `Runtime` +pub fn new_global_runtime(worker_num: Option) -> (Handle, Receiver<()>, Runtime) { + let runtime = new_runtime(worker_num); + let handle = runtime.handle().clone(); + let (guard, handle_stop_rx): (Sender<()>, Receiver<()>) = tokio::sync::mpsc::channel::<()>(1); + + (Handle::new(handle, Some(guard)), handle_stop_rx, runtime) +} + +/// Create new threaded_scheduler tokio Runtime, return `Handle` and background thread join handle, +/// NOTICE: This is only used in testing +pub fn new_background_runtime() -> Handle { + let runtime = new_runtime(None); + let handle = runtime.handle().clone(); + + let (guard, mut handle_stop_rx): (Sender<()>, Receiver<()>) = + tokio::sync::mpsc::channel::<()>(1); + let _thread = std::thread::Builder::new() + .name("GlobalRtBuilder".to_string()) + .spawn(move || { + let ret = runtime.block_on(async move { handle_stop_rx.recv().await }); + ckb_logger::debug!("Global runtime finished {:?}", ret); + }) + .expect("tokio runtime started"); + + Handle::new(handle, Some(guard)) +} + +impl Spawn for Handle { + fn spawn_task(&self, future: F) + where + F: Future + Send + 'static, + { + self.spawn(future); + } +} diff --git a/util/spawn/src/lib.rs b/util/spawn/src/lib.rs index 83864084bb..8d5bcabf23 100644 --- a/util/spawn/src/lib.rs +++ b/util/spawn/src/lib.rs @@ -5,9 +5,18 @@ use core::future::Future; /// `Spawn` abstract async runtime, spawns a future onto the runtime +#[cfg(not(target_family = "wasm"))] pub trait Spawn { /// This spawns the given future onto the runtime's executor fn spawn_task(&self, task: F) where F: Future + Send + 'static; } + +#[cfg(target_family = "wasm")] +pub trait Spawn { + /// This spawns the given future onto the runtime's executor + fn spawn_task(&self, task: F) + where + F: Future + 'static; +} diff --git a/util/stop-handler/Cargo.toml b/util/stop-handler/Cargo.toml index 60d24bd9a3..88a35c7603 100644 --- a/util/stop-handler/Cargo.toml +++ b/util/stop-handler/Cargo.toml @@ -10,7 +10,7 @@ repository = "https://github.com/nervosnetwork/ckb" [dependencies] ckb-logger = { path = "../logger", version = "= 0.119.0-pre" } -tokio = { version = "1", features = ["sync", "rt-multi-thread"] } +tokio = { version = "1", features = ["sync"] } ckb-channel = { path = "../channel", version = "= 0.119.0-pre" } ckb-util = { path = "..", version = "= 0.119.0-pre" } ckb-async-runtime = { path = "../runtime", version = "= 0.119.0-pre" } diff --git a/util/systemtime/Cargo.toml b/util/systemtime/Cargo.toml index 90cb7f31a3..44d7d46f45 100644 --- a/util/systemtime/Cargo.toml +++ b/util/systemtime/Cargo.toml @@ -10,6 +10,9 @@ repository = "https://github.com/nervosnetwork/ckb" [dependencies] +[target.'cfg(all(target_family = "wasm", target_os = "unknown"))'.dependencies] +web-time = "1.1.0" + [dev-dependencies] [features] diff --git a/util/systemtime/src/lib.rs b/util/systemtime/src/lib.rs index aed840fa23..f33082aa39 100644 --- a/util/systemtime/src/lib.rs +++ b/util/systemtime/src/lib.rs @@ -4,7 +4,10 @@ mod test_realtime; #[cfg(feature = "enable_faketime")] use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; -use std::time::Duration; +#[cfg(not(target_family = "wasm"))] +pub use std::time::{Duration, Instant, SystemTime}; +#[cfg(all(target_family = "wasm", target_os = "unknown"))] +pub use web_time::{Duration, Instant, SystemTime}; // Store faketime timestamp here #[cfg(feature = "enable_faketime")] @@ -16,8 +19,8 @@ static FAKETIME_ENABLED: AtomicBool = AtomicBool::new(false); // Get real system's timestamp in millis fn system_time_as_millis() -> u64 { - let duration = std::time::SystemTime::now() - .duration_since(std::time::SystemTime::UNIX_EPOCH) + let duration = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) .expect("SystemTime before UNIX EPOCH!"); duration.as_secs() * 1000 + u64::from(duration.subsec_millis()) } diff --git a/verification/Cargo.toml b/verification/Cargo.toml index 4565967bc4..3c0fd6e390 100644 --- a/verification/Cargo.toml +++ b/verification/Cargo.toml @@ -23,7 +23,7 @@ derive_more = { version = "1", default-features = false, features = [ "display", ] } ckb-verification-traits = { path = "./traits", version = "= 0.119.0-pre" } -tokio = { version = "1", features = ["sync", "process"] } +tokio = { version = "1", features = ["sync", "macros"] } [dev-dependencies] ckb-test-chain-utils = { path = "../util/test-chain-utils", version = "= 0.119.0-pre" } diff --git a/verification/src/transaction_verifier.rs b/verification/src/transaction_verifier.rs index b6317cebaa..62be30fcfb 100644 --- a/verification/src/transaction_verifier.rs +++ b/verification/src/transaction_verifier.rs @@ -5,7 +5,9 @@ use ckb_chain_spec::consensus::Consensus; use ckb_dao::DaoCalculator; use ckb_dao_utils::DaoError; use ckb_error::Error; -use ckb_script::{ChunkCommand, TransactionScriptsVerifier, TransactionSnapshot}; +#[cfg(not(target_family = "wasm"))] +use ckb_script::ChunkCommand; +use ckb_script::{TransactionScriptsVerifier, TransactionSnapshot}; use ckb_traits::{ CellDataProvider, EpochProvider, ExtensionProvider, HeaderFieldsProvider, HeaderProvider, }; @@ -175,6 +177,7 @@ where /// Perform context-dependent verification with command /// The verification will be interrupted when receiving a Suspend command + #[cfg(not(target_family = "wasm"))] pub async fn verify_with_pause( &self, max_cycles: Cycle,