Skip to content
Open
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
3792297
Add package-lock to git-ignore
heemankv Jun 12, 2025
1efa322
updates: non_null_updates
heemankv Jun 12, 2025
314f8bc
update: Remove non_null handling from db
heemankv Jun 12, 2025
1689ce6
update: worker logic rework
Jun 17, 2025
eab7b63
update: fix filter pipeline
heemankv Jun 18, 2025
b9ccecf
Merge branch 'main' into fix/multiple-minors
heemankv Jun 18, 2025
d9ee194
update: workers re-write
heemankv Jun 18, 2025
09244f1
update: fix
heemankv Jun 18, 2025
7ae47f3
Merge branch 'main' into fix/multiple-minors
heemankv Jun 18, 2025
efba843
update: tests
heemankv Jun 18, 2025
d994268
update: lint
heemankv Jun 18, 2025
8063aed
update: fixing proving tests
heemankv Jun 24, 2025
f9fc200
Merge branch 'main' into fix/multiple-minors
heemankv Jun 24, 2025
e006895
update: tests
heemankv Jun 24, 2025
7cad8bc
Merge branch 'main' into fix/multiple-minors
heemankv Jun 24, 2025
e3bd4da
update: fixed test_proving_worker
heemankv Jun 25, 2025
36ff766
update: fixes test_data_submission_worker
heemankv Jun 25, 2025
b23ff65
Merge branch 'main' into fix/multiple-minors
heemankv Jun 28, 2025
87469b5
fixes
heemankv Jun 28, 2025
d06a676
update: lint fix
heemankv Jun 28, 2025
868fe0a
Merge branch 'fix/multiple-minors' of github.com:madara-alliance/mada…
heemankv Jun 28, 2025
0abb7cd
Merge branch 'main' into fix/multiple-minors
heemankv Jul 1, 2025
b0be317
Merge branch 'main' into fix/multiple-minors
Mohiiit Jul 2, 2025
9d5b450
Update orchestrator/src/worker/event_handler/triggers/update_state.rs
heemankv Jul 7, 2025
49e353b
update: PR review fixes
heemankv Jul 7, 2025
890f8ec
PR reviews
heemankv Jul 10, 2025
1e2a427
update: remove comment
heemankv Jul 10, 2025
b4e36f2
Update orchestrator/src/worker/event_handler/triggers/data_submission.rs
heemankv Jul 10, 2025
1297873
Update orchestrator/src/worker/event_handler/triggers/proving.rs
heemankv Jul 10, 2025
45bfd32
Merge branch 'main' into fix/multiple-minors
Mohiiit Jul 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ rustc-ice-*.txt

# Javascript dependencies
**/node_modules/

