Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP]feat: query config listeners info #134

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 97 additions & 12 deletions src/config/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::sync::Arc;
use std::sync::Weak;
use std::time::Duration;

use crate::console::model::paginate::PaginateQuery;
use crate::raft::store::ClientRequest;
use crate::raft::NacosRaft;
use crate::utils::get_md5;
Expand All @@ -21,6 +22,8 @@ use actix::prelude::*;

use super::config_subscribe::Subscriber;
use super::dal::ConfigHistoryParam;
use super::dal::ConfigListenerDo;
use super::dal::QueryListeners;
use crate::config::config_index::{ConfigQueryParam, TenantIndex};
use crate::config::config_type::ConfigType;
use crate::config::model::{
Expand Down Expand Up @@ -297,14 +300,48 @@ pub enum ListenerResult {
DATA(Vec<ConfigKey>),
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(transparent)]
pub struct AppName(String);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AppName 位置需要改动吗? 需要添加校验规则吗?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AppName后面注册中心应该也会用,位置可以考虑放在common/model中;
校验规则应该不需要,取值上可以把考虑只取符合规则的值,如果不合规就当做unknow。

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

好的


impl AppName {
pub fn as_str(&self) -> &str {
&self.0
}
}

impl Default for AppName {
fn default() -> Self {
Self("unknown".to_owned())
}
}

#[derive(Debug, Clone, Serialize)]
pub struct ConfigListenerInfo {
/// 客户端设置的应用名字,默认值为 unknown
/// appName set by the client, the default value is unknown
pub name: AppName,
/// 客户端 ip 地址
/// client remote ip address
pub ip: String,
/// 客户端订阅的版本,当前取值为请求 User-Agent
/// The client version. The current value is user agent
pub version: String,
}

type ListenerSenderType = tokio::sync::oneshot::Sender<ListenerResult>;
//type ListenerReceiverType = tokio::sync::oneshot::Receiver<ListenerResult>;

pub struct ConfigListenerChannel {
sender: ListenerSenderType,
info: ConfigListenerInfo,
}

pub(crate) struct ConfigListener {
version: u64,
listener: HashMap<ConfigKey, Vec<u64>>,
time_listener: BTreeMap<i64, Vec<OnceListener>>,
sender_map: HashMap<u64, ListenerSenderType>,
channels: HashMap<u64, ConfigListenerChannel>,
}

impl ConfigListener {
Expand All @@ -313,11 +350,11 @@ impl ConfigListener {
version: 0,
listener: Default::default(),
time_listener: Default::default(),
sender_map: Default::default(),
channels: Default::default(),
}
}

fn add(&mut self, items: Vec<ListenerItem>, sender: ListenerSenderType, time: i64) {
fn add(&mut self, items: Vec<ListenerItem>, channel: ConfigListenerChannel, time: i64) {
self.version += 1;
for item in &items {
let key = item.key.clone();
Expand All @@ -330,7 +367,7 @@ impl ConfigListener {
}
};
}
self.sender_map.insert(self.version, sender);
self.channels.insert(self.version, channel);
let once_listener = OnceListener {
version: self.version,
//time,
Expand All @@ -349,8 +386,11 @@ impl ConfigListener {
fn notify(&mut self, key: ConfigKey) {
if let Some(list) = self.listener.remove(&key) {
for v in list {
if let Some(sender) = self.sender_map.remove(&v) {
sender.send(ListenerResult::DATA(vec![key.clone()])).ok();
if let Some(sender) = self.channels.remove(&v) {
sender
.sender
.send(ListenerResult::DATA(vec![key.clone()]))
.ok();
}
}
}
Expand All @@ -364,8 +404,8 @@ impl ConfigListener {
keys.push(*key);
for item in list {
let v = item.version;
if let Some(sender) = self.sender_map.remove(&v) {
sender.send(ListenerResult::NULL).ok();
if let Some(channel) = self.channels.remove(&v) {
channel.sender.send(ListenerResult::NULL).ok();
}
}
} else {
Expand All @@ -378,7 +418,7 @@ impl ConfigListener {
}

pub(crate) fn get_listener_client_size(&self) -> usize {
self.sender_map.len()
self.channels.len()
}

pub(crate) fn get_listener_key_size(&self) -> usize {
Expand Down Expand Up @@ -531,6 +571,31 @@ impl ConfigActor {
Ok(())
}

pub fn get_config_listeners(
&self,
config_key: &ConfigKey,
pagiante: &PaginateQuery,
) -> (usize, Vec<ConfigListenerDo>) {
let ids = self.listener.listener.get(config_key).unwrap();
let cursors = &ids[pagiante.page_no - 1..pagiante.page_size];
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

page_no 的最小值是1


let subscribers = cursors
.iter()
.copied()
.filter_map(|id| {
self.listener
.channels
.get(&id)
.map(|channel| ConfigListenerDo {
id,
info: channel.info.clone(),
})
})
.collect::<Vec<ConfigListenerDo>>();

(ids.len(), subscribers)
}

pub fn get_config_info_page(&self, param: &ConfigQueryParam) -> (usize, Vec<ConfigInfoDto>) {
let (size, list) = self.tenant_index.query_config_page(param);

Expand Down Expand Up @@ -653,7 +718,13 @@ pub enum ConfigCmd {
GET(ConfigKey),
QueryPageInfo(Box<ConfigQueryParam>),
QueryHistoryPageInfo(Box<ConfigHistoryParam>),
LISTENER(Vec<ListenerItem>, ListenerSenderType, i64),
Listener(
Vec<ListenerItem>,
ListenerSenderType,
ConfigListenerInfo,
i64,
),
QueryListeners(QueryListeners),
Subscribe(Vec<ListenerItem>, Arc<String>),
RemoveSubscribe(Vec<ListenerItem>, Arc<String>),
RemoveSubscribeClient(Arc<String>),
Expand Down Expand Up @@ -685,6 +756,7 @@ pub enum ConfigResult {
ChangeKey(Vec<ConfigKey>),
ConfigInfoPage(usize, Vec<ConfigInfoDto>),
ConfigHistoryInfoPage(usize, Vec<ConfigHistoryInfoDto>),
ConfigListenerInfoPage(usize, Vec<ConfigListenerDo>),
}

impl Actor for ConfigActor {
Expand Down Expand Up @@ -727,7 +799,13 @@ impl Handler<ConfigCmd> for ConfigActor {
});
}
}
ConfigCmd::LISTENER(items, sender, time) => {
ConfigCmd::QueryListeners(cmd) => {
let (total, subscribers) =
self.get_config_listeners(&cmd.config_key, &cmd.paginate);

return Ok(ConfigResult::ConfigListenerInfoPage(total, subscribers));
}
ConfigCmd::Listener(items, sender, subscribe_info, time) => {
let mut changes = vec![];
for item in &items {
if let Some(v) = self.cache.get(&item.key) {
Expand All @@ -742,7 +820,14 @@ impl Handler<ConfigCmd> for ConfigActor {
sender.send(ListenerResult::DATA(changes)).ok();
return Ok(ConfigResult::NULL);
} else {
self.listener.add(items, sender, time);
self.listener.add(
items,
ConfigListenerChannel {
sender,
info: subscribe_info,
},
time,
);
return Ok(ConfigResult::NULL);
}
}
Expand Down
19 changes: 17 additions & 2 deletions src/config/dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ use rusqlite::{Connection, Row};
use serde::{Deserialize, Serialize};
use std::rc::Rc;

use crate::common::rusqlite_utils::{
get_row_value, sqlite_execute, sqlite_fetch, sqlite_fetch_count,
use crate::{
common::rusqlite_utils::{get_row_value, sqlite_execute, sqlite_fetch, sqlite_fetch_count},
console::model::paginate::PaginateQuery,
};

use super::core::{ConfigKey, ConfigListenerInfo};

#[derive(Debug, Default, Serialize, Deserialize)]
pub struct ConfigDO {
pub id: Option<i64>,
Expand Down Expand Up @@ -418,3 +421,15 @@ impl ConfigHistoryDao {
self.fetch_count(&sql, &args)
}
}

pub struct QueryListeners {
pub config_key: ConfigKey,
pub paginate: PaginateQuery,
}

#[derive(Debug, Serialize)]
pub struct ConfigListenerDo {
pub id: u64,
#[serde(flatten)]
pub info: ConfigListenerInfo,
}
3 changes: 2 additions & 1 deletion src/console/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::openapi::naming::service::{query_service, remove_service, update_serv
//use crate::console::raft_api::{raft_add_learner, raft_change_membership, raft_init, raft_metrics, raft_read, raft_write};

use super::cluster_api::query_cluster_info;
use super::config_api::query_config_list;
use super::config_api::{query_config_list, query_config_listener_list};
use super::{
config_api::{download_config, import_config, query_history_config_page},
connection_api::query_grpc_connection,
Expand Down Expand Up @@ -138,6 +138,7 @@ pub fn console_api_config_v1(config: &mut web::ServiceConfig) {
.route(web::delete().to(remove_namespace)),
)
.service(web::resource("/configs").route(web::get().to(query_config_list)))
.service(web::resource("/configs/:id/listeners").route(web::get().to(query_config_listener_list)))
.service(web::resource("/config/import").route(web::post().to(import_config)))
.service(web::resource("/config/download").route(web::get().to(download_config)))
.service(
Expand Down
37 changes: 35 additions & 2 deletions src/console/config_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ use actix_multipart::form::tempfile::TempFile;
use actix_multipart::form::text::Text;
use actix_multipart::form::MultipartForm;
use actix_multipart::Multipart;
use actix_web::{http::header, web, Error, HttpRequest, HttpResponse, Responder};
use actix_web::{error, http::header, web, Error, HttpRequest, HttpResponse, Responder, Result};
use zip::write::FileOptions;

use crate::common::appdata::AppShareData;
use crate::config::core::{
ConfigActor, ConfigAsyncCmd, ConfigCmd, ConfigInfoDto, ConfigKey, ConfigResult,
};
use crate::config::dal::QueryListeners;
use crate::config::ConfigUtils;
use crate::console::model::config_model::{
OpsConfigOptQueryListResponse, OpsConfigQueryListRequest,
Expand All @@ -27,6 +28,7 @@ use uuid::Uuid;
use zip::{ZipArchive, ZipWriter};

use super::model::config_model::OpsConfigImportInfo;
use super::model::paginate::{PaginateQuery, PaginateResponse};
use super::model::PageResult;

pub async fn query_config_list(
Expand Down Expand Up @@ -165,7 +167,7 @@ fn zip_file(mut zip: ZipWriter<&mut File>, list: Vec<ConfigInfoDto>) -> anyhow::
.compression_method(zip::CompressionMethod::Stored)
.unix_permissions(0o755);
zip.start_file(
&format!("{}/{}", &item.group.as_str(), &item.data_id.as_str()),
format!("{}/{}", item.group.as_str(), item.data_id.as_str()),
options,
)?;
zip.write_all(item.content.as_ref().unwrap().as_bytes())?;
Expand Down Expand Up @@ -212,3 +214,34 @@ pub async fn download_config(
Err(err) => HttpResponse::InternalServerError().body(err.to_string()),
}
}

pub async fn query_config_listener_list(
query: web::Query<PaginateQuery>,
path: web::Path<String>,
config_addr: web::Data<Addr<ConfigActor>>,
) -> Result<impl Responder> {
let config_key = ConfigKey::from(path.as_str());

config_key.is_valid().map_err(error::ErrorBadRequest)?;

let cmd = ConfigCmd::QueryListeners(QueryListeners {
config_key,
paginate: query.into_inner(),
});

let result = config_addr
.send(cmd)
.await
.map_err(error::ErrorInternalServerError)?
.map_err(error::ErrorInternalServerError)?;

match result {
ConfigResult::ConfigListenerInfoPage(count, subscribers) => {
Ok(web::Json(PaginateResponse {
count,
list: subscribers,
}))
}
_ => Err(error::ErrorInternalServerError("config result error")),
}
}
1 change: 1 addition & 0 deletions src/console/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub mod config_model;
pub mod login_model;
pub mod metrics_model;
pub mod naming_model;
pub mod paginate;
pub mod raft_model;
pub mod user_model;

Expand Down
59 changes: 59 additions & 0 deletions src/console/model/paginate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
use std::ops::Sub;

use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Copy, Deserialize)]
#[serde(from = "String")]
pub struct Page(usize);

impl Page {
pub fn new(page: usize) -> Self {
Self(page.max(1))
}
pub fn as_usize(&self) -> usize {
self.0
}
}

impl Default for Page {
fn default() -> Self {
Self(1)
}
}

impl Sub<usize> for Page {
type Output = usize;

fn sub(self, rhs: usize) -> Self::Output {
self.0 - rhs
}
}

impl From<String> for Page {
fn from(page: String) -> Self {
Self(page.parse::<usize>().unwrap_or(1))
}
}

#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct PaginateQuery {
pub page_no: Page,
pub page_size: usize,
}

impl Default for PaginateQuery {
fn default() -> Self {
Self {
page_no: Page::default(),
page_size: 0xffff_ffff,
}
}
}

#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct PaginateResponse<T> {
pub count: usize,
pub list: Vec<T>,
}
Loading