From 774b7b12fb69581bd32128048ba40fa20d13b286 Mon Sep 17 00:00:00 2001 From: l-7-l <571234647@qq.com> Date: Thu, 5 Sep 2024 15:20:58 +0800 Subject: [PATCH 1/2] feat: query config listeners info --- src/config/core.rs | 109 ++++++++++++++++++++++++++++++---- src/config/dal.rs | 19 +++++- src/console/api.rs | 6 +- src/console/config_api.rs | 37 +++++++++++- src/console/model/mod.rs | 1 + src/console/model/paginate.rs | 24 ++++++++ src/openapi/config/api.rs | 57 +++++++++++++----- 7 files changed, 221 insertions(+), 32 deletions(-) create mode 100644 src/console/model/paginate.rs diff --git a/src/config/core.rs b/src/config/core.rs index 14e54200..efd714fa 100644 --- a/src/config/core.rs +++ b/src/config/core.rs @@ -9,6 +9,7 @@ use std::sync::Arc; use std::sync::Weak; use std::time::Duration; +use crate::console::model::paginate::PaginateQuery; use crate::raft::store::ClientRequest; use crate::raft::NacosRaft; use crate::utils::get_md5; @@ -21,6 +22,8 @@ use actix::prelude::*; use super::config_subscribe::Subscriber; use super::dal::ConfigHistoryParam; +use super::dal::ConfigListenerDo; +use super::dal::QueryListeners; use crate::config::config_index::{ConfigQueryParam, TenantIndex}; use crate::config::config_type::ConfigType; use crate::config::model::{ @@ -297,14 +300,48 @@ pub enum ListenerResult { DATA(Vec), } +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(transparent)] +pub struct AppName(String); + +impl AppName { + pub fn as_str(&self) -> &str { + &self.0 + } +} + +impl Default for AppName { + fn default() -> Self { + Self("unknown".to_owned()) + } +} + +#[derive(Debug, Clone, Serialize)] +pub struct ConfigListenerInfo { + /// 客户端设置的应用名字,默认值为 unknown + /// appName set by the client, the default value is unknown + pub name: AppName, + /// 客户端 ip 地址 + /// client remote ip address + pub ip: String, + /// 客户端订阅的版本,当前取值为请求 User-Agent + /// The client version. The current value is user agent + pub version: String, +} + type ListenerSenderType = tokio::sync::oneshot::Sender; //type ListenerReceiverType = tokio::sync::oneshot::Receiver; +pub struct ConfigListenerChannel { + sender: ListenerSenderType, + info: ConfigListenerInfo, +} + pub(crate) struct ConfigListener { version: u64, listener: HashMap>, time_listener: BTreeMap>, - sender_map: HashMap, + channels: HashMap, } impl ConfigListener { @@ -313,11 +350,11 @@ impl ConfigListener { version: 0, listener: Default::default(), time_listener: Default::default(), - sender_map: Default::default(), + channels: Default::default(), } } - fn add(&mut self, items: Vec, sender: ListenerSenderType, time: i64) { + fn add(&mut self, items: Vec, channel: ConfigListenerChannel, time: i64) { self.version += 1; for item in &items { let key = item.key.clone(); @@ -330,7 +367,7 @@ impl ConfigListener { } }; } - self.sender_map.insert(self.version, sender); + self.channels.insert(self.version, channel); let once_listener = OnceListener { version: self.version, //time, @@ -349,8 +386,11 @@ impl ConfigListener { fn notify(&mut self, key: ConfigKey) { if let Some(list) = self.listener.remove(&key) { for v in list { - if let Some(sender) = self.sender_map.remove(&v) { - sender.send(ListenerResult::DATA(vec![key.clone()])).ok(); + if let Some(sender) = self.channels.remove(&v) { + sender + .sender + .send(ListenerResult::DATA(vec![key.clone()])) + .ok(); } } } @@ -364,8 +404,8 @@ impl ConfigListener { keys.push(*key); for item in list { let v = item.version; - if let Some(sender) = self.sender_map.remove(&v) { - sender.send(ListenerResult::NULL).ok(); + if let Some(channel) = self.channels.remove(&v) { + channel.sender.send(ListenerResult::NULL).ok(); } } } else { @@ -378,7 +418,7 @@ impl ConfigListener { } pub(crate) fn get_listener_client_size(&self) -> usize { - self.sender_map.len() + self.channels.len() } pub(crate) fn get_listener_key_size(&self) -> usize { @@ -531,6 +571,31 @@ impl ConfigActor { Ok(()) } + pub fn get_config_listeners( + &self, + config_key: &ConfigKey, + pagiante: &PaginateQuery, + ) -> (usize, Vec) { + let ids = self.listener.listener.get(config_key).unwrap(); + let cursors = &ids[pagiante.page_no - 1..pagiante.page_size]; + + let subscribers = cursors + .iter() + .copied() + .filter_map(|id| { + self.listener + .channels + .get(&id) + .map(|channel| ConfigListenerDo { + id, + info: channel.info.clone(), + }) + }) + .collect::>(); + + (ids.len(), subscribers) + } + pub fn get_config_info_page(&self, param: &ConfigQueryParam) -> (usize, Vec) { let (size, list) = self.tenant_index.query_config_page(param); @@ -653,7 +718,13 @@ pub enum ConfigCmd { GET(ConfigKey), QueryPageInfo(Box), QueryHistoryPageInfo(Box), - LISTENER(Vec, ListenerSenderType, i64), + Listener( + Vec, + ListenerSenderType, + ConfigListenerInfo, + i64, + ), + QueryListeners(QueryListeners), Subscribe(Vec, Arc), RemoveSubscribe(Vec, Arc), RemoveSubscribeClient(Arc), @@ -685,6 +756,7 @@ pub enum ConfigResult { ChangeKey(Vec), ConfigInfoPage(usize, Vec), ConfigHistoryInfoPage(usize, Vec), + ConfigListenerInfoPage(usize, Vec), } impl Actor for ConfigActor { @@ -727,7 +799,13 @@ impl Handler for ConfigActor { }); } } - ConfigCmd::LISTENER(items, sender, time) => { + ConfigCmd::QueryListeners(cmd) => { + let (total, subscribers) = + self.get_config_listeners(&cmd.config_key, &cmd.paginate); + + return Ok(ConfigResult::ConfigListenerInfoPage(total, subscribers)); + } + ConfigCmd::Listener(items, sender, subscribe_info, time) => { let mut changes = vec![]; for item in &items { if let Some(v) = self.cache.get(&item.key) { @@ -742,7 +820,14 @@ impl Handler for ConfigActor { sender.send(ListenerResult::DATA(changes)).ok(); return Ok(ConfigResult::NULL); } else { - self.listener.add(items, sender, time); + self.listener.add( + items, + ConfigListenerChannel { + sender, + info: subscribe_info, + }, + time, + ); return Ok(ConfigResult::NULL); } } diff --git a/src/config/dal.rs b/src/config/dal.rs index 72ac5392..a23d04d4 100644 --- a/src/config/dal.rs +++ b/src/config/dal.rs @@ -3,10 +3,13 @@ use rusqlite::{Connection, Row}; use serde::{Deserialize, Serialize}; use std::rc::Rc; -use crate::common::rusqlite_utils::{ - get_row_value, sqlite_execute, sqlite_fetch, sqlite_fetch_count, +use crate::{ + common::rusqlite_utils::{get_row_value, sqlite_execute, sqlite_fetch, sqlite_fetch_count}, + console::model::paginate::PaginateQuery, }; +use super::core::{ConfigKey, ConfigListenerInfo}; + #[derive(Debug, Default, Serialize, Deserialize)] pub struct ConfigDO { pub id: Option, @@ -418,3 +421,15 @@ impl ConfigHistoryDao { self.fetch_count(&sql, &args) } } + +pub struct QueryListeners { + pub config_key: ConfigKey, + pub paginate: PaginateQuery, +} + +#[derive(Debug, Serialize)] +pub struct ConfigListenerDo { + pub id: u64, + #[serde(flatten)] + pub info: ConfigListenerInfo, +} diff --git a/src/console/api.rs b/src/console/api.rs index c29655c8..3e5da016 100644 --- a/src/console/api.rs +++ b/src/console/api.rs @@ -12,7 +12,7 @@ use crate::openapi::naming::service::{query_service, remove_service, update_serv //use crate::console::raft_api::{raft_add_learner, raft_change_membership, raft_init, raft_metrics, raft_read, raft_write}; use super::cluster_api::query_cluster_info; -use super::config_api::query_config_list; +use super::config_api::{query_config_list, query_config_listener_list}; use super::{ config_api::{download_config, import_config, query_history_config_page}, connection_api::query_grpc_connection, @@ -111,6 +111,10 @@ pub fn console_api_config(config: &mut web::ServiceConfig) { .route(web::delete().to(remove_namespace)), ) .service(web::resource("/configs").route(web::get().to(query_config_list))) + .service( + web::resource("/configs/:id/listeners") + .route(web::get().to(query_config_listener_list)), + ) .service(web::resource("/config/import").route(web::post().to(import_config))) .service(web::resource("/config/download").route(web::get().to(download_config))) .service( diff --git a/src/console/config_api.rs b/src/console/config_api.rs index 8fe70eea..c57cc99a 100644 --- a/src/console/config_api.rs +++ b/src/console/config_api.rs @@ -8,13 +8,14 @@ use actix_multipart::form::tempfile::TempFile; use actix_multipart::form::text::Text; use actix_multipart::form::MultipartForm; use actix_multipart::Multipart; -use actix_web::{http::header, web, Error, HttpRequest, HttpResponse, Responder}; +use actix_web::{error, http::header, web, Error, HttpRequest, HttpResponse, Responder, Result}; use zip::write::FileOptions; use crate::common::appdata::AppShareData; use crate::config::core::{ ConfigActor, ConfigAsyncCmd, ConfigCmd, ConfigInfoDto, ConfigKey, ConfigResult, }; +use crate::config::dal::QueryListeners; use crate::config::ConfigUtils; use crate::console::model::config_model::{ OpsConfigOptQueryListResponse, OpsConfigQueryListRequest, @@ -27,6 +28,7 @@ use uuid::Uuid; use zip::{ZipArchive, ZipWriter}; use super::model::config_model::OpsConfigImportInfo; +use super::model::paginate::{PaginateQuery, PaginateResponse}; use super::model::PageResult; pub async fn query_config_list( @@ -165,7 +167,7 @@ fn zip_file(mut zip: ZipWriter<&mut File>, list: Vec) -> anyhow:: .compression_method(zip::CompressionMethod::Stored) .unix_permissions(0o755); zip.start_file( - &format!("{}/{}", &item.group.as_str(), &item.data_id.as_str()), + format!("{}/{}", item.group.as_str(), item.data_id.as_str()), options, )?; zip.write_all(item.content.as_ref().unwrap().as_bytes())?; @@ -212,3 +214,34 @@ pub async fn download_config( Err(err) => HttpResponse::InternalServerError().body(err.to_string()), } } + +pub async fn query_config_listener_list( + query: web::Query, + path: web::Path, + config_addr: web::Data>, +) -> Result { + let config_key = ConfigKey::from(path.as_str()); + + config_key.is_valid().map_err(error::ErrorBadRequest)?; + + let cmd = ConfigCmd::QueryListeners(QueryListeners { + config_key, + paginate: query.into_inner(), + }); + + let result = config_addr + .send(cmd) + .await + .map_err(error::ErrorInternalServerError)? + .map_err(error::ErrorInternalServerError)?; + + match result { + ConfigResult::ConfigListenerInfoPage(count, subscribers) => { + Ok(web::Json(PaginateResponse { + count, + list: subscribers, + })) + } + _ => Err(error::ErrorInternalServerError("config result error")), + } +} diff --git a/src/console/model/mod.rs b/src/console/model/mod.rs index 7cf4b571..a76d73c6 100644 --- a/src/console/model/mod.rs +++ b/src/console/model/mod.rs @@ -3,6 +3,7 @@ pub mod config_model; pub mod login_model; pub mod metrics_model; pub mod naming_model; +pub mod paginate; pub mod raft_model; pub mod user_model; diff --git a/src/console/model/paginate.rs b/src/console/model/paginate.rs new file mode 100644 index 00000000..2acc58c1 --- /dev/null +++ b/src/console/model/paginate.rs @@ -0,0 +1,24 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct PaginateQuery { + pub page_no: usize, + pub page_size: usize, +} + +impl Default for PaginateQuery { + fn default() -> Self { + Self { + page_no: 1, + page_size: 0xffff_ffff, + } + } +} + +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct PaginateResponse { + pub count: usize, + pub list: Vec, +} diff --git a/src/openapi/config/api.rs b/src/openapi/config/api.rs index 868d2768..c34962a8 100644 --- a/src/openapi/config/api.rs +++ b/src/openapi/config/api.rs @@ -1,4 +1,3 @@ -use std::cmp::{max, min}; use std::collections::HashMap; use std::sync::Arc; @@ -14,7 +13,8 @@ use crate::common::web_utils::get_req_body; use crate::config::config_index::ConfigQueryParam; use crate::config::config_type::ConfigType; use crate::config::core::{ - ConfigActor, ConfigCmd, ConfigInfoDto, ConfigKey, ConfigResult, ListenerItem, ListenerResult, + AppName, ConfigActor, ConfigCmd, ConfigInfoDto, ConfigKey, ConfigListenerInfo, ConfigResult, + ListenerItem, ListenerResult, }; use crate::config::utils::param_utils; use crate::config::ConfigUtils; @@ -363,15 +363,19 @@ async fn do_search_config( } #[derive(Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] pub struct ListenerParams { #[serde(rename(serialize = "Listening-Configs", deserialize = "Listening-Configs"))] configs: Option, + #[serde(default)] + app_name: AppName, } impl ListenerParams { pub fn select_option(&self, o: &Self) -> Self { Self { configs: select_option_by_clone(&self.configs, &o.configs), + app_name: self.app_name.clone(), } } @@ -386,25 +390,21 @@ pub(super) async fn listener_config( a: web::Query, payload: web::Payload, config_addr: web::Data>, -) -> impl Responder { +) -> Result { let body = match get_req_body(payload).await { Ok(v) => v, - Err(err) => { - return HttpResponse::InternalServerError().body(err.to_string()); - } + Err(err) => return Err(actix_web::error::ErrorInternalServerError(err.to_string())), }; let b = match serde_urlencoded::from_bytes(&body) { Ok(v) => v, - Err(err) => { - return HttpResponse::InternalServerError().body(err.to_string()); - } + Err(err) => return Err(actix_web::error::ErrorInternalServerError(err.to_string())), }; let list = a.select_option(&b).to_items(); if list.is_empty() { //println!("listener_config error: listener item len == 0"); - return HttpResponse::NoContent() + return Ok(HttpResponse::NoContent() .content_type("text/html; charset=utf-8") - .body("error:listener empty"); + .body("error:listener empty")); } let (tx, rx) = tokio::sync::oneshot::channel(); let current_time = Local::now().timestamp_millis(); @@ -412,15 +412,39 @@ pub(super) async fn listener_config( if let Some(_timeout) = _req.headers().get("Long-Pulling-Timeout") { match _timeout.to_str().unwrap().parse::() { Ok(v) => { - time_out = current_time + min(max(10000, v), 120000) - 500; + time_out = current_time + v.clamp(10000, 120000) - 500; } Err(_) => { time_out = 0; } } } + + let version = _req + .headers() + .get("User-Agent") + .and_then(|x| x.to_str().ok()) + .ok_or(actix_web::error::ErrorNotAcceptable( + "error:User-Agent required", + ))? + .to_string(); + + let ip = _req + .connection_info() + .realip_remote_addr() + .ok_or(actix_web::error::ErrorNotAcceptable( + "error:parse ip failed", + ))? + .to_string(); + + let subscribe_info = ConfigListenerInfo { + name: b.app_name, + ip, + version, + }; + //println!("timeout header:{:?},time_out:{}",_req.headers().get("Long-Pulling-Timeout") ,time_out); - let cmd = ConfigCmd::LISTENER(list, tx, time_out); + let cmd = ConfigCmd::Listener(list, tx, subscribe_info, time_out); let _ = config_addr.send(cmd).await; let res = rx.await.unwrap(); let v = match res { @@ -437,7 +461,10 @@ pub(super) async fn listener_config( } ListenerResult::NULL => "".to_owned(), }; - HttpResponse::Ok() + + let res = HttpResponse::Ok() .content_type("text/html; charset=utf-8") - .body(v) + .body(v); + + Ok(res) } From dbcfa5f9d24afd355f7cc2b9d3d30eb99dfe53cf Mon Sep 17 00:00:00 2001 From: l-7-l <571234647@qq.com> Date: Thu, 5 Sep 2024 15:49:35 +0800 Subject: [PATCH 2/2] fix: set a min value of 1 for the page_no in paginate --- src/console/model/paginate.rs | 41 ++++++++++++++++++++++++++++++++--- 1 file changed, 38 insertions(+), 3 deletions(-) diff --git a/src/console/model/paginate.rs b/src/console/model/paginate.rs index 2acc58c1..6006e132 100644 --- a/src/console/model/paginate.rs +++ b/src/console/model/paginate.rs @@ -1,16 +1,51 @@ +use std::ops::Sub; + use serde::{Deserialize, Serialize}; -#[derive(Debug, Deserialize)] +#[derive(Debug, Clone, Copy, Deserialize)] +#[serde(from = "String")] +pub struct Page(usize); + +impl Page { + pub fn new(page: usize) -> Self { + Self(page.max(1)) + } + pub fn as_usize(&self) -> usize { + self.0 + } +} + +impl Default for Page { + fn default() -> Self { + Self(1) + } +} + +impl Sub for Page { + type Output = usize; + + fn sub(self, rhs: usize) -> Self::Output { + self.0 - rhs + } +} + +impl From for Page { + fn from(page: String) -> Self { + Self(page.parse::().unwrap_or(1)) + } +} + +#[derive(Debug, Clone, Deserialize)] #[serde(rename_all = "camelCase")] pub struct PaginateQuery { - pub page_no: usize, + pub page_no: Page, pub page_size: usize, } impl Default for PaginateQuery { fn default() -> Self { Self { - page_no: 1, + page_no: Page::default(), page_size: 0xffff_ffff, } }