package-lock.json
# Artillery reports
benchmarking/reports/*.json

Expand Down
4 changes: 4 additions & 0 deletions orchestrator/src/core/client/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ pub trait DatabaseClient: Send + Sync {
limit: Option<i64>,
) -> Result<Vec<u64>, DatabaseError>;

/// get_earliest_failed_block_number - Get block number of the earliest failed job
/// A failed jobs means : status - either Failed or VerificationTimeout
async fn get_earliest_failed_block_number(&self) -> Result<Option<u64>, DatabaseError>;

/// get_latest_batch - Get the latest batch from DB. Returns `None` if the DB is empty
async fn get_latest_batch(&self) -> Result<Option<Batch>, DatabaseError>;
/// update_batch - Update the bath
Expand Down
78 changes: 68 additions & 10 deletions orchestrator/src/core/client/database/mongodb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,11 +372,6 @@ impl DatabaseClient for MongoDbClient {
}
});

// throw an error if there's no field to be updated
if non_null_updates.is_empty() {
return Err(DatabaseError::NoUpdateFound("No field to be updated, likely a false call".to_string()));
}

// Add additional fields that are always updated
non_null_updates.insert("version", Bson::Int32(current_job.version + 1));
non_null_updates.insert("updated_at", Bson::DateTime(Utc::now().round_subsecs(0).into()));
Expand Down Expand Up @@ -774,6 +769,74 @@ impl DatabaseClient for MongoDbClient {
Ok(result)
}

/// Get all the jobs with status 'Failed' or 'VerificationTimeout'
/// Sort all the jobs by their block_number
/// Get the first job's block number
/// Or maintain a min flag while iteration over all!
#[tracing::instrument(skip(self), fields(function_type = "db_call"), ret, err)]
async fn get_earliest_failed_block_number(&self) -> Result<Option<u64>, DatabaseError> {
let start = Instant::now();

// Convert job statuses to Bson
let failed_status_bson = mongodb::bson::to_bson(&JobStatus::Failed)?;
let timeout_status_bson = mongodb::bson::to_bson(&JobStatus::VerificationTimeout)?;

// Construct the aggregation pipeline
let pipeline = vec![
// Stage 1: Match jobs with status 'Failed' or 'VerificationTimeout'
doc! {
"$match": {
"$or": [
{ "status": failed_status_bson },
{ "status": timeout_status_bson }
]
}
},
doc! {
"$addFields": {
"numeric_internal_id": { "$toLong": "$internal_id" }
}
},
doc! {
"$sort": {
"numeric_internal_id": -1
}
},
doc! {
"$limit": 1
},
// Stage 4: Project only the block_number field for efficiency
doc! {
"$project": {
"block_number": "$numeric_internal_id"
}
},
];

tracing::debug!(category = "db_call", "Fetching earliest failed block number");

let collection: Collection<JobItem> = self.get_job_collection();

// Define a simple struct for the projection result
#[derive(Deserialize)]
struct BlockNumberResult {
block_number: u64,
}

// Execute pipeline
let results = self.execute_pipeline::<JobItem, BlockNumberResult>(collection, pipeline, None).await?;

let attributes = [KeyValue::new("db_operation_name", "get_earliest_failed_block_number")];

// Convert results to single result and extract block number
let result = vec_to_single_result(results, "get_earliest_failed_block_number")?;

let duration = start.elapsed();
ORCHESTRATOR_METRICS.db_calls_response_time.record(duration.as_secs_f64(), &attributes);

Ok(result.map(|block_result| block_result.block_number))
}

async fn get_latest_batch(&self) -> Result<Option<Batch>, DatabaseError> {
let start = Instant::now();
let pipeline = vec![
Expand Down Expand Up @@ -830,11 +893,6 @@ impl DatabaseClient for MongoDbClient {
}
});

// throw an error if there's no field to be updated
if non_null_updates.is_empty() {
return Err(DatabaseError::NoUpdateFound("No field to be updated, likely a false call".to_string()));
}

// Add additional fields that are always updated
non_null_updates.insert("size", Bson::Int64(update.end_block as i64 - batch.start_block as i64 + 1));
non_null_updates.insert("updated_at", Bson::DateTime(Utc::now().round_subsecs(0).into()));
Expand Down
3 changes: 2 additions & 1 deletion orchestrator/src/tests/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@ async fn database_test_update_job() {
JobItemUpdates::new()
.update_status(JobStatus::LockedForProcessing)
.update_metadata(updated_job_metadata)
.build(),
.build()
.unwrap(),
)
.await;

Expand Down
234 changes: 234 additions & 0 deletions orchestrator/src/tests/workers/data_submission/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
#![allow(clippy::type_complexity)]
use crate::core::client::database::MockDatabaseClient;
use crate::core::client::queue::MockQueueClient;
use crate::tests::config::TestConfigBuilder;
use crate::tests::workers::utils::get_job_item_mock_by_id;
use crate::types::constant::BLOB_DATA_FILE_NAME;
use crate::types::jobs::metadata::{DaMetadata, JobSpecificMetadata, ProvingInputType, ProvingMetadata};
use crate::types::jobs::types::{JobStatus, JobType};
use crate::types::queue::QueueType;
use crate::worker::event_handler::factory::mock_factory::get_job_handler_context;
use crate::worker::event_handler::jobs::{JobHandlerTrait, MockJobHandlerTrait};
use crate::worker::event_handler::triggers::JobTrigger;
use mockall::predicate::eq;
use orchestrator_da_client_interface::MockDaClient;
use rstest::rstest;
use std::error::Error;
use std::sync::Arc;
use uuid::Uuid;

#[rstest]
// Scenario 1: No completed proving jobs exist
// Expected result: no data submission jobs created
#[case(
None, // earliest_failed_block
vec![], // completed_proving_jobs (no completed proving jobs)
vec![], // expected_data_submission_jobs (no jobs to create)
)]
// Scenario 2: Single completed proving job with valid metadata
// Expected result: one data submission job created
#[case(
None, // earliest_failed_block
vec![
(0, Some("path/to/cairo_pie_0".to_string()), Some("valid_proof_path_0".to_string()), Some(1000))
], // completed_proving_jobs (block_num, input_path, proof_path, n_steps)
vec![0], // expected_data_submission_jobs
)]
// Scenario 3: Multiple completed proving jobs with valid metadata
// Expected result: data submission jobs created for all
#[case(
None, // earliest_failed_block
vec![
(0, Some("path/to/cairo_pie_0".to_string()), Some("proof_path_0".to_string()), Some(1000)),
(1, Some("path/to/cairo_pie_1".to_string()), Some("proof_path_1".to_string()), Some(1500)),
(2, Some("path/to/cairo_pie_2".to_string()), Some("proof_path_2".to_string()), Some(2000))
], // completed_proving_jobs
vec![0, 1, 2], // expected_data_submission_jobs
)]
// Scenario 4: Proving jobs without input_path (should still create data submission job)
// Expected result: data submission jobs created (input_path is not required for data submission)
#[case(
None, // earliest_failed_block
vec![
(0, None, Some("proof_path_0".to_string()), Some(1000)), // No input_path
(1, Some("path/to/cairo_pie_1".to_string()), None, Some(1500)) // No proof hash
], // completed_proving_jobs
vec![0, 1], // expected_data_submission_jobs (both should be created)
)]
// Scenario 5: Proving jobs with earliest_failed_block constraint
// Expected result: data submission jobs created only for blocks before failed block
#[case(
Some(1), // earliest_failed_block (blocks >= 1 should be skipped)
vec![
(0, Some("path/to/cairo_pie_0".to_string()), Some("proof_path_0".to_string()), Some(1000)), // Valid (< failed block)
(1, Some("path/to/cairo_pie_1".to_string()), Some("proof_path_1".to_string()), Some(1500)), // Skipped (>= failed block)
(2, Some("path/to/cairo_pie_2".to_string()), Some("proof_path_2".to_string()), Some(2000)) // Skipped (>= failed block)
], // completed_proving_jobs
vec![0], // expected_data_submission_jobs (only block 0)
)]
// Scenario 6: All proving jobs are beyond failed block
// Expected result: no data submission jobs created (all skipped)
#[case(
Some(0), // earliest_failed_block (all blocks >= 0 should be skipped)
vec![
(0, Some("path/to/cairo_pie_0".to_string()), Some("proof_path_0".to_string()), Some(1000)),
(1, Some("path/to/cairo_pie_1".to_string()), Some("proof_path_1".to_string()), Some(1500)),
(2, Some("path/to/cairo_pie_2".to_string()), Some("proof_path_2".to_string()), Some(2000))
], // completed_proving_jobs
vec![], // expected_data_submission_jobs (all skipped)
)]
// Scenario 7: Large number of completed proving jobs
// Expected result: data submission jobs created for all valid ones
#[case(
None, // earliest_failed_block
vec![
(0, Some("pie_0".to_string()), Some("proof_path_0".to_string()), Some(1000)),
(1, Some("pie_1".to_string()), Some("proof_path_1".to_string()), Some(1100)),
(2, Some("pie_2".to_string()), Some("proof_path_2".to_string()), Some(1200)),
(3, Some("pie_3".to_string()), Some("proof_path_3".to_string()), Some(1300)),
(4, Some("pie_4".to_string()), Some("proof_path_4".to_string()), Some(1400)),
(5, Some("pie_5".to_string()), Some("proof_path_5".to_string()), Some(1500))
], // completed_proving_jobs
vec![0, 1, 2, 3, 4, 5], // expected_data_submission_jobs
)]
// Scenario 8: Mix of valid proving jobs with failed block constraint
// Expected result: data submission jobs created only for valid blocks before failed block
#[case(
Some(3), // earliest_failed_block
vec![
(0, Some("pie_0".to_string()), Some("proof_path_0".to_string()), Some(1000)), // Valid
(1, Some("pie_1".to_string()), Some("proof_path_1".to_string()), Some(1100)), // Valid
(2, Some("pie_2".to_string()), Some("proof_path_2".to_string()), Some(1200)), // Valid
(3, Some("pie_3".to_string()), Some("proof_path_3".to_string()), Some(1300)), // Skipped - at failed block
(4, Some("pie_4".to_string()), Some("proof_path_4".to_string()), Some(1400)), // Skipped - beyond failed block
(5, Some("pie_5".to_string()), Some("proof_path_5".to_string()), Some(1500)) // Skipped - beyond failed block
], // completed_proving_jobs
vec![0, 1, 2], // expected_data_submission_jobs (only blocks before failed block)
)]
// Scenario 9: Proving jobs with minimal metadata (testing edge cases)
// Expected result: data submission jobs created for all (minimal metadata is acceptable)
#[case(
None, // earliest_failed_block
vec![
(0, None, None, None), // Minimal metadata
(1, None, None, Some(0)), // Zero n_steps
(2, Some("".to_string()), Some("".to_string()), Some(1000)) // Empty strings
], // completed_proving_jobs
vec![0, 1, 2], // expected_data_submission_jobs (all should be created)
)]
// Scenario 10: High block numbers with failed block constraint
// Expected result: data submission jobs created only for blocks before failed block
#[case(
Some(1000), // earliest_failed_block
vec![
(999, Some("pie_999".to_string()), Some("proof_path_999".to_string()), Some(10000)), // Valid
(1000, Some("pie_1000".to_string()), Some("proof_path_1000".to_string()), Some(11000)), // Skipped - at failed block
(1001, Some("pie_1001".to_string()), Some("proof_path_1001".to_string()), Some(12000)) // Skipped - beyond failed block
], // completed_proving_jobs
vec![999], // expected_data_submission_jobs (only block 999)
)]
#[tokio::test]
async fn test_data_submission_worker(
#[case] earliest_failed_block: Option<u64>,
#[case] completed_proving_jobs: Vec<(u64, Option<String>, Option<String>, Option<usize>)>, // (block_num, input_path, proof_path, n_steps)
#[case] expected_data_submission_jobs: Vec<u64>,
) -> Result<(), Box<dyn Error>> {
dotenvy::from_filename_override(".env.test").expect("Failed to load the .env file");

// Setup mock clients
let da_client = MockDaClient::new();
let mut db = MockDatabaseClient::new();
let mut queue = MockQueueClient::new();
let mut job_handler = MockJobHandlerTrait::new();

// Mock earliest_failed_block_number query
db.expect_get_earliest_failed_block_number().returning(move || Ok(earliest_failed_block));

// Create completed proving job items
let proving_job_items: Vec<_> = completed_proving_jobs
.iter()
.map(|(block_num, input_path, proof_path, n_steps)| {
let mut job_item = get_job_item_mock_by_id(block_num.to_string(), Uuid::new_v4());
job_item.metadata.specific = JobSpecificMetadata::Proving(ProvingMetadata {
block_number: *block_num,
input_path: input_path.as_ref().map(|path| ProvingInputType::CairoPie(path.clone())),
download_proof: proof_path.clone(),
ensure_on_chain_registration: None, // Not needed for data submission
n_steps: *n_steps,
});
job_item.status = JobStatus::Completed;
job_item
})
.collect();

// Mock database call to get proving jobs without data submission jobs
let proving_jobs_clone = proving_job_items.clone();
db.expect_get_jobs_without_successor()
.with(eq(JobType::ProofCreation), eq(JobStatus::Completed), eq(JobType::DataSubmission))
.returning(move |_, _, _| Ok(proving_jobs_clone.clone()));

// Mock get_job_by_internal_id_and_type to always return None
db.expect_get_job_by_internal_id_and_type().returning(|_, _| Ok(None));

// Mock job creation for expected data submission jobs
for &block_num in &expected_data_submission_jobs {
let uuid = Uuid::new_v4();
let block_num_str = block_num.to_string();

// Only expect job creation for jobs that should actually be created
// (i.e., are not beyond failed block)
if earliest_failed_block.is_none() || block_num < earliest_failed_block.unwrap() {
let mut da_job_item = get_job_item_mock_by_id(block_num_str.clone(), uuid);
da_job_item.metadata.specific = JobSpecificMetadata::Da(DaMetadata {
block_number: block_num,
blob_data_path: Some(format!("{}/{}", block_num, BLOB_DATA_FILE_NAME)),
tx_hash: None, // Will be populated during processing
});
da_job_item.status = JobStatus::Created;

let job_item_clone = da_job_item.clone();

job_handler
.expect_create_job()
.with(eq(block_num_str.clone()), mockall::predicate::always())
.returning(move |_, _| Ok(job_item_clone.clone()));

let block_num_str_for_db = block_num_str.clone();
db.expect_create_job()
.withf(move |item| {
item.internal_id == block_num_str_for_db
&& matches!(item.metadata.specific, JobSpecificMetadata::Da(_))
})
.returning(move |_| Ok(da_job_item.clone()));
}
}

// Setup job handler context
let job_handler: Arc<Box<dyn JobHandlerTrait>> = Arc::new(Box::new(job_handler));
let ctx = get_job_handler_context();
ctx.expect().with(eq(JobType::DataSubmission)).returning(move |_| Arc::clone(&job_handler));

let expected_created_count = expected_data_submission_jobs.len();

// Mock queue operations for successful job creations
queue
.expect_send_message()
.times(expected_created_count)
.returning(|_, _, _| Ok(()))
.withf(|queue, _, _| *queue == QueueType::DataSubmissionJobProcessing);

// Build test configuration
let services = TestConfigBuilder::new()
.configure_database(db.into())
.configure_queue_client(queue.into())
.configure_da_client(da_client.into())
.build()
.await;

// Run the Data Submission worker
crate::worker::event_handler::triggers::data_submission::DataSubmissionJobTrigger
.run_worker(services.config)
.await?;

Ok(())
}
3 changes: 3 additions & 0 deletions orchestrator/src/tests/workers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,6 @@ pub mod proving;
pub mod snos;
mod update_state;
pub mod utils;

#[cfg(test)]
pub mod data_submission;
Loading
Loading