diff --git a/.gitignore b/.gitignore index fede54bbca..4560a9a519 100644 --- a/.gitignore +++ b/.gitignore @@ -35,7 +35,7 @@ rustc-ice-*.txt # Javascript dependencies **/node_modules/ - +package-lock.json # Artillery reports benchmarking/reports/*.json diff --git a/orchestrator/src/core/client/database/mod.rs b/orchestrator/src/core/client/database/mod.rs index 944bb7be37..6ddbad94d8 100644 --- a/orchestrator/src/core/client/database/mod.rs +++ b/orchestrator/src/core/client/database/mod.rs @@ -73,6 +73,10 @@ pub trait DatabaseClient: Send + Sync { limit: Option, ) -> Result, DatabaseError>; + /// get_earliest_failed_block_number - Get block number of the earliest failed job + /// A failed jobs means : status - either Failed or VerificationTimeout + async fn get_earliest_failed_block_number(&self) -> Result, DatabaseError>; + /// get_latest_batch - Get the latest batch from DB. Returns `None` if the DB is empty async fn get_latest_batch(&self) -> Result, DatabaseError>; /// update_batch - Update the bath diff --git a/orchestrator/src/core/client/database/mongodb.rs b/orchestrator/src/core/client/database/mongodb.rs index e8c8ac2eb0..7726a723db 100644 --- a/orchestrator/src/core/client/database/mongodb.rs +++ b/orchestrator/src/core/client/database/mongodb.rs @@ -372,11 +372,6 @@ impl DatabaseClient for MongoDbClient { } }); - // throw an error if there's no field to be updated - if non_null_updates.is_empty() { - return Err(DatabaseError::NoUpdateFound("No field to be updated, likely a false call".to_string())); - } - // Add additional fields that are always updated non_null_updates.insert("version", Bson::Int32(current_job.version + 1)); non_null_updates.insert("updated_at", Bson::DateTime(Utc::now().round_subsecs(0).into())); @@ -774,6 +769,74 @@ impl DatabaseClient for MongoDbClient { Ok(result) } + /// Get all the jobs with status 'Failed' or 'VerificationTimeout' + /// Sort all the jobs by their block_number + /// Get the first job's block number + /// Or maintain a min flag while iteration over all! + #[tracing::instrument(skip(self), fields(function_type = "db_call"), ret, err)] + async fn get_earliest_failed_block_number(&self) -> Result, DatabaseError> { + let start = Instant::now(); + + // Convert job statuses to Bson + let failed_status_bson = mongodb::bson::to_bson(&JobStatus::Failed)?; + let timeout_status_bson = mongodb::bson::to_bson(&JobStatus::VerificationTimeout)?; + + // Construct the aggregation pipeline + let pipeline = vec![ + // Stage 1: Match jobs with status 'Failed' or 'VerificationTimeout' + doc! { + "$match": { + "$or": [ + { "status": failed_status_bson }, + { "status": timeout_status_bson } + ] + } + }, + doc! { + "$addFields": { + "numeric_internal_id": { "$toLong": "$internal_id" } + } + }, + doc! { + "$sort": { + "numeric_internal_id": -1 + } + }, + doc! { + "$limit": 1 + }, + // Stage 4: Project only the block_number field for efficiency + doc! { + "$project": { + "block_number": "$numeric_internal_id" + } + }, + ]; + + tracing::debug!(category = "db_call", "Fetching earliest failed block number"); + + let collection: Collection = self.get_job_collection(); + + // Define a simple struct for the projection result + #[derive(Deserialize)] + struct BlockNumberResult { + block_number: u64, + } + + // Execute pipeline + let results = self.execute_pipeline::(collection, pipeline, None).await?; + + let attributes = [KeyValue::new("db_operation_name", "get_earliest_failed_block_number")]; + + // Convert results to single result and extract block number + let result = vec_to_single_result(results, "get_earliest_failed_block_number")?; + + let duration = start.elapsed(); + ORCHESTRATOR_METRICS.db_calls_response_time.record(duration.as_secs_f64(), &attributes); + + Ok(result.map(|block_result| block_result.block_number)) + } + async fn get_latest_batch(&self) -> Result, DatabaseError> { let start = Instant::now(); let pipeline = vec![ @@ -830,11 +893,6 @@ impl DatabaseClient for MongoDbClient { } }); - // throw an error if there's no field to be updated - if non_null_updates.is_empty() { - return Err(DatabaseError::NoUpdateFound("No field to be updated, likely a false call".to_string())); - } - // Add additional fields that are always updated non_null_updates.insert("size", Bson::Int64(update.end_block as i64 - batch.start_block as i64 + 1)); non_null_updates.insert("updated_at", Bson::DateTime(Utc::now().round_subsecs(0).into())); diff --git a/orchestrator/src/tests/database/mod.rs b/orchestrator/src/tests/database/mod.rs index 21b7d058a2..9375a00c39 100644 --- a/orchestrator/src/tests/database/mod.rs +++ b/orchestrator/src/tests/database/mod.rs @@ -215,7 +215,8 @@ async fn database_test_update_job() { JobItemUpdates::new() .update_status(JobStatus::LockedForProcessing) .update_metadata(updated_job_metadata) - .build(), + .build() + .unwrap(), ) .await; diff --git a/orchestrator/src/tests/workers/data_submission/mod.rs b/orchestrator/src/tests/workers/data_submission/mod.rs new file mode 100644 index 0000000000..a453b7b305 --- /dev/null +++ b/orchestrator/src/tests/workers/data_submission/mod.rs @@ -0,0 +1,234 @@ +#![allow(clippy::type_complexity)] +use crate::core::client::database::MockDatabaseClient; +use crate::core::client::queue::MockQueueClient; +use crate::tests::config::TestConfigBuilder; +use crate::tests::workers::utils::get_job_item_mock_by_id; +use crate::types::constant::BLOB_DATA_FILE_NAME; +use crate::types::jobs::metadata::{DaMetadata, JobSpecificMetadata, ProvingInputType, ProvingMetadata}; +use crate::types::jobs::types::{JobStatus, JobType}; +use crate::types::queue::QueueType; +use crate::worker::event_handler::factory::mock_factory::get_job_handler_context; +use crate::worker::event_handler::jobs::{JobHandlerTrait, MockJobHandlerTrait}; +use crate::worker::event_handler::triggers::JobTrigger; +use mockall::predicate::eq; +use orchestrator_da_client_interface::MockDaClient; +use rstest::rstest; +use std::error::Error; +use std::sync::Arc; +use uuid::Uuid; + +#[rstest] +// Scenario 1: No completed proving jobs exist +// Expected result: no data submission jobs created +#[case( + None, // earliest_failed_block + vec![], // completed_proving_jobs (no completed proving jobs) + vec![], // expected_data_submission_jobs (no jobs to create) +)] +// Scenario 2: Single completed proving job with valid metadata +// Expected result: one data submission job created +#[case( + None, // earliest_failed_block + vec![ + (0, Some("path/to/cairo_pie_0".to_string()), Some("valid_proof_path_0".to_string()), Some(1000)) + ], // completed_proving_jobs (block_num, input_path, proof_path, n_steps) + vec![0], // expected_data_submission_jobs +)] +// Scenario 3: Multiple completed proving jobs with valid metadata +// Expected result: data submission jobs created for all +#[case( + None, // earliest_failed_block + vec![ + (0, Some("path/to/cairo_pie_0".to_string()), Some("proof_path_0".to_string()), Some(1000)), + (1, Some("path/to/cairo_pie_1".to_string()), Some("proof_path_1".to_string()), Some(1500)), + (2, Some("path/to/cairo_pie_2".to_string()), Some("proof_path_2".to_string()), Some(2000)) + ], // completed_proving_jobs + vec![0, 1, 2], // expected_data_submission_jobs +)] +// Scenario 4: Proving jobs without input_path (should still create data submission job) +// Expected result: data submission jobs created (input_path is not required for data submission) +#[case( + None, // earliest_failed_block + vec![ + (0, None, Some("proof_path_0".to_string()), Some(1000)), // No input_path + (1, Some("path/to/cairo_pie_1".to_string()), None, Some(1500)) // No proof hash + ], // completed_proving_jobs + vec![0, 1], // expected_data_submission_jobs (both should be created) +)] +// Scenario 5: Proving jobs with earliest_failed_block constraint +// Expected result: data submission jobs created only for blocks before failed block +#[case( + Some(1), // earliest_failed_block (blocks >= 1 should be skipped) + vec![ + (0, Some("path/to/cairo_pie_0".to_string()), Some("proof_path_0".to_string()), Some(1000)), // Valid (< failed block) + (1, Some("path/to/cairo_pie_1".to_string()), Some("proof_path_1".to_string()), Some(1500)), // Skipped (>= failed block) + (2, Some("path/to/cairo_pie_2".to_string()), Some("proof_path_2".to_string()), Some(2000)) // Skipped (>= failed block) + ], // completed_proving_jobs + vec![0], // expected_data_submission_jobs (only block 0) +)] +// Scenario 6: All proving jobs are beyond failed block +// Expected result: no data submission jobs created (all skipped) +#[case( + Some(0), // earliest_failed_block (all blocks >= 0 should be skipped) + vec![ + (0, Some("path/to/cairo_pie_0".to_string()), Some("proof_path_0".to_string()), Some(1000)), + (1, Some("path/to/cairo_pie_1".to_string()), Some("proof_path_1".to_string()), Some(1500)), + (2, Some("path/to/cairo_pie_2".to_string()), Some("proof_path_2".to_string()), Some(2000)) + ], // completed_proving_jobs + vec![], // expected_data_submission_jobs (all skipped) +)] +// Scenario 7: Large number of completed proving jobs +// Expected result: data submission jobs created for all valid ones +#[case( + None, // earliest_failed_block + vec![ + (0, Some("pie_0".to_string()), Some("proof_path_0".to_string()), Some(1000)), + (1, Some("pie_1".to_string()), Some("proof_path_1".to_string()), Some(1100)), + (2, Some("pie_2".to_string()), Some("proof_path_2".to_string()), Some(1200)), + (3, Some("pie_3".to_string()), Some("proof_path_3".to_string()), Some(1300)), + (4, Some("pie_4".to_string()), Some("proof_path_4".to_string()), Some(1400)), + (5, Some("pie_5".to_string()), Some("proof_path_5".to_string()), Some(1500)) + ], // completed_proving_jobs + vec![0, 1, 2, 3, 4, 5], // expected_data_submission_jobs +)] +// Scenario 8: Mix of valid proving jobs with failed block constraint +// Expected result: data submission jobs created only for valid blocks before failed block +#[case( + Some(3), // earliest_failed_block + vec![ + (0, Some("pie_0".to_string()), Some("proof_path_0".to_string()), Some(1000)), // Valid + (1, Some("pie_1".to_string()), Some("proof_path_1".to_string()), Some(1100)), // Valid + (2, Some("pie_2".to_string()), Some("proof_path_2".to_string()), Some(1200)), // Valid + (3, Some("pie_3".to_string()), Some("proof_path_3".to_string()), Some(1300)), // Skipped - at failed block + (4, Some("pie_4".to_string()), Some("proof_path_4".to_string()), Some(1400)), // Skipped - beyond failed block + (5, Some("pie_5".to_string()), Some("proof_path_5".to_string()), Some(1500)) // Skipped - beyond failed block + ], // completed_proving_jobs + vec![0, 1, 2], // expected_data_submission_jobs (only blocks before failed block) +)] +// Scenario 9: Proving jobs with minimal metadata (testing edge cases) +// Expected result: data submission jobs created for all (minimal metadata is acceptable) +#[case( + None, // earliest_failed_block + vec![ + (0, None, None, None), // Minimal metadata + (1, None, None, Some(0)), // Zero n_steps + (2, Some("".to_string()), Some("".to_string()), Some(1000)) // Empty strings + ], // completed_proving_jobs + vec![0, 1, 2], // expected_data_submission_jobs (all should be created) +)] +// Scenario 10: High block numbers with failed block constraint +// Expected result: data submission jobs created only for blocks before failed block +#[case( + Some(1000), // earliest_failed_block + vec![ + (999, Some("pie_999".to_string()), Some("proof_path_999".to_string()), Some(10000)), // Valid + (1000, Some("pie_1000".to_string()), Some("proof_path_1000".to_string()), Some(11000)), // Skipped - at failed block + (1001, Some("pie_1001".to_string()), Some("proof_path_1001".to_string()), Some(12000)) // Skipped - beyond failed block + ], // completed_proving_jobs + vec![999], // expected_data_submission_jobs (only block 999) +)] +#[tokio::test] +async fn test_data_submission_worker( + #[case] earliest_failed_block: Option, + #[case] completed_proving_jobs: Vec<(u64, Option, Option, Option)>, // (block_num, input_path, proof_path, n_steps) + #[case] expected_data_submission_jobs: Vec, +) -> Result<(), Box> { + dotenvy::from_filename_override(".env.test").expect("Failed to load the .env file"); + + // Setup mock clients + let da_client = MockDaClient::new(); + let mut db = MockDatabaseClient::new(); + let mut queue = MockQueueClient::new(); + let mut job_handler = MockJobHandlerTrait::new(); + + // Mock earliest_failed_block_number query + db.expect_get_earliest_failed_block_number().returning(move || Ok(earliest_failed_block)); + + // Create completed proving job items + let proving_job_items: Vec<_> = completed_proving_jobs + .iter() + .map(|(block_num, input_path, proof_path, n_steps)| { + let mut job_item = get_job_item_mock_by_id(block_num.to_string(), Uuid::new_v4()); + job_item.metadata.specific = JobSpecificMetadata::Proving(ProvingMetadata { + block_number: *block_num, + input_path: input_path.as_ref().map(|path| ProvingInputType::CairoPie(path.clone())), + download_proof: proof_path.clone(), + ensure_on_chain_registration: None, // Not needed for data submission + n_steps: *n_steps, + }); + job_item.status = JobStatus::Completed; + job_item + }) + .collect(); + + // Mock database call to get proving jobs without data submission jobs + let proving_jobs_clone = proving_job_items.clone(); + db.expect_get_jobs_without_successor() + .with(eq(JobType::ProofCreation), eq(JobStatus::Completed), eq(JobType::DataSubmission)) + .returning(move |_, _, _| Ok(proving_jobs_clone.clone())); + + // Mock get_job_by_internal_id_and_type to always return None + db.expect_get_job_by_internal_id_and_type().returning(|_, _| Ok(None)); + + // Mock job creation for expected data submission jobs + for &block_num in &expected_data_submission_jobs { + let uuid = Uuid::new_v4(); + let block_num_str = block_num.to_string(); + + // Only expect job creation for jobs that should actually be created + // (i.e., are not beyond failed block) + if earliest_failed_block.is_none() || block_num < earliest_failed_block.unwrap() { + let mut da_job_item = get_job_item_mock_by_id(block_num_str.clone(), uuid); + da_job_item.metadata.specific = JobSpecificMetadata::Da(DaMetadata { + block_number: block_num, + blob_data_path: Some(format!("{}/{}", block_num, BLOB_DATA_FILE_NAME)), + tx_hash: None, + }); + da_job_item.status = JobStatus::Created; + + let job_item_clone = da_job_item.clone(); + + job_handler + .expect_create_job() + .with(eq(block_num_str.clone()), mockall::predicate::always()) + .returning(move |_, _| Ok(job_item_clone.clone())); + + let block_num_str_for_db = block_num_str.clone(); + db.expect_create_job() + .withf(move |item| { + item.internal_id == block_num_str_for_db + && matches!(item.metadata.specific, JobSpecificMetadata::Da(_)) + }) + .returning(move |_| Ok(da_job_item.clone())); + } + } + + // Setup job handler context + let job_handler: Arc> = Arc::new(Box::new(job_handler)); + let ctx = get_job_handler_context(); + ctx.expect().with(eq(JobType::DataSubmission)).returning(move |_| Arc::clone(&job_handler)); + + let expected_created_count = expected_data_submission_jobs.len(); + + // Mock queue operations for successful job creations + queue + .expect_send_message() + .times(expected_created_count) + .returning(|_, _, _| Ok(())) + .withf(|queue, _, _| *queue == QueueType::DataSubmissionJobProcessing); + + // Build test configuration + let services = TestConfigBuilder::new() + .configure_database(db.into()) + .configure_queue_client(queue.into()) + .configure_da_client(da_client.into()) + .build() + .await; + + // Run the Data Submission worker + crate::worker::event_handler::triggers::data_submission::DataSubmissionJobTrigger + .run_worker(services.config) + .await?; + + Ok(()) +} diff --git a/orchestrator/src/tests/workers/mod.rs b/orchestrator/src/tests/workers/mod.rs index c21dd3d096..93a81a7ceb 100644 --- a/orchestrator/src/tests/workers/mod.rs +++ b/orchestrator/src/tests/workers/mod.rs @@ -4,3 +4,6 @@ pub mod proving; pub mod snos; mod update_state; pub mod utils; + +#[cfg(test)] +pub mod data_submission; diff --git a/orchestrator/src/tests/workers/proving/mod.rs b/orchestrator/src/tests/workers/proving/mod.rs index a72050f70b..a1f9b9984c 100644 --- a/orchestrator/src/tests/workers/proving/mod.rs +++ b/orchestrator/src/tests/workers/proving/mod.rs @@ -1,123 +1,232 @@ +#![allow(clippy::type_complexity)] use std::error::Error; use std::sync::Arc; use crate::core::client::database::MockDatabaseClient; use crate::core::client::queue::MockQueueClient; use crate::tests::config::TestConfigBuilder; -use crate::tests::workers::utils::{db_checks_proving_worker, get_job_by_mock_id_vector}; +use crate::tests::workers::utils::get_job_item_mock_by_id; use crate::types::jobs::metadata::JobSpecificMetadata; +use crate::types::jobs::metadata::{ProvingInputType, ProvingMetadata, SnosMetadata}; use crate::types::jobs::types::{JobStatus, JobType}; use crate::types::queue::QueueType; use crate::worker::event_handler::factory::mock_factory::get_job_handler_context; use crate::worker::event_handler::jobs::{JobHandlerTrait, MockJobHandlerTrait}; -use crate::worker::event_handler::triggers::proving::ProvingJobTrigger; use crate::worker::event_handler::triggers::JobTrigger; -use httpmock::MockServer; use mockall::predicate::eq; use orchestrator_da_client_interface::MockDaClient; -use orchestrator_prover_client_interface::MockProverClient; -use orchestrator_settlement_client_interface::MockSettlementClient; use rstest::rstest; -use starknet::providers::jsonrpc::HttpTransport; -use starknet::providers::JsonRpcClient; -use url::Url; +use uuid::Uuid; #[rstest] -#[case(true)] -#[case(false)] +// Scenario 1: No completed SNOS jobs exist +// Expected result: no proving jobs created +#[case( + None, // earliest_failed_block + vec![], // completed_snos_jobs (no completed SNOS jobs) + vec![], // expected_proving_jobs (no jobs to create) +)] +// Scenario 2: Single completed SNOS job with valid snos_fact +// Expected result: one proving job created +#[case( + None, // earliest_failed_block + vec![ + (0, Some("valid_snos_fact_block_0".to_string()), Some("path/to/cairo_pie_0".to_string()), Some(1000)) + ], // completed_snos_jobs + vec![0], // expected_proving_jobs +)] +// Scenario 3: Multiple completed SNOS jobs with valid snos_facts +// Expected result: proving jobs created for all +#[case( + None, // earliest_failed_block + vec![ + (0, Some("valid_snos_fact_block_0".to_string()), Some("path/to/cairo_pie_0".to_string()), Some(1000)), + (1, Some("valid_snos_fact_block_1".to_string()), Some("path/to/cairo_pie_1".to_string()), Some(1500)), + (2, Some("valid_snos_fact_block_2".to_string()), Some("path/to/cairo_pie_2".to_string()), Some(2000)) + ], // completed_snos_jobs + vec![0, 1, 2], // expected_proving_jobs +)] +// Scenario 4: SNOS job without snos_fact (should be skipped) +// Expected result: no proving jobs created +#[case( + None, // earliest_failed_block + vec![ + (0, None, Some("path/to/cairo_pie_0".to_string()), Some(1000)) // Missing snos_fact + ], // completed_snos_jobs + vec![], // expected_proving_jobs (skipped due to missing fact) +)] +// Scenario 5: Mix of valid and invalid SNOS jobs +// Expected result: proving jobs created only for valid ones +#[case( + None, // earliest_failed_block + vec![ + (0, Some("valid_snos_fact_block_0".to_string()), Some("path/to/cairo_pie_0".to_string()), Some(1000)), // Valid + (1, None, Some("path/to/cairo_pie_1".to_string()), Some(1500)), // Invalid - no snos_fact + (2, Some("valid_snos_fact_block_2".to_string()), Some("path/to/cairo_pie_2".to_string()), Some(2000)) // Valid + ], // completed_snos_jobs + vec![0, 2], // expected_proving_jobs (skip block 1) +)] +// Scenario 6: Completed SNOS jobs but earliest_failed_block constraint blocks some +// Expected result: proving jobs created only for blocks before failed block +#[case( + Some(1), // earliest_failed_block (blocks >= 1 should be skipped) + vec![ + (0, Some("valid_snos_fact_block_0".to_string()), Some("path/to/cairo_pie_0".to_string()), Some(1000)), // Valid (< failed block) + (1, Some("valid_snos_fact_block_1".to_string()), Some("path/to/cairo_pie_1".to_string()), Some(1500)), // Skipped (>= failed block) + (2, Some("valid_snos_fact_block_2".to_string()), Some("path/to/cairo_pie_2".to_string()), Some(2000)) // Skipped (>= failed block) + ], // completed_snos_jobs + vec![0], // expected_proving_jobs (only block 0) +)] +// Scenario 7: All SNOS jobs are beyond failed block +// Expected result: no proving jobs created (all skipped) +#[case( + Some(0), // earliest_failed_block (all blocks >= 0 should be skipped) + vec![ + (0, Some("valid_snos_fact_block_0".to_string()), Some("path/to/cairo_pie_0".to_string()), Some(1000)), + (1, Some("valid_snos_fact_block_1".to_string()), Some("path/to/cairo_pie_1".to_string()), Some(1500)), + (2, Some("valid_snos_fact_block_2".to_string()), Some("path/to/cairo_pie_2".to_string()), Some(2000)) + ], // completed_snos_jobs + vec![], // expected_proving_jobs (all skipped) +)] +// Scenario 8: Large number of completed SNOS jobs +// Expected result: proving jobs created for all valid ones +#[case( + None, // earliest_failed_block + vec![ + (0, Some("fact_0".to_string()), Some("pie_0".to_string()), Some(1000)), + (1, Some("fact_1".to_string()), Some("pie_1".to_string()), Some(1100)), + (2, Some("fact_2".to_string()), Some("pie_2".to_string()), Some(1200)), + (3, Some("fact_3".to_string()), Some("pie_3".to_string()), Some(1300)), + (4, Some("fact_4".to_string()), Some("pie_4".to_string()), Some(1400)) + ], // completed_snos_jobs + vec![0, 1, 2, 3, 4], // expected_proving_jobs +)] +// Scenario 9: SNOS jobs without cairo_pie_path (should still create proving job) +// Expected result: proving job created with None input_path +#[case( + None, // earliest_failed_block + vec![ + (0, Some("valid_snos_fact_block_0".to_string()), None, Some(1000)) // No cairo_pie_path + ], // completed_snos_jobs + vec![0], // expected_proving_jobs (should still be created) +)] +// Scenario 10: Complex scenario with failed block constraint and mixed validity +// Expected result: proving jobs created only for valid blocks before failed block +#[case( + Some(3), // earliest_failed_block + vec![ + (0, Some("fact_0".to_string()), Some("pie_0".to_string()), Some(1000)), // Valid + (1, None, Some("pie_1".to_string()), Some(1100)), // Invalid - no fact + (2, Some("fact_2".to_string()), Some("pie_2".to_string()), Some(1200)), // Valid + (3, Some("fact_3".to_string()), Some("pie_3".to_string()), Some(1300)), // Skipped - at failed block + (4, Some("fact_4".to_string()), Some("pie_4".to_string()), Some(1400)) // Skipped - beyond failed block + ], // completed_snos_jobs + vec![0, 2], // expected_proving_jobs (only valid blocks before failed block) +)] #[tokio::test] -async fn test_proving_worker(#[case] incomplete_runs: bool) -> Result<(), Box> { - let num_jobs = 5; - // Choosing a random incomplete job ID out of the total number of jobs - let random_incomplete_job_id: u64 = 3; - - let server = MockServer::start(); +async fn test_proving_worker( + #[case] earliest_failed_block: Option, + #[case] completed_snos_jobs: Vec<(u64, Option, Option, Option)>, // (block_num, snos_fact, cairo_pie_path, n_steps) + #[case] expected_proving_jobs: Vec, +) -> Result<(), Box> { + dotenvy::from_filename_override(".env.test").expect("Failed to load the .env file"); + + // Setup mock clients let da_client = MockDaClient::new(); let mut db = MockDatabaseClient::new(); let mut queue = MockQueueClient::new(); - let prover_client = MockProverClient::new(); - let settlement_client = MockSettlementClient::new(); - - // Mocking the get_job_handler function. let mut job_handler = MockJobHandlerTrait::new(); - // Create mock SNOS jobs with snos_fact field set - let mut snos_jobs = Vec::new(); - - for i in 1..=num_jobs { - // Skip job with ID 3 if incomplete_runs is true - if incomplete_runs && i == random_incomplete_job_id { - continue; - } - - // Create a SNOS job with snos_fact field set - let mut job = get_job_by_mock_id_vector(JobType::SnosRun, JobStatus::Completed, 1, i)[0].clone(); - - // Ensure the SNOS job has a snos_fact field - if let JobSpecificMetadata::Snos(ref mut snos_metadata) = job.metadata.specific { - snos_metadata.snos_fact = Some(format!("0x{:064x}", i)); - } - - snos_jobs.push(job); - } - - // Mock db call for getting successful SNOS jobs without successor - db.expect_get_jobs_without_successor() - .times(1) - .withf(|job_type, job_status, successor_type| { - *job_type == JobType::SnosRun - && *job_status == JobStatus::Completed - && *successor_type == JobType::ProofCreation + // Mock earliest_failed_block_number query + db.expect_get_earliest_failed_block_number().returning(move || Ok(earliest_failed_block)); + + // Create completed SNOS job items + let snos_job_items: Vec<_> = completed_snos_jobs + .iter() + .map(|(block_num, snos_fact, cairo_pie_path, n_steps)| { + let mut job_item = get_job_item_mock_by_id(block_num.to_string(), Uuid::new_v4()); + job_item.metadata.specific = JobSpecificMetadata::Snos(SnosMetadata { + block_number: *block_num, + snos_fact: snos_fact.clone(), + cairo_pie_path: cairo_pie_path.clone(), + snos_n_steps: *n_steps, + ..Default::default() + }); + job_item.status = JobStatus::Completed; + job_item }) - .returning(move |_, _, _| Ok(snos_jobs.clone())); + .collect(); - // Set up expectations for each job - for i in 1..=num_jobs { - if incomplete_runs && i == random_incomplete_job_id { - continue; + // Mock database call to get SNOS jobs without proving jobs + let snos_jobs_clone = snos_job_items.clone(); + db.expect_get_jobs_without_successor() + .with(eq(JobType::SnosRun), eq(JobStatus::Completed), eq(JobType::ProofCreation)) + .returning(move |_, _, _| Ok(snos_jobs_clone.clone())); + + // Mock get_job_by_internal_id_and_type to always return None + db.expect_get_job_by_internal_id_and_type().returning(|_, _| Ok(None)); + + // Mock job creation for expected proving jobs + for &block_num in &expected_proving_jobs { + let uuid = Uuid::new_v4(); + let block_num_str = block_num.to_string(); + + // Find the corresponding SNOS job to get its metadata + let snos_job = completed_snos_jobs.iter().find(|(b, _, _, _)| *b == block_num).unwrap(); + let (_, snos_fact, cairo_pie_path, n_steps) = snos_job; + + // Only expect job creation for jobs that should actually be created + // (i.e., have snos_fact and are not beyond failed block) + if snos_fact.is_some() && (earliest_failed_block.is_none() || block_num < earliest_failed_block.unwrap()) { + let mut proving_job_item = get_job_item_mock_by_id(block_num_str.clone(), uuid); + proving_job_item.metadata.specific = JobSpecificMetadata::Proving(ProvingMetadata { + block_number: block_num, + input_path: cairo_pie_path.as_ref().map(|path| ProvingInputType::CairoPie(path.clone())), + download_proof: None, + ensure_on_chain_registration: snos_fact.clone(), + n_steps: *n_steps, + }); + proving_job_item.status = JobStatus::Created; + + let job_item_clone = proving_job_item.clone(); + + job_handler + .expect_create_job() + .with(eq(block_num_str.clone()), mockall::predicate::always()) + .returning(move |_, _| Ok(job_item_clone.clone())); + + let block_num_str_for_db = block_num_str.clone(); + db.expect_create_job() + .withf(move |item| { + item.internal_id == block_num_str_for_db + && matches!(item.metadata.specific, JobSpecificMetadata::Proving(_)) + }) + .returning(move |_| Ok(proving_job_item.clone())); } - db_checks_proving_worker(i as i32, &mut db, &mut job_handler); } - // Queue function call simulations - if incomplete_runs { - queue - .expect_send_message() - .times(4) - .returning(|_, _, _| Ok(())) - .withf(|queue, _payload, _delay| *queue == QueueType::ProvingJobProcessing); - } else { - queue - .expect_send_message() - .times(5) - .returning(|_, _, _| Ok(())) - .withf(|queue, _payload, _delay| *queue == QueueType::ProvingJobProcessing); - } - let provider = JsonRpcClient::new(HttpTransport::new( - Url::parse(format!("http://localhost:{}", server.port()).as_str()).expect("Failed to parse URL"), - )); + // Setup job handler context + let job_handler: Arc> = Arc::new(Box::new(job_handler)); + let ctx = get_job_handler_context(); + ctx.expect().with(eq(JobType::ProofCreation)).returning(move |_| Arc::clone(&job_handler)); + + // Mock queue operations for successful job creations + queue + .expect_send_message() + .times(expected_proving_jobs.len()) + .returning(|_, _, _| Ok(())) + .withf(|queue, _, _| *queue == QueueType::ProvingJobProcessing); + // Build test configuration let services = TestConfigBuilder::new() - .configure_starknet_client(provider.into()) .configure_database(db.into()) .configure_queue_client(queue.into()) .configure_da_client(da_client.into()) - .configure_prover_client(prover_client.into()) - .configure_settlement_client(settlement_client.into()) .build() .await; - let job_handler: Arc> = Arc::new(Box::new(job_handler)); - let ctx = get_job_handler_context(); - - // Mocking the `get_job_handler` call in create_job function. - if incomplete_runs { - ctx.expect().times(4).with(eq(JobType::ProofCreation)).returning(move |_| Arc::clone(&job_handler)); - } else { - ctx.expect().times(5).with(eq(JobType::ProofCreation)).returning(move |_| Arc::clone(&job_handler)); - } - - ProvingJobTrigger.run_worker(services.config).await?; + // Run the Proving worker + crate::worker::event_handler::triggers::proving::ProvingJobTrigger.run_worker(services.config).await?; Ok(()) } diff --git a/orchestrator/src/tests/workers/snos/mod.rs b/orchestrator/src/tests/workers/snos/mod.rs index 4817b9c3e1..6ea463b756 100644 --- a/orchestrator/src/tests/workers/snos/mod.rs +++ b/orchestrator/src/tests/workers/snos/mod.rs @@ -1,3 +1,5 @@ +#![allow(clippy::too_many_arguments)] + use crate::core::client::database::MockDatabaseClient; use crate::core::client::queue::MockQueueClient; use crate::tests::config::TestConfigBuilder; @@ -25,6 +27,7 @@ use uuid::Uuid; // Scenario 1: Block 0 is Completed | Block 1 is PendingRetry | Max_concurrent_create_snos is 3 // Expected result: create jobs for block 2,3 only #[case( + None, 100, // latest_sequencer_block Some(0), // latest_snos_completed None, // latest_state_transition_completed @@ -36,6 +39,7 @@ use uuid::Uuid; // Scenario 2: Block 0 is Completed | Max_concurrent_create_snos is 3 // Expected result: create jobs for block 1,2,3 only #[case( + None, 100, // latest_sequencer_block Some(0), // latest_snos_completed None, // latest_state_transition_completed @@ -47,6 +51,7 @@ use uuid::Uuid; // Scenario 3: No SNOS job for any block exists | Max_concurrent_create_snos is 3 // Expected result: create jobs for block 0,1,2 only #[case( + None, 100, // latest_sequencer_block None, // latest_snos_completed None, // latest_state_transition_completed @@ -58,6 +63,7 @@ use uuid::Uuid; // Scenario 4: Block 0,2 is Completed | Block 1 is Missed | Max_concurrent_create_snos is 3 // Expected result: create jobs for block 1,3,4 only #[case( + None, 100, // latest_sequencer_block Some(2), // latest_snos_completed None, // latest_state_transition_completed @@ -69,6 +75,7 @@ use uuid::Uuid; // Scenario 5: Block 2 is Completed | Block 0 is PendingRetry | Block 1 is Missed | Max_concurrent_create_snos is 3 // Expected result: create jobs for block 1,3 only #[case( + None, 100, // latest_sequencer_block Some(2), // latest_snos_completed None, // latest_state_transition_completed @@ -80,6 +87,7 @@ use uuid::Uuid; // Scenario 6: Block 2 is Completed | Block 0 is PendingRetry | Block 1 is Created | Max_concurrent_create_snos is 3 // Expected result: create jobs for block 3 only #[case( + None, 100, // latest_sequencer_block Some(2), // latest_snos_completed None, // latest_state_transition_completed @@ -91,6 +99,7 @@ use uuid::Uuid; // Scenario 7: Block 4 is Created | latest_snos_completed & latest_state_transition_completed is 3 | Max_concurrent_create_snos is 3 // Expected result: create jobs for block 3 only #[case( + None, 100, // latest_sequencer_block Some(3), // latest_snos_completed Some(3), // latest_state_transition_completed @@ -102,6 +111,7 @@ use uuid::Uuid; // Scenario 8: Block 1 is Created | latest_snos_completed is 2 & latest_state_transition_completed is None | Max_concurrent_create_snos is 3 // Expected result: create jobs for block 3 only #[case( + None, 3, // latest_sequencer_block Some(2), // latest_snos_completed None, // latest_state_transition_completed @@ -110,8 +120,21 @@ use uuid::Uuid; vec![1], // pending_blocks (block 1 Created, consumes 1 slot) vec![0,3] // expected_jobs (only 2 slot left for new block) )] +// Scenario 9: Block 1 is Created | earliest_failed_block is 4 | latest_snos_completed is 2 & latest_state_transition_completed is None | Max_concurrent_create_snos is 3 +// Expected result: create jobs for block 3 only +#[case( + Some(4), // earliest_failed_block + 5, // latest_sequencer_block + Some(2), // latest_snos_completed + None, // latest_state_transition_completed + vec![0], // missing_blocks_first_half (no missing blocks to create) + vec![3], // missing_blocks_second_half + vec![1], // pending_blocks (block 1 Created, consumes 1 slot) + vec![0,3] // expected_jobs (only 2 slot left for new block) +)] #[tokio::test] async fn test_snos_worker( + #[case] earliest_failed_block: Option, #[case] latest_sequencer_block: u64, #[case] latest_snos_completed: Option, #[case] latest_state_transition_completed: Option, @@ -154,6 +177,8 @@ async fn test_snos_worker( .with(eq(JobType::SnosRun), eq(JobStatus::Completed)) .returning(move |_, _| Ok(latest_snos_job.clone())); + db.expect_get_earliest_failed_block_number().with().returning(move || Ok(earliest_failed_block)); + // Mock get_job_by_internal_id_and_type to always return None db.expect_get_job_by_internal_id_and_type().returning(|_, _| Ok(None)); diff --git a/orchestrator/src/types/jobs/job_updates.rs b/orchestrator/src/types/jobs/job_updates.rs index 848059b683..3cbe67481f 100644 --- a/orchestrator/src/types/jobs/job_updates.rs +++ b/orchestrator/src/types/jobs/job_updates.rs @@ -1,3 +1,4 @@ +use crate::error::job::JobError; use crate::types::jobs::external_id::ExternalId; use crate::types::jobs::metadata::JobMetadata; use crate::types::jobs::types::JobStatus; @@ -37,8 +38,11 @@ impl JobItemUpdates { self.metadata = Some(metadata); self } - // creating another type JobItemUpdatesBuilder would be an overkill - pub fn build(self) -> JobItemUpdates { - self + pub fn build(self) -> Result { + if self.status.is_none() && self.external_id.is_none() && self.metadata.is_none() { + Err(JobError::Other("No field to be updated, likely a false call".to_string().into())) + } else { + Ok(self) + } } } diff --git a/orchestrator/src/worker/controller/event_worker.rs b/orchestrator/src/worker/controller/event_worker.rs index e10f29c686..fc57bfad00 100644 --- a/orchestrator/src/worker/controller/event_worker.rs +++ b/orchestrator/src/worker/controller/event_worker.rs @@ -93,7 +93,7 @@ impl EventWorker { let worker_handler = JobHandlerService::get_worker_handler_from_worker_trigger_type(worker_mes.worker.clone()); worker_handler - .run_worker_if_enabled(self.config.clone()) + .run_worker(self.config.clone()) .await .map_err(|e| ConsumptionError::Other(OtherError::from(e.to_string())))?; Ok(()) diff --git a/orchestrator/src/worker/event_handler/service.rs b/orchestrator/src/worker/event_handler/service.rs index f101b5bea0..4a1494dc7e 100644 --- a/orchestrator/src/worker/event_handler/service.rs +++ b/orchestrator/src/worker/event_handler/service.rs @@ -20,7 +20,7 @@ use crate::utils::metrics::ORCHESTRATOR_METRICS; #[double] use crate::worker::event_handler::factory::factory; use crate::worker::event_handler::triggers::batching::BatchingTrigger; -use crate::worker::event_handler::triggers::data_submission_worker::DataSubmissionJobTrigger; +use crate::worker::event_handler::triggers::data_submission::DataSubmissionJobTrigger; use crate::worker::event_handler::triggers::proof_registration::ProofRegistrationJobTrigger; use crate::worker::event_handler::triggers::proving::ProvingJobTrigger; use crate::worker::event_handler::triggers::snos::SnosJobTrigger; @@ -185,7 +185,7 @@ impl JobHandlerService { JobItemUpdates::new() .update_status(JobStatus::LockedForProcessing) .update_metadata(job.metadata.clone()) - .build(), + .build()?, ) .await .map_err(|e| { @@ -240,7 +240,7 @@ impl JobHandlerService { .update_status(JobStatus::PendingVerification) .update_metadata(job.metadata.clone()) .update_external_id(external_id.clone().into()) - .build(), + .build()?, ) .await .map_err(|e| { @@ -350,7 +350,7 @@ impl JobHandlerService { job.metadata.common.verification_started_at = Some(Utc::now()); let mut job = config .database() - .update_job(&job, JobItemUpdates::new().update_metadata(job.metadata.clone()).build()) + .update_job(&job, JobItemUpdates::new().update_metadata(job.metadata.clone()).build()?) .await .map_err(|e| { tracing::error!(job_id = ?id, error = ?e, "Failed to update job status"); @@ -390,7 +390,7 @@ impl JobHandlerService { JobItemUpdates::new() .update_metadata(job.metadata.clone()) .update_status(JobStatus::Completed) - .build(), + .build()?, ) .await .map_err(|e| { @@ -420,7 +420,7 @@ impl JobHandlerService { JobItemUpdates::new() .update_status(JobStatus::VerificationFailed) .update_metadata(job.metadata.clone()) - .build(), + .build()?, ) .await .map_err(|e| { @@ -448,7 +448,7 @@ impl JobHandlerService { tracing::warn!(job_id = ?id, "Max verification attempts reached. Marking job as timed out"); config .database() - .update_job(&job, JobItemUpdates::new().update_status(JobStatus::VerificationTimeout).build()) + .update_job(&job, JobItemUpdates::new().update_status(JobStatus::VerificationTimeout).build()?) .await .map_err(|e| { tracing::error!(job_id = ?id, error = ?e, "Failed to update job status to VerificationTimeout"); @@ -461,7 +461,7 @@ impl JobHandlerService { config .database() - .update_job(&job, JobItemUpdates::new().update_metadata(job.metadata.clone()).build()) + .update_job(&job, JobItemUpdates::new().update_metadata(job.metadata.clone()).build()?) .await .map_err(|e| { tracing::error!(job_id = ?id, error = ?e, "Failed to update job metadata"); @@ -585,7 +585,7 @@ impl JobHandlerService { JobItemUpdates::new() .update_status(JobStatus::PendingRetry) .update_metadata(job.metadata.clone()) - .build(), + .build()?, ) .await .map_err(|e| { diff --git a/orchestrator/src/worker/event_handler/triggers/data_submission.rs b/orchestrator/src/worker/event_handler/triggers/data_submission.rs new file mode 100644 index 0000000000..89e8d78030 --- /dev/null +++ b/orchestrator/src/worker/event_handler/triggers/data_submission.rs @@ -0,0 +1,186 @@ +use crate::core::client::database::DatabaseError; +use crate::core::config::Config; +use crate::error::job::JobError; +use crate::types::constant::BLOB_DATA_FILE_NAME; +use crate::types::jobs::job_item::JobItem; +use crate::types::jobs::metadata::{CommonMetadata, DaMetadata, JobMetadata, JobSpecificMetadata, ProvingMetadata}; +use crate::types::jobs::types::{JobStatus, JobType}; +use crate::utils::metrics::ORCHESTRATOR_METRICS; +use crate::worker::event_handler::service::JobHandlerService; +use crate::worker::event_handler::triggers::{JobTrigger, ProcessingResult}; +use async_trait::async_trait; +use color_eyre::eyre::Context; +use opentelemetry::KeyValue; +use std::sync::Arc; + +pub struct DataSubmissionJobTrigger; + +#[async_trait] +impl JobTrigger for DataSubmissionJobTrigger { + /// Creates data submission jobs for all successful proving jobs that don't have one yet. + /// + /// This worker: + /// 1. Fetches all completed proving jobs without data submission jobs + /// 2. Validates each job can be processed (not beyond failed blocks) + /// 3. Creates corresponding data submission jobs with proper metadata + /// + async fn run_worker(&self, config: Arc) -> color_eyre::Result<()> { + tracing::trace!(log_type = "starting", category = "DataSubmissionWorker", "DataSubmissionWorker started."); + + let processing_context = ProcessingContext::new(config.clone()).await?; + let eligible_proving_jobs = processing_context.get_eligible_proving_jobs().await?; + + tracing::debug!("Found {} successful proving jobs without data submission jobs", eligible_proving_jobs.len()); + + let mut created_jobs = 0; + let mut skipped_jobs = 0; + + for proving_job in eligible_proving_jobs { + match self.process_proving_job(&proving_job, &processing_context).await { + ProcessingResult::Created => created_jobs += 1, + ProcessingResult::Skipped => skipped_jobs += 1, + ProcessingResult::Failed => { + // Error already logged in process_proving_job + continue; + } + } + } + + tracing::info!(created = created_jobs, skipped = skipped_jobs, "DataSubmissionWorker completed job processing"); + + tracing::trace!(log_type = "completed", category = "DataSubmissionWorker", "DataSubmissionWorker completed."); + Ok(()) + } +} + +impl DataSubmissionJobTrigger { + /// Processes a single proving job to create its corresponding data submission job + async fn process_proving_job(&self, proving_job: &JobItem, context: &ProcessingContext) -> ProcessingResult { + // Extract and validate proving metadata + let proving_metadata = match self.extract_proving_metadata(proving_job) { + Ok(metadata) => metadata, + Err(_) => return ProcessingResult::Failed, + }; + + // Check if job should be skipped due to failed block constraints + if context.should_skip_block(proving_metadata.block_number) { + tracing::debug!( + job_id = %proving_job.internal_id, + block_number = proving_metadata.block_number, + earliest_failed_block = context.earliest_failed_block, + "Skipping data submission job due to failed block constraint" + ); + return ProcessingResult::Skipped; + } + + // Create and submit data submission job + match self.create_data_submission_job(proving_job, &proving_metadata, context.config.clone()).await { + Ok(_) => { + tracing::info!( + block_number = proving_metadata.block_number, + job_id = %proving_job.internal_id, + "Successfully created data submission job" + ); + ProcessingResult::Created + } + Err(_) => ProcessingResult::Failed, + } + } + + /// Extracts and validates proving metadata from the job + fn extract_proving_metadata(&self, proving_job: &JobItem) -> color_eyre::Result { + proving_job + .metadata + .specific + .clone() + .try_into() + .map_err(|e| { + tracing::error!( + job_id = %proving_job.internal_id, + error = %e, + "Invalid metadata type for proving job" + ); + e + }) + .context("Unalbe to Extract Proving Metadata") + } + + /// Creates a data submission job with the appropriate metadata + async fn create_data_submission_job( + &self, + proving_job: &JobItem, + proving_metadata: &ProvingMetadata, + config: Arc, + ) -> Result<(), JobError> { + let da_metadata = self.build_da_metadata(proving_metadata); + + tracing::debug!( + job_id = %proving_job.internal_id, + block_number = proving_metadata.block_number, + "Creating data submission job for proving job" + ); + + JobHandlerService::create_job(JobType::DataSubmission, proving_job.internal_id.clone(), da_metadata, config) + .await + .map_err(|e| { + tracing::warn!( + job_id = %proving_job.internal_id, + error = %e, + "Failed to create data submission job" + ); + + let attributes = [ + KeyValue::new("operation_job_type", format!("{:?}", JobType::DataSubmission)), + KeyValue::new("operation_type", "create_job"), + ]; + ORCHESTRATOR_METRICS.failed_job_operations.add(1.0, &attributes); + + e + }) + } + + /// Builds data submission job metadata from proving metadata + fn build_da_metadata(&self, proving_metadata: &ProvingMetadata) -> JobMetadata { + JobMetadata { + common: CommonMetadata::default(), + specific: JobSpecificMetadata::Da(DaMetadata { + block_number: proving_metadata.block_number, + blob_data_path: Some(self.build_blob_data_path(proving_metadata.block_number)), + tx_hash: None, // Will be populated during processing + }), + } + } + + /// Builds the blob data path for a given block number + fn build_blob_data_path(&self, block_number: u64) -> String { + format!("{}/{BLOB_DATA_FILE_NAME}", block_number) + } +} + +/// Context for processing jobs, containing shared data and configuration +struct ProcessingContext { + config: Arc, + earliest_failed_block: Option, +} + +impl ProcessingContext { + /// Creates a new processing context with necessary data fetched + async fn new(config: Arc) -> color_eyre::Result { + let earliest_failed_block = config.database().get_earliest_failed_block_number().await?; + + Ok(Self { config, earliest_failed_block }) + } + + /// Fetches all proving jobs eligible for data submission job creation + async fn get_eligible_proving_jobs(&self) -> Result, DatabaseError> { + self.config + .database() + .get_jobs_without_successor(JobType::ProofCreation, JobStatus::Completed, JobType::DataSubmission) + .await + } + + /// Determines if a block number should be skipped due to failed block constraints + fn should_skip_block(&self, block_number: u64) -> bool { + self.earliest_failed_block.is_some_and(|failed_block| block_number >= failed_block) + } +} diff --git a/orchestrator/src/worker/event_handler/triggers/data_submission_worker.rs b/orchestrator/src/worker/event_handler/triggers/data_submission_worker.rs deleted file mode 100644 index 67539f1065..0000000000 --- a/orchestrator/src/worker/event_handler/triggers/data_submission_worker.rs +++ /dev/null @@ -1,80 +0,0 @@ -use crate::core::config::Config; -use crate::types::constant::BLOB_DATA_FILE_NAME; -use crate::types::jobs::metadata::{CommonMetadata, DaMetadata, JobMetadata, JobSpecificMetadata, ProvingMetadata}; -use crate::types::jobs::types::{JobStatus, JobType}; -use crate::utils::metrics::ORCHESTRATOR_METRICS; -use crate::worker::event_handler::service::JobHandlerService; -use crate::worker::event_handler::triggers::JobTrigger; -use async_trait::async_trait; -use opentelemetry::KeyValue; -use std::sync::Arc; - -pub struct DataSubmissionJobTrigger; - -#[async_trait] -impl JobTrigger for DataSubmissionJobTrigger { - // 0. All ids are assumed to be block numbers. - // 1. Fetch the latest completed Proving jobs without Data Submission jobs as successor jobs - // 2. Create jobs. - async fn run_worker(&self, config: Arc) -> color_eyre::Result<()> { - tracing::trace!(log_type = "starting", category = "DataSubmissionWorker", "DataSubmissionWorker started."); - - let successful_proving_jobs = config - .database() - .get_jobs_without_successor(JobType::ProofCreation, JobStatus::Completed, JobType::DataSubmission) - .await?; - - for proving_job in successful_proving_jobs { - // Extract proving metadata - let proving_metadata: ProvingMetadata = proving_job.metadata.specific.try_into().map_err(|e| { - tracing::error!( - job_id = %proving_job.internal_id, - error = %e, - "Invalid metadata type for proving job" - ); - e - })?; - - // Create DA metadata - let da_metadata = JobMetadata { - common: CommonMetadata::default(), - specific: JobSpecificMetadata::Da(DaMetadata { - block_number: proving_metadata.block_number, - // Set the blob data path using block number - blob_data_path: Some(format!("{}/{BLOB_DATA_FILE_NAME}", proving_metadata.block_number)), - // These will be populated during processing - tx_hash: None, - }), - }; - - match JobHandlerService::create_job( - JobType::DataSubmission, - proving_job.internal_id.clone(), - da_metadata, - config.clone(), - ) - .await - { - Ok(_) => tracing::info!( - block_id = %proving_job.internal_id, - "Successfully created new data submission job" - ), - Err(e) => { - tracing::warn!( - block_id = %proving_job.internal_id, - error = %e, - "Failed to create new data submission job" - ); - let attributes = [ - KeyValue::new("operation_job_type", format!("{:?}", JobType::DataSubmission)), - KeyValue::new("operation_type", format!("{:?}", "create_job")), - ]; - ORCHESTRATOR_METRICS.failed_job_operations.add(1.0, &attributes); - } - } - } - - tracing::trace!(log_type = "completed", category = "DataSubmissionWorker", "DataSubmissionWorker completed."); - Ok(()) - } -} diff --git a/orchestrator/src/worker/event_handler/triggers/mod.rs b/orchestrator/src/worker/event_handler/triggers/mod.rs index d958352e8b..b7a8d10acc 100644 --- a/orchestrator/src/worker/event_handler/triggers/mod.rs +++ b/orchestrator/src/worker/event_handler/triggers/mod.rs @@ -1,46 +1,22 @@ pub(crate) mod batching; -pub(crate) mod data_submission_worker; +pub(crate) mod data_submission; pub(crate) mod proof_registration; pub(crate) mod proving; pub(crate) mod snos; pub(crate) mod update_state; use crate::core::config::Config; -use crate::types::jobs::types::JobStatus; use async_trait::async_trait; use std::sync::Arc; +/// Result of processing a single proving job +enum ProcessingResult { + Created, + Skipped, + Failed, +} + #[async_trait] pub trait JobTrigger: Send + Sync { - async fn run_worker_if_enabled(&self, config: Arc) -> color_eyre::Result<()> { - if !self.is_worker_enabled(config.clone()).await? { - return Ok(()); - } - self.run_worker(config).await - } - async fn run_worker(&self, config: Arc) -> color_eyre::Result<()>; - - // Assumption - // If say a job for block X fails, we don't want the worker to respawn another job for the same - // block we will resolve the existing failed job first. - - // We assume the system to keep working till a job hasn't failed, - // as soon as it fails we currently halt any more execution and wait for manual intervention. - - // Checks if any of the jobs have failed - // Failure : JobStatus::VerificationFailed, JobStatus::VerificationTimeout, JobStatus::Failed - // Halts any new job creation till all the count of failed jobs is not Zero. - async fn is_worker_enabled(&self, config: Arc) -> color_eyre::Result { - let failed_jobs = config - .database() - .get_jobs_by_types_and_statuses(vec![], vec![JobStatus::Failed, JobStatus::VerificationTimeout], Some(1)) - .await?; - - if !failed_jobs.is_empty() { - return Ok(false); - } - - Ok(true) - } } diff --git a/orchestrator/src/worker/event_handler/triggers/proving.rs b/orchestrator/src/worker/event_handler/triggers/proving.rs index 287ffd7d0c..61e6c340fe 100644 --- a/orchestrator/src/worker/event_handler/triggers/proving.rs +++ b/orchestrator/src/worker/event_handler/triggers/proving.rs @@ -1,86 +1,199 @@ use std::sync::Arc; use async_trait::async_trait; +use color_eyre::eyre::Context; use opentelemetry::KeyValue; use crate::core::config::Config; +use crate::types::jobs::job_item::JobItem; use crate::types::jobs::metadata::{ CommonMetadata, JobMetadata, JobSpecificMetadata, ProvingInputType, ProvingMetadata, SnosMetadata, }; use crate::types::jobs::types::{JobStatus, JobType}; use crate::utils::metrics::ORCHESTRATOR_METRICS; use crate::worker::event_handler::service::JobHandlerService; -use crate::worker::event_handler::triggers::JobTrigger; +use crate::worker::event_handler::triggers::{JobTrigger, ProcessingResult}; pub struct ProvingJobTrigger; #[async_trait] impl JobTrigger for ProvingJobTrigger { - /// 1. Fetch all successful SNOS job runs that don't have a proving job - /// 2. Create a proving job for each SNOS job run + /// Creates proving jobs for all successful SNOS jobs that don't have one yet. + /// + /// This worker: + /// 1. Fetches all completed SNOS jobs without proving jobs + /// 2. Validates each job can be processed (not beyond failed blocks) + /// 3. Creates corresponding proving jobs with proper metadata async fn run_worker(&self, config: Arc) -> color_eyre::Result<()> { tracing::info!(log_type = "starting", category = "ProvingWorker", "ProvingWorker started."); - let successful_snos_jobs = config - .database() - .get_jobs_without_successor(JobType::SnosRun, JobStatus::Completed, JobType::ProofCreation) - .await?; + let processing_context = ProcessingContext::new(config.clone()).await?; + let eligible_snos_jobs = processing_context.get_eligible_snos_jobs().await?; - tracing::debug!("Found {} successful SNOS jobs without proving jobs", successful_snos_jobs.len()); + tracing::debug!("Found {} successful SNOS jobs without proving jobs", eligible_snos_jobs.len()); - for snos_job in successful_snos_jobs { - // Extract SNOS metadata - let snos_metadata: SnosMetadata = snos_job.metadata.specific.try_into().map_err(|e| { - tracing::error!(job_id = %snos_job.internal_id, error = %e, "Invalid metadata type for SNOS job"); - e - })?; + let mut created_jobs = 0; + let mut skipped_jobs = 0; - // Get SNOS fact early to handle the error case - let snos_fact = match &snos_metadata.snos_fact { - Some(fact) => fact.clone(), - None => { - tracing::error!(job_id = %snos_job.internal_id, "SNOS fact not found in metadata"); + for snos_job in eligible_snos_jobs { + match self.process_snos_job(&snos_job, &processing_context).await { + ProcessingResult::Created => created_jobs += 1, + ProcessingResult::Skipped => skipped_jobs += 1, + ProcessingResult::Failed => { + // Error already logged in process_snos_job continue; } - }; - - // Create proving job metadata - let proving_metadata = JobMetadata { - common: CommonMetadata::default(), - specific: JobSpecificMetadata::Proving(ProvingMetadata { - block_number: snos_metadata.block_number, - // Set input path as CairoPie type - input_path: snos_metadata.cairo_pie_path.map(ProvingInputType::CairoPie), - // Set a download path if needed - download_proof: None, - // Set SNOS fact for on-chain verification - ensure_on_chain_registration: Some(snos_fact), - n_steps: snos_metadata.snos_n_steps, - }), - }; - - tracing::debug!(job_id = %snos_job.internal_id, "Creating proof creation job for SNOS job"); - match JobHandlerService::create_job( - JobType::ProofCreation, - snos_job.internal_id.clone(), - proving_metadata, - config.clone(), - ) - .await - { - Ok(_) => tracing::info!(block_id = %snos_job.internal_id, "Successfully created new proving job"), - Err(e) => { - tracing::warn!(job_id = %snos_job.internal_id, error = %e, "Failed to create new proving job"); - let attributes = [ - KeyValue::new("operation_job_type", format!("{:?}", JobType::ProofCreation)), - KeyValue::new("operation_type", format!("{:?}", "create_job")), - ]; - ORCHESTRATOR_METRICS.failed_job_operations.add(1.0, &attributes); - } } } - tracing::trace!(log_type = "completed", category = "ProvingWorker", "ProvingWorker completed."); + tracing::info!(created = created_jobs, skipped = skipped_jobs, "ProvingWorker completed job processing"); + Ok(()) } } + +impl ProvingJobTrigger { + /// Processes a single SNOS job to create its corresponding proving job + async fn process_snos_job(&self, snos_job: &JobItem, context: &ProcessingContext) -> ProcessingResult { + // Extract and validate SNOS metadata + let snos_metadata = match self.extract_snos_metadata(snos_job) { + Ok(metadata) => metadata, + Err(_) => return ProcessingResult::Failed, + }; + + // Check if job should be skipped due to failed block constraints + if context.should_skip_block(snos_metadata.block_number) { + tracing::debug!( + job_id = %snos_job.internal_id, + block_number = snos_metadata.block_number, + earliest_failed_block = context.earliest_failed_block, + "Skipping proving job due to failed block constraint" + ); + return ProcessingResult::Skipped; + } + + // Validate SNOS fact availability + let snos_fact = match self.extract_snos_fact(&snos_metadata, &snos_job.internal_id) { + Ok(fact) => fact, + Err(_) => return ProcessingResult::Failed, + }; + + // Create and submit proving job + match self.create_proving_job(snos_job, &snos_metadata, snos_fact, context.config.clone()).await { + Ok(_) => { + tracing::info!( + block_number = snos_metadata.block_number, + job_id = %snos_job.internal_id, + "Successfully created proving job" + ); + ProcessingResult::Created + } + Err(_) => ProcessingResult::Failed, + } + } + + /// Extracts and validates SNOS metadata from the job + fn extract_snos_metadata(&self, snos_job: &JobItem) -> color_eyre::Result { + snos_job + .metadata + .specific + .clone() + .try_into() + .map_err(|e| { + tracing::error!( + job_id = %snos_job.internal_id, + error = %e, + "Invalid metadata type for SNOS job" + ); + e + }) + .context("Extracting Snos Metadata Failed") + } + + /// Extracts SNOS fact from metadata with proper error handling + fn extract_snos_fact(&self, snos_metadata: &SnosMetadata, job_id: &str) -> color_eyre::Result { + snos_metadata.snos_fact.clone().ok_or_else(|| { + tracing::error!(job_id = %job_id, "SNOS fact not found in metadata"); + color_eyre::eyre::eyre!("SNOS fact missing from metadata") + }) + } + + /// Creates a proving job with the appropriate metadata + async fn create_proving_job( + &self, + snos_job: &JobItem, + snos_metadata: &SnosMetadata, + snos_fact: String, + config: Arc, + ) -> color_eyre::Result<()> { + let proving_metadata = self.build_proving_metadata(snos_metadata, snos_fact); + + tracing::debug!( + job_id = %snos_job.internal_id, + block_number = snos_metadata.block_number, + "Creating proving job for SNOS job" + ); + + JobHandlerService::create_job(JobType::ProofCreation, snos_job.internal_id.clone(), proving_metadata, config) + .await + .map_err(|e| { + tracing::warn!( + job_id = %snos_job.internal_id, + error = %e, + "Failed to create proving job" + ); + + let attributes = [ + KeyValue::new("operation_job_type", format!("{:?}", JobType::ProofCreation)), + KeyValue::new("operation_type", "create_job"), + ]; + ORCHESTRATOR_METRICS.failed_job_operations.add(1.0, &attributes); + + e + }) + .context("Create Proving Job Failed.") + } + + /// Builds proving job metadata from SNOS metadata + fn build_proving_metadata(&self, snos_metadata: &SnosMetadata, snos_fact: String) -> JobMetadata { + JobMetadata { + common: CommonMetadata::default(), + specific: JobSpecificMetadata::Proving(ProvingMetadata { + block_number: snos_metadata.block_number, + input_path: snos_metadata.cairo_pie_path.as_ref().map(|path| ProvingInputType::CairoPie(path.clone())), + download_proof: None, + ensure_on_chain_registration: Some(snos_fact), + n_steps: snos_metadata.snos_n_steps, + }), + } + } +} + +/// Context for processing jobs, containing shared data and configuration +pub struct ProcessingContext { + pub config: Arc, + pub earliest_failed_block: Option, +} + +impl ProcessingContext { + /// Creates a new processing context with necessary data fetched + pub async fn new(config: Arc) -> color_eyre::Result { + let earliest_failed_block = config.database().get_earliest_failed_block_number().await?; + + Ok(Self { config, earliest_failed_block }) + } + + /// Fetches all SNOS jobs eligible for proving job creation + async fn get_eligible_snos_jobs(&self) -> color_eyre::Result> { + self.config + .database() + .get_jobs_without_successor(JobType::SnosRun, JobStatus::Completed, JobType::ProofCreation) + .await + .context("Failed to get Eligible SNOS Jobs") + } + + /// Determines if a block number should be skipped due to failed block constraints + fn should_skip_block(&self, block_number: u64) -> bool { + self.earliest_failed_block.is_some_and(|failed_block| block_number >= failed_block) + } +} diff --git a/orchestrator/src/worker/event_handler/triggers/snos.rs b/orchestrator/src/worker/event_handler/triggers/snos.rs index 41285ffcae..75f9356b6d 100644 --- a/orchestrator/src/worker/event_handler/triggers/snos.rs +++ b/orchestrator/src/worker/event_handler/triggers/snos.rs @@ -97,15 +97,17 @@ impl SnosJobTrigger { /// - Configuration limits (min/max block constraints) /// - Latest completed SNOS job (progress tracking) /// - Latest completed state update job (dependency requirement) + /// - Earliest failed block (processing safety boundary) /// /// # Processing Logic /// - `block_n_min`: Max of (latest state update block, configured minimum) /// - State updates must complete before SNOS processing /// - Respects configured minimum processing boundary /// - `block_n_completed`: Latest completed SNOS block (for gap filling) - /// - `block_n_max`: Min of (sequencer latest, configured maximum) + /// - `block_n_max`: Min of (sequencer latest, configured maximum, failed block - 1) /// - Cannot process blocks that don't exist yet /// - Respects configured maximum processing boundary + /// - Stops before any failed blocks to prevent processing beyond failure points /// /// # Arguments /// * `config` - Application configuration containing database and client access @@ -113,6 +115,7 @@ impl SnosJobTrigger { /// # Returns /// * `Result` - Calculated boundaries or error async fn calculate_processing_bounds(&self, config: &Arc) -> Result { + let earliest_failed_block = config.database().get_earliest_failed_block_number().await?; let latest_sequencer_block = self.fetch_latest_sequencer_block(config).await?; let service_config = config.service_config(); @@ -123,14 +126,17 @@ impl SnosJobTrigger { .map(|block| max(block, service_config.min_block_to_process)) .unwrap_or(service_config.min_block_to_process); - let block_n_max = service_config - .max_block_to_process - .map(|bound| min(latest_sequencer_block, bound)) - .unwrap_or(latest_sequencer_block); + let block_n_max = match (service_config.max_block_to_process, earliest_failed_block) { + (Some(config_max), Some(failed_block)) => { + min(min(latest_sequencer_block, config_max), failed_block.saturating_sub(1)) + } + (Some(config_max), None) => min(latest_sequencer_block, config_max), + (None, Some(failed_block)) => min(latest_sequencer_block, failed_block.saturating_sub(1)), + (None, None) => latest_sequencer_block, + }; Ok(ProcessingBounds { block_n_min, block_n_completed: latest_snos_completed, block_n_max }) } - /// Initializes the job scheduling context with available concurrency slots. /// /// This method sets up the scheduling context by: diff --git a/orchestrator/src/worker/event_handler/triggers/update_state.rs b/orchestrator/src/worker/event_handler/triggers/update_state.rs index 23404fc29b..dc3cf93ebe 100644 --- a/orchestrator/src/worker/event_handler/triggers/update_state.rs +++ b/orchestrator/src/worker/event_handler/triggers/update_state.rs @@ -1,10 +1,11 @@ use std::sync::Arc; use async_trait::async_trait; -use color_eyre::eyre::eyre; +use color_eyre::eyre::{eyre, Context}; use opentelemetry::KeyValue; use crate::core::config::Config; +use crate::types::jobs::job_item::JobItem; use crate::types::jobs::metadata::{ CommonMetadata, DaMetadata, JobMetadata, JobSpecificMetadata, SnosMetadata, StateUpdateMetadata, }; @@ -17,102 +18,51 @@ pub struct UpdateStateJobTrigger; #[async_trait] impl JobTrigger for UpdateStateJobTrigger { + /// Creates state transition jobs by collecting completed DA jobs and building state updates. + /// + /// This worker: + /// 1. Checks for existing pending state transition jobs (prevents parallel execution) + /// 2. Determines blocks to process based on completed DA jobs and previous state transitions + /// 3. Validates block continuity to ensure proper ordering + /// 4. Collects metadata from SNOS and DA jobs for the selected blocks + /// 5. Creates a state transition job with all necessary paths and metadata async fn run_worker(&self, config: Arc) -> color_eyre::Result<()> { tracing::trace!(log_type = "starting", category = "UpdateStateWorker", "UpdateStateWorker started."); - let latest_job = config.database().get_latest_job_by_type(JobType::StateTransition).await?; - let (completed_da_jobs, last_block_processed_in_last_job) = match latest_job { - Some(job) => { - if job.status != JobStatus::Completed { - tracing::warn!( - "There's already a pending update state job. Parallel jobs can cause nonce issues or can \ - completely fail as the update logic needs to be strictly ordered. Returning safely..." - ); - return Ok(()); - } - - // Extract blocks from state transition metadata - let state_metadata: StateUpdateMetadata = job.metadata.specific - .try_into() - .map_err(|e| { - tracing::error!(job_id = %job.internal_id, error = %e, "Invalid metadata type for state transition job"); - e - })?; - - let mut blocks_processed = state_metadata.blocks_to_settle.clone(); - blocks_processed.sort(); - - let last_block_processed = *blocks_processed - .last() - .ok_or_else(|| eyre!("No blocks found in previous state transition job"))?; - - ( - config - .database() - .get_jobs_after_internal_id_by_job_type( - JobType::DataSubmission, - JobStatus::Completed, - last_block_processed.to_string(), - ) - .await?, - Some(last_block_processed), - ) - } - None => { - tracing::warn!("No previous state transition job found, fetching latest data submission job"); - // Getting latest DA job in case no latest state update job is present - ( - config - .database() - .get_jobs_without_successor( - JobType::DataSubmission, - JobStatus::Completed, - JobType::StateTransition, - ) - .await?, - None, - ) - } - }; - - let mut blocks_to_process: Vec = - completed_da_jobs.iter().map(|j| j.internal_id.parse::().unwrap()).collect(); - blocks_to_process.sort(); + let processing_context = ProcessingContext::new(config.clone()).await?; - // no DA jobs completed after the last settled block - if blocks_to_process.is_empty() { - tracing::warn!("No DA jobs completed after the last settled block. Returning safely..."); + // Check for existing pending jobs first + if let Some(pending_reason) = processing_context.check_pending_jobs().await? { + tracing::warn!("{}", pending_reason); return Ok(()); } - // Verify block continuity - match last_block_processed_in_last_job { - Some(last_block) => { - if blocks_to_process[0] != last_block + 1 { - tracing::warn!( - "DA job for the block just after the last settled block is not yet completed. Returning \ - safely..." - ); - return Ok(()); - } - } + let blocks_to_process = match processing_context.determine_blocks_to_process().await? { + Some(blocks) => blocks, None => { - let min_block_to_process = config.service_config().min_block_to_process; - if blocks_to_process[0] != min_block_to_process { - tracing::warn!("DA job for the first block is not yet completed. Returning safely..."); - return Ok(()); - } + // Already logged the reason in determine_blocks_to_process + return Ok(()); } - } + }; - let mut blocks_to_process = find_successive_blocks_in_vector(blocks_to_process); - if blocks_to_process.len() > 10 { - blocks_to_process = blocks_to_process.into_iter().take(10).collect(); - } + let state_metadata = self.build_state_metadata(&blocks_to_process, &processing_context).await?; - // Prepare state transition metadata + self.create_state_transition_job(&blocks_to_process, state_metadata, processing_context.config.clone()).await?; + + tracing::trace!(log_type = "completed", category = "UpdateStateWorker", "UpdateStateWorker completed."); + Ok(()) + } +} + +impl UpdateStateJobTrigger { + /// Builds complete state metadata by collecting paths from SNOS and DA jobs + async fn build_state_metadata( + &self, + blocks_to_process: &[u64], + context: &ProcessingContext, + ) -> color_eyre::Result { let mut state_metadata = StateUpdateMetadata { - blocks_to_settle: blocks_to_process.clone(), + blocks_to_settle: blocks_to_process.to_vec(), snos_output_paths: Vec::new(), program_output_paths: Vec::new(), blob_data_paths: Vec::new(), @@ -120,68 +70,320 @@ impl JobTrigger for UpdateStateJobTrigger { tx_hashes: Vec::new(), }; - // Collect paths from SNOS and DA jobs - for block_number in &blocks_to_process { - // Get SNOS job paths - let snos_job = config - .database() - .get_job_by_internal_id_and_type(&block_number.to_string(), &JobType::SnosRun) - .await? - .ok_or_else(|| eyre!("SNOS job not found for block {}", block_number))?; - let snos_metadata: SnosMetadata = snos_job.metadata.specific.try_into().map_err(|e| { - tracing::error!(job_id = %snos_job.internal_id, error = %e, "Invalid metadata type for SNOS job"); - e - })?; + for &block_number in blocks_to_process { + let snos_paths = self.collect_snos_paths(block_number, context).await?; + let da_paths = self.collect_da_paths(block_number, context).await?; - if let Some(snos_path) = &snos_metadata.snos_output_path { - state_metadata.snos_output_paths.push(snos_path.clone()); - } - if let Some(program_path) = &snos_metadata.program_output_path { - state_metadata.program_output_paths.push(program_path.clone()); - } + state_metadata.snos_output_paths.extend(snos_paths.snos_output_paths); + state_metadata.program_output_paths.extend(snos_paths.program_output_paths); + state_metadata.blob_data_paths.extend(da_paths.blob_data_paths); + } + + Ok(state_metadata) + } + + /// Collects SNOS-related paths for a specific block + async fn collect_snos_paths( + &self, + block_number: u64, + context: &ProcessingContext, + ) -> color_eyre::Result { + let snos_job = context.get_snos_job(block_number).await?; + let snos_metadata = self.extract_snos_metadata(&snos_job)?; + + Ok(SnosPaths { + snos_output_paths: snos_metadata.snos_output_path.into_iter().collect(), + program_output_paths: snos_metadata.program_output_path.into_iter().collect(), + }) + } + + /// Collects DA-related paths for a specific block + async fn collect_da_paths(&self, block_number: u64, context: &ProcessingContext) -> color_eyre::Result { + let da_job = context.get_da_job(block_number).await?; + let da_metadata = self.extract_da_metadata(&da_job)?; - // Get DA job blob path - let da_job = config - .database() - .get_job_by_internal_id_and_type(&block_number.to_string(), &JobType::DataSubmission) - .await? - .ok_or_else(|| eyre!("DA job not found for block {}", block_number))?; + Ok(DaPaths { blob_data_paths: da_metadata.blob_data_path.into_iter().collect() }) + } - let da_metadata: DaMetadata = da_job.metadata.specific.try_into().map_err(|e| { - tracing::error!(job_id = %da_job.internal_id, error = %e, "Invalid metadata type for DA job"); + /// Extracts SNOS metadata from a job with error handling + fn extract_snos_metadata(&self, snos_job: &JobItem) -> color_eyre::Result { + snos_job + .metadata + .specific + .clone() + .try_into() + .map_err(|e| { + tracing::error!( + job_id = %snos_job.internal_id, + error = %e, + "Invalid metadata type for SNOS job" + ); e - })?; + }) + .context("Unable to extract SNOS metadata") + } - if let Some(blob_path) = &da_metadata.blob_data_path { - state_metadata.blob_data_paths.push(blob_path.clone()); - } - } - // Create job metadata + /// Extracts DA metadata from a job with error handling + fn extract_da_metadata(&self, da_job: &JobItem) -> color_eyre::Result { + da_job + .metadata + .specific + .clone() + .try_into() + .map_err(|e| { + tracing::error!( + job_id = %da_job.internal_id, + error = %e, + "Invalid metadata type for DA job" + ); + e + }) + .context("Unable to extract SNOS metadata") + } + + /// Creates the actual state transition job + async fn create_state_transition_job( + &self, + blocks_to_process: &[u64], + state_metadata: StateUpdateMetadata, + config: Arc, + ) -> color_eyre::Result<()> { let metadata = JobMetadata { common: CommonMetadata::default(), specific: JobSpecificMetadata::StateUpdate(state_metadata), }; - // Create the state transition job let new_job_id = blocks_to_process[0].to_string(); - match JobHandlerService::create_job(JobType::StateTransition, new_job_id.clone(), metadata, config.clone()) - .await - { - Ok(_) => tracing::info!(block_id = %new_job_id, "Successfully created new state transition job"), - Err(e) => { - tracing::error!(job_id = %new_job_id, error = %e, "Failed to create new state transition job"); + + tracing::info!( + job_id = %new_job_id, + blocks_count = blocks_to_process.len(), + blocks = ?blocks_to_process, + "Creating state transition job" + ); + + JobHandlerService::create_job(JobType::StateTransition, new_job_id.clone(), metadata, config).await.map_err( + |e| { + tracing::error!( + job_id = %new_job_id, + error = %e, + "Failed to create state transition job" + ); + let attributes = [ KeyValue::new("operation_job_type", format!("{:?}", JobType::StateTransition)), - KeyValue::new("operation_type", format!("{:?}", "create_job")), + KeyValue::new("operation_type", "create_job"), ]; ORCHESTRATOR_METRICS.failed_job_operations.add(1.0, &attributes); - return Err(e.into()); + + e + }, + )?; + + tracing::info!(job_id = %new_job_id, "Successfully created state transition job"); + Ok(()) + } +} + +/// Context for processing state transitions, containing shared data and configuration +struct ProcessingContext { + config: Arc, +} + +impl ProcessingContext { + /// Creates a new processing context + async fn new(config: Arc) -> color_eyre::Result { + Ok(Self { config }) + } + + /// Checks if there are any pending state transition jobs that would prevent new job creation + async fn check_pending_jobs(&self) -> color_eyre::Result> { + let latest_job = self.config.database().get_latest_job_by_type(JobType::StateTransition).await?; + + if let Some(job) = latest_job { + if job.status != JobStatus::Completed { + return Ok(Some( + "There's already a pending update state job. Parallel jobs can cause nonce issues or can \ + completely fail as the update logic needs to be strictly ordered. Returning safely..." + .to_string(), + )); } } - tracing::trace!(log_type = "completed", category = "UpdateStateWorker", "UpdateStateWorker completed."); - Ok(()) + Ok(None) + } + + /// Determines which blocks should be processed based on completed DA jobs and previous state transitions + async fn determine_blocks_to_process(&self) -> color_eyre::Result>> { + let latest_job = self.config.database().get_latest_job_by_type(JobType::StateTransition).await?; + + let earliest_failed_block = self.config.database().get_earliest_failed_block_number().await?; + + let (completed_da_jobs, last_block_processed) = match latest_job { + Some(job) => { + let last_block = self.get_last_processed_block(&job).await?; + let da_jobs = self.get_da_jobs_after_block(last_block).await?; + (da_jobs, Some(last_block)) + } + None => { + tracing::warn!("No previous state transition job found, fetching latest data submission job"); + let da_jobs = self.get_all_unprocessed_da_jobs().await?; + (da_jobs, None) + } + }; + + let mut blocks_to_process = self.extract_block_numbers(&completed_da_jobs)?; + + if blocks_to_process.is_empty() { + tracing::warn!("No DA jobs completed after the last settled block. Returning safely..."); + return Ok(None); + } + + // Filter out blocks that are at or after the earliest failed block (if it exists) + if let Some(failed_block) = earliest_failed_block { + blocks_to_process.retain(|&block| block < failed_block); + + if blocks_to_process.is_empty() { + tracing::warn!( + "All blocks to process are at or after earliest failed block {}. Returning safely...", + failed_block + ); + return Ok(None); + } + } + + blocks_to_process.sort(); + + // Validate block continuity + if !self.validate_block_continuity(&blocks_to_process, last_block_processed).await? { + return Ok(None); + } + + // Find successive blocks and limit to maximum batch size + let successive_blocks = find_successive_blocks_in_vector(blocks_to_process); + let final_blocks = self.limit_batch_size(successive_blocks); + + Ok(Some(final_blocks)) + } + + /// Extracts the last processed block from a state transition job + async fn get_last_processed_block(&self, job: &JobItem) -> color_eyre::Result { + // TODO: This is what I don't like, we know we are working with StateTransition Job, still we have to treat it like a general job. + let state_metadata: StateUpdateMetadata = job.metadata.specific.clone().try_into().map_err(|e| { + tracing::error!(job_id = %job.internal_id, error = %e, "Invalid metadata type for state transition job"); + e + })?; + + let mut blocks_processed = state_metadata.blocks_to_settle; + blocks_processed.sort(); + + blocks_processed.last().copied().ok_or_else(|| eyre!("No blocks found in previous state transition job")) + } + + /// Gets DA jobs processed after a specific block number + async fn get_da_jobs_after_block(&self, last_block: u64) -> color_eyre::Result> { + self.config + .database() + .get_jobs_after_internal_id_by_job_type( + JobType::DataSubmission, + JobStatus::Completed, + last_block.to_string(), + ) + .await + .context("Unable to get DA jobs after specified block") + } + + /// Gets all DA jobs that don't have state transition successors + async fn get_all_unprocessed_da_jobs(&self) -> color_eyre::Result> { + self.config + .database() + .get_jobs_without_successor(JobType::DataSubmission, JobStatus::Completed, JobType::StateTransition) + .await + .context("Unable to get all Unprocessed DA jobs") + } + + /// Extracts block numbers from DA jobs + fn extract_block_numbers(&self, da_jobs: &[JobItem]) -> color_eyre::Result> { + da_jobs + .iter() + .map(|job| { + job.internal_id.parse::().map_err(|e| { + tracing::error!(job_id = %job.internal_id, error = %e, "Failed to parse job ID as block number"); + eyre!("Invalid block number in job ID: {}", job.internal_id) + }) + }) + .collect() + } + + /// Validates that blocks are continuous from the expected starting point + async fn validate_block_continuity( + &self, + blocks: &[u64], + last_block_processed: Option, + ) -> color_eyre::Result { + let expected_first_block = match last_block_processed { + Some(last_block) => { + if blocks[0] != last_block + 1 { + tracing::warn!( + "DA job for the block just after the last settled block is not yet completed. Returning safely..." + ); + return Ok(false); + } + last_block + 1 + } + None => { + let min_block = self.config.service_config().min_block_to_process; + if blocks[0] != min_block { + tracing::warn!("DA job for the first block is not yet completed. Returning safely..."); + return Ok(false); + } + min_block + } + }; + + Ok(blocks[0] == expected_first_block) + } + + /// Limits the batch size to maximum allowed blocks + fn limit_batch_size(&self, mut blocks: Vec) -> Vec { + const MAX_BATCH_SIZE: usize = 10; + + if blocks.len() > MAX_BATCH_SIZE { + blocks.truncate(MAX_BATCH_SIZE); + tracing::info!("Limited batch size to {} blocks for processing efficiency", MAX_BATCH_SIZE); + } + + blocks } + + /// Gets a SNOS job for a specific block number + async fn get_snos_job(&self, block_number: u64) -> color_eyre::Result { + self.config + .database() + .get_job_by_internal_id_and_type(&block_number.to_string(), &JobType::SnosRun) + .await? + .ok_or_else(|| eyre!("SNOS job not found for block {}", block_number)) + } + + /// Gets a DA job for a specific block number + async fn get_da_job(&self, block_number: u64) -> color_eyre::Result { + self.config + .database() + .get_job_by_internal_id_and_type(&block_number.to_string(), &JobType::DataSubmission) + .await? + .ok_or_else(|| eyre!("DA job not found for block {}", block_number)) + } +} + +/// Paths collected from SNOS jobs +struct SnosPaths { + snos_output_paths: Vec, + program_output_paths: Vec, +} + +/// Paths collected from DA jobs +struct DaPaths { + blob_data_paths: Vec, } /// Gets the successive list of blocks from all the blocks processed in previous jobs diff --git a/orchestrator/src/worker/service.rs b/orchestrator/src/worker/service.rs index 3f3dc6fed7..bd54120ad7 100644 --- a/orchestrator/src/worker/service.rs +++ b/orchestrator/src/worker/service.rs @@ -129,7 +129,7 @@ impl JobService { JobItemUpdates::new() .update_status(JobStatus::PendingVerification) .update_metadata(job.metadata.clone()) - .build(), + .build()?, ) .await?; @@ -182,7 +182,7 @@ impl JobService { .database() .update_job( job, - JobItemUpdates::new().update_status(JobStatus::Failed).update_metadata(job_metadata).build(), + JobItemUpdates::new().update_status(JobStatus::Failed).update_metadata(job_metadata).build()?, ) .await {