Skip to content

Commit cb6874b

Browse files
CherishCaiclaude
andcommitted
feat: implement configuration data persistence and loading
This commit adds support for persisting configuration data to disk and loading it when the server is unavailable, providing weak dependency. Key changes: - Add PersistentConfigData struct for serializable config data - Add config_load_cache_at_start to ClientProps with env var support - Refactor ConfigWorker to use dual cache system (persistent + runtime) - Implement automatic cache persistence on config changes - Add support for loading cached configs when server is down - Add comprehensive test coverage for the feature The cache location is ~/nacos/config/{namespace}/{cache_key} Feature is opt-in via ClientProps or NACOS_CLIENT_CONFIG_LOAD_CACHE_AT_START env var Co-Authored-By: Claude <[email protected]>
1 parent dac261a commit cb6874b

File tree

5 files changed

+265
-25
lines changed

5 files changed

+265
-25
lines changed

src/api/constants.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,3 +59,7 @@ pub const ENV_NACOS_CLIENT_NAMING_PUSH_EMPTY_PROTECTION: &str =
5959
/// env `NACOS_CLIENT_NAMING_LOAD_CACHE_AT_START`, default false
6060
pub const ENV_NACOS_CLIENT_NAMING_LOAD_CACHE_AT_START: &str =
6161
"NACOS_CLIENT_NAMING_LOAD_CACHE_AT_START";
62+
63+
/// env `NACOS_CLIENT_CONFIG_LOAD_CACHE_AT_START`, default false
64+
pub const ENV_NACOS_CLIENT_CONFIG_LOAD_CACHE_AT_START: &str =
65+
"NACOS_CLIENT_CONFIG_LOAD_CACHE_AT_START";

src/api/props.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ pub struct ClientProps {
1818
naming_push_empty_protection: bool,
1919
/// naming load_cache_at_start, default false
2020
naming_load_cache_at_start: bool,
21+
/// config load_cache_at_start, default false
22+
config_load_cache_at_start: bool,
2123
/// env_first when get props, default true
2224
env_first: bool,
2325
/// metadata
@@ -84,6 +86,17 @@ impl ClientProps {
8486
}
8587
}
8688

