Skip to content

Commit d4fb606

Browse files
committedMar 19, 2025
g3tiles: add backend alive-connection rpc method
1 parent 53b6f03 commit d4fb606

File tree

17 files changed

+181
-2
lines changed

17 files changed

+181
-2
lines changed
 

‎g3tiles/proto/build.rs

+1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ fn main() {
2020
.file("schema/types.capnp")
2121
.file("schema/proc.capnp")
2222
.file("schema/server.capnp")
23+
.file("schema/backend.capnp")
2324
.run()
2425
.unwrap();
2526
}

‎g3tiles/proto/schema/backend.capnp

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
@0xb3f076019fd2cc68;
2+
3+
interface BackendControl {
4+
aliveConnection @0 () -> (count :UInt64);
5+
}

‎g3tiles/proto/schema/proc.capnp

+3
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ using Types = import "types.capnp";
44

55
using Server = import "server.capnp";
66

7+
using Backend = import "backend.capnp";
8+
79
interface ProcControl {
810
#
911

@@ -24,4 +26,5 @@ interface ProcControl {
2426

2527
reloadBackend @9 (name :Text) -> (result :Types.OperationResult);
2628
listBackend @10 () -> (result :List(Text));
29+
getBackend @13 (name: Text) -> (backend :Types.FetchResult(Backend.BackendControl));
2730
}

‎g3tiles/proto/src/lib.rs

+4
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,7 @@ pub mod proc_capnp {
2828
pub mod server_capnp {
2929
include!(concat!(env!("OUT_DIR"), "/server_capnp.rs"));
3030
}
31+
32+
pub mod backend_capnp {
33+
include!(concat!(env!("OUT_DIR"), "/backend_capnp.rs"));
34+
}

‎g3tiles/src/backend/dummy_close.rs

+4
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,10 @@ impl Backend for DummyCloseBackend {
8585
Ok(())
8686
}
8787

88+
fn alive_connection(&self) -> u64 {
89+
0
90+
}
91+
8892
async fn stream_connect(&self, _task_notes: &ServerTaskNotes) -> StreamConnectResult {
8993
Err(StreamConnectError::UpstreamNotResolved)
9094
}

‎g3tiles/src/backend/keyless_quic/mod.rs

+4
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,10 @@ impl Backend for KeylessQuicBackend {
196196
Ok(())
197197
}
198198

199+
fn alive_connection(&self) -> u64 {
200+
self.pool_handle.alive_connection()
201+
}
202+
199203
async fn keyless(&self, req: KeylessRequest) -> KeylessResponse {
200204
let err = KeylessInternalErrorResponse::new(req.header());
201205
if !self.config.wait_new_channel && self.stats.alive_channel() <= 0 {

‎g3tiles/src/backend/keyless_tcp/mod.rs

+4
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,10 @@ impl Backend for KeylessTcpBackend {
207207
Ok(())
208208
}
209209

210+
fn alive_connection(&self) -> u64 {
211+
self.pool_handle.alive_connection()
212+
}
213+
210214
async fn keyless(&self, req: KeylessRequest) -> KeylessResponse {
211215
let err = KeylessInternalErrorResponse::new(req.header());
212216
if !self.config.wait_new_channel && self.stats.alive_channel() <= 0 {

‎g3tiles/src/backend/mod.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ mod stream_tcp;
3535

3636
mod ops;
3737
pub use ops::load_all;
38-
pub(crate) use ops::{reload, update_dependency_to_discover};
38+
pub(crate) use ops::{get_backend, reload, update_dependency_to_discover};
3939

4040
mod registry;
4141
pub(crate) use registry::{get_names, get_or_insert_default};
@@ -53,6 +53,8 @@ pub(crate) trait Backend {
5353
fn discover(&self) -> &NodeName;
5454
fn update_discover(&self) -> anyhow::Result<()>;
5555

56+
fn alive_connection(&self) -> u64;
57+
5658
async fn stream_connect(&self, _task_notes: &ServerTaskNotes) -> StreamConnectResult {
5759
Err(StreamConnectError::UpstreamNotResolved) // TODO
5860
}

‎g3tiles/src/backend/ops.rs

+7
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,13 @@ pub async fn load_all() -> anyhow::Result<()> {
6969
Ok(())
7070
}
7171

72+
pub(crate) fn get_backend(name: &NodeName) -> anyhow::Result<ArcBackend> {
73+
match registry::get(name) {
74+
Some(backend) => Ok(backend),
75+
None => Err(anyhow!("no backend named {name} found")),
76+
}
77+
}
78+
7279
pub(crate) async fn reload(
7380
name: &NodeName,
7481
position: Option<YamlDocPosition>,

‎g3tiles/src/backend/stream_tcp.rs

+5
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,11 @@ impl Backend for StreamTcpBackend {
179179
Ok(())
180180
}
181181

182+
fn alive_connection(&self) -> u64 {
183+
// TODO add alive connection stats
184+
0
185+
}
186+
182187
async fn stream_connect(&self, task_notes: &ServerTaskNotes) -> StreamConnectResult {
183188
let Some(next_addr) = self.select_peer(task_notes) else {
184189
return Err(StreamConnectError::UpstreamNotResolved);

‎g3tiles/src/control/capnp/backend.rs

+47
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright 2025 ByteDance and/or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
use capnp::capability::Promise;
18+
19+
use g3_types::metrics::NodeName;
20+
21+
use g3tiles_proto::backend_capnp::backend_control;
22+
23+
use crate::backend::ArcBackend;
24+
25+
pub(super) struct BackendControlImpl {
26+
backend: ArcBackend,
27+
}
28+
29+
impl BackendControlImpl {
30+
pub(super) fn new_client(name: &str) -> anyhow::Result<backend_control::Client> {
31+
let name = unsafe { NodeName::new_unchecked(name) };
32+
let backend = crate::backend::get_backend(&name)?;
33+
Ok(capnp_rpc::new_client(BackendControlImpl { backend }))
34+
}
35+
}
36+
37+
impl backend_control::Server for BackendControlImpl {
38+
fn alive_connection(
39+
&mut self,
40+
_params: backend_control::AliveConnectionParams,
41+
mut results: backend_control::AliveConnectionResults,
42+
) -> Promise<(), capnp::Error> {
43+
let alive_count = self.backend.alive_connection();
44+
results.get().set_count(alive_count);
45+
Promise::ok(())
46+
}
47+
}

‎g3tiles/src/control/capnp/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ mod common;
2020
use common::set_operation_result;
2121
mod proc;
2222

23+
mod backend;
2324
mod server;
2425

2526
pub fn stop_working_thread() {

‎g3tiles/src/control/capnp/proc.rs

+14
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use capnp_rpc::pry;
1919

2020
use g3_types::metrics::NodeName;
2121

22+
use g3tiles_proto::backend_capnp::backend_control;
2223
use g3tiles_proto::proc_capnp::proc_control;
2324
use g3tiles_proto::server_capnp::server_control;
2425
use g3tiles_proto::types_capnp::fetch_result;
@@ -185,6 +186,19 @@ impl proc_control::Server for ProcControlImpl {
185186
}
186187
Promise::ok(())
187188
}
189+
190+
fn get_backend(
191+
&mut self,
192+
params: proc_control::GetBackendParams,
193+
mut results: proc_control::GetBackendResults,
194+
) -> Promise<(), capnp::Error> {
195+
let backend = pry!(pry!(pry!(params.get()).get_name()).to_str());
196+
pry!(set_fetch_result::<backend_control::Owned>(
197+
results.get().init_backend(),
198+
super::backend::BackendControlImpl::new_client(backend),
199+
));
200+
Promise::ok(())
201+
}
188202
}
189203

190204
fn set_fetch_result<'a, T>(

‎g3tiles/src/module/keyless/backend/pool.rs

+7-1
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ enum KeylessPoolCmd {
5555
#[derive(Clone)]
5656
pub(crate) struct KeylessConnectionPoolHandle {
5757
cmd_sender: mpsc::Sender<KeylessPoolCmd>,
58+
stats: Arc<ConnectionPoolStats>,
5859
}
5960

6061
impl KeylessConnectionPoolHandle {
@@ -69,6 +70,10 @@ impl KeylessConnectionPoolHandle {
6970
pub(crate) fn request_new_connection(&self) {
7071
let _ = self.cmd_sender.try_send(KeylessPoolCmd::NewConnection);
7172
}
73+
74+
pub(crate) fn alive_connection(&self) -> u64 {
75+
self.stats.alive_count() as u64
76+
}
7277
}
7378

7479
pub(crate) struct KeylessConnectionPool<C: KeylessUpstreamConnection> {
@@ -123,11 +128,12 @@ where
123128
keyless_request_receiver,
124129
graceful_close_wait,
125130
);
131+
let stats = pool.stats.clone();
126132
let (cmd_sender, cmd_receiver) = mpsc::channel(CMD_CHANNEL_SIZE);
127133
tokio::spawn(async move {
128134
pool.into_running(cmd_receiver).await;
129135
});
130-
KeylessConnectionPoolHandle { cmd_sender }
136+
KeylessConnectionPoolHandle { cmd_sender, stats }
131137
}
132138

133139
async fn into_running(mut self, mut cmd_receiver: mpsc::Receiver<KeylessPoolCmd>) {

‎g3tiles/utils/ctl/src/backend.rs

+58
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright 2025 ByteDance and/or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
use clap::{Arg, ArgMatches, Command};
18+
use futures_util::future::TryFutureExt;
19+
20+
use g3_ctl::CommandResult;
21+
22+
use g3tiles_proto::backend_capnp::backend_control;
23+
use g3tiles_proto::proc_capnp::proc_control;
24+
25+
pub const COMMAND: &str = "backend";
26+
27+
const COMMAND_ARG_NAME: &str = "name";
28+
29+
const SUBCOMMAND_ALIVE_CONNECTION: &str = "alive-connection";
30+
31+
pub fn command() -> Command {
32+
Command::new(COMMAND)
33+
.arg(Arg::new(COMMAND_ARG_NAME).required(true).num_args(1))
34+
.subcommand_required(true)
35+
.subcommand(Command::new(SUBCOMMAND_ALIVE_CONNECTION))
36+
}
37+
38+
async fn alive_connection(client: &backend_control::Client) -> CommandResult<()> {
39+
let req = client.alive_connection_request();
40+
let rsp = req.send().promise.await?;
41+
let count = rsp.get()?.get_count();
42+
println!("{count}");
43+
Ok(())
44+
}
45+
46+
pub async fn run(client: &proc_control::Client, args: &ArgMatches) -> CommandResult<()> {
47+
let name = args.get_one::<String>(COMMAND_ARG_NAME).unwrap();
48+
49+
let (subcommand, _) = args.subcommand().unwrap();
50+
match subcommand {
51+
SUBCOMMAND_ALIVE_CONNECTION => {
52+
super::proc::get_backend(client, name)
53+
.and_then(|backend| async move { alive_connection(&backend).await })
54+
.await
55+
}
56+
_ => unreachable!(),
57+
}
58+
}

‎g3tiles/utils/ctl/src/main.rs

+3
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use g3tiles_proto::proc_capnp::proc_control;
2424
mod common;
2525
mod proc;
2626

27+
mod backend;
2728
mod server;
2829

2930
fn build_cli_args() -> Command {
@@ -39,6 +40,7 @@ fn build_cli_args() -> Command {
3940
.subcommand(proc::commands::reload_discover())
4041
.subcommand(proc::commands::reload_backend())
4142
.subcommand(server::command())
43+
.subcommand(backend::command())
4244
}
4345

4446
#[tokio::main(flavor = "current_thread")]
@@ -74,6 +76,7 @@ async fn main() -> anyhow::Result<()> {
7476
proc::COMMAND_RELOAD_DISCOVER => proc::reload_discover(&proc_control, args).await,
7577
proc::COMMAND_RELOAD_BACKEND => proc::reload_backend(&proc_control, args).await,
7678
server::COMMAND => server::run(&proc_control, args).await,
79+
backend::COMMAND => backend::run(&proc_control, args).await,
7780
_ => Err(CommandError::Cli(anyhow!(
7881
"unsupported command {subcommand}"
7982
))),

‎g3tiles/utils/ctl/src/proc.rs

+11
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use clap::ArgMatches;
1818

1919
use g3_ctl::CommandResult;
2020

21+
use g3tiles_proto::backend_capnp::backend_control;
2122
use g3tiles_proto::proc_capnp::proc_control;
2223
use g3tiles_proto::server_capnp::server_control;
2324

@@ -199,3 +200,13 @@ pub(crate) async fn get_server(
199200
let rsp = req.send().promise.await?;
200201
parse_fetch_result(rsp.get()?.get_server()?)
201202
}
203+
204+
pub(crate) async fn get_backend(
205+
client: &proc_control::Client,
206+
name: &str,
207+
) -> CommandResult<backend_control::Client> {
208+
let mut req = client.get_backend_request();
209+
req.get().set_name(name);
210+
let rsp = req.send().promise.await?;
211+
parse_fetch_result(rsp.get()?.get_backend()?)
212+
}

0 commit comments

Comments
 (0)
Please sign in to comment.