Skip to content

Commit a5df51a

Browse files
authored
CLI tool update (#20)
* update: working cli (v1 runner mode) * update: cli tool * update: fix small v0 multi listener bug + v0 multi listener tests + more cli dev * fix: modify test for v0 multi listener * update: cli tool for all roles, docs * remove 🚧 in cli readme [skip ci]
1 parent c12f7b6 commit a5df51a

File tree

17 files changed

+331
-168
lines changed

17 files changed

+331
-168
lines changed

crates/water/src/runtime/client.rs

+10-10
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ impl WATERClient {
9797
}
9898

9999
/// keep_listen is the function that is called when user wants to accept a newly income connection,
100-
/// it creates a new WASM instance and migrate the previous listener to it. Used by v0 listener and relay for now.
100+
/// it creates a new WASM instance and migrate the previous listener to it. -- v0_plus listener and relay for now.
101101
pub fn keep_listen(&mut self) -> Result<Self, anyhow::Error> {
102102
info!("[HOST] WATERClient keep listening...",);
103103

@@ -128,7 +128,7 @@ impl WATERClient {
128128
self.debug = debug;
129129
}
130130

131-
/// `connect` is the entry point for `Dialer` to connect to a remote address
131+
/// `connect` is the function for `Dialer` to connect to a remote address
132132
pub fn connect(&mut self) -> Result<(), anyhow::Error> {
133133
info!("[HOST] WATERClient connecting ...");
134134

@@ -161,7 +161,7 @@ impl WATERClient {
161161
Ok(())
162162
}
163163

164-
/// `associate` is the entry point for `Relay` to associate with a remote addr
164+
/// `associate` is the function for `Relay` to associate a remote connection
165165
pub fn associate(&mut self) -> Result<(), anyhow::Error> {
166166
info!("[HOST] WATERClient relaying ...");
167167

@@ -176,8 +176,8 @@ impl WATERClient {
176176
Ok(())
177177
}
178178

179-
/// `accept` is the entry point for `Listener` to accept a connection
180-
/// called after `listen`
179+
/// `accept` is the function for `Listener` to accept a connection
180+
/// called after `listen()`
181181
pub fn accept(&mut self) -> Result<(), anyhow::Error> {
182182
info!("[HOST] WATERClient accepting ...");
183183

@@ -192,8 +192,8 @@ impl WATERClient {
192192
Ok(())
193193
}
194194

195-
/// `run_worker` is the entry point for `Runner` to run the entry_fn(a worker in WATM) in a separate thread
196-
/// it will return a `JoinHandle` for the caller to manage the thread -- used by v0 currently
195+
/// `run_worker` is the function to run the entry_fn(a worker in WATM) in a separate thread and return the thread handle
196+
/// it will return a `JoinHandle` for the caller to manage the thread -- used by v0_plus
197197
pub fn run_worker(
198198
&mut self,
199199
) -> Result<std::thread::JoinHandle<Result<(), anyhow::Error>>, anyhow::Error> {
@@ -207,7 +207,7 @@ impl WATERClient {
207207
}
208208
}
209209

210-
/// `execute` is the entry point for `Runner` to run the entry_fn(a worker in WATM) in the current thread
210+
/// `execute` is the function to run the entry_fn(a worker in WATM) in the current thread
211211
/// -- replace the thread running Host when running it <- used by v1 currently
212212
pub fn execute(&mut self) -> Result<(), anyhow::Error> {
213213
info!("[HOST] WATERClient Executing ...");
@@ -229,7 +229,7 @@ impl WATERClient {
229229
Ok(())
230230
}
231231

232-
/// `cancel_with` is the function to set the pipe for canceling later -- v0
232+
/// `cancel_with` is the function to set the cancel pipe for exiting later -- v0_plus
233233
pub fn cancel_with(&mut self) -> Result<(), anyhow::Error> {
234234
info!("[HOST] WATERClient cancel_with ...");
235235

@@ -251,7 +251,7 @@ impl WATERClient {
251251
Ok(())
252252
}
253253

254-
/// `cancel` is the function to cancel the thread running the entry_fn -- v0
254+
/// `cancel` is the function to send thru the cancel_pipe and let the thread running the worker to exit -- v0_plus
255255
pub fn cancel(&mut self) -> Result<(), anyhow::Error> {
256256
info!("[HOST] WATERClient canceling ...");
257257

crates/water/src/runtime/core.rs

+53-54
Original file line numberDiff line numberDiff line change
@@ -75,62 +75,10 @@ impl H2O<Host> {
7575
return Err(anyhow::Error::msg("WATM module version not found"));
7676
}
7777

78-
Self::setup_core(conf, linker, store, module, engine, version)
78+
Self::create_core(conf, linker, store, module, engine, version)
7979
}
8080

81-
/// This function is for migrating the v0 core for listener and relay
82-
/// to handle every new connection will create a new separate core (as v0 spec)
83-
pub fn v0_migrate_core(conf: &WATERConfig, core: &H2O<Host>) -> Result<Self, anyhow::Error> {
84-
info!("[HOST] WATERCore H2O v0_migrating...");
85-
86-
// reseting the listener accepted_fd or the relay's accepted_fd & dial_fd
87-
// when migrating from existed listener / relay
88-
let version = match &core.version {
89-
Version::V0(v0conf) => {
90-
match v0conf {
91-
Some(og_v0_conf) => match og_v0_conf.lock() {
92-
Ok(og_v0_conf) => {
93-
let mut new_v0_conf_inner = og_v0_conf.clone();
94-
// reset the new cloned v0conf
95-
new_v0_conf_inner.reset_listener_or_relay();
96-
97-
Version::V0(Some(Arc::new(Mutex::new(new_v0_conf_inner))))
98-
}
99-
Err(e) => {
100-
return Err(anyhow::anyhow!("Failed to lock v0_conf: {}", e))?;
101-
}
102-
},
103-
None => {
104-
return Err(anyhow::anyhow!("v0_conf is None"))?;
105-
}
106-
}
107-
}
108-
_ => {
109-
return Err(anyhow::anyhow!("This is not a V0 core"))?;
110-
}
111-
};
112-
113-
// NOTE: Some of the followings can reuse the existing core, leave to later explore
114-
let wasm_config = wasmtime::Config::new();
115-
116-
#[cfg(feature = "multithread")]
117-
{
118-
wasm_config.wasm_threads(true);
119-
}
120-
121-
let engine = Engine::new(&wasm_config)?;
122-
let linker: Linker<Host> = Linker::new(&engine);
123-
124-
let module = Module::from_file(&engine, &conf.filepath)?;
125-
126-
let host = Host::default();
127-
let store = Store::new(&engine, host);
128-
129-
Self::setup_core(conf, linker, store, module, engine, Some(version))
130-
}
131-
132-
/// called by init_core() or v0_migrate_core() to setup the core (reduce code duplication)
133-
pub fn setup_core(
81+
pub fn create_core(
13482
conf: &WATERConfig,
13583
mut linker: Linker<Host>,
13684
mut store: Store<Host>,
@@ -213,6 +161,57 @@ impl H2O<Host> {
213161
})
214162
}
215163

164+
// This function is for migrating the v0 core for listener and relay
165+
// to handle every new connection is creating a new separate core (as v0 spec)
166+
pub fn v0_migrate_core(conf: &WATERConfig, core: &H2O<Host>) -> Result<Self, anyhow::Error> {
167+
info!("[HOST] WATERCore H2O v0_migrating...");
168+
169+
// reseting the listener accepted_fd or the relay's accepted_fd & dial_fd
170+
// when migrating from existed listener / relay
171+
let version = match &core.version {
172+
Version::V0(v0conf) => {
173+
match v0conf {
174+
Some(og_v0_conf) => match og_v0_conf.lock() {
175+
Ok(og_v0_conf) => {
176+
let mut new_v0_conf_inner = og_v0_conf.clone();
177+
// reset the new cloned v0conf
178+
new_v0_conf_inner.reset_listener_or_relay();
179+
180+
Version::V0(Some(Arc::new(Mutex::new(new_v0_conf_inner))))
181+
}
182+
Err(e) => {
183+
return Err(anyhow::anyhow!("Failed to lock v0_conf: {}", e))?;
184+
}
185+
},
186+
None => {
187+
return Err(anyhow::anyhow!("v0_conf is None"))?;
188+
}
189+
}
190+
}
191+
_ => {
192+
return Err(anyhow::anyhow!("This is not a V0 core"))?;
193+
}
194+
};
195+
196+
// NOTE: Some of the followings can reuse the existing core, leave to later explore
197+
let wasm_config = wasmtime::Config::new();
198+
199+
#[cfg(feature = "multithread")]
200+
{
201+
wasm_config.wasm_threads(true);
202+
}
203+
204+
let engine = Engine::new(&wasm_config)?;
205+
let linker: Linker<Host> = Linker::new(&engine);
206+
207+
let module = Module::from_file(&engine, &conf.filepath)?;
208+
209+
let host = Host::default();
210+
let store = Store::new(&engine, host);
211+
212+
Self::create_core(conf, linker, store, module, engine, Some(version))
213+
}
214+
216215
pub fn _prepare(&mut self, conf: &WATERConfig) -> Result<(), anyhow::Error> {
217216
self._init(conf.debug)?;
218217
self._process_config(conf)?; // This is for now needed only by v1_preview

crates/water/src/runtime/transport.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ pub trait WATERTransportTrait: Send {
1515
// read from WASM's caller_reader
1616
match caller_io {
1717
Some(ref mut caller_io) => match caller_io.read(buf) {
18-
Ok(n) => Ok(n as i64),
18+
Ok(n) if n > 0 => Ok(n as i64),
19+
Ok(_) => Err(anyhow::Error::msg("Stream closed or read 0 bytes")),
1920
Err(e) => Err(anyhow::Error::msg(format!(
2021
"failed to read from caller_reader: {}",
2122
e

crates/water/src/runtime/v0/config.rs

+9-8
Original file line numberDiff line numberDiff line change
@@ -93,12 +93,14 @@ impl V0Config {
9393
})
9494
}
9595

96+
/// It will connect to the remote addr and set the fd in the V0Config
9697
pub fn connect(&mut self) -> Result<std::net::TcpStream, anyhow::Error> {
9798
let addr = format!("{}:{}", self.remote_addr, self.remote_port);
9899

99100
info!("[HOST] WATERCore V0 connecting to {}", addr);
100101

101102
match &mut self.conn {
103+
// if the V0CRole is Relay, then it will remain as Relay
102104
V0CRole::Relay(_, _, ref mut conn_fd) => {
103105
// now relay has been built, need to dial
104106
if *conn_fd != -1 {
@@ -109,6 +111,7 @@ impl V0Config {
109111
*conn_fd = conn.as_raw_fd();
110112
Ok(conn)
111113
}
114+
// if the V0CRole has not been set, and connect() was called, then it should be a dialer
112115
V0CRole::Unknown => {
113116
let conn = std::net::TcpStream::connect(addr)?;
114117
self.conn = V0CRole::Dialer(conn.as_raw_fd());
@@ -118,6 +121,7 @@ impl V0Config {
118121
}
119122
}
120123

124+
/// It will create a listener and set the fd in the V0Config (for either listener or relay)
121125
pub fn create_listener(&mut self, is_relay: bool) -> Result<(), anyhow::Error> {
122126
let addr = format!("{}:{}", self.loc_addr, self.loc_port);
123127

@@ -133,6 +137,7 @@ impl V0Config {
133137
Ok(())
134138
}
135139

140+
/// It will accept a connection and set the fd in the V0Config (for either listener or relay)
136141
pub fn accept(&mut self) -> Result<std::net::TcpStream, anyhow::Error> {
137142
info!("[HOST] WATERCore V0 accept with conn {:?} ...", self.conn);
138143

@@ -146,7 +151,7 @@ impl V0Config {
146151

147152
let (stream, _) = listener.accept()?;
148153

149-
*listener_fd = listener.into_raw_fd(); // makde sure the listener is not closed after scope
154+
*listener_fd = listener.into_raw_fd(); // made sure the listener is not closed after scope
150155
*accepted_fd = stream.as_raw_fd();
151156

152157
Ok(stream)
@@ -158,14 +163,15 @@ impl V0Config {
158163

159164
let listener = unsafe { std::net::TcpListener::from_raw_fd(*listener_fd) };
160165
let (stream, _) = listener.accept()?;
161-
*listener_fd = listener.into_raw_fd(); // makde sure the listener is not closed after scope
166+
*listener_fd = listener.into_raw_fd(); // made sure the listener is not closed after scope
162167
*accepted_fd = stream.as_raw_fd();
163168
Ok(stream)
164169
}
165170
_ => Err(anyhow::Error::msg("not a listener")),
166171
}
167172
}
168173

174+
/// It will close the connection to remote / accepted connection listened and exit gracefully
169175
pub fn defer(&mut self) {
170176
info!("[HOST] WATERCore V0 defer with conn {:?} ...", self.conn);
171177

@@ -193,6 +199,7 @@ impl V0Config {
193199
}
194200
}
195201

202+
/// It is used for listener and relay only, to reset the accepted connection in the migrated listener / relay
196203
pub fn reset_listener_or_relay(&mut self) {
197204
info!(
198205
"[HOST] WATERCore v0 reset lisener / relay with conn {:?} ...",
@@ -202,21 +209,15 @@ impl V0Config {
202209
match self.conn {
203210
V0CRole::Listener(_, ref mut accepted_fd) => {
204211
if *accepted_fd != -1 {
205-
let accepted_conn = unsafe { std::net::TcpStream::from_raw_fd(*accepted_fd) };
206-
drop(accepted_conn);
207212
*accepted_fd = -1; // set it back to default
208213
}
209214
}
210215
V0CRole::Relay(_, ref mut accepted_fd, ref mut conn_fd) => {
211216
if *accepted_fd != -1 {
212-
let accepted_conn = unsafe { std::net::TcpStream::from_raw_fd(*accepted_fd) };
213-
drop(accepted_conn);
214217
*accepted_fd = -1; // set it back to default
215218
}
216219

217220
if *conn_fd != -1 {
218-
let conn = unsafe { std::net::TcpStream::from_raw_fd(*conn_fd) };
219-
drop(conn);
220221
*conn_fd = -1; // set it back to default
221222
}
222223
}

examples/clients/cli/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -17,5 +17,6 @@ clap = { version="4.2.1", features = ["derive"] }
1717
anyhow = "1.0.7"
1818
tracing = "0.1"
1919
tracing-subscriber = "0.3.17"
20+
tokio = { version = "1", features = ["full"] }
2021

2122
water = {path="../../../crates/water", version="0.1.0"}

0 commit comments

Comments
 (0)