Skip to content

Commit 2a99dd3

Browse files
JannikStpokgak
andauthored
Release/0.3.3 (#549)
* refactor helm charts (#527) * orchestrator: refactor helm chart * validator: refactor helm chart * discovery: create new helm chart * fix helm chart (#537) * fix orchestrator validation * discovery: match existing secrets name * charts: add quotes (#538) * charts: add quotes * charts/validator: fix conditions for redis (#539) * fix validator redis chart (#540) * bump version (#541) * remove deprecated e2e folder (#543) * fix(validator) dockerfile env causing invalidations not to be triggered(#536) * imp(validator): ability to pick invalidation type for rejection (#542) * ability to pick invalidation type for rejection * imp(orchestrator): optimize heartbeat & metrics storage (#545) * optimize heartbeat & metrics store * imp(orchestrator): optimize orchestrator nodes / groups performance (#546) * optimize orchestrator nodes / groups performance * bump version (#548) * add missing toploc invalidation type * Merge pull request #552 from PrimeIntellect-ai/fix/migration-only-on-processor imp(orchestrator): restrict redis migrations to processor * improve node metrics migration (#553) * validator: fix fullname template (#554) * bump version to 0.3.3 --------- Co-authored-by: Aiman Ismail <[email protected]> Co-authored-by: Aiman Ismail <[email protected]>
2 parents 187af02 + 09c896c commit 2a99dd3

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

67 files changed

+2611
-2308
lines changed

Cargo.lock

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ iroh = "0.34.1"
4040
rand_v8 = { package = "rand", version = "0.8.5", features = ["std"] }
4141
rand_core_v6 = { package = "rand_core", version = "0.6.4", features = ["std"] }
4242
[workspace.package]
43-
version = "0.3.2"
43+
version = "0.3.3"
4444
edition = "2021"
4545

4646
[workspace.features]

crates/orchestrator/src/api/routes/heartbeat.rs

Lines changed: 33 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -95,46 +95,54 @@ async fn heartbeat(
9595
error!("Heartbeat Error: {}", e);
9696
}
9797
if let Some(metrics) = heartbeat.metrics.clone() {
98-
// Get all previously reported metrics for this node
99-
let previous_metrics = match app_state
98+
// Get current metric keys for this node efficiently using HKEYS
99+
let previous_metric_keys = match app_state
100100
.store_context
101101
.metrics_store
102-
.get_metrics_for_node(node_address)
102+
.get_metric_keys_for_node(node_address)
103103
.await
104104
{
105-
Ok(metrics) => metrics,
105+
Ok(keys) => keys,
106106
Err(e) => {
107-
error!("Error getting metrics for node: {}", e);
108-
Default::default()
107+
error!("Error getting metric keys for node: {}", e);
108+
Vec::new()
109109
}
110110
};
111111

112-
// Create a HashSet of new metrics for efficient lookup
113-
let new_metrics_set: HashSet<_> = metrics
112+
// Create a HashSet of new metric keys for efficient lookup
113+
let new_metrics_set: HashSet<String> = metrics
114114
.iter()
115-
.map(|metric| (&metric.key.task_id, &metric.key.label))
115+
.map(|metric| {
116+
let task_id = if metric.key.task_id.is_empty() {
117+
"manual".to_string()
118+
} else {
119+
metric.key.task_id.clone()
120+
};
121+
format!("{}:{}", task_id, metric.key.label.replace(':', ""))
122+
})
116123
.collect();
117124

118-
// Clean up stale metrics from Redis only
119-
// The sync service will handle all Prometheus updates
120-
for (task_id, task_metrics) in previous_metrics {
121-
for (label, _value) in task_metrics {
122-
let prev_key = (&task_id, &label);
123-
if !new_metrics_set.contains(&prev_key) {
124-
// Remove from Redis metrics store
125-
if let Err(e) = app_state
126-
.store_context
127-
.metrics_store
128-
.delete_metric(&task_id, &label, &node_address.to_string())
129-
.await
130-
{
131-
error!("Error deleting metric: {}", e);
132-
}
125+
// Find stale metrics to delete
126+
let stale_metrics: Vec<String> = previous_metric_keys
127+
.into_iter()
128+
.filter(|key| !new_metrics_set.contains(key))
129+
.collect();
130+
131+
// Delete stale metrics efficiently
132+
for metric_key in stale_metrics {
133+
if let Some((task_id, label)) = metric_key.split_once(':') {
134+
if let Err(e) = app_state
135+
.store_context
136+
.metrics_store
137+
.delete_metric(task_id, label, &node_address.to_string())
138+
.await
139+
{
140+
error!("Error deleting metric: {}", e);
133141
}
134142
}
135143
}
136144

137-
// Store new metrics in Redis only
145+
// Store new metrics in Redis
138146
if let Err(e) = app_state
139147
.store_context
140148
.metrics_store

crates/orchestrator/src/api/routes/nodes.rs

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -68,22 +68,34 @@ async fn get_nodes(query: Query<NodeQuery>, app_state: Data<AppState>) -> HttpRe
6868
if let Some(node_groups_plugin) = &app_state.node_groups_plugin {
6969
let mut nodes_with_groups = Vec::new();
7070

71-
for node in &nodes {
72-
let mut node_json = json!(node);
71+
let node_addresses: Vec<String> =
72+
nodes.iter().map(|node| node.address.to_string()).collect();
7373

74-
if let Ok(Some(group)) = node_groups_plugin
75-
.get_node_group(&node.address.to_string())
76-
.await
77-
{
78-
node_json["group"] = json!({
79-
"id": group.id,
80-
"size": group.nodes.len(),
81-
"created_at": group.created_at,
82-
"topology_config": group.configuration_name
83-
});
84-
}
74+
match node_groups_plugin
75+
.get_node_groups_batch(&node_addresses)
76+
.await
77+
{
78+
Ok(node_groups) => {
79+
for node in &nodes {
80+
let mut node_json = json!(node);
8581

86-
nodes_with_groups.push(node_json);
82+
if let Some(Some(group)) = node_groups.get(&node.address.to_string()) {
83+
node_json["group"] = json!({
84+
"id": group.id,
85+
"size": group.nodes.len(),
86+
"created_at": group.created_at,
87+
"topology_config": group.configuration_name
88+
});
89+
}
90+
91+
nodes_with_groups.push(node_json);
92+
}
93+
}
94+
Err(e) => {
95+
error!("Error getting node groups batch: {}", e);
96+
// Fall back to nodes without group information
97+
nodes_with_groups = nodes.iter().map(|node| json!(node)).collect();
98+
}
8799
}
88100

89101
response["nodes"] = json!(nodes_with_groups);

crates/orchestrator/src/main.rs

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,51 @@ struct Args {
126126
max_healthy_nodes_with_same_endpoint: u32,
127127
}
128128

129+
async fn run_inactive_node_metric_migration(store_context: Arc<StoreContext>) -> Result<()> {
130+
info!("Starting migration of inactive node metrics to new data model");
131+
132+
let node_addresses = match store_context.node_store.get_nodes().await {
133+
Ok(nodes) => {
134+
let addresses: Vec<_> = nodes.into_iter().map(|node| node.address).collect();
135+
info!("Found {} nodes to migrate", addresses.len());
136+
addresses
137+
}
138+
Err(e) => {
139+
error!("Error getting all nodes for migration: {}", e);
140+
return Ok(()); // Don't fail startup if migration can't get nodes
141+
}
142+
};
143+
144+
let mut migrated_count = 0;
145+
let mut error_count = 0;
146+
147+
for node_address in node_addresses {
148+
match store_context
149+
.metrics_store
150+
.migrate_node_metrics_if_needed(node_address)
151+
.await
152+
{
153+
Ok(()) => {
154+
migrated_count += 1;
155+
if migrated_count % 100 == 0 {
156+
info!("Migrated {} nodes so far...", migrated_count);
157+
}
158+
}
159+
Err(e) => {
160+
error!("Error migrating metrics for node {}: {}", node_address, e);
161+
error_count += 1;
162+
// Continue with other nodes even if one fails
163+
}
164+
}
165+
}
166+
167+
info!(
168+
"Migration completed. Successfully migrated {} nodes, {} errors",
169+
migrated_count, error_count
170+
);
171+
Ok(())
172+
}
173+
129174
#[tokio::main]
130175
async fn main() -> Result<()> {
131176
let args = Args::parse();
@@ -176,6 +221,17 @@ async fn main() -> Result<()> {
176221
let store = Arc::new(RedisStore::new(&args.redis_store_url));
177222
let store_context = Arc::new(StoreContext::new(store.clone()));
178223

224+
// Run one-time migration for inactive nodes
225+
let migration_store_context = store_context.clone();
226+
if matches!(server_mode, ServerMode::ProcessorOnly | ServerMode::Full) {
227+
let migration_store_context = migration_store_context.clone();
228+
tokio::spawn(async move {
229+
if let Err(e) = run_inactive_node_metric_migration(migration_store_context).await {
230+
error!("Failed to run inactive node metric migration: {}", e);
231+
}
232+
});
233+
}
234+
179235
let p2p_client = Arc::new(P2PClient::new(wallet.clone()).await.unwrap());
180236

181237
let contracts = ContractBuilder::new(wallet.provider())
@@ -256,6 +312,27 @@ async fn main() -> Result<()> {
256312
Some(node_groups_heartbeats.clone()),
257313
Some(webhook_plugins.clone()),
258314
);
315+
316+
// Run groups index migration on startup
317+
if matches!(server_mode, ServerMode::ProcessorOnly | ServerMode::Full) {
318+
match group_plugin.migrate_groups_index().await {
319+
Ok(count) => {
320+
if count > 0 {
321+
info!(
322+
"Groups index migration completed: {} groups migrated",
323+
count
324+
);
325+
} else {
326+
info!("Groups index migration: no groups to migrate");
327+
}
328+
}
329+
Err(e) => {
330+
error!("Groups index migration failed: {}", e);
331+
return Err(e);
332+
}
333+
}
334+
}
335+
259336
let status_group_plugin = group_plugin.clone();
260337
let group_plugin_for_server = group_plugin.clone();
261338
node_groups_plugin = Some(Arc::new(group_plugin_for_server));

0 commit comments

Comments
 (0)