Skip to content

Commit be374bb

Browse files
authored
Merge pull request #557 from PrimeIntellect-ai/release/0.3.4
Release/0.3.4
2 parents 2a99dd3 + 88a3649 commit be374bb

File tree

5 files changed

+6
-236
lines changed

5 files changed

+6
-236
lines changed

Cargo.lock

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ iroh = "0.34.1"
4040
rand_v8 = { package = "rand", version = "0.8.5", features = ["std"] }
4141
rand_core_v6 = { package = "rand_core", version = "0.6.4", features = ["std"] }
4242
[workspace.package]
43-
version = "0.3.3"
43+
version = "0.3.4"
4444
edition = "2021"
4545

4646
[workspace.features]

crates/orchestrator/src/main.rs

Lines changed: 0 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -126,51 +126,6 @@ struct Args {
126126
max_healthy_nodes_with_same_endpoint: u32,
127127
}
128128

129-
async fn run_inactive_node_metric_migration(store_context: Arc<StoreContext>) -> Result<()> {
130-
info!("Starting migration of inactive node metrics to new data model");
131-
132-
let node_addresses = match store_context.node_store.get_nodes().await {
133-
Ok(nodes) => {
134-
let addresses: Vec<_> = nodes.into_iter().map(|node| node.address).collect();
135-
info!("Found {} nodes to migrate", addresses.len());
136-
addresses
137-
}
138-
Err(e) => {
139-
error!("Error getting all nodes for migration: {}", e);
140-
return Ok(()); // Don't fail startup if migration can't get nodes
141-
}
142-
};
143-
144-
let mut migrated_count = 0;
145-
let mut error_count = 0;
146-
147-
for node_address in node_addresses {
148-
match store_context
149-
.metrics_store
150-
.migrate_node_metrics_if_needed(node_address)
151-
.await
152-
{
153-
Ok(()) => {
154-
migrated_count += 1;
155-
if migrated_count % 100 == 0 {
156-
info!("Migrated {} nodes so far...", migrated_count);
157-
}
158-
}
159-
Err(e) => {
160-
error!("Error migrating metrics for node {}: {}", node_address, e);
161-
error_count += 1;
162-
// Continue with other nodes even if one fails
163-
}
164-
}
165-
}
166-
167-
info!(
168-
"Migration completed. Successfully migrated {} nodes, {} errors",
169-
migrated_count, error_count
170-
);
171-
Ok(())
172-
}
173-
174129
#[tokio::main]
175130
async fn main() -> Result<()> {
176131
let args = Args::parse();
@@ -221,17 +176,6 @@ async fn main() -> Result<()> {
221176
let store = Arc::new(RedisStore::new(&args.redis_store_url));
222177
let store_context = Arc::new(StoreContext::new(store.clone()));
223178

224-
// Run one-time migration for inactive nodes
225-
let migration_store_context = store_context.clone();
226-
if matches!(server_mode, ServerMode::ProcessorOnly | ServerMode::Full) {
227-
let migration_store_context = migration_store_context.clone();
228-
tokio::spawn(async move {
229-
if let Err(e) = run_inactive_node_metric_migration(migration_store_context).await {
230-
error!("Failed to run inactive node metric migration: {}", e);
231-
}
232-
});
233-
}
234-
235179
let p2p_client = Arc::new(P2PClient::new(wallet.clone()).await.unwrap());
236180

237181
let contracts = ContractBuilder::new(wallet.provider())
@@ -313,26 +257,6 @@ async fn main() -> Result<()> {
313257
Some(webhook_plugins.clone()),
314258
);
315259

316-
// Run groups index migration on startup
317-
if matches!(server_mode, ServerMode::ProcessorOnly | ServerMode::Full) {
318-
match group_plugin.migrate_groups_index().await {
319-
Ok(count) => {
320-
if count > 0 {
321-
info!(
322-
"Groups index migration completed: {} groups migrated",
323-
count
324-
);
325-
} else {
326-
info!("Groups index migration: no groups to migrate");
327-
}
328-
}
329-
Err(e) => {
330-
error!("Groups index migration failed: {}", e);
331-
return Err(e);
332-
}
333-
}
334-
}
335-
336260
let status_group_plugin = group_plugin.clone();
337261
let group_plugin_for_server = group_plugin.clone();
338262
node_groups_plugin = Some(Arc::new(group_plugin_for_server));

crates/orchestrator/src/plugins/node_groups/mod.rs

Lines changed: 0 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -1138,88 +1138,6 @@ impl NodeGroupsPlugin {
11381138
Ok(mappings)
11391139
}
11401140

1141-
/// Migrate existing group data to populate the groups index
1142-
/// This method should be called once after deploying the new groups index feature
1143-
pub async fn migrate_groups_index(&self) -> Result<usize, Error> {
1144-
debug!("Starting groups index migration");
1145-
let mut conn = self.store.client.get_multiplexed_async_connection().await?;
1146-
1147-
// Check if migration is needed by seeing if groups index is empty
1148-
let existing_groups_in_index: Vec<String> = conn.smembers(GROUPS_INDEX_KEY).await?;
1149-
if !existing_groups_in_index.is_empty() {
1150-
info!(
1151-
"Groups index already contains {} groups, migration appears completed",
1152-
existing_groups_in_index.len()
1153-
);
1154-
return Ok(existing_groups_in_index.len());
1155-
}
1156-
1157-
// Get all node-to-group mappings using the old method
1158-
let node_mappings: HashMap<String, String> = conn.hgetall(NODE_GROUP_MAP_KEY).await?;
1159-
1160-
if node_mappings.is_empty() {
1161-
info!("No existing groups found to migrate");
1162-
return Ok(0);
1163-
}
1164-
1165-
// Collect unique group IDs from node mappings
1166-
let existing_group_ids: HashSet<String> = node_mappings.values().cloned().collect();
1167-
info!(
1168-
"Found {} unique group IDs to migrate",
1169-
existing_group_ids.len()
1170-
);
1171-
1172-
// Verify these groups actually exist by checking their keys
1173-
let group_keys: Vec<String> = existing_group_ids
1174-
.iter()
1175-
.map(|id| Self::get_group_key(id))
1176-
.collect();
1177-
1178-
let group_values: Vec<Option<String>> = conn.mget(&group_keys).await?;
1179-
1180-
let mut valid_group_ids = Vec::new();
1181-
for (group_id, group_data) in existing_group_ids.iter().zip(group_values.iter()) {
1182-
if group_data.is_some() {
1183-
valid_group_ids.push(group_id.clone());
1184-
} else {
1185-
warn!(
1186-
"Group {} exists in mappings but has no data, skipping",
1187-
group_id
1188-
);
1189-
}
1190-
}
1191-
1192-
if valid_group_ids.is_empty() {
1193-
info!("No valid groups found to migrate");
1194-
return Ok(0);
1195-
}
1196-
1197-
// Add all valid group IDs to the groups index in a single operation
1198-
let _: () = conn.sadd(GROUPS_INDEX_KEY, &valid_group_ids).await?;
1199-
1200-
info!(
1201-
"Successfully migrated {} groups to groups index",
1202-
valid_group_ids.len()
1203-
);
1204-
1205-
// Verify the migration by checking the index
1206-
let migrated_count: usize = conn.scard(GROUPS_INDEX_KEY).await?;
1207-
if migrated_count != valid_group_ids.len() {
1208-
error!(
1209-
"Migration verification failed: expected {} groups in index, found {}",
1210-
valid_group_ids.len(),
1211-
migrated_count
1212-
);
1213-
} else {
1214-
info!(
1215-
"Migration verification successful: {} groups in index",
1216-
migrated_count
1217-
);
1218-
}
1219-
1220-
Ok(valid_group_ids.len())
1221-
}
1222-
12231141
/// Get all groups assigned to a specific task
12241142
/// Returns a list of group IDs that are currently working on the given task
12251143
pub async fn get_groups_for_task(&self, task_id: &str) -> Result<Vec<String>, Error> {

crates/orchestrator/src/store/domains/metrics_store.rs

Lines changed: 1 addition & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
11
use crate::store::core::RedisStore;
22
use alloy::primitives::Address;
33
use anyhow::{anyhow, Result};
4-
use log::{error, info};
4+
use log::error;
55
use redis::AsyncCommands;
66
use shared::models::metric::MetricEntry;
77
use std::collections::HashMap;
88
use std::sync::Arc;
99

1010
const ORCHESTRATOR_NODE_METRICS_STORE: &str = "orchestrator:node_metrics";
11-
const ORCHESTRATOR_METRICS_STORE: &str = "orchestrator:metrics";
1211

1312
pub struct MetricsStore {
1413
redis: Arc<RedisStore>,
@@ -23,77 +22,6 @@ impl MetricsStore {
2322
label.replace(':', "")
2423
}
2524

26-
pub async fn migrate_node_metrics_if_needed(&self, node_address: Address) -> Result<()> {
27-
let mut con = self.redis.client.get_multiplexed_async_connection().await?;
28-
let new_key = format!("{}:{}", ORCHESTRATOR_NODE_METRICS_STORE, node_address);
29-
30-
// Check if the new node-centric key already exists
31-
let exists: bool = con.exists(&new_key).await?;
32-
if exists {
33-
info!("Migration already complete for this node: {}", node_address);
34-
// Migration already complete for this node
35-
return Ok(());
36-
}
37-
38-
// Perform the slow SCAN to find all metrics for this node in the old data structure
39-
let pattern = format!("{}:*:*", ORCHESTRATOR_METRICS_STORE);
40-
let mut iter: redis::AsyncIter<String> = con.scan_match(&pattern).await?;
41-
let mut old_keys_to_migrate = Vec::new();
42-
43-
while let Some(key) = iter.next_item().await {
44-
old_keys_to_migrate.push(key);
45-
}
46-
drop(iter);
47-
48-
// Collect all metrics for this node from the old structure
49-
let mut node_metrics = HashMap::new();
50-
let mut keys_to_clean = Vec::new();
51-
52-
for old_key in old_keys_to_migrate {
53-
if let Some(value_str) = con
54-
.hget::<_, _, Option<String>>(&old_key, node_address.to_string())
55-
.await?
56-
{
57-
if let Ok(val) = value_str.parse::<f64>() {
58-
let parts: Vec<&str> = old_key.split(':').collect();
59-
if parts.len() >= 4 {
60-
let task_id = parts[2];
61-
let metric_name = parts[3];
62-
let new_metric_key = format!("{}:{}", task_id, metric_name);
63-
node_metrics.insert(new_metric_key, val);
64-
keys_to_clean.push(old_key);
65-
}
66-
}
67-
}
68-
}
69-
70-
// If we have metrics for this node, perform the atomic migration
71-
if !node_metrics.is_empty() {
72-
// Use Redis MULTI/EXEC transaction for atomicity
73-
let mut pipe = redis::pipe();
74-
pipe.atomic();
75-
76-
// Set all metrics in the new node-centric key
77-
for (metric_key, value) in &node_metrics {
78-
pipe.hset(&new_key, metric_key, value);
79-
}
80-
81-
// Clean up the old data structure by removing this node's fields
82-
for old_key in &keys_to_clean {
83-
pipe.hdel(old_key, node_address.to_string());
84-
}
85-
86-
pipe.query_async::<()>(&mut con).await?;
87-
}
88-
89-
// Always create the key to mark migration as complete, even if no metrics exist
90-
// This prevents future migration attempts for nodes without data
91-
let _: () = con.hset(&new_key, "_migrated", "true").await?;
92-
let _: () = con.hdel(&new_key, "_migrated").await?;
93-
94-
Ok(())
95-
}
96-
9725
pub async fn store_metrics(
9826
&self,
9927
metrics: Option<Vec<MetricEntry>>,

0 commit comments

Comments
 (0)