89+
pub(crate) fn get_config_load_cache_at_start(&self) -> bool {
90+
if self.env_first {
91+
get_value_bool(
92+
ENV_NACOS_CLIENT_CONFIG_LOAD_CACHE_AT_START,
93+
self.config_load_cache_at_start,
94+
)
95+
} else {
96+
self.config_load_cache_at_start
97+
}
98+
}
99+
87100
pub(crate) fn get_labels(&self) -> HashMap<String, String> {
88101
let mut labels = self.labels.clone();
89102
labels.insert(KEY_LABEL_APP_NAME.to_string(), self.get_app_name());
@@ -171,6 +184,7 @@ impl ClientProps {
171184
app_name: UNKNOWN.to_string(),
172185
naming_push_empty_protection: true,
173186
naming_load_cache_at_start: false,
187+
config_load_cache_at_start: false,
174188
env_first: true,
175189
labels: HashMap::default(),
176190
client_version,
@@ -216,6 +230,19 @@ impl ClientProps {
216230
self
217231
}
218232

233+
/// Sets the config_load_cache_at_start.
234+
pub fn config_load_cache_at_start(mut self, config_load_cache_at_start: bool) -> Self {
235+
self.config_load_cache_at_start = config_load_cache_at_start;
236+
self
237+
}
238+
239+
/// Sets the config_load_cache_at_start / naming_load_cache_at_start.
240+
pub fn load_cache_at_start(mut self, load_cache_at_start: bool) -> Self {
241+
self.naming_load_cache_at_start = load_cache_at_start;
242+
self.config_load_cache_at_start = load_cache_at_start;
243+
self
244+
}
245+
219246
/// Sets the env_first.
220247
pub fn env_first(mut self, env_first: bool) -> Self {
221248
self.env_first = env_first;
@@ -300,6 +327,7 @@ mod tests {
300327
app_name: "test_app".to_string(),
301328
naming_push_empty_protection: true,
302329
naming_load_cache_at_start: false,
330+
config_load_cache_at_start: false,
303331
env_first: true,
304332
labels: HashMap::new(),
305333
client_version: "test_version".to_string(),

src/config/cache.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,38 @@
11
use crate::api::config::ConfigResponse;
22
use crate::api::plugin::ConfigFilter;
33
use crate::api::plugin::ConfigResp;
4+
use serde::{Deserialize, Serialize};
45
use std::ops::Deref;
56
use std::sync::{Arc, Mutex};
67

8+
/// Persistent Config Data for serialization
9+
#[derive(Serialize, Deserialize, Clone, Debug)]
10+
pub(crate) struct PersistentConfigData {
11+
pub data_id: String,
12+
pub group: String,
13+
pub namespace: String,
14+
pub content_type: String,
15+
pub content: String,
16+
pub md5: String,
17+
pub encrypted_data_key: String,
18+
pub last_modified: i64,
19+
}
20+
21+
impl From<&CacheData> for PersistentConfigData {
22+
fn from(cache_data: &CacheData) -> Self {
23+
Self {
24+
data_id: cache_data.data_id.clone(),
25+
group: cache_data.group.clone(),
26+
namespace: cache_data.namespace.clone(),
27+
content_type: cache_data.content_type.clone(),
28+
content: cache_data.content.clone(),
29+
md5: cache_data.md5.clone(),
30+
encrypted_data_key: cache_data.encrypted_data_key.clone(),
31+
last_modified: cache_data.last_modified,
32+
}
33+
}
34+
}
35+
736
/// Cache Data for Config
837
#[derive(Default)]
938
pub(crate) struct CacheData {

src/config/worker.rs

Lines changed: 70 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
use crate::api::config::ConfigResponse;
22
use crate::api::plugin::{AuthPlugin, ConfigFilter, ConfigReq, ConfigResp};
33
use crate::api::props::ClientProps;
4+
use crate::common::cache::{Cache, CacheBuilder};
45
use crate::common::remote::grpc::message::GrpcResponseMessage;
56
use crate::common::remote::grpc::{NacosGrpcClient, NacosGrpcClientBuilder};
6-
use crate::config::cache::CacheData;
7+
use crate::config::cache::{CacheData, PersistentConfigData};
78
use crate::config::handler::ConfigChangeNotifyHandler;
89
use crate::config::message::request::*;
910
use crate::config::message::response::*;
@@ -18,7 +19,8 @@ use tracing::{Instrument, instrument};
1819
pub(crate) struct ConfigWorker {
1920
pub(crate) client_props: ClientProps,
2021
remote_client: Arc<NacosGrpcClient>,
21-
cache_data_map: Arc<Mutex<HashMap<String, CacheData>>>,
22+
runtime_cache: Arc<Mutex<HashMap<String, CacheData>>>,
23+
persistent_cache: Arc<Cache<PersistentConfigData>>,
2224
config_filters: Arc<Vec<Box<dyn ConfigFilter>>>,
2325
}
2426

@@ -29,7 +31,19 @@ impl ConfigWorker {
2931
config_filters: Vec<Box<dyn ConfigFilter>>,
3032
client_id: String,
3133
) -> crate::api::error::Result<Self> {
32-
let cache_data_map = Arc::new(Mutex::new(HashMap::new()));
34+
let mut persistent_ns = client_props.get_namespace();
35+
if persistent_ns.is_empty() {
36+
persistent_ns = crate::api::constants::DEFAULT_NAMESPACE.to_owned();
37+
}
38+
// Create persistent cache using the Cache framework
39+
let persistent_cache: Cache<PersistentConfigData> = CacheBuilder::config(persistent_ns)
40+
.load_cache_at_start(client_props.get_config_load_cache_at_start())
41+
.disk_store()
42+
.build(client_id.clone());
43+
let persistent_cache = Arc::new(persistent_cache);
44+
45+
// Runtime cache for in-memory operations and listeners
46+
let runtime_cache = Arc::new(Mutex::new(HashMap::new()));
3347
let config_filters = Arc::new(config_filters);
3448

3549
// group_key: String
@@ -66,20 +80,22 @@ impl ConfigWorker {
6680
// todo Event/Subscriber instead of mpsc Sender/Receiver
6781
crate::common::executor::spawn(Self::notify_change_to_cache_data(
6882
Arc::clone(&remote_client),
69-
Arc::clone(&cache_data_map),
83+
Arc::clone(&runtime_cache),
84+
Arc::clone(&persistent_cache),
7085
notify_change_rx,
7186
));
7287

7388
crate::common::executor::spawn(Self::list_ensure_cache_data_newest(
7489
Arc::clone(&remote_client),
75-
Arc::clone(&cache_data_map),
90+
Arc::clone(&runtime_cache),
7691
notify_change_tx_clone,
7792
));
7893

7994
Ok(Self {
8095
client_props,
8196
remote_client,
82-
cache_data_map,
97+
runtime_cache,
98+
persistent_cache,
8399
config_filters,
84100
})
85101
}
@@ -274,11 +290,27 @@ impl ConfigWorker {
274290
let namespace = self.client_props.get_namespace();
275291
let group_key = util::group_key(&data_id, &group, &namespace);
276292

277-
let mut mutex = self.cache_data_map.lock().await;
278-
if !mutex.contains_key(group_key.as_str()) {
293+
let mut runtime_mutex = self.runtime_cache.lock().await;
294+
if !runtime_mutex.contains_key(group_key.as_str()) {
279295
let mut cache_data =
280296
CacheData::new(self.config_filters.clone(), data_id, group, namespace);
281297

298+
// Check if we have persistent data to load
299+
if let Some(persistent_data) = self.persistent_cache.get(&group_key) {
300+
// Convert persistent data to cache data
301+
cache_data.content_type = persistent_data.content_type.clone();
302+
cache_data.content = persistent_data.content.clone();
303+
cache_data.md5 = persistent_data.md5.clone();
304+
cache_data.encrypted_data_key = persistent_data.encrypted_data_key.clone();
305+
cache_data.last_modified = persistent_data.last_modified;
306+
tracing::info!(
307+
"Loaded config from cache: dataId={}, group={}, namespace={}",
308+
cache_data.data_id,
309+
cache_data.group,
310+
cache_data.namespace
311+
);
312+
}
313+
282314
// listen immediately upon initialization
283315
let config_resp = Self::get_config_inner_async(
284316
self.remote_client.clone(),
@@ -291,9 +323,18 @@ impl ConfigWorker {
291323
match config_resp {
292324
Ok(config_resp) => {
293325
Self::fill_data_and_notify(&mut cache_data, config_resp).await;
326+
// Save to persistent cache
327+
let persistent_data = PersistentConfigData::from(&cache_data);
328+
self.persistent_cache
329+
.insert(group_key.clone(), persistent_data);
294330
}
295331
Err(e) => {
296332
tracing::error!("get_config_inner_async, config_resp err={e:?}");
333+
// If we have cache but no server response, still notify from cache
334+
if !cache_data.content.is_empty() {
335+
cache_data.initializing = false;
336+
cache_data.notify_listener().await;
337+
}
297338
}
298339
}
299340
let req = ConfigBatchListenRequest::new(true).add_config_listen_context(
@@ -316,9 +357,9 @@ impl ConfigWorker {
316357
.in_current_span(),
317358
);
318359

319-
mutex.insert(group_key.clone(), cache_data);
360+
runtime_mutex.insert(group_key.clone(), cache_data);
320361
}
321-
let _ = mutex
362+
let _ = runtime_mutex
322363
.get_mut(group_key.as_str())
323364
.map(|c| c.add_listener(listener));
324365
}
@@ -334,11 +375,11 @@ impl ConfigWorker {
334375
let namespace = self.client_props.get_namespace();
335376
let group_key = util::group_key(&data_id, &group, &namespace);
336377

337-
let mut mutex = self.cache_data_map.lock().await;
338-
if !mutex.contains_key(group_key.as_str()) {
378+
let mut runtime_mutex = self.runtime_cache.lock().await;
379+
if !runtime_mutex.contains_key(group_key.as_str()) {
339380
return;
340381
}
341-
let _ = mutex
382+
let _ = runtime_mutex
342383
.get_mut(group_key.as_str())
343384
.map(|c| c.remove_listener(listener));
344385
}
@@ -349,7 +390,7 @@ impl ConfigWorker {
349390
#[instrument(skip_all)]
350391
async fn list_ensure_cache_data_newest(
351392
remote_client: Arc<NacosGrpcClient>,
352-
cache_data_map: Arc<Mutex<HashMap<String, CacheData>>>,
393+
runtime_cache: Arc<Mutex<HashMap<String, CacheData>>>,
353394
notify_change_tx: tokio::sync::mpsc::Sender<String>,
354395
) {
355396
tracing::info!("list_ensure_cache_data_newest started");
@@ -360,7 +401,7 @@ impl ConfigWorker {
360401
let mut listen_context_vec = Vec::with_capacity(6);
361402
{
362403
// try_lock, The failure to acquire the lock can be handled by the next loop.
363-
if let Ok(mutex) = cache_data_map.try_lock() {
404+
if let Ok(mutex) = runtime_cache.try_lock() {
364405
for c in mutex.values() {
365406
listen_context_vec.push(ConfigListenContext::new(
366407
c.data_id.clone(),
@@ -423,7 +464,8 @@ impl ConfigWorker {
423464
#[instrument(skip_all)]
424465
async fn notify_change_to_cache_data(
425466
remote_client: Arc<NacosGrpcClient>,
426-
cache_data_map: Arc<Mutex<HashMap<String, CacheData>>>,
467+
runtime_cache: Arc<Mutex<HashMap<String, CacheData>>>,
468+
persistent_cache: Arc<Cache<PersistentConfigData>>,
427469
mut notify_change_rx: tokio::sync::mpsc::Receiver<String>,
428470
) {
429471
loop {
@@ -435,12 +477,12 @@ impl ConfigWorker {
435477
break;
436478
}
437479
Some(group_key) => {
438-
let mut mutex = cache_data_map.lock().await;
480+
let mut runtime_mutex = runtime_cache.lock().await;
439481

440-
if !mutex.contains_key(group_key.as_str()) {
482+
if !runtime_mutex.contains_key(group_key.as_str()) {
441483
continue;
442484
}
443-
if let Some(data) = mutex.get_mut(group_key.as_str()) {
485+
if let Some(data) = runtime_mutex.get_mut(group_key.as_str()) {
444486
// get the newest config to notify
445487
let config_resp = Self::get_config_inner_async(
446488
remote_client.clone(),
@@ -453,6 +495,9 @@ impl ConfigWorker {
453495
match config_resp {
454496
Ok(config_resp) => {
455497
Self::fill_data_and_notify(data, config_resp).await;
498+
// Save updated data to persistent cache
499+
let persistent_data = PersistentConfigData::from(&*data);
500+
persistent_cache.insert(group_key, persistent_data);
456501
}
457502
Err(e) => {
458503
tracing::error!("get_config_inner_async, config_resp err={e:?}");
@@ -622,8 +667,8 @@ mod tests {
622667

623668
let group_key = util::group_key(&d, &g, &n);
624669
{
625-
let cache_data_map_mutex = client_worker.cache_data_map.lock().await;
626-
let cache_data = cache_data_map_mutex
670+
let runtime_cache_map_mutex = client_worker.runtime_cache.lock().await;
671+
let cache_data = runtime_cache_map_mutex
627672
.get(group_key.as_str())
628673
.expect("Cache data should exist for group key");
629674
let listen_mutex = cache_data
@@ -654,8 +699,8 @@ mod tests {
654699

655700
let group_key = util::group_key(&d, &g, &n);
656701
{
657-
let cache_data_map_mutex = client_worker.cache_data_map.lock().await;
658-
let cache_data = cache_data_map_mutex
702+
let runtime_cache_map_mutex = client_worker.runtime_cache.lock().await;
703+
let cache_data = runtime_cache_map_mutex
659704
.get(group_key.as_str())
660705
.expect("Cache data should exist for group key");
661706
let listen_mutex = cache_data
@@ -669,8 +714,8 @@ mod tests {
669714
.remove_listener(d.clone(), g.clone(), lis1_arc2)
670715
.await;
671716
{
672-
let cache_data_map_mutex = client_worker.cache_data_map.lock().await;
673-
let cache_data = cache_data_map_mutex
717+
let runtime_cache_map_mutex = client_worker.runtime_cache.lock().await;
718+
let cache_data = runtime_cache_map_mutex
674719
.get(group_key.as_str())
675720
.expect("Cache data should exist for group key");
676721
let listen_mutex = cache_data

0 commit comments

Comments
 (